터칭 데이터
Dag Dependencies - 나머지 본문
Sensor + ExternalTaskSensor
BranchPythonOperator
LatestOnlyOperator
Trigger Rules
Sensor란 무엇인가?
Sensor는 특정 조건이 충족될 때까지 대기하는 Operator
Sensor는 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용
Airflow는 몇 가지 내장 Sensor를 제공
FileSensor: 지정된 위치에 파일이 생길 때까지 대기
HttpSensor: HTTP 요청을 수행하고 지정된 응답이 대기
SqlSensor: SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기
TimeSensor: 특정 시간에 도달할 때까지 워크플로우를 일시 중지
ExternalTaskSensor: 다른 Airflow DAG의 특정 작업 완료를 대기
기본적으로 주기적으로 poke를 하는 것
Sensor는 크게 reschedule과 poke 방식으로 나뉘는데 poke는 주기적 체크를 말합니다.
poke는 체크 주기를 명확하게 보장하고 reschedule은 별도의 체크가 필요 없을 때 더 유용합니다.
worker를 하나 붙잡고 poke간에 sleep를 할지 아니면 worker를 릴리스하고 다시 잡아서 poke를 할지 결정해주는 파라미터가 존재: mode
- mode의 값은 reschedule 혹은 poke가 됨
ExternalTaskSensor (1)
DAG B의 ExternalTaskSensor 태스크가 DAG A의 특정 태스크가 끝났는지 체크함
먼저 동일한 schedule_interval을 사용
이 경우 두 태스크들의 Execution Date이 동일해야함. 아니면 매칭이 안됨!
그래서 TriggerDagRunOperator보다 조건이 까다롭고 poke로 워커가 낭비되는 경우가 많으며 DAG A입장에서는 누군가가 나를 기다린 다는 것을 알 수 없기 때문에 사용하기 조금 힘듭니다.
웬만해서는 사용을 비권장합니다.
from airflow.sensors.external_task import ExternalTaskSensor
waiting_for_end_of_dag_a = ExternalTaskSensor(
task_id='waiting_for_end_of_dag_a',
external_dag_id='DAG이름',
external_task_id='end',
timeout=5*60,
mode='reschedule'
)
ExternalTaskSensor (2)
만일 DAG A와 DAG B가 서로 다른 schedule interval을 갖는다면 ?
예를 들어 DAG A가 DAG B보다 5분 먼저 실행된다면?
- execution_delta를 사용
- execution_date_fn을 사용하면 조금더 복잡하게 컨트롤 가능
만일 두개의 DAG가 서로 다른 frequency를 갖고 있다면 이 경우 ExternalTaskSensor는 사용불가
from airflow.sensors.external_task import ExternalTaskSensor
waiting_for_end_of_dag_a = ExternalTaskSensor(
task_id='waiting_for_end_of_dag_a',
external_dag_id='DAG이름',
external_task_id='end',
timeout=5*60,
mode='reschedule',
execution_delta=timedelta(minutes=5)
)
BranchPythonOperator
상황에 따라 뒤에 실행되어야할 태스크를 동적으로 결정해주는 오퍼레이터
미리 정해준 Operator들 중에 선택하는 형태로 돌아감 (뒤에서 예제를 볼 예정)
TriggerDagOperator 앞에 이 오퍼레이터를 사용하는 경우도 있음
from airflow.operators.python import BranchPythonOperator
# 상황에 따라 뒤에 실행되어야 하는 태스크를 리턴
def skip_or_cont_trigger():
if Variable.get("mode", "dev") == "dev":
return []
else:
return ["trigger_b"]
# "mode"라는 Variable의 값이 "dev"이면 trigger_b 태스크를 스킵
branching = BranchPythonOperator(
task_id='branching',
python_callable=skip_or_cont_trigger,
)
BranchPythonOperator 실습
Learn_BranchPythonOperator.py -
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator
from datetime import datetime
default_args = {
'start_date': datetime(2023, 1, 1)
}
dag = DAG(
'Learn_BranchPythonOperator',
schedule='@daily',
default_args=default_args)
def decide_branch(**context):
current_hour = datetime.now().hour
print(f"current_hour: {current_hour}")
if current_hour < 12:
return 'morning_task'
else:
return 'afternoon_task'
branching_operator = BranchPythonOperator(
task_id='branching_task',
python_callable=decide_branch,
dag=dag
)
morning_task = EmptyOperator(
task_id='morning_task',
dag=dag
)
afternoon_task = EmptyOperator(
task_id='afternoon_task',
dag=dag
)
branching_operator >> morning_task
branching_operator >> afternoon_task

