Airflow

Airflow - Backfill

터칭 데이터 2023. 12. 14. 17:43

 

 

Backfill과 Airflow


관리하는 데이터 파이프라인의 수가 늘어나면 이 중의 몇은 항상 실패하게 되며 이를 어떻게 관리하느냐가 데이터 엔지니어의 삶에 큰 영향을 준다

 

 

 

 

 

 

 

 

 

 

 

Incremental Update가 실패하면?

 

 

예전의 실패를 나중에서야 알게 되었다면?

 

아마존 리포트 데이터를 가져오는 Daily DAG에서 5월24일~5월25일 DAG가 실패했는데 데이터 웨어하우스에는 이틀치의 데이터들이 담기지 않은 상태입니다. 이와 같은 일들은 데이터 엔지니어들에게 굉장히 자주 발생하며 이를 최대한 방지하고 만회하는 것이 데이터 엔지니어들에게 요구되는 매우 중요한 역량입니다.

 

Airflow가 거의 모든 기업에서 쓰이는 이유는 Backfill에서의 성능이 막강하기 때문

 

 

 

 

 

 

 

 

이제부터 할 이야기는 Incremental Update시에만 의미가 있음

 

다시 한번 가능하면 Full Refresh를 사용하는 것이 좋음

문제가 생겨도 다시 실행하면 됨

 

Incremental Update는 효율성이 더 좋을 수 있지만 운영/유지보수의 난이도가 올라감

실수등으로 데이터가 빠지는 일이 생길 수 있음

과거 데이터를 다시 다 읽어와야하는 경우 다시 모두 재실행을 해주어야함

 

 

Backfill은 Full Refresh를 사용하는 경우 필요와 의미가 전혀 없습니다. Full Refresh로 최대한 버티다가 필요한 경우에만 Incremental Update 사용을 권장합니다.

 

 

 

 

 

 

 

 

 

 

Backfill의 용이성 여부 -> 데이터 엔지니어 삶에 직접적인 영향! 


Backfill의 정의

실패한 데이터 파이프라인을 재실행 혹은 읽어온 데이터들의 문제로 다시 다 읽어와야하는 경우를 의미

 

Backfill 해결은 Incremental Update에서 복잡해짐

Full Refresh에서는 간단. 그냥 다시 실행하면 됨

 

즉 실패한 데이터 파이프라인의 재실행이 얼마나 용이한 구조인가?

이게 잘 디자인된 것이 바로 Airflow

 

 

 

 

 

 

 

 

 

 

 

보통 Daily DAG를 작성한다고 하면 어떻게 할까?

지금 시간을 기준으로 어제 날짜를 계산하고 그 날짜에 해당하는 데이터를 읽어옴

 

프로덕션 DB에서 어제 날짜의 데이터를 가져오는 경우의 코드

from datetime import datetime, timedelta

# 지금 시간 기준으로 어제 날짜를 계산
y = datetime.now() - timedelta(1)
yesterday = datetime.strftime(y, '%Y-%m-%d')

# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"

 

맨 아래 sql 변수에 담긴 쿼리는 데이터 웨어하우스가 아닌 프로덕션 DB에서 사용되는 쿼리입니다.

 

그런데 위와 같은 대처는 문제가 발생한 당일에 문제를 파악하고 수정했을 때 의미가 있습니다.

 

 

 

 

 

 

 

 

만일 문제 파악이나 문제 수정을 뒤늦게 한다면?

from datetime import datetime, timedelta

# y = datetime.now() - timedelta(1) # 당일날 알지 못했으므로 의미가 없는 코드
# yesterday = datetime.strftime(y, '%Y-%m-%d') # 당일날 알지 못했으므로 의미가 없는 코드
yesterday = '2023-01-01'

# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"

 

예를 들어 1월1일의 데이터를 가져오지 못한 것을 당일날 알지 못하고 먼 훗날 12월31일에 알게되었다면 datetime과 timedelta로 어제 날짜(12월30일)를 파악하는 것이 아무 의미 없습니다.

 

그렇게 되다보니 어쩔 수 없이 yesterday를 1월1일로 해킹(하드 코딩)으로 처리하게 된 코드입니다.

 

