터칭 데이터
Airflow 숙제 리뷰 본문
airflow.cfg (1)
1. DAGs 폴더는 어디에 지정되는가?
a. 기본적으로는 Airflow가 설치된 디렉토리 밑의 dags 폴더가 되며 dags_folder 키에 저장됨
2. DAGs 폴더에 새로운 Dag를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되나? 이 스캔 주기를 결정해주는 키의 이름이 무엇인가?
a. dag_dir_list_interval (기본값은 300 = 5분)
3. 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경해야하는가?
a. api 섹션의 auth_backend를 airflow.api.auth.backend.basic_auth로 변경
4. Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데 이 단어들은 무엇일까? :)
a. password, secret, passwd, authorization, api_key, apikey, access_token
5. 이 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해서 해야 하는 일은?
a. sudo systemctl restart airflow-webserver
b. sudo systemctl restart airflow-scheduler
6. Metadata DB의 내용을 암호화하는데 사용되는 키는 무엇인가?
a. fernet_key
docker-desktop에서 airflow.cfg 확인하기
1. airflow-webserver 컨테이너의 Files 탭 클릭
2. opt>airflow>airflow.cfg를 클릭해 확인 혹은 수정
Airflow와 타임존
airflow.cfg에는 두 종류의 타임존 관련 키가 존재
a. default_timezone
b. default_ui_timezone
start_date, end_date, schedule
a. default_timezone에 지정된 타임존을 따름
execution_date와 로그 시간
a. 항상 UTC를 따름
b. 즉 execution_date를 사용할 때는 타임존을 고려해서 변환후 사용필요
현재로 가장 좋은 방법은 UTC를 일관되게 사용하는 것으로 보임
dags 폴더에서 코딩시 작성한다면 주의할 점
Airflow는 dags 폴더를 주기적으로 스캔함
[core] dags_folder = /var/lib/airflow/dags
# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes. dag_dir_list_interval = 300
dag_dir_list_interval의 설정 시간에 따라 주기적으로 스캔하고, default로는 5분에 한 번씩 스캔합니다.
이때 DAG 모듈이 들어있는 모든 파일들의 메인 함수가 실행이 됨
이 경우 본의 아니게 개발 중인 테스트 코드도 실행될 수 있음
from airflow import DAG …
cur.execute(“DELETE FROM ….”)
만일 위와 같은 코드가 airflow/dags아래 있었다고 가정하면 5분 마다 실행되며 테이블의 레코드가 삭제되는 사고가 발생합니다.
세계 나라 정보 API 사용 DAG 작성 (1)
https://restcountries.com/ 에 가면 세부 사항을 찾을 수 있음
- 별도의 API Key가 필요없음
https://restcountries.com/v3/all 를 호출하여 나라별로 다양한 정보를 얻을 수 있음
{"name": {"common": "South Korea", "official": "Republic of Korea", …
"area": 100210.0,
"population": 51780579, …}
세계 나라 정보 API 사용 DAG 작성 (2)
Full Refresh로 구현해서 매번 국가 정보를 읽어오게 할 것!
API 결과에서 아래 3개의 정보를 추출하여 Redshift에 각자 스키마 밑에 테이블 생성
- country -> [“name”][“official”]
- population -> [“population”]
- area -> [“area”]
단 이 DAG는 UTC로 매주 토요일 오전 6시 30분에 실행되게 만들어볼 것!
https://crontab.guru/
숙제는 개인 github에 repo를 만든 후 제출할 것!
CountryInfo DAG 코드 리뷰
별도의 데모는 하지 않겠습니다.
import requests
@task
def extract_transform():
response = requests.get('https://restcountries.com/v3/all')
countries = response.json()
records = []
for country in countries:
name = country['name']['official']
population = country['population']
area = country['area']
records.append([name, population, area])
return records
with DAG(
dag_id = 'CountryInfo',
start_date = datetime(2023,5,30),
catchup=False,
tags=['API'],
schedule = '30 6 * * 6' # 0 - Sunday, …, 6 - Saturday
) as dag:
results = extract_transform()
load("keeyong", "country_info", results)
데이터가 적재되는 테이블의 스키마는 다음과 같을 것입니다.
CREATE TABLE {schema}.{table} (
name varchar(256) primary key,
population int,
area float
);
데이터 웨어하우스는 primary key를 지정한다고 해서 Primary Key Uniqueness를 보장하지 않는다는 점 다시 한번 말씀드립니다. 이는 primary key가 존재하고 어떤 필드인지 상기시키는 것이지 이를 준수하고 최적화 하는 것은 데이터 분석가과 데이터 엔지니어의 업무임을 다시 한번 유념해주세요.
'Airflow' 카테고리의 다른 글
Airflow - Primary Key Uniqueness 보장하기 (0) | 2023.12.14 |
---|---|
Airflow - Open Weathermap DAG 구현하기 (0) | 2023.12.14 |
Airflow - Yahoo Finance API Incremental Update (0) | 2023.12.14 |
Airflow - Yahoo Finance API Full Refresh (0) | 2023.12.13 |
Airflow 관련 기타 Q&A (0) | 2023.12.13 |