Python Operator를 사용해서 Airflow DAG를 만들어보자
Operators - PythonOperator
from airflow.operators.python import PythonOperator
def python_func(**cxt):
table = cxt["params"]["table"]
schema = cxt["params"]["schema"]
ex_date = cxt["execution_date"]
# do what you need to do
...
load_nps = PythonOperator(
dag=dag,
task_id='task_id',
python_callable=python_func,
params={
'table':'delighted_nps',
'schema':'raw_data'
},
)
Operator를 정의할 때 python_callable에서 실행할 파이썬 함수를 정의할 수 있습니다. params는 실행할 파이썬 함수에서 사용할 파라미터를 정의합니다.
소스 코드 보기
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
dag_id = 'HelloWorld',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *') # 매일 02:00시에 실행
def print_hello():
print("hello!")
return "hello!"
def print_goodbye():
print("goodbye!")
return "goodbye!"
print_hello = PythonOperator(
task_id = 'print_hello',
#python_callable param points to the function you want to run
python_callable = print_hello,
#dag param points to the DAG that this task is a part of
dag = dag)
print_goodbye = PythonOperator(
task_id = 'print_goodbye',
python_callable = print_goodbye,
dag = dag)
#Assign the order of the tasks in our DAG
print_hello >> print_goodbye # 순서를 정해주지 않으면 자기들 멋대로 print_hello → print_goodbye 순으로 진행
2개의 태스크로 구성된 데이터 파이프라인 (DAG)
- print_hello: PythonOperator로 구성되어 있으며 먼저 실행
- print_goodbye: PythonOperator로 구성되어 있으며 두번째로 실행
Airflow Decorators
데코레이터란? (Decorators)
파이썬을 비롯한 프로그래밍 언어에는 데코레이터라는 기능이 존재합니다. 다른 기능을 수행하는 함수를 입력값(인자)로 받아 명시적인 수정이나 확장없이 함수의 수정과 확장을 허용해줍니다.
def my_decorator(func):
def wrapper():
print("Before function call")
func()
print("After function call")
return wrapper
@my_decorator
def say_hello():
print("Hello!")
say_hello()
Before function call
Hello!
After function call
Airflow에서의 Decorators
위의 PythonOperators의 python_callable에 지정해준 함수들을 데코레이터(@, 어노테이트)할 수 있습니다.
from airflow.decorators import task
@task
def print_hello():
print("hello!")
return "hello!"
@task
def print_goodbye():
print("goodbye!")
return "goodbye!"
@task와 같이 데코레이터를 붙이면 파이썬 오퍼레이터라는 뜻입니다. 각각의 함수들이 태스크임을 명시한 것으로
print_hello() >> print_goodbye()
DAG는 위와 같이 작성합니다.
Task의 ID를 별도로 지정하지 않으면 함수의 이름(print_hello, print_goodbye)들이 태스크 ID가 됩니다.
Task Decorators
Task Decorator를 사용하면 훨씬 더 프로그램이 직관적
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
@task
def print_hello():
print("hello!")
return "hello!"
@task
def print_goodbye():
print("goodbye!")
return "goodbye!"
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *'
) as dag:
# Assign the tasks to the DAG in order
print_hello() >> print_goodbye()
task를 import한뒤 데코레이터로 어노테이트했습니다.
중요한 DAG 파라미터 (not task parameters)
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *'
) as dag:
위의 dag_id, start_date, default_args 등 외에도 다른 중요한 DAG 파라미터가 존재합니다.
max_active_runs: 한번에 동시에 실행될 수 있는 DAG의 수, Incremental Update로 365번의 backfill이 필요한 경우 max_active_runs의 수가 높다면 더 빨리 backfill이 가능할 것입니다.
max_active_tasks: 동시에 몇개의 태스크가 실행가능한지를 결정합니다. 병렬 처리(구조)에서 의미를 갖습니다.
max_active_runs와 max_active_tasks의 최대치는 사용 가능한 CPU의 총합입니다. CPU가 하나라면 아무리 최대치를 높게 잡아도 의미가 없습니다.
catchup: catchup은 Full Refresh에서는 의미가 없다고 말씀드렸습니다.
DAG parameters vs. Task parameters의 차이점 이해가 중요
- 위의 파라미터들은 모두 DAG 파라미터로 DAG 객체를 만들 때 지정해주어야함
'Airflow' 카테고리의 다른 글
| Airflow - Redshift Connection 설정 & Decorators (0) | 2023.12.13 |
|---|---|
| Airflow - Name Gender 예제 프로그램 포팅 (0) | 2023.12.13 |
| Airflow 기본 프로그램 실행 (0) | 2023.12.12 |
| Airflow 설치 - 도커 사용 (0) | 2023.12.12 |
| Airflow 설치 - EC2 사용 (0) | 2023.12.12 |