그런데 만약 하루가 아닌 1년치 데이터를 Backfill해야 한다면 어떻게 될까요?

 

 

 

 

 

 

 

 

 

그런데 지난 1년치 데이터를 Backfill 해야한다면?

위와 같이 읽어와야 하는 데이터의 날짜를 개발자가 직접 계산하는 방식은 언젠가는 한계에 봉착하기 마련입니다.

 

DAG를 처음 만들 때 부터 Backfill이 쉽도록 구성하는 것이 핵심입니다.

 

Airflow는 시스템적으로 Backfill을 쉽게할 수 있도록 돕습니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

어떻게 ETL을 구현해놓으면 이런 일이 편해질까?


시스템적으로 이걸 쉽게 해주는 방법을 구현한다

- 날짜별로 backfill 결과를 기록하고 성공 여부 기록: 나중에 결과를 쉽게 확인
- 이 날짜를 시스템에서 ETL의 인자로 제공
- 데이터 엔지니어는 읽어와야하는 데이터의 날짜를 계산하지 않고 시스템이 지정해준 날짜를 사용

 

Airflow의 접근방식

- ETL 별로 실행날짜와 결과를 메타데이터 데이터베이스에 기록
- 모든 DAG 실행에는 “execution_date”이 지정되어 있음

    execution_date으로 채워야하는 날짜와 시간이 넘어옴

- 이를 바탕으로 데이터를 갱신하도록 코드를 작성해야함
- 잇점: backfill이 쉬워짐

 

 

위에서 본 예시와 같이 풀어서 설명을 하자면 Airflow는 각 ETL별로 실행날짜와 결과를 메타데이터 DB에 기록합니다. 모든 DAG 실행에는 execution_date이 지정되어 있어 실패했던 5월24일, 25일에 읽어 왔어야 하는 데이터의 날짜들이 기록되어 개발자가 직접 날짜를 계산할 필요가 없습니다. 덕분에 execution_date를 이용해 backfill이 되도록 DAG 코드를 작성하면 끝입니다.

 

한 줄로 요약하자면 Airflow는 execution_date을 이용해 backfill을 쉽게 해줍니다.

 

얼핏 execution_date은 Incremental Update에만 필요할 것 같이 느껴지겠지만 그렇지 않습니다. catchup이 True인 경우 Full Refresh DAG의 실행 횟수에 영향을 주기 때문입니다.

 

사족으로 Airflow는 각 DAG가 Full Refresh인지 Incremental Update인지는 파악하지 못하지만 어차피 개발자가 실패한 DAG를 선별해 backfill을 진행하기 때문 전혀 문제가 되지 않습니다.

 

 

execution date와 커맨드 라인

우리가 지난 시간 yfinance API DAG를 실습했을 때 커맨드 라인에서 사용했던

airflow dags test (DAG ID) 20xx-xx-xx

명령을 기억하시나요? 여기서 맨 뒤에 붙여주었던 년-월-일이 바로 execution_date입니다.

Airflow Web UI에서는 알아서 execution date를 파악하지만 커맨드 라인은 그렇지 않기 때문에 execution_date를 명시한 것이었습니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Daily Incremental Update를 구현해야 한다면?

 

예를 들어 2020년 11월 7일의 데이터부터 매일매일 하루치 데이터를 읽어온다고 가정

 

이 경우 언제부터 해당 ETL이 동작해야하나?

2020년 11월 8일

 

다르게 이야기하면 2020년 11월 8일날 동작하지만 읽어와야 하는 데이터의 날짜는?

2020년 11월 7일: 이게 start_date이 됨

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Daily Incremental Update를 구현해야 한다면?

 

 Airflow의 start_date은 시작 날짜라기는 보다는 처음 읽어와야 하는 데이터의 날짜

 

execution_date은 읽어와야하는 데이터의 날짜로 설정됨

 

 

 

Daily DAG인 경우 start_date이 2020-11-07이면 2020-11-08에 execution_date를 2020-11-07로 갖고 DAG가 처음 실행됩니다. 2020-11-09에는 execution_date를 2020-11-08로 갖고 DAG가 실행됩니다. 2020-11-10에는 execution_date를 2020-11-09로 갖고 DAG가 실행됩니다.

 

 

 

 