LatestOnlyOperator (1)
Time-sensitive한 태스크들이 과거 데이터의 backfill시 실행되는 것을 막기 위함
현재 시간이 지금 태스크가 처리하는 execution_date보다 미래이고 다음 execution_date보다는 과거인 경우에만 뒤로 실행을 이어가고 아니면 여기서 중단됨
t1 >> t3 >> [t2, t4]

LatestOnlyOperator (2)
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id='latest_only_example',
schedule=timedelta(hours=48), # 매 48시간마다 실행되는 DAG로 설정
start_date=datetime(2023, 6, 14),
catchup=True) as dag:
t1 = EmptyOperator(task_id='task1')
t2 = LatestOnlyOperator(task_id = 'latest_only')
t3 = EmptyOperator(task_id='task3')
t4 = EmptyOperator(task_id='task4')
t1 >> t2 >> [t3, t4]
LatestOnlyOperator (3)
앞서 소스 코드 살펴보기
Learn_LatestOnlyOperator.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from datetime import datetime
from datetime import timedelta
with DAG(
dag_id='Learn_LatestOnlyOperator',
schedule=timedelta(hours=48), # 매 48시간마다 실행되는 DAG로 설정
start_date=datetime(2023, 6, 14),
catchup=True) as dag:
t1 = EmptyOperator(task_id='task1')
t2 = LatestOnlyOperator(task_id='latest_only')
t3 = EmptyOperator(task_id='task3')
t4 = EmptyOperator(task_id='task4')
t1 >> t2 >> [t3, t4]

Trigger Rules이란?
Upstream 태스크의 성공실패 상황에 따라 뒷단 태스크의 실행여부를 결정하고 싶다면?
보통 앞단이 하나라도 실패하면 뒷 단의 태스크는 실행불가
Operator에 trigger_rule이란 파라미터로 결정 가능
trigger_rule은 태스크에 주어지는 파라미터로 다음과 같은 값이 가능
all_success (기본값), all_failed, all_done, one_failed, one_success, none_failed, none_failed_min_one_success
Trigger Rule의 가능값 (airflow.utils.trigger_rule.TriggerRule)
ALL_SUCCESS: (default) all parents have succeeded
ALL_FAILED: all parents are in a failed or upstream_failed state
ALL_DONE: all parents are done with their execution (성공실패 여부와 관계없이)
ONE_FAILED:
fires as soon as at least one parent has failed, it does not wait for all parents to be done
ONE_SUCCESS:
fires as soon as at least one parent succeeds, it does not wait for all parents to be done
NONE_FAILED:
all parents have not failed (or upstream_failed) i.e. all parents have succeeded or been skipped
NONE_FAILED_MIN_ONE_SUCCESS
one parent at least is done but none failed
Trigger Rule 사용 예

from airflow.utils.trigger_rule import TriggerRule
with DAG("trigger_rules", default_args=default_args, schedule=timedelta(1)) as dag:
t1 = BashOperator(task_id="print_date", bash_command="date")
t2 = BashOperator(task_id="sleep", bash_command="sleep 5")
t3 = BashOperator(task_id="exit", bash_command="exit 1")
t4 = BashOperator(
task_id='final_task',
bash_command='echo DONE!',
trigger_rule=TriggerRule.ALL_DONE
)
[t1, t2, t3] >> t4
t3 = BashOperator(task_id="exit", bash_command="exit 1")
Trigger Rule 예제와 실행 예
Learn_TriggerRules 실습
Learn_TriggerRule.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta
default_args = {
'start_date': datetime(2023, 6, 15)
}
with DAG("Learn_TriggerRule", default_args=default_args, schedule=timedelta(1)) as dag:
t1 = BashOperator(task_id="print_date", bash_command="date")
t2 = BashOperator(task_id="sleep", bash_command="sleep 5")
t3 = BashOperator(task_id="exit", bash_command="exit 1")
t4 = BashOperator(
task_id='final_task',
bash_command='echo DONE!',
trigger_rule=TriggerRule.ALL_DONE
)
[t1, t2, t3] >> t4
실제 실행 후 Web UI에서의 모습

'Airflow 고급 기능, dbt, Data Catalog' 카테고리의 다른 글
Airflow DB 살펴보기 (0) | 2024.01.03 |
---|---|
Dag-Dependencies 데모 (0) | 2024.01.03 |
Dag Dependencies - TriggerDagRunOperator & Jinja Template (0) | 2024.01.03 |
2 번째 챕터 (0) | 2024.01.03 |
Airflow API와 모니터링 (0) | 2024.01.03 |