터칭 데이터
Dag Dependencies - TriggerDagRunOperator & Jinja Template 본문
Dag Dependencies - TriggerDagRunOperator & Jinja Template
터칭 데이터 2024. 1. 3. 14:35
Contents
1. 지난 숙제 리뷰
2. Dag Dependencies
3. Task Grouping
4. Dynamic Dags
5. DockerOperator
Dag Dependencies
DAG간 실행 순서를 정하려면 어떻게 해야하는지 알아보자
Dag를 실행하는 방법
주기적 실행: schedule로 지정
crontab을 이용
다른 Dag에 의해 트리거
Explicit Trigger: Dag A가 분명하게 Dag B를 트리거 (TriggerDagRunOperator)
Reactive Trigger: Dag B가 Dag A가 끝나기를 대기 (ExternalTaskSensor)
알아두면 좋은 상황에 따라 다른 태스크 실행 방식들
조건에 따라 다른 태스크로 분기 (BranchPythonOperator)
과거 데이터 Backfill시에는 불필요한 태스크 처리 (LatestOnlyOperator)
앞단 태스크들의 실행상황
- 어떤 경우에는 앞단이 실패해도 동작해야하는 경우가 있을 수 있음
두 가지 방법이 존재
Explicit trigger
TriggerDagRunOperator
DAG A가 명시적으로 DAG B를 트리거
Reactive trigger
ExternalTaskSensor
DAG B가 DAG A의 태스크가 끝나기를 대기
- 이 경우 DAG A는 이 사실을 모름
TriggerDagRunOperator (1)
DAG A의 태스크를 TriggerDagRunOperator로 구현
참고: Jinja Template이란?
Jinja 템플릿은 Python에서 널리 사용되는 템플릿 엔진
Django 템플릿 엔진에서 영감을 받아 개발
Jinja를 사용하면 프레젠테이션 로직과 애플리케이션 로직을 분리하여 동적으로 HTML 생성
Flask에서 사용됨
변수는 이중 중괄호 {{ }}로 감싸서 사용
<h1>안녕하세요, {{ name }}님!</h1>
제어문은 퍼센트 기호 {% %}로 표시
<ul>
{% for item in items %}
<li>{{ item }}</li>
{% endfor %}
</ul>
참고: Jinja Template + Airflow (1)
Airflow에서 Jinja 템플릿을 사용하면 작업 이름, 파라미터 또는 SQL 쿼리와 같은 작업 매개변수를 템플릿화된 문자열로 정의 가능
이를 통해 재사용가능하고 사용자 정의 가능한 워크플로우 생성
예 1) execution_date을 코드 내에서 쉽게 사용: {{ ds }}
가능한 모든 시스템 변수는 여기를 참조
# BashOperator를 사용하여 템플릿 작업 정의
task1 = BashOperator(
task_id='task1',
bash_command='echo "{{ ds }}"',
dag=dag
)
참고: Jinja Template + Airflow (2)
예 2) 파라미터 등으로 넘어온 변수를 쉽게 사용 가능
# 동적 매개변수가 있는 다른 템플릿 작업 정의
task2 = BashOperator(
task_id='task2',
bash_command='echo "안녕하세요, {{ params.name }}!"',
params={'name': 'John'}, # 사용자 정의 가능한 매개변수
dag=dag
)
Jinja Template은 어떤 Operator에서 특정 파라미터에서만 사용할 수 있습니다.
위의 예시에서는 BashOperator에서 bash_command에서만 Jinja 템플릿을 사용할 수 있었습니다. 이를 어떻게 체크할 수 있을까요?
참고: BashOperator 레퍼런스 보기
● bash_command (str) – The command, set of commands or reference to a bash script (must be ‘.sh’) to be executed.
(templated)
● env (dict[str, str] | None) – If env is not None, it must be a dict that defines the environment variables for the new
process; these are used instead of inheriting the current process environment, which is the default behavior.
(templated)
● append_env (bool) – If False(default) uses the environment variables passed in env params and does not inherit the
current process environment. If True, inherits the environment variables from current passes and then environment
variable passed by the user will either update the existing inherited environment variables or the new variables gets
appended to it
● output_encoding (str) – Output encoding of bash command
● skip_on_exit_code (int | Container[int] | None) – If task exits with this exit code, leave the task in skipped state
(default: 99). If set to None, any non-zero exit code will be treated as a failure.
● cwd (str | None) – Working directory to execute the command in. If None (default), the command is run in a
temporary directory
뒤에 (templated)가 붙어 있다면 Jinja template을 사용할 수 있다는 얘기입니다.
참고: Airflow에서 사용 가능한 Jinja 변수들 몇개 살펴보기
가능한 모든 시스템 변수는 여기를 참조
{{ ds }}
execution_date에서 연도-월-일 읽어오기(10자리)
{{ ds_nodash }}
연도-월-일에서 대시(-)를 뺀 7자리
{{ ts }}
execution_date인데 시간:분:초까지
{{ dag }}
dag 이름
.을 붙여 property 조회 가능
{{ task }}
task 정보
.을 붙여 property 조회 가능
{{ dag_run }}
trigger dagrun과 관련
{{ var.value }}: {{ var.value.get('my.var', 'fallback') }}
{{ var.json }}: {{ var.json.my_dict_var.key1 }}
저장된 값이 json이라면 훨씬 더 편리한 방법
{{ conn }}: {{ conn.my_conn_id.login }}, {{ conn.my_conn_id.password }}
참고: Airflow에서 Jinja 변수를 사용한 예제 코드 살펴보기
Learn_Jinja
BashOperator 3개로 구성
- “airflow dags test Learn_Jinja 2023-05-30”
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
# DAG 정의
dag = DAG(
'Learn_Jinja',
schedule='0 0 * * *', # 매일 실행
start_date=datetime(2023, 6, 1),
catchup=False
)
# BashOperator를 사용하여 템플릿 작업 정의
task1 = BashOperator(
task_id='task1',
bash_command='echo "{{ ds }}"',
dag=dag
)
# 동적 매개변수가 있는 다른 템플릿 작업 정의
task2 = BashOperator(
task_id='task2',
bash_command='echo "안녕하세요, {{ params.name }}!"',
params={'name': 'John'}, # 사용자 정의 가능한 매개변수
dag=dag
)
task3 = BashOperator(
task_id='task3',
bash_command="""echo "{{ dag }}, {{ task }}, {{ var.value.get('csv_url') }}" """,
dag=dag
)
task1 >> task2 >> task3
Jinja Template에 대한 얘기는 여기서 마무리하고 다시 TriggerDagRunOperator 이야기로 넘어가겠습니다.
TriggerDagRunOperator (2)
trigger_dag_id (str) – The dag_id to trigger (templated).
trigger_run_id (str | None) – The run ID to use for the triggered DAG run (templated). If not provided, a run ID will be automatically generated.
conf (dict | None) – Configuration for the DAG run (templated).
execution_date (str | datetime.datetime | None) – Execution date for the dag (templated).
reset_dag_run (bool) – Whether clear existing dag run if already exists. This is useful when backfill or
rerun an existing dag run. This only resets (not recreates) the dag run. Dag run conf is immutable and will not be reset on rerun of an existing dag run. When reset_dag_run=False and dag run exists, DagRunAlreadyExists will be raised. When reset_dag_run=True and dag run exists, existing dag run will
be cleared to rerun.
wait_for_completion (bool) – Whether or not wait for dag run completion. (default: False)
poke_interval (int) – Poke interval to check dag run status when wait_for_completion=True. (default: 60)
TriggerDagRunOperator (3)
airflow.cfg의 dag_run_conf_overrides_params가 True로 설정되어 있어야함
'Airflow 고급 기능, dbt, Data Catalog' 카테고리의 다른 글
Dag-Dependencies 데모 (0) | 2024.01.03 |
---|---|
Dag Dependencies - 나머지 (0) | 2024.01.03 |
2 번째 챕터 (0) | 2024.01.03 |
Airflow API와 모니터링 (0) | 2024.01.03 |
Airflow와 구글시트 연동하기 - Redshift → sheet (0) | 2024.01.02 |