터칭 데이터

Dynamic Dags 본문

Airflow 고급 기능, dbt, Data Catalog

Dynamic Dags

터칭 데이터 2024. 1. 3. 18:31

 

 

Dynamic Dags

 

DAG를 템플릿에 따라 찍어내야 하는 경우가 있는데 이를 사람의 손으로 하나하나 복사 붙여넣기 하기에는 귀찮고 실수할 확률도 높다. 이를 대신 해주는 것이 Dynamic Dags


Dynamic Dags를 사용해서 코드 재사용을 최대화할 수 있다! 
Jinja template와 YAML을 사용해야함

 

 

 

 

 

 

 

Dynamic Dag란 무엇인가?

 

템플릿과 YAML을 기반으로 DAG를 동적으로 만들어보자

Jinja를 기반으로 DAG 자체의 템플릿을 디자인하고 YAML을 통해 앞서 만든 템플릿에 파라미터를 제공

 

이를 통해 비슷한 DAG를 계속해서 매뉴얼하게 개발하는 것을 방지

 

DAG를 계속해서 만드는 것과 한 DAG안에서 태스크를 늘리는 것 사이의 밸런스 필요

오너가 다르거나 태스크의 수가 너무 커지는 경우 DAG를 복제해나가는 것이 더 좋음

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Dynamic Dag의 기본적인 아이디어

 

 

 

Yahoo finance에서 주식 종목들에 대한 정보를 가져오는 DAG를 만든다고 가정했을 때 주식 티커에 따라 하나하나 DAG를 만들 필요 없이 Dynamic Dag로 편리하게 생성합니다.

 

파라미터가 지정된 yaml 파일에서 변수를 입력 받아 jinja temlate 파일과 generator.py라는 생성기를 통해 dag_APPL.py, dag_GOOG.py 처럼 ymal 파일마다 하나의 파이썬 DAG를 만듭니다.

 

 

 

 

 

 

 

 

간단한 예제

 

https://github.com/learndataeng/learn-airflow/tree/main/dags/dynamic_dags

 

템플릿을 통한 최종 DAG 파일 생성

learn-airflow % python3 dags/dynamic_dags/generator.py

 

generator.py는 airflow가 설치된 root에서 실행해야 합니다.

 

이는 dags 폴더에 yml 파일의 수 만큼의 DAG 코드를 생성해줌

generator 실행을 언제할지 결정이 필요

 

 

 

 

 

 

 

templated_dag.jinja2

from airflow import DAG
from airflow.decorators import task
from datetime import datetime

with DAG(dag_id="get_price_{{ dag_id }}",
    start_date=datetime(2023, 6, 15),
    schedule='{{ schedule }}',
    catchup={{ catchup or True }}) as dag: # yaml 파일에서 catchup 파라미터가 제공되지 않으면 default 값은 True

    @task
    def extract(symbol):
        return symbol

    @task
    def process(symbol):
        return symbol

    @task
    def store(symbol):
        return symbol

    store(process(extract("{{ symbol }}")))

 

 

 

 

 

get_price_APPL.py

from airflow import DAG
from airflow.decorators import task
from datetime import datetime

with DAG(dag_id="get_price_APPL",
    start_date=datetime(2023, 6, 15),
    schedule='@daily',
    catchup=True) as dag:

    @task
    def extract(symbol):
        return symbol

    @task
    def process(symbol):
        return symbol

    @task
    def store(symbol):
        return symbol

    store(process(extract("APPL")))

 

 

 

 

 

 

 

데모

 

앞서 예제 실행

 

Web UI에서 확인

 

 

 

 

 

PS D:\dev_kdt\airflow-advanced\learn-airflow\dags\dynamic_dags> dir


    디렉터리: D:\dev_kdt\airflow-advanced\learn-airflow\dags\dynamic_dags


Mode                 LastWriteTime         Length Name
----                 -------------         ------ ----
-a----  2024-01-01(월)   오후 8:21             68 config_appl.yml
-a----  2024-01-01(월)   오후 8:21             53 config_goog.yml
-a----  2024-01-01(월)   오후 8:21            516 generator.py
-a----  2024-01-01(월)   오후 8:21            484 templated_dag.jinja2

 

dynamic_dags에 config_appl, config_goog yaml파일들이 있습니다.

 

 

 

generate.py는 root 폴더 즉, learn-airflow에서 실행해야 합니다.

 

PS D:\dev_kdt\airflow-advanced\learn-airflow> python3 dags/dynamic_dags/generator.py
Python

 

 

 

 

 

'Airflow 고급 기능, dbt, Data Catalog' 카테고리의 다른 글

DBT - Database Normalization  (0) 2024.01.04
Airflow 운영과 대안  (0) 2024.01.04
Task Groups  (0) 2024.01.03
Airflow DB 살펴보기  (0) 2024.01.03
Dag-Dependencies 데모  (0) 2024.01.03