터칭 데이터
Dag-Dependencies 데모 본문
데모
TriggerDagRunOperator 실습:
Jinja 템플릿 실습: Learn_Jinja
BranchPythonOperator 실습: Learn_BranchPythonOperator
LatestOnlyOperator 실습: Learn_LatestOnlyOperator
Trigger Rules 실습: Learn_TriggerRules
TriggerDagRunOperator 실습:
TriggerDag.py의 DAG ID는 SourceDag입니다.
먼저 실행이 되어야하는 DAG를 SourceDag라고 합니다.
from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime
dag = DAG(
dag_id='SourceDag',
start_date=datetime(2023, 6, 19),
schedule='@daily'
)
trigger_task = TriggerDagRunOperator(
task_id='trigger_task',
trigger_dag_id='TargetDag',
conf={'path': 'value1'},
execution_date='{{ ds }}',
reset_dag_run=True,
dag=dag
)
특정 DAG가 실행되고 그 다음에 실행되는 DAG를 TargetDag라고 합니다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
dag = DAG(
'TargetDag',
schedule='@once', # 매일 실행
start_date=datetime(2023, 6, 1),
)
task1 = BashOperator(
task_id='task1',
bash_command="""echo '{{ ds }}, {{ dag_run.conf.get("path", "none") }}' """,
dag=dag
)
PS D:\dev_kdt\airflow-advanced\learn-airflow\dags\trigger_dags> dir
디렉터리: D:\dev_kdt\airflow-advanced\learn-airflow\dags\trigger_dags
Mode LastWriteTime Length Name
---- ------------- ------ ----
d----- 2024-01-01(월) 오후 9:11 __pycache__
-a---- 2024-01-01(월) 오후 8:21 363 TargetDag.py
-a---- 2024-01-01(월) 오후 8:21 434 TriggerDag.py
trigger_dags에 있는 TargetDag와 TriggerDag들을 실습에 사용하겠습니다.
먼저 Airflow Scheduler 컨테이너 로그인합니다.
(airflow)airflow dags list | grep SourceDag
SourceDag | trigger_dags/TriggerDag.py | airflow | True
(airflow)airflow tasks list SourceDag
trigger_task
DAG를 찾고 태스크들을 살펴보았습니다.
(airflow)airflow tasks test SourceDag trigger_task 2023-06-18
execution_date는 2023-06-18입니다.
Airflow WebUI로 가서
Target DAG를 활성화한 뒤
Auto Refresh로 성공한 것을 확인한 후
logs탭으로 가보면
execution_date로 넘겨준 날짜와 conf라는 파라미터에 path key의 값이 value1이 넘어온 것을 볼 수 있습니다.
Jinja 실습
Web UI에서 Learn_Jinja DAG를 실행합니다.
task 3개가 실행되었습니다.
코드를 살펴보면
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
# DAG 정의
dag = DAG(
'Learn_Jinja',
schedule='0 0 * * *', # 매일 실행
start_date=datetime(2023, 6, 1),
catchup=False
)
# BashOperator를 사용하여 템플릿 작업 정의
task1 = BashOperator(
task_id='task1',
bash_command='echo "{{ ds }}"',
dag=dag
)
# 동적 매개변수가 있는 다른 템플릿 작업 정의
task2 = BashOperator(
task_id='task2',
bash_command='echo "안녕하세요, {{ params.name }}!"',
params={'name': 'John'}, # 사용자 정의 가능한 매개변수
dag=dag
)
task3 = BashOperator(
task_id='task3',
bash_command="""echo "{{ dag }}, {{ task }}, {{ var.value.get('csv_url') }}" """,
dag=dag
)
task1 >> task2 >> task3
task1은 execution_date를 echo로 print하는 역할을 합니다.
task2는 params로 들어온 params의 키 name과 값 John을 사용해 echo로 안녕하세요 param.name(John)을 print합니다.
task3는 dag, task와 csv_url이라는 Variables를 프린트합니다.
에서 각 task1, 2, 3를 클릭해 log를 살펴보면
task1
현재 실행된 execution_date이 보입니다.
task2
안녕하세요, John이 보입니다.
task3
DAG, Task, csv_url이 보입니다.
BranchPythonOperator
Learn_BranchPythonOperator
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator
from datetime import datetime
default_args = {
'start_date': datetime(2023, 1, 1)
}
dag = DAG(
'Learn_BranchPythonOperator',
schedule='@daily',
default_args=default_args)
def decide_branch(**context):
current_hour = datetime.now().hour
print(f"current_hour: {current_hour}")
if current_hour < 12:
return 'morning_task'
else:
return 'afternoon_task'
branching_operator = BranchPythonOperator(
task_id='branching_task',
python_callable=decide_branch,
dag=dag
)
morning_task = EmptyOperator(
task_id='morning_task',
dag=dag
)
afternoon_task = EmptyOperator(
task_id='afternoon_task',
dag=dag
)
branching_operator >> morning_task
branching_operator >> afternoon_task
WebUI 우측상단의 UTC 시간에 따라 12시 이전이면 morning_task를 그렇지 않다면 afternoon_task를 실행합니다.
20시 이후에 실행되었다면 morning_task를 skip했다고 할 것입니다.
LatestOnlyOperator
Learn_LatestOnlyOperator
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from datetime import datetime
from datetime import timedelta
with DAG(
dag_id='Learn_LatestOnlyOperator',
schedule=timedelta(hours=48), # 매 48시간마다 실행되는 DAG로 설정
start_date=datetime(2023, 6, 14),
catchup=True) as dag:
t1 = EmptyOperator(task_id='task1')
t2 = LatestOnlyOperator(task_id='latest_only')
t3 = EmptyOperator(task_id='task3')
t4 = EmptyOperator(task_id='task4')
t1 >> t2 >> [t3, t4]
48시간 마다 실행되는 DAG입니다. 시작시간은 2023_06_14입니다. 2023_06_16에 execution_date를 2023-06-14가 되어 첫 실행될 것입니다.
만일 2023-06-18이 execution_date으로 주어진다면 현재 catchup=True이기 때문에 2023-06-16 한 번, 2023-06-18 한 번으로 총 2번 시행되어야 하지만 2023-06-16 시행분은 t2에서 막힐 것입니다. execution_date 2023-06-18에는 2023-06-16은 과거 데이터를 처리하는 것이므로 t2가 이를 막기 때문입니다. 즉 결과적으로 t1, t2는 실행되지만 t3, t4는 실행되지 않을 것입니다.
(airflow)airflow dags test Learn_LatestOnlyOperator 2023-06-15
[2024-01-03 08:26:00,806] {skipmixin.py:211} INFO - Skipping tasks ['task3', 'task4']
[2024-01-03 08:26:00,806] {skipmixin.py:211} INFO - Skipping tasks ['task3', 'task4']
[2024-01-03 08:26:00,862] {taskinstance.py:1323} INFO - Marking task as SUCCESS. dag_id=Learn_LatestOnlyOperator, task_id=latest_only, execution_date=20231230T000000, start_date=, end_date=20240103T082600
[2024-01-03 08:26:00,862] {taskinstance.py:1323} INFO - Marking task as SUCCESS. dag_id=Learn_LatestOnlyOperator, task_id=latest_only, execution_date=20231230T000000, start_date=, end_date=20240103T082600
[2024-01-03 08:26:00,879] {dag.py:3642} INFO - latest_only ran successfully!
execution_date를 2023-06-15로 주었을 때 task3, task4는 skip되었습니다.
Web UI에서 DAG를 직접 실행하고 latest_only의 log를 살펴보면 위와 같이 skip된 task가 없을 것입니다.
Trigger Rules 실습
Learn_TriggerRule.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta
default_args = {
'start_date': datetime(2023, 6, 15)
}
with DAG("Learn_TriggerRule", default_args=default_args, schedule=timedelta(1)) as dag:
t1 = BashOperator(task_id="print_date", bash_command="date")
t2 = BashOperator(task_id="sleep", bash_command="sleep 5")
t3 = BashOperator(task_id="exit", bash_command="exit 1")
t4 = BashOperator(
task_id='final_task',
bash_command='echo DONE!',
trigger_rule=TriggerRule.ALL_DONE
)
[t1, t2, t3] >> t4
sleep, print_date, exit이 모두 실행이 끝나야 final_task가 실행됩니다.
'Airflow 고급 기능, dbt, Data Catalog' 카테고리의 다른 글
Task Groups (0) | 2024.01.03 |
---|---|
Airflow DB 살펴보기 (0) | 2024.01.03 |
Dag Dependencies - 나머지 (0) | 2024.01.03 |
Dag Dependencies - TriggerDagRunOperator & Jinja Template (0) | 2024.01.03 |
2 번째 챕터 (0) | 2024.01.03 |