Airflow 기본 프로그램 실행
Airflow 코드의 기본 구조
DAG 대표하는 객체를 먼저 만듬
- DAG 이름, 실행주기, 실행날짜, 오너 등의 여러 파라미터를 지정
다음으로 DAG를 구성하는 태스크들을 만듬
- 태스크별로 적합한 오퍼레이터를 선택
- 태스크 ID를 부여하고 해야할 작업의 세부사항 지정
최종적으로 태스크들간의 실행 순서를 결정
DAG 설정 예제 (1)
DAG 설정을 할 때 가장 첫번째로 할 일은 모든 태스크(Task)들에게 공통적으로 적용되는 설정인 default_args를 설정하는 일입니다.
from datetime import datetime, timedelta
default_args = {
'owner': 'jon',
'email': ['jon@hotmail.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
retries는 재시도 횟수, retry_delay는 다음 재시도까지 얼마나 기다릴지를 나타냅니다. 이 외에도 많은 파라미터를 지정할 수 있으며
on_faliure_callback, on_success_callback과 같이 태스크가 실패 혹은 성공적으로 끝났을 때 어떤 함수를 호출할지 지정할 수도 있습니다.
DAG 설정 예제 (2)
from airflow import DAG
dag = DAG(
"dag_v1", # DAG name
start_date=datetime(2020,8,7,hour=0,minute=00),
schedule="0 * * * *",
tags=["example"],
catchup=False,
# common settings
default_args=default_args
)
schedule 변수에 "0 * * * *"과 같이 첫 0에 4개의 *이 뒤이어 적혀있는데 첫번째 자리는 분으로 매 0분이라는 뜻입니다. 두번째는 시간, 세번째는 일, 네번째는 월, 다섯번째는 요일입니다.
즉 "0 * * * *"는 매 시간 0분마다 시작하는 hourly 데이터 파이프라인이라는 뜻입니다.
Schedule interval can be defined as cron expression or presets as follows:
○ None, @once, @hourly, @daily, @weekly, @monthly, @yearly
위와 같이 숫자 대신 @표현식으로 매시, 매일, 매주와 같은 설정이 가능하기도 합니다.
catchup을 설명드리면
start_date and end_date specify when this dag starts and stops:
- can be used to do one-off backfilling
- catchup의 의미를 이해하는 것이 중요
start_date를 과거로 설정했다고 가정해보겠습니다. 현재 2024년 1월1일, start_date는 2014년 1월1일이고 schedule이 yearly로 되어있다면 catchup이 True일 경우 Airflow는 지난 10년간 놓친 10번의 작업을 따라잡기 위해 DAG를 10번 실행하게 됩니다. 만일 @daily 혹은 @hourly라면 더 많은 빈도로 실행하겠죠? 이와 같이 뒤쳐진 작업을 따라잡는 것을 원한다면 catchup을 True로 그렇지 않다면 False로 적용하시면 됩니다.
Full Refresh를 하는 DAG라면 False로 권장합니다. 여러번 실행해도 한번 실행한 것과 결과는 전혀 다르지 않기 때문입니다. 만일 Incremental Update 방식이라면 catchup을 True로 두는 것도 고려할 수 있지만 그 외의 경우라면 일반적으로는 catchup을 False로 두는 것을 권장합니다.
Bash Operator를 사용한 예제 (1)
3개의 태스크로 구성
t1은 현재 시간 출력
t2는 5초간 대기 후 종료
t3는 서버의 /tmp 디렉토리의 내용 출력
t1이 끝나고 t2와 t3를 병렬로 실행
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'jon',
'start_date': datetime(2023, 5, 27, hour=0, minute=00),
'email': ['jon@hotmail.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
test_dag = DAG(
"dag_v1", # DAG name
schedule="0 9 * * *",
tags=['test'],
catchUp=False,
default_args=default_args
)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=test_dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=test_dag)
t3 = BashOperator(
task_id='ls',
bash_command='ls /tmp',
dag=test_dag)
t1 >> [ t2, t3 ] # t1 동작후 t2와 t3 동시에 실행하라는 실행관계 생성
How to Trigger a DAG - From the Airflow Web UI
좌측의 슬라이드 버튼을 클릭해 dag_v1을 활성화와 동시에 실행할 수 있습니다. 앞에서 살펴본 DAG 설정에 따라 실행됩니다. 활성화를 시켜야 스케줄링에 따라 작업을 실행합니다.
활성화 이후에 또 실행하고 싶다면 우측의 삼각형을 클릭하시면 됩니다.
또 다른 방법으로는 커맨드 라인에 로그인해 Airflow 명령을 실행하는 방법도 있습니다.
How to Trigger a DAG - 터미널에서 실행
먼저 Airflow 서버에 로그인하고 다음 명령 실행
airflow dags list: 서버에 설치된 모든 DAG 조회
airflow tasks list DAG이름: 그 DAG에 속한 모든 태스크 조회
airflow tasks test DAG이름 Task이름 날짜: 특정 태스크를 실행, 날짜를 적는 이유는 이후에 backfill에 대해 설명할 때 같이 설명드리겠습니다. (# test vs. run 둘다 동일한 기능이지만 run은 메타 DB에 기록되지만 test는 기록되지 않는다는 점만 다릅니다.)
- 날짜는 YYYY-MM-DD (이하의 내용은 backfill 파트에서 설명드리겠습니다. 일단 넘어가세요.)
start_date보다 과거인 경우는 실행이 되지만 오늘 날짜보다 미래인 경우 실행 안됨
이게 바로 execution_date의 값이 됨 (execution_date은 나중에 설명)
데모
1. 웹 UI 살펴보기
2. 터미널로 연결해서 커맨드라인 툴 사용해보기 (EC2와 Docker 버전 모두)
- airflow dags list
- airflow tasks list dag_v1
- airflow tasks test dag_v1 ls 2020-08-09
- airflow dags test dag_v1 2019-12-08
- airflow dags backfill dag_v1 -s 2019-01-01 -e 2019-12-31
로그인 방식만 다를 뿐 그 외의 모든 명령은 EC2와 Docker모두 동일합니다.
1. 웹 UI 살펴보기
활성화를 시키니 Runs탭 등에 신호가 들어옵니다.
DAG탭에서 dag_v1을 클릭해봅시다.
DAG와 DAG를 이루는 Task들의 실행을 확인해볼 수 있습니다.
현재는 Grid가 선택되어 있는데 코드를 확인해보려면 Code를 클릭합니다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'keeyong',
'start_date': datetime(2023, 5, 27, hour=0, minute=00),
'email': ['keeyonghan@hotmail.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
test_dag = DAG(
"dag_v1", # DAG name
schedule="0 9 * * *",
tags=['test'],
catchup=False,
default_args=default_args
)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=test_dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=test_dag)
t3 = BashOperator(
task_id='ls',
bash_command='ls /tmp',
dag=test_dag)
t1 >> [ t2, t3 ]
확인했다면 Grid 탭을 클릭해 돌아옵니다.
print_date, sleep, ls 태스크를 확인하실 수 있는데 어떤 순서로 확인되었는지 더 간단하고 이해하기 쉽게 확인하려면 Grid탭 옆의 Graph탭을 클릭하세요.
시각화된 상태로 확인할 수 있습니다. 각 태스크의 다이어그램에 마우스를 올리면 자세한 정보가 호버링됩니다.
Task Duration탭을 선택하면 각 태스크 실행에 소요된 시간을 확인할 수 있습니다. 현재는 간단한 예제이므로 별다른 점은 없을겁니다.
Grid 탭에서 print_date의 우측 작은 사각형을 클릭하시면 우측과 같은 화면이 뜹니다. 만일 특정 태스크 (지금은 print_date)의 재실행을 원하신다면 Clear 버튼을 클릭하시면 됩니다. (현재 예제는 print_date 실행후 sleep, ls 태스크가 병렬 실행되는 DAG이므로 사실상 sleep, ls까지 모든 태스크를 재실행 하는 셈입니다.)
초록색 박스의 Log를 클릭하면 Airflow의 디버깅에 매우 유용한 로그 기록이 뜨게됩니다.
수 많은 로그 중에 위와 같은 기록을 보실 수 있는데 이는
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=test_dag)
우리가 print_date 태스크를 정의할 때 적어주었던 bash_command의 'date'입니다.
다른 태스크의 로그도 살펴보겠습니다. sleep 태스크는 별다른 출력이 없는 태스크이므로 ls의 Log를 확인해보면
tmp 디렉토리 아래에 있는 내용물들을 출력한 것을 볼 수 있습니다.
t3 = BashOperator(
task_id='ls',
bash_command='ls /tmp',
dag=test_dag)
우리가 ls 태스크를 위와 같이 정의했기 때문입니다.
2. 터미널로 연결해서 커맨드라인 툴 사용해보기
웹 UI는 전반적으로 살펴보았고 이제 커맨드 라인에서 로그인하고 실행하는 방법을 살펴보겠습니다.
Docker
먼저 로그인을 위해서는 터미널에서 docker ps라는 명령어로 컨테이너들의 ID를 알아야합니다.
C:\Users\User>docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
123412341234 apache/airflow:2.5.1 "/usr/bin/dumb-init …" About an hour ago Up About an hour (healthy) 8080/tcp airflow-setup-airflow-worker-1
234123412341 apache/airflow:2.5.1 "/usr/bin/dumb-init …" About an hour ago Up About an hour (healthy) 8080/tcp airflow-setup-airflow-triggerer-1
341234123412 apache/airflow:2.5.1 "/usr/bin/dumb-init …" About an hour ago Up About an hour (healthy) 8080/tcp airflow-setup-airflow-scheduler-1
432143214321 apache/airflow:2.5.1 "/usr/bin/dumb-init …" About an hour ago Up About an hour (healthy) 0.0.0.0:8080->8080/tcp airflow-setup-airflow-webserver-1
412341234123 postgres:13 "docker-entrypoint.s…" About an hour ago Up About an hour (healthy) 5432/tcp airflow-setup-postgres-1
123443211234 redis:latest "docker-entrypoint.s…" About an hour ago Up About an hour (healthy) 6379/tcp airflow-setup-redis-1
스케줄러의 ID를 알아야 합니다. airflow-setup-airflow-scheduler-1의 ID는 432143214321입니다. (여러분은 다를겁니다.)
이제 알아낸 ID로 로그인하려면 docker exec -it (ID) sh 를 입력합니다.
C:\Users\User>docker exec -it 432143214321 sh
(airflow)
이제 로그인이 되어 (airflow) 상태에서 Airflow 명령을 내릴 수 있습니다.
airflow dags list
(airflow)airflow dags list
모든 DAG들을 조회합니다.
dag_id | filepath | owner | paused
=========================================+==================================================================================================================+=========+=======
HelloWorld | HelloWorld.py | airflow | True
HelloWorld_v2 | HelloWorld_v2.py | airflow | True
dag_v1 | TestDAG.py | jon | False
(이하 생략..)
우리가 웹 UI를 살펴보며 켜두었던 dag_v1은 paused가 False인 것을 볼 수 있습니다.
airflow tasks list dag_v1
dag_v1라는 DAG에 있는 태스크들 확인
(airflow)airflow tasks list dag_v1
ls
print_date
sleep
airflow tasks test dag_v1 print_date 2023-12-12
dag_v1 DAG의 print_date 태스크를 2023-12-12 날짜를 기준으로 실행
(airflow)airflow tasks test dag_v1 print_date 2023-12-12
(생략..)
[2023-12-12 08:14:12,502] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'date']
[2023-12-12 08:14:12,512] {subprocess.py:86} INFO - Output:
[2023-12-12 08:14:12,514] {subprocess.py:93} INFO - Tue Dec 12 08:14:12 UTC 2023
[2023-12-12 08:14:12,515] {subprocess.py:97} INFO - Command exited with return code 0
(생략..)