터칭 데이터

Airflow 숙제 리뷰 본문

Airflow

Airflow 숙제 리뷰

터칭 데이터 2023. 12. 14. 13:53

 

 

airflow.cfg (1)


1. DAGs 폴더는 어디에 지정되는가?
a. 기본적으로는 Airflow가 설치된 디렉토리 밑의 dags 폴더가 되며 dags_folder 키에 저장됨


2. DAGs 폴더에 새로운 Dag를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되나? 이 스캔 주기를 결정해주는 키의 이름이 무엇인가?
a. dag_dir_list_interval (기본값은 300 = 5분)


3. 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경해야하는가?
a. api 섹션의 auth_backend를 airflow.api.auth.backend.basic_auth로 변경


4. Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데 이 단어들은 무엇일까? :)
a. password, secret, passwd, authorization, api_key, apikey, access_token


5. 이 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해서 해야 하는 일은?
a. sudo systemctl restart airflow-webserver
b. sudo systemctl restart airflow-scheduler
6. Metadata DB의 내용을 암호화하는데 사용되는 키는 무엇인가?
a. fernet_key

 

 

 

 

 

 

docker-desktop에서 airflow.cfg 확인하기

1. airflow-webserver 컨테이너의 Files 탭 클릭


2. opt>airflow>airflow.cfg를 클릭해 확인 혹은 수정

 

 

 

 

 

 

 

 

Airflow와 타임존

 

airflow.cfg에는 두 종류의 타임존 관련 키가 존재

a. default_timezone
b. default_ui_timezone


start_date, end_date, schedule

a. default_timezone에 지정된 타임존을 따름


execution_date와 로그 시간

a. 항상 UTC를 따름
b. 즉 execution_date를 사용할 때는 타임존을 고려해서 변환후 사용필요


현재로 가장 좋은 방법은 UTC를 일관되게 사용하는 것으로 보임

 

 

 

 

 

 

 

 

 

 

 

 

dags 폴더에서 코딩시 작성한다면 주의할 점

 

Airflow는 dags 폴더를 주기적으로 스캔함

[core] dags_folder = /var/lib/airflow/dags

# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes. dag_dir_list_interval = 300

 

dag_dir_list_interval의 설정 시간에 따라 주기적으로 스캔하고, default로는 5분에 한 번씩 스캔합니다.

 

이때 DAG 모듈이 들어있는 모든 파일들의 메인 함수가 실행이 됨

이 경우 본의 아니게 개발 중인 테스트 코드도 실행될 수 있음

 

from airflow import DAG … 

cur.execute(“DELETE FROM ….”)

 

만일 위와 같은 코드가 airflow/dags아래 있었다고 가정하면 5분 마다 실행되며 테이블의 레코드가 삭제되는 사고가 발생합니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

세계 나라 정보 API 사용 DAG 작성 (1)

https://restcountries.com/ 에 가면 세부 사항을 찾을 수 있음

    - 별도의 API Key가 필요없음


https://restcountries.com/v3/all 를 호출하여 나라별로 다양한 정보를 얻을 수 있음

 

{"name": {"common": "South Korea", "official": "Republic of Korea", …
 "area": 100210.0,
"population": 51780579, …}

 

 

 

 

 

 

 

세계 나라 정보 API 사용 DAG 작성 (2)

Full Refresh로 구현해서 매번 국가 정보를 읽어오게 할 것!


API 결과에서 아래 3개의 정보를 추출하여 Redshift에 각자 스키마 밑에 테이블 생성

    - country -> [“name”][“official”]
    - population -> [“population”]
    - area -> [“area”]


단 이 DAG는 UTC로 매주 토요일 오전 6시 30분에 실행되게 만들어볼 것!
https://crontab.guru/


숙제는 개인 github에 repo를 만든 후 제출할 것!

 

 

 

 

 

 

 

 

 

 

CountryInfo DAG 코드 리뷰

 

별도의 데모는 하지 않겠습니다.

 

import requests

@task
def extract_transform():
    response = requests.get('https://restcountries.com/v3/all')
    countries = response.json()
    records = []
    
    for country in countries:
        name = country['name']['official']
        population = country['population']
        area = country['area']
        
        records.append([name, population, area])
    return records

with DAG(
    dag_id = 'CountryInfo',
    start_date = datetime(2023,5,30),
    catchup=False,
    tags=['API'],
    schedule = '30 6 * * 6' # 0 - Sunday, …, 6 - Saturday
) as dag:
    results = extract_transform()
    load("keeyong", "country_info", results)

 

 

 

데이터가 적재되는 테이블의 스키마는 다음과 같을 것입니다.

 

CREATE TABLE {schema}.{table} (
    name varchar(256) primary key,
    population int,
    area float
);

 

데이터 웨어하우스는 primary key를 지정한다고 해서 Primary Key Uniqueness를 보장하지 않는다는 점 다시 한번 말씀드립니다. 이는 primary key가 존재하고 어떤 필드인지 상기시키는 것이지 이를 준수하고 최적화 하는 것은 데이터 분석가과 데이터 엔지니어의 업무임을 다시 한번 유념해주세요.