터칭 데이터

Dag-Dependencies 데모 본문

Airflow 고급 기능, dbt, Data Catalog

Dag-Dependencies 데모

터칭 데이터 2024. 1. 3. 17:47

 

 

 

데모

 

TriggerDagRunOperator 실습:

SourceDag
TargetDag

 

Jinja 템플릿 실습: Learn_Jinja

 

BranchPythonOperator 실습: Learn_BranchPythonOperator

 

LatestOnlyOperator 실습: Learn_LatestOnlyOperator

 

Trigger Rules 실습: Learn_TriggerRules

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

TriggerDagRunOperator 실습:

 

TriggerDag.py의 DAG ID는 SourceDag입니다.

먼저 실행이 되어야하는 DAG를 SourceDag라고 합니다.

TriggerDag

from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime

dag = DAG(
    dag_id='SourceDag',
    start_date=datetime(2023, 6, 19),
    schedule='@daily'
)

trigger_task = TriggerDagRunOperator(
    task_id='trigger_task',
    trigger_dag_id='TargetDag',
    conf={'path': 'value1'},
    execution_date='{{ ds }}',
    reset_dag_run=True,
    dag=dag
)

 

 


TargetDag

특정 DAG가 실행되고 그 다음에 실행되는 DAG를 TargetDag라고 합니다.

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

dag = DAG(
    'TargetDag',
    schedule='@once',  # 매일 실행
    start_date=datetime(2023, 6, 1),
)

task1 = BashOperator(
    task_id='task1',
    bash_command="""echo '{{ ds }}, {{ dag_run.conf.get("path", "none") }}' """,
    dag=dag
)

 

 

 

 

 

 

 

 

PS D:\dev_kdt\airflow-advanced\learn-airflow\dags\trigger_dags> dir


    디렉터리: D:\dev_kdt\airflow-advanced\learn-airflow\dags\trigger_dags


Mode                 LastWriteTime         Length Name
----                 -------------         ------ ----
d-----  2024-01-01(월)   오후 9:11                __pycache__
-a----  2024-01-01(월)   오후 8:21            363 TargetDag.py
-a----  2024-01-01(월)   오후 8:21            434 TriggerDag.py

 

trigger_dags에 있는 TargetDag와 TriggerDag들을 실습에 사용하겠습니다.

 

 

 

 

먼저 Airflow Scheduler 컨테이너 로그인합니다.

 

(airflow)airflow dags list | grep SourceDag

SourceDag                                | trigger_dags/TriggerDag.py                                                                                       | airflow | True

 

(airflow)airflow tasks list SourceDag

trigger_task

 

DAG를 찾고 태스크들을 살펴보았습니다.

 

 

 

 

 

 

(airflow)airflow tasks test SourceDag trigger_task 2023-06-18

 

execution_date는 2023-06-18입니다.

 

 

 

Airflow WebUI로 가서

Target DAG를 활성화한 뒤

Auto Refresh로 성공한 것을 확인한 후 

logs탭으로 가보면

 

execution_date로 넘겨준 날짜와 conf라는 파라미터에 path key의 값이 value1이 넘어온 것을 볼 수 있습니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

Jinja 실습

 

 

Web UI에서 Learn_Jinja DAG를 실행합니다.

 

 

 

 

 

 

task 3개가 실행되었습니다.

 

 

 

코드를 살펴보면

 

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

# DAG 정의
dag = DAG(
    'Learn_Jinja',
    schedule='0 0 * * *',  # 매일 실행
    start_date=datetime(2023, 6, 1),
    catchup=False
)

# BashOperator를 사용하여 템플릿 작업 정의
task1 = BashOperator(
    task_id='task1',
    bash_command='echo "{{ ds }}"',
    dag=dag
)

# 동적 매개변수가 있는 다른 템플릿 작업 정의
task2 = BashOperator(
    task_id='task2',
    bash_command='echo "안녕하세요, {{ params.name }}!"',
    params={'name': 'John'},  # 사용자 정의 가능한 매개변수
    dag=dag
)

task3 = BashOperator(
    task_id='task3',
    bash_command="""echo "{{ dag }}, {{ task }}, {{ var.value.get('csv_url') }}" """,
    dag=dag
)

task1 >> task2 >> task3

 

task1은 execution_date를 echo로 print하는 역할을 합니다.

 

task2는 params로 들어온 params의 키 name과 값 John을 사용해 echo로 안녕하세요 param.name(John)을 print합니다.

 

task3는 dag, task와 csv_url이라는 Variables를 프린트합니다.

 

 

 

 

에서 각 task1, 2, 3를 클릭해 log를 살펴보면

 

 

task1

 

현재 실행된 execution_date이 보입니다.

 

 

 

task2

 

안녕하세요, John이 보입니다.

 

 

task3

 

DAG, Task, csv_url이 보입니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

BranchPythonOperator

 

