터칭 데이터

Airflow - Open Weathermap DAG 구현하기 본문

Airflow

Airflow - Open Weathermap DAG 구현하기

터칭 데이터 2023. 12. 14. 15:03

 

 

 

API를 사용해서 DAG를 만들어보자

 

 

 

 

 

 

 

 

 

 

 

Open Weathermap API 소개


위도/경도를 기반으로 그 지역의 기후 정보를 알려주는 서비스


무료 계정으로 api key를 받아서 이를 호출시에 사용

    - https://openweathermap.org/price

 

 

 

 

 

 

 

 

 

 

 

 

만들려는 DAG: 서울 8일 낮/최소/최대 온도 읽기


먼저 Open Weathermap에 각자 등록하고 자신의 API Key를 다운로드 받거나 강의에서 제공 받은 API Key 사용하기


API Key를 open_weather_api_key라는 Variable로 저장


서울의 위도와 경도를 찾을 것


One-Call API를 사용: https://openweathermap.org/api/one-call-api

    - 앞서 API KEY와 서울의 위도/경도를 사용해서 위의 API를 requests 모듈을 사용해서 호출

    - 응답 결과에서 온도 정보(평균/최소/최대)만 앞으로 7일을 대상으로 출력해볼 것
        날짜, 낮 온도(day), 최소 온도(min), 최대 온도(max)



https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude={part}&appid={API key}&units=metric

metric은 미터법 단위(화씨가 아닌 섭씨)

 

 

 

 

 

 

 

 

 

DAG 구현 (1)

Open Weathermap의 one call API를 사용해서 서울의 다음 8일간의 낮/최소/최대 온도를 읽어다가 각자 스키마 밑의 weather_forecast라는 테이블로 저장

    - https://openweathermap.org/api/one-call-api 를 호출해서 테이블을 채움

    - weather_forecast라는 테이블이 대상이 됨 ■ 여기서 유의할 점은 created_date은 레코드 생성시간으로 자동 채워지는 필드라는 점

 

 

CREATE TABLE (스키마).weather_forecast (
    date date primary key,
    temp float, -- 낮 온도
    min_temp float,
    max_temp float,
    created_date timestamp default GETDATE()
);

 

created_date는 Incremental Update를 위한 필드입니다. 값을 세팅해주면 그 값이 들어가고 그렇지 않다면 default GETDATE()에 따라 현재 시간이 레코드로 들어가게 됩니다.

 

 

 

 

 

 

 

 

 

 

DAG 구현 (2)

One-Call API는 결과를 JSON 형태로 리턴해줌

이를 읽어들이려면 requests.get 결과의 text를 JSON으로 변환해 주어야함
아니면 requests.get 결과 오브젝트가 제공해주는 .json()이란 함수 사용
    - f = requests.get(link)
    - f_js = f.json()


결과 JSON에서 daily라는 필드에 앞으로 8일간 날씨 정보가 들어감 있음

daily 필드는 리스트이며 각 레코드가 하나의 날짜에 해당
날짜 정보는 “dt”라는 필드에 들어 있음. 이는 epoch이라고 해서 1970년 1월 1일 이후 밀리세컨드로 시간을 표시. 이는 아래와 같은 코드로 읽을 수 있는 날짜로 변경 가능
    - datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') # 2021-10-09

 

 

 

 

 

 

 

 

 

 

 

 

Open Weather API 호출 응답 보기

daily라는 리스트에 앞으로 8일간의 온도 정보가 들어옴

dt 필드가 날짜를 나타냄
temp 필드가 온도 정보를 나타냄
    - day
    - min
    - max
    - night
    - eve
    - morn

 

 

 

 

 

 

 

 

 

 

DAG 구현 (3)


Airflow Connections를 통해 만들어진 Redshift connection

기본 autocommit의 값은 False인 점을 유의


두 가지 방식의 Full Refresh 구현 방식

Full Refresh와 INSERT INTO를 사용
Full Refresh와 COPY를 사용 -> 나중에 사용해볼 예정

 

 

 

 

 

 

 

 

 

 

 

 

 

 

DAG 구현: Full Refresh

API Key는 어디에 저장해야할까?

 

Full Refresh

매번 테이블을 지우고 다시 빌드

 

DW상의 테이블은 아래처럼 정의

CREATE TABLE 각자스키마.weather_forecast (
    date date primary key,
    temp float,
    min_temp float,
    max_temp float,
    created_date timestamp default GETDATE()
);

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

코드

from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task

from datetime import datetime
from datetime import timedelta

import requests
import logging
import json


def get_Redshift_connection():
    # autocommit is False by default
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    return hook.get_conn().cursor()

@task
def etl(schema, table):
    api_key = Variable.get("open_weather_api_key")
    # 서울의 위도/경도
    lat = 37.5665
    lon = 126.9780

    # https://openweathermap.org/api/one-call-api
    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
    response = requests.get(url)
    data = json.loads(response.text) # response.json()도 상관 없다.
    """
    {'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
    """
    ret = []
    for d in data["daily"]:
        day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
        ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))

    cur = get_Redshift_connection()
    drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table};
CREATE TABLE {schema}.{table} (
    date date,
    temp float,
    min_temp float,
    max_temp float,
    created_date timestamp default GETDATE()
);
"""
    insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret)
    logging.info(drop_recreate_sql)
    logging.info(insert_sql)
    try:
        cur.execute(drop_recreate_sql)
        cur.execute(insert_sql)
        cur.execute("Commit;")
    except Exception as e:
        cur.execute("Rollback;")
        raise

with DAG(
    dag_id = 'Weather_to_Redshift',
    start_date = datetime(2023,5,30), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 2 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
) as dag:

    etl("본인의 스키마", "weather_forecast") # 본인의 스키마 사용에 유의

 

autocommit을 디폴트(False)로 사용합니다.

 

API KEY를 airflow의 Variables로 사용합니다.

 

태스크가 1개 밖에 없습니다. ETL 과정들을 하나의 태스크로 만들었기 때문입니다.

 

except 구간에서는 raise 사용을 꼭 권장합니다.

 

 

 

 

 

'Airflow' 카테고리의 다른 글

Airflow - Backfill  (0) 2023.12.14
Airflow - Primary Key Uniqueness 보장하기  (0) 2023.12.14
Airflow 숙제 리뷰  (0) 2023.12.14
Airflow - Yahoo Finance API Incremental Update  (0) 2023.12.14
Airflow - Yahoo Finance API Full Refresh  (0) 2023.12.13