터칭 데이터

Airflow - Yahoo Finance API Incremental Update 본문

Airflow

Airflow - Yahoo Finance API Incremental Update

터칭 데이터 2023. 12. 14. 12:06

 

 

구현 DAG의 세부 사항 - Incremental Update로 구현

1. Yahoo Finance API를 호출하여 애플 주식 정보 수집 (지난 30일)


2. Redshift 상의 테이블로 1에서 받은 레코드들을 적재하고 중복 제거
    a. 매일 하루치의 데이터씩 늘어남

 

 

 

 

본인의 스키마 아래에 stock_info_v2 테이블을 새로 만든 다음 실습을 진행합니다.

 

 

 

 

 

 

 

 

 

Extract/Transform: Yahoo Finance API 호출 -> 동일

Yahoo Finance API를 호출하여 애플 주식 정보 수집하고 파싱

    - 기본으로 지난 한달의 주식 가격을 리턴해줌

 

Extract/Transform은 지난 Full Refresh와 동일합니다.

 

 

 

 

 

 

 

 

 

 

 

Load: Redshift의 테이블을 업데이트 (1) 


Incremental Update로 구현

1. 임시 테이블 생성하면서 현재 테이블의 레코드를 복사 (CREATE TEMP TABLE … AS SELECT)
2. 임시 테이블로 Yahoo Finance API로 읽어온 레코드를 적재
3. 원본 테이블을 삭제하고 새로 생성
4. 원본 테이블에 임시 테이블의 내용을 복사 (이 때 SELECT DISTINCT *를 사용하여 중복 제거)

 

풀어서 설명하자면 TEMP 테이블은 Redshift 세션이 끊어지면 알아서 사라지기 때문에 DROP을 해줄 필요가 없습니다. 이 임시 테이블에 yfinance의 데이터를 적재하고 그 다음 원본 테이블을 삭제하고 새로 생성한 뒤 원본 테이블에 임시 테이블의 내용을 복사합니다. 이 때 중복 제거를 위해 SELECT DINTINCT를 사용합니다.

 

트랜잭션 형태로 구성 (NameGender DAG와 동일)

트랜잭션은 3~4단계에 걸어줍니다. 임시 테이블은 문제가 발생해도 원본 테이블의 정합성에 문제가 생기지 않기 때문입니다. 1~4단계에 걸어도 되지만 트랜잭션 영역은 작은 것이 좋기 때문입니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Load: Redshift의 테이블을 업데이트 (2)

def _create_table(cur, schema, table, drop_first):
    if drop_first:
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
    cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
    date date,
    "open" float,
    high float,
    low float,
    close float,
    volume bigint
);""")


@task
def load(schema, table, records):
    logging.info("load started")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        # 원본 테이블이 없으면 생성 - 테이블이 처음 한번 만들어질 때 필요한 코드
        _create_table(cur, schema, table, False)
        # 임시 테이블로 원본 테이블을 복사
        cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};")
        for r in records:
            sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
            print(sql)
            cur.execute(sql)

        # 원본 테이블 생성
        _create_table(cur, schema, table, True)
        # 임시 테이블 내용을 원본 테이블로 복사
        cur.execute(f"INSERT INTO {schema}.{table} SELECT DISTINCT * FROM t;")
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;") 
        raise
    logging.info("load done")

 

 

 

 

 

 

 

 

실행 데모

코드 설명
하루 후 코드를 실행하고 보여주는 형태의 데모