전체 글 373

Airflow DB 살펴보기

팁: Airflow 메타데이터 DB 내용 살펴보기 airflow:airflow로 Postgres에 로그인 가능 docker exec -it learn-airflow-airflow-webserver-1 sh 혹은 learn-airflow-airflow-scheduler-1로 로그인 그 다음에 아래 명령 수행 psql -h postgres docker-compose로 연결된 컨테이너들끼리는 서비스 이름을 host이름 처럼 연동할 수 있다고 했었습니다. psql shell에서 아래 명령 수행 \dt select * FROM dag_run LIMIT 10; DELETE FROM dag_run WHERE dag_id = '기록을삭제하고싶은DAG’; 참고로 psql shell에서는 ctrl + l로 창을 clear..

Dag-Dependencies 데모

데모 TriggerDagRunOperator 실습: SourceDag TargetDag Jinja 템플릿 실습: Learn_Jinja BranchPythonOperator 실습: Learn_BranchPythonOperator LatestOnlyOperator 실습: Learn_LatestOnlyOperator Trigger Rules 실습: Learn_TriggerRules TriggerDagRunOperator 실습: TriggerDag.py의 DAG ID는 SourceDag입니다. 먼저 실행이 되어야하는 DAG를 SourceDag라고 합니다. TriggerDag from airflow import DAG from airflow.operators.trigger_dagrun import TriggerD..

Dag Dependencies - 나머지

Sensor + ExternalTaskSensor BranchPythonOperator LatestOnlyOperator Trigger Rules Sensor란 무엇인가? Sensor는 특정 조건이 충족될 때까지 대기하는 Operator Sensor는 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용 Airflow는 몇 가지 내장 Sensor를 제공 FileSensor: 지정된 위치에 파일이 생길 때까지 대기 HttpSensor: HTTP 요청을 수행하고 지정된 응답이 대기 SqlSensor: SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기 TimeSensor: 특정 시간에 도달할 때까지 워크플로우를 일시 중지 ExternalTaskSensor: 다른 Airflow DAG의 특..

Dag Dependencies - TriggerDagRunOperator & Jinja Template

Contents 1. 지난 숙제 리뷰 2. Dag Dependencies 3. Task Grouping 4. Dynamic Dags 5. DockerOperator Dag Dependencies DAG간 실행 순서를 정하려면 어떻게 해야하는지 알아보자 Dag를 실행하는 방법 주기적 실행: schedule로 지정 crontab을 이용 다른 Dag에 의해 트리거 Explicit Trigger: Dag A가 분명하게 Dag B를 트리거 (TriggerDagRunOperator) Reactive Trigger: Dag B가 Dag A가 끝나기를 대기 (ExternalTaskSensor) 알아두면 좋은 상황에 따라 다른 태스크 실행 방식들 조건에 따라 다른 태스크로 분기 (BranchPythonOperator) ..

Airflow API와 모니터링

Contents 1. ELT 구현 2. Slack 연동하기 3. 구글 시트 연동하기 (1): 시트 => Redshift 테이블 4. 구글 시트 연동하기 (2): Redshift 테이블 => 시트 5. API & Airflow 모니터링 6. 숙제 API & Airflow 모니터링 Airflow가 제공해주는 API에 대해서 알아보고 이를 이용해 모니터링 방법에 대해 알아보자 이번 섹션에서 해보고자 하는 일들 Airflow의 건강 여부 체크 (health check)을 어떻게 할지 학습 대부분의 서비스와 마찬가지로 Airflow도 얼마나 건강한지 외부에서 체크할 수 있는 Health Check라는 엔드 포인트가 있습니다. 이를 주기적으로 실행해 중요한 Airflow의 컴포넌트 2개 Web Server와 Sch..

Airflow와 구글시트 연동하기 - Redshift → sheet

Contents 1. ELT 구현 2. Slack 연동하기 3. 구글 시트 연동하기 (1): 시트 => Redshift 테이블 4. 구글 시트 연동하기 (2): Redshift 테이블 => 시트 5. API & Airflow 모니터링 6. 숙제 구글 시트 연동하기 (2) Redshift SELECT → 구글 시트 이번에는 RedShift의 SELECT 결과를 구글 시트에 COPY하는 과정을 DAG로 구현해보겠습니다. SQL 결과를 구글 시트로 복사하는 예제 개요 SELECT * FROM analytics.nps_summary 소스 코드 보기 소스 코드 보기 SQL_to_Sheet.py from airflow import DAG from airflow.operators.python import Python..

Airflow와 구글시트 연동하기 - 개요와 sheet → Redshift

1. ELT 작성과 구글시트/슬랙 연동 (후반부) ELT 구현과 구글시트/슬랙 연동과 같은 다양한 DAG를 작성해보자 Contents 1. ELT 구현 2. Slack 연동하기 3. 구글 시트 연동하기 (1): 시트 => Redshift 테이블 4. 구글 시트 연동하기 (2): Redshift 테이블 => 시트 5. API & Airflow 모니터링 6. 숙제 구글 시트를 테이블로 복사하는 예제 개요 CREATE TABLE keeyong.spreadsheet_copy_testing ( col1 int, col2 int, col3 int, col4 int ); 구글 시트 연동하기 시트 API 활성화하고 구글 서비스 어카운트 생성하고 그 내용을 JSON 파일로 다운로드 어카운트에서 생성해준 이메일을 조작하고..

Slack 연동하기

Slack 연동하기 DAG가 실패하면 Slack으로 에러를 보내보자 이번 섹션에서 하려는 일 DAG 실행 중에 에러가 발생하면 그걸 지정된 슬랙 workspace의 채널로 보내기 이를 위해서 해당 슬랙 workspace에 App 설정이 필요 다음으로 연동을 위한 함수를 하나 만들고 (plugins/slack.py) slack.py from airflow.models import Variable import logging import requests def on_failure_callback(context): """ https://airflow.apache.org/_modules/airflow/operators/slack_operator.html Define the callback to post on Sl..