Learn_BranchPythonOperator

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator
from datetime import datetime

default_args = {
    'start_date': datetime(2023, 1, 1)
}

dag = DAG(
    'Learn_BranchPythonOperator',
    schedule='@daily',
    default_args=default_args)


def decide_branch(**context):
    current_hour = datetime.now().hour
    print(f"current_hour: {current_hour}")
    if current_hour < 12:
        return 'morning_task'
    else:
        return 'afternoon_task'


branching_operator = BranchPythonOperator(
    task_id='branching_task',
    python_callable=decide_branch,
    dag=dag
)


morning_task = EmptyOperator(
    task_id='morning_task',
    dag=dag
)


afternoon_task = EmptyOperator(
    task_id='afternoon_task',
    dag=dag
)

branching_operator >> morning_task
branching_operator >> afternoon_task

 

 

 

WebUI 우측상단의 UTC 시간에 따라 12시 이전이면 morning_task를 그렇지 않다면 afternoon_task를 실행합니다.

 

 

 

 

20시 이후에 실행되었다면 morning_task를 skip했다고 할 것입니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

LatestOnlyOperator 

 

Learn_LatestOnlyOperator

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from datetime import datetime
from datetime import timedelta

with DAG(
   dag_id='Learn_LatestOnlyOperator',
   schedule=timedelta(hours=48),    # 매 48시간마다 실행되는 DAG로 설정
   start_date=datetime(2023, 6, 14),
   catchup=True) as dag:

   t1 = EmptyOperator(task_id='task1')
   t2 = LatestOnlyOperator(task_id='latest_only')
   t3 = EmptyOperator(task_id='task3')
   t4 = EmptyOperator(task_id='task4')

   t1 >> t2 >> [t3, t4]

 

48시간 마다 실행되는 DAG입니다. 시작시간은 2023_06_14입니다. 2023_06_16에 execution_date를 2023-06-14가 되어 첫 실행될 것입니다.

 

만일 2023-06-18이 execution_date으로 주어진다면 현재 catchup=True이기 때문에 2023-06-16 한 번, 2023-06-18 한 번으로 총 2번 시행되어야 하지만 2023-06-16 시행분은 t2에서 막힐 것입니다. execution_date 2023-06-18에는 2023-06-16은 과거 데이터를 처리하는 것이므로 t2가 이를 막기 때문입니다. 즉 결과적으로  t1, t2는 실행되지만 t3, t4는 실행되지 않을 것입니다.

 

 

 

 

 

(airflow)airflow dags test Learn_LatestOnlyOperator 2023-06-15

[2024-01-03 08:26:00,806] {skipmixin.py:211} INFO - Skipping tasks ['task3', 'task4']
[2024-01-03 08:26:00,806] {skipmixin.py:211} INFO - Skipping tasks ['task3', 'task4']
[2024-01-03 08:26:00,862] {taskinstance.py:1323} INFO - Marking task as SUCCESS. dag_id=Learn_LatestOnlyOperator, task_id=latest_only, execution_date=20231230T000000, start_date=, end_date=20240103T082600
[2024-01-03 08:26:00,862] {taskinstance.py:1323} INFO - Marking task as SUCCESS. dag_id=Learn_LatestOnlyOperator, task_id=latest_only, execution_date=20231230T000000, start_date=, end_date=20240103T082600
[2024-01-03 08:26:00,879] {dag.py:3642} INFO - latest_only ran successfully!

 

execution_date를 2023-06-15로 주었을 때 task3, task4는 skip되었습니다.

 

 

 

 

 

Web UI에서 DAG를 직접 실행하고 latest_only의 log를 살펴보면 위와 같이 skip된 task가 없을 것입니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Trigger Rules 실습

 

Learn_TriggerRule.py

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta

default_args = {
    'start_date': datetime(2023, 6, 15)
}

with DAG("Learn_TriggerRule", default_args=default_args, schedule=timedelta(1)) as dag:
    t1 = BashOperator(task_id="print_date", bash_command="date")
    t2 = BashOperator(task_id="sleep", bash_command="sleep 5")
    t3 = BashOperator(task_id="exit", bash_command="exit 1")
    t4 = BashOperator(
        task_id='final_task',
        bash_command='echo DONE!',
        trigger_rule=TriggerRule.ALL_DONE
    )
    [t1, t2, t3] >> t4

 

 

 

sleep, print_date, exit이 모두 실행이 끝나야 final_task가 실행됩니다.

 

 

 

 

 

 

'Airflow 고급 기능, dbt, Data Catalog' 카테고리의 다른 글

Task Groups  (0) 2024.01.03
Airflow DB 살펴보기  (0) 2024.01.03
Dag Dependencies - 나머지  (0) 2024.01.03
Dag Dependencies - TriggerDagRunOperator & Jinja Template  (0) 2024.01.03
2 번째 챕터  (0) 2024.01.03