터칭 데이터
Airflow - Open Weathermap DAG 구현하기 본문
API를 사용해서 DAG를 만들어보자
Open Weathermap API 소개
위도/경도를 기반으로 그 지역의 기후 정보를 알려주는 서비스
무료 계정으로 api key를 받아서 이를 호출시에 사용
- https://openweathermap.org/price
만들려는 DAG: 서울 8일 낮/최소/최대 온도 읽기
먼저 Open Weathermap에 각자 등록하고 자신의 API Key를 다운로드 받거나 강의에서 제공 받은 API Key 사용하기
API Key를 open_weather_api_key라는 Variable로 저장
서울의 위도와 경도를 찾을 것
One-Call API를 사용: https://openweathermap.org/api/one-call-api
- 앞서 API KEY와 서울의 위도/경도를 사용해서 위의 API를 requests 모듈을 사용해서 호출
- 응답 결과에서 온도 정보(평균/최소/최대)만 앞으로 7일을 대상으로 출력해볼 것
날짜, 낮 온도(day), 최소 온도(min), 최대 온도(max)
https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude={part}&appid={API key}&units=metric
metric은 미터법 단위(화씨가 아닌 섭씨)
DAG 구현 (1)
Open Weathermap의 one call API를 사용해서 서울의 다음 8일간의 낮/최소/최대 온도를 읽어다가 각자 스키마 밑의 weather_forecast라는 테이블로 저장
- https://openweathermap.org/api/one-call-api 를 호출해서 테이블을 채움
- weather_forecast라는 테이블이 대상이 됨 ■ 여기서 유의할 점은 created_date은 레코드 생성시간으로 자동 채워지는 필드라는 점
CREATE TABLE (스키마).weather_forecast (
date date primary key,
temp float, -- 낮 온도
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
created_date는 Incremental Update를 위한 필드입니다. 값을 세팅해주면 그 값이 들어가고 그렇지 않다면 default GETDATE()에 따라 현재 시간이 레코드로 들어가게 됩니다.
DAG 구현 (2)
One-Call API는 결과를 JSON 형태로 리턴해줌
이를 읽어들이려면 requests.get 결과의 text를 JSON으로 변환해 주어야함
아니면 requests.get 결과 오브젝트가 제공해주는 .json()이란 함수 사용
- f = requests.get(link)
- f_js = f.json()
결과 JSON에서 daily라는 필드에 앞으로 8일간 날씨 정보가 들어감 있음
daily 필드는 리스트이며 각 레코드가 하나의 날짜에 해당
날짜 정보는 “dt”라는 필드에 들어 있음. 이는 epoch이라고 해서 1970년 1월 1일 이후 밀리세컨드로 시간을 표시. 이는 아래와 같은 코드로 읽을 수 있는 날짜로 변경 가능
- datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') # 2021-10-09
Open Weather API 호출 응답 보기
daily라는 리스트에 앞으로 8일간의 온도 정보가 들어옴
dt 필드가 날짜를 나타냄
temp 필드가 온도 정보를 나타냄
- day
- min
- max
- night
- eve
- morn
DAG 구현 (3)
Airflow Connections를 통해 만들어진 Redshift connection
기본 autocommit의 값은 False인 점을 유의
두 가지 방식의 Full Refresh 구현 방식
Full Refresh와 INSERT INTO를 사용
Full Refresh와 COPY를 사용 -> 나중에 사용해볼 예정
DAG 구현: Full Refresh
API Key는 어디에 저장해야할까?
Full Refresh
매번 테이블을 지우고 다시 빌드
DW상의 테이블은 아래처럼 정의
CREATE TABLE 각자스키마.weather_forecast (
date date primary key,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
코드
from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
from datetime import datetime
from datetime import timedelta
import requests
import logging
import json
def get_Redshift_connection():
# autocommit is False by default
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
return hook.get_conn().cursor()
@task
def etl(schema, table):
api_key = Variable.get("open_weather_api_key")
# 서울의 위도/경도
lat = 37.5665
lon = 126.9780
# https://openweathermap.org/api/one-call-api
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
response = requests.get(url)
data = json.loads(response.text) # response.json()도 상관 없다.
"""
{'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
"""
ret = []
for d in data["daily"]:
day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_Redshift_connection()
drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table};
CREATE TABLE {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
"""
insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret)
logging.info(drop_recreate_sql)
logging.info(insert_sql)
try:
cur.execute(drop_recreate_sql)
cur.execute(insert_sql)
cur.execute("Commit;")
except Exception as e:
cur.execute("Rollback;")
raise
with DAG(
dag_id = 'Weather_to_Redshift',
start_date = datetime(2023,5,30), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
) as dag:
etl("본인의 스키마", "weather_forecast") # 본인의 스키마 사용에 유의
autocommit을 디폴트(False)로 사용합니다.
API KEY를 airflow의 Variables로 사용합니다.
태스크가 1개 밖에 없습니다. ETL 과정들을 하나의 태스크로 만들었기 때문입니다.
except 구간에서는 raise 사용을 꼭 권장합니다.
'Airflow' 카테고리의 다른 글
Airflow - Backfill (0) | 2023.12.14 |
---|---|
Airflow - Primary Key Uniqueness 보장하기 (0) | 2023.12.14 |
Airflow 숙제 리뷰 (0) | 2023.12.14 |
Airflow - Yahoo Finance API Incremental Update (0) | 2023.12.14 |
Airflow - Yahoo Finance API Full Refresh (0) | 2023.12.13 |