Airflow

Airflow - Yahoo Finance API Full Refresh

터칭 데이터 2023. 12. 13. 21:25

 

 

구현 DAG의 세부 사항 - Full Refresh로 구현


1. Yahoo Finance API를 호출하여 애플 주식 정보 수집 (지난 30일)


2. Redshift 상의 테이블로 1에서 받은 레코드들을 적재

 

 

 

stock_info 테이블은 각자의 스키마에 생성

 

 

 

 

 

 

 

 

 

 

 

 

 

Extract/Transform: Yahoo Finance API 호출

Yahoo Finance API를 호출하여 애플 주식 정보 수집하고 파싱

    - 기본으로 지난 한달의 주식 가격을 리턴해줌

 

 

 

import yfinance as yf
@task
def get_historical_prices(symbol):
    ticket = yf.Ticker(symbol)
    data = ticket.history()
    records = []
 
    for index, row in data.iterrows():
        date = index.strftime('%Y-%m-%d %H:%M:%S')
        records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])
 return records

 

 

 

 

 

 

 

 

 

 

 

Load: Redshift의 테이블을 업데이트

Full Refresh로 구현
    - 매번 테이블을 새로 만드는 형태로 구성


트랜잭션 형태로 구성 (NameGender DAG와 동일)

 

 

 

 

 

 

 

 

 

실행 데모


앞서 코드 실행
    - Docker에 로그인해서 yfinance 모듈 설치가 필요
    - airflow dags test UpdateSymbol 2023-05-20


docker container에 루트(root) 유저로 로그인하는 방법

    - 지금까지는 airflow 일반 유저로 실습을 진행 해왔음

    - 일부 Operator는 root 유저 자격을 필요로 함

 

 

 

UpdateSymbol.py

from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp

import yfinance as yf
import pandas as pd
import logging


def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


@task
def get_historical_prices(symbol):
    ticket = yf.Ticker(symbol)
    data = ticket.history()
    records = []

    for index, row in data.iterrows():
        date = index.strftime('%Y-%m-%d %H:%M:%S')

        records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])

    return records

@task
def load(schema, table, records):
    logging.info("load started")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
        cur.execute(f"""
CREATE TABLE {schema}.{table} (
    date date,
    "open" float,
    high float,
    low float,
    close float,
    volume bigint
);""")
        # DROP과 CREATE를 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            sql = f"INSERT INTO {schema}.{table} VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
            print(sql)
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;")
        raise

    logging.info("load done")


with DAG(
    dag_id = 'UpdateSymbol',
    start_date = datetime(2023,5,30),
    catchup=False,
    tags=['API'],
    schedule = '0 10 * * *'
) as dag:

    results = get_historical_prices("AAPL")
    load("keeyong", "stock_info", results)

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

커맨드 라인에서 실행

Docker에서 airflow-setup-airflow-scheduler-1의 ID를 이용해 airflow로  로그인한 상태에서 진행합니다. 로그인 방법이 기억나지 않는다면 바로 이전 시간의 게시물을 참고하세요.

 

스케줄러의 ID로 로그인하셔야 합니다. 만일 웹서버의 ID로 로그인하시고 yfinance를 설치한다면 환경이 다르기 때문에 실습이 진행되지 않고 UpdateSymbol의 태스크를 조회하는 곳부터 문제가 발생할 것 입니다.

 

 

pip3 install yfinance 실행

(airflow)pip3 install yfinance

 

 

 

 

조회와 테스트

(airflow)pwd # 현재 위치 확인
/opt/airflow


(airflow)ls -tl # 현재 위치의 파일과 디렉토리들 확인

(airflow)cd dags # dags 디렉토리로 이동


(airflow)ls -tl UpdateSymbol.py
-rwxrwxrwx 1 root root 1234 Dec 13 08:27 UpdateSymbol.py # UpdateSymbol이 DAG의 ID이므로 복사해두기

 

 

DAG의 ID는 cat UpdateSymbol.py로 DAG 정의시 DAG의 ID로 무엇을 입력했는지 직접 코드상에서 확인하는 것이 더 정확합니다.

 

 

 

 

airflow tasks list (DAG의 ID)로 DAG의 태스크들 조회

 

 

만일 여기서부터 airflow dags test 까지 에러가 발생하는 경우

1. airflow dags list 로 DAGs 정상적으로 조회되는지 확인

2. 몇 DAG들이 정상조회되지 않는 경우 airflow dags list-import-errors로 원인 확인

3. 예를 들어 No module named 'oauth2client' 혹은 No module named 'pymysql' 등의 원인 파악

4. 필요한 모듈이나 API key 등을 설치 혹은 제공

5. airflow상에서 Connections를 사용하는 경우 호스트, ID, DB이름, 패스워드 재확인, 특히 포트를 확인하세요. 포트는 제대로 입력하지 않아도 test시 성공했다고 뜨기 때문에 더 유의해야합니다.

 

 

 

 

airflow dags test (DAG ID) 20xx-xx-xx

DAG의 태스크들을 테스트 해볼 수 있습니다. 명령어에서 날짜는 미래가 아닌 날짜로 적어주세요.

 

 

 

 

 

 

 

 

 

 

 

 

 

root 유저로 로그인

docker exec --user root -it (도커 스케줄러 컨테이너 ID) sh