Airflow - 숙제 리뷰
퀴즈 리뷰
Q) Airflow에서 하나의 DAG는 다수의 ()로 구성된다. ()에 들어갈 말은?
Task(Operator)입니다.
Q) 매일 동작하는 DAG의 Start date이 2021-02-05라면 이 DAG의 첫 실행 날짜는?
매일은 곧 Daily이므로 2021-02-06입니다.
Q) 위 DAG의 경우 이때 execution_date으로 들어오는 날짜는?
2021-02-06의 execution_date로 2021-02-05가 들어옵니다.
Q) Schedule interval이 "30 * * * *"으로 설정된 DAG에 대한 올바른 설명은?
매시 30분마다 한번씩 실행합니다.
Q) Schedule interval이 "0 * * * *"으로 설정된 DAG의 start date이 "2021-02-04 00:00:00"으로 잡혀있다면 이 DAG의 첫 번째 실행 날짜와 시간은 언제인가?
hourly DAG이므로 DAG의 첫 실행일시는 2021-02-04 01:00:00입니다.
Q) Airflow의 DAG가 처음 ON이 되었을 때 start_date과 현재 날짜 사이에 실행이 안된 run들이 있을 경우 이를 실행한다. 이는 (??) 파라미터에 의해 결정된다. 이 파라미터를 False로 세팅하면 과거 실행이 안된 run을 무시한다
catchup 입니다.
Q) 다음 중 Redshift에서 큰 데이터를 테이블로 복사하는 방식을 제대로 설명한 것은?
복사할 레코드들을 파일로 저장해서 S3로 올린 후에 거기서 Redshift로 벌크 복사한다
UpdateSymbol_v2의 Incremental Update 방식 수정해보기
DISTINCT 방식의 중복처리는 데이터 소스에 따라 이상 동작할 수 있음
주식 시장이 아직 끝나지 않은 상태에서 DAG(코드)가 두번 실행되거나 무언가 잘못되어 위와 같이 가격이 다른, 거래량이 다른 경우가 같은 날짜에 2개씩 들어간다면 DISTINCT로 중복이 제거되지 않습니다. 데이터 웨어하우스는 Primary Key Uniquness를 보장하지 않기 때문입니다.
DISTINCT에 기대지 않고 ROW_NUMBER() 윈도우 함수로 Incremental Update 방식을 구현하는 것이 더 안전하고 확실한 방법입니다.
ROW_NUMBER 방식의 Primary key 동일 레코드 처리: UpdateSymbol_v3
코드 보기
UpdateSymbol_v3.py
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp
import yfinance as yf
import pandas as pd
import logging
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
@task
def get_historical_prices(symbol):
ticket = yf.Ticker(symbol)
data = ticket.history()
records = []
for index, row in data.iterrows():
date = index.strftime('%Y-%m-%d %H:%M:%S')
records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])
return records
def _create_table(cur, schema, table, drop_first):
if drop_first:
cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
date date,
"open" float,
high float,
low float,
close float,
volume bigint,
created_date timestamp DEFAULT GETDATE()
);""")
@task
def load(schema, table, records):
logging.info("load started")
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
# 원본 테이블이 없으면 생성 - 테이블이 처음 한번 만들어질 때 필요한 코드
_create_table(cur, schema, table, False)
# 임시 테이블로 원본 테이블을 복사
create_t_sql = f"""CREATE TEMP TABLE t (LIKE {schema}.{table} INCLUDING DEFAULTS);
INSERT INTO t SELECT * FROM {schema}.{table};"""
cur.execute(create_t_sql)
for r in records:
sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
print(sql)
cur.execute(sql)
# 임시 테이블 내용을 원본 테이블로 복사
cur.execute(f"DELETE FROM {schema}.{table};")
cur.execute(f"""INSERT INTO {schema}.{table}
SELECT date, "open", high, low, close, volume FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
WHERE seq = 1;""")
cur.execute("COMMIT;") # cur.execute("END;")
except Exception as error:
print(error)
cur.execute("ROLLBACK;")
raise
logging.info("load done")
with DAG(
dag_id = 'UpdateSymbol_v3',
start_date = datetime(2023,5,30),
catchup=False,
tags=['API'],
schedule = '0 10 * * *'
) as dag:
results = get_historical_prices("AAPL")
load("(자신의 스키마)", "stock_info_v3", results