Let's consider a daily DAG that has a start_date of 2022-01-01.

start_date: This is set to 2022-01-01. This means that the scheduler will start scheduling tasks from this date. However, the first task won't actually be executed right away. Instead, it will be scheduled for execution after the first interval has passed.

execution_date: This is the logical date that the tasks should consider as their execution date. For a daily DAG, the execution_date of the first task will also be 2022-01-01. However, this task won't actually be executed until the next day (2022-01-02) because Airflow schedules tasks at the end of their execution period.

So, on 2022-01-02, the scheduler will execute the task for the execution_date of 2022-01-01. On 2022-01-03, it will execute the task for the execution_date of 2022-01-02, and so on.

This might seem a bit counter-intuitive, but it's designed this way because the execution_date represents the logical date and time at which the data should be processed, not the actual date and time at which the task is executed. For example, if you have a task that aggregates data for a whole day, you can't run that task until the day is over, which is why the task is scheduled for the next day.

 

 

 

 

 

 

 

 

 

 

 

만불짜리 눈물의 쿼리

 

잘못된 만남 ㅜㅜ

- 최적화되지 않은 Airflow configuration (start_date, catchup, …)
- 엄청 큰 쿼리
- BigQuery/Snowflake

    Redshift는 문제가 없음. 뭐를 하건 월별 정액 지출.


2천불짜리 쿼리가 Airflow에 의해 8번 스케줄된다면?

- 예를 들어 daily job이 2020년 8월 6일을 start_date로 설정되었고 오늘 날짜가 8월 14일이라고 가정하자. 이 때 이 잡이 Enable되는 순간 (아래 그림 참고) 이 잡은 catchup 파라미터의 값에 따라 8번 자동 실행되게 된다. 

 

 

Catchup은 가급적이면 False로 두는 것이 안전하고 Airflow는 catchup을 디폴트로 True로 간주하므로 주의

 

 

 

 

 

 

 

 

 

 

 

 

 

start_date과 execution_date 이해하기


2020-08-10 02:00:00로 start date로 설정된 daily job이 있다

- catchup이 True로 설정되어 있다고 가정 (디폴트가 True)


지금 시간이 2020-08-13 20:00:00이고 처음으로 이 job이 활성화되었다

 


질문: 이 경우 이 job은 몇번 실행될까? (execution_date)

2020-08-10 02:00:00
2020-08-11 02:00:00
2020-08-12 02:00:00
2020-08-13 02:00:00

 

 

 

Daily DAG이므로 start_date이 2020-08-10 02라면 첫 실행은 execution_date를 2020-08-10 02로 갖는 2020-08-11 02에 됩니다. 그리고 2020-08-12 02는 execution_date를 2020-08-11 02로 갖고 두번째 실행됩니다. 그렇게 2020-08-13 02에 execution_date를 2020-08-12 02로 갖고 세번째 실행이 진행됩니다. 2020-08-13 20은 아직 네번째 실행인 2020-08-14 02 이전이므로 총 실행 횟수는 세번입니다.

 

 

 

 

 

 

 

 

 

 

 

 

Backfill과 관련된 Airflow 변수들

 

start_date

DAG가 처음 실행되는 날짜가 아니라 DAG가 처음 읽어와야하는 데이터의 날짜/시간. 실제 첫 실행날짜는 start_date + DAG의 실행주기

 

execution_date

DAG가 읽어와야하는 데이터의 날짜와 시간

 

catchup

DAG가 처음 활성화된 시점이 start_date보다 미래라면 그 사이에 실행이 안된 것들을 어떻게 할 것인지 결정해주는 파라미터. True가 디폴트값이고 이 경우 실행안 된 것들을 모두 따라잡으려고 함. False가 되면 실행안된 것들을 무시함


end_date

이 값은 보통 필요하지 않으며 Backfill을 날짜 범위에 대해 하는 경우에만 필요 airflow dags backfill -s …. -e …

Incremental Update를 한다는 가정하에 의미를 갖습니다.