전체 글 373

Airflow - 숙제 리뷰

퀴즈 리뷰 퀴즈 풀기 더보기 Q) Airflow에서 하나의 DAG는 다수의 ()로 구성된다. ()에 들어갈 말은? Task(Operator)입니다. Q) 매일 동작하는 DAG의 Start date이 2021-02-05라면 이 DAG의 첫 실행 날짜는? 매일은 곧 Daily이므로 2021-02-06입니다. Q) 위 DAG의 경우 이때 execution_date으로 들어오는 날짜는? 2021-02-06의 execution_date로 2021-02-05가 들어옵니다. Q) Schedule interval이 "30 * * * *"으로 설정된 DAG에 대한 올바른 설명은? 매시 30분마다 한번씩 실행합니다. Q) Schedule interval이 "0 * * * *"으로 설정된 DAG의 start date이 "2..

Airflow 2023.12.15

Airflow - Backfill

Backfill과 Airflow 관리하는 데이터 파이프라인의 수가 늘어나면 이 중의 몇은 항상 실패하게 되며 이를 어떻게 관리하느냐가 데이터 엔지니어의 삶에 큰 영향을 준다 Incremental Update가 실패하면? 예전의 실패를 나중에서야 알게 되었다면? 아마존 리포트 데이터를 가져오는 Daily DAG에서 5월24일~5월25일 DAG가 실패했는데 데이터 웨어하우스에는 이틀치의 데이터들이 담기지 않은 상태입니다. 이와 같은 일들은 데이터 엔지니어들에게 굉장히 자주 발생하며 이를 최대한 방지하고 만회하는 것이 데이터 엔지니어들에게 요구되는 매우 중요한 역량입니다. Airflow가 거의 모든 기업에서 쓰이는 이유는 Backfill에서의 성능이 막강하기 때문 이제부터 할 이야기는 Incremental U..

Airflow 2023.12.14

Airflow - Primary Key Uniqueness 보장하기

이번 시간의 목표 1. Primary Key Uniquness 보장 방법을 살펴보기 2. Open Weathermap DAG를 Incremental Update 방식으로 구현하며 Primary Key Uniquness 보장 방법을 복습하기 Primary Key Uniqueness란? 테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드(들) 하나의 필드가 일반적이지만 다수의 필드를 사용할 수도 있음 이를 CREATE TABLE 사용시 지정 관계형 데이터베이스 시스템이 Primary key의 값이 중복 존재하는 것을 막아줌 예 1) Users 테이블에서 email 필드 예 2) Products 테이블에서 product_id 필드 PK 선언 방식1) 필드 뒤에 붙이기 CREATE TABLE product..

Airflow 2023.12.14

Airflow - Open Weathermap DAG 구현하기

API를 사용해서 DAG를 만들어보자 Open Weathermap API 소개 위도/경도를 기반으로 그 지역의 기후 정보를 알려주는 서비스 무료 계정으로 api key를 받아서 이를 호출시에 사용 - https://openweathermap.org/price 만들려는 DAG: 서울 8일 낮/최소/최대 온도 읽기 먼저 Open Weathermap에 각자 등록하고 자신의 API Key를 다운로드 받거나 강의에서 제공 받은 API Key 사용하기 API Key를 open_weather_api_key라는 Variable로 저장 서울의 위도와 경도를 찾을 것 One-Call API를 사용: https://openweathermap.org/api/one-call-api - 앞서 API KEY와 서울의 위도/경도를 ..

Airflow 2023.12.14

Airflow 숙제 리뷰

airflow.cfg (1) 1. DAGs 폴더는 어디에 지정되는가? a. 기본적으로는 Airflow가 설치된 디렉토리 밑의 dags 폴더가 되며 dags_folder 키에 저장됨 2. DAGs 폴더에 새로운 Dag를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되나? 이 스캔 주기를 결정해주는 키의 이름이 무엇인가? a. dag_dir_list_interval (기본값은 300 = 5분) 3. 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경해야하는가? a. api 섹션의 auth_backend를 airflow.api.auth.backend.basic_auth로 변경 4. Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들..

Airflow 2023.12.14

Airflow - Yahoo Finance API Incremental Update

구현 DAG의 세부 사항 - Incremental Update로 구현 1. Yahoo Finance API를 호출하여 애플 주식 정보 수집 (지난 30일) 2. Redshift 상의 테이블로 1에서 받은 레코드들을 적재하고 중복 제거 a. 매일 하루치의 데이터씩 늘어남 본인의 스키마 아래에 stock_info_v2 테이블을 새로 만든 다음 실습을 진행합니다. Extract/Transform: Yahoo Finance API 호출 -> 동일 Yahoo Finance API를 호출하여 애플 주식 정보 수집하고 파싱 - 기본으로 지난 한달의 주식 가격을 리턴해줌 Extract/Transform은 지난 Full Refresh와 동일합니다. Load: Redshift의 테이블을 업데이트 (1) Incrementa..

Airflow 2023.12.14

Airflow - Yahoo Finance API Full Refresh

구현 DAG의 세부 사항 - Full Refresh로 구현 1. Yahoo Finance API를 호출하여 애플 주식 정보 수집 (지난 30일) 2. Redshift 상의 테이블로 1에서 받은 레코드들을 적재 stock_info 테이블은 각자의 스키마에 생성 Extract/Transform: Yahoo Finance API 호출 Yahoo Finance API를 호출하여 애플 주식 정보 수집하고 파싱 - 기본으로 지난 한달의 주식 가격을 리턴해줌 import yfinance as yf @task def get_historical_prices(symbol): ticket = yf.Ticker(symbol) data = ticket.history() records = [] for index, row in d..

Airflow 2023.12.13

Airflow 관련 기타 Q&A

PostgresHook의 autocommit 파라미터 Default 값은 False로 주어짐 이 경우 BEGIN은 아무런 영향이 없음 (no-operation) DAG에서 task를 어느 정도로 분리하는 것이 좋을까? task를 많이 만들면 전체 DAG이 실행되는데 오래 걸리고 스케줄러에 부하가 감 task를 너무 적게 만들면 모듈화가 안되고 실패시 재실행을 시간이 오래 걸림 오래 걸리는 DAG이라는 실패시 재실행이 쉽게 다수의 task로 나누는 것이 좋음 task를 적절히 나누면 실패한 부분만 재실행할 수 있음 Airflow의 Variable 관리 vs. 코드 관리 장점: 코드 푸시의 필요성이 없음 단점: 관리나 테스트가 안되어서 사고로 이어질 가능성이 있음 굉장히 중요한 SQL이라면 Variables..

Airflow 2023.12.13

Airflow - 커맨드 라인에서 Variables와 Connections 세팅

커맨드 라인에서 Airflow 로그인 https://touchingdata.tistory.com/198 Airflow 기본 프로그램 실행 Airflow 코드의 기본 구조 DAG 대표하는 객체를 먼저 만듬 - DAG 이름, 실행주기, 실행날짜, 오너 등의 여러 파라미터를 지정 다음으로 DAG를 구성하는 태스크들을 만듬 - 태스크별로 적합한 오퍼레이 touchingdata.tistory.com 지난 시간 "2. 터미널로 연결해서 커맨드라인 툴 사용해보기"에서 Airflow를 사용하는 방법을 알아봤습니다. 1. docker ps를 입력해 scheduler의 스케줄러 컨테이너 ID를 찾는다. 2. docker exec -it (스케줄러의 컨테이너 ID) sh를 입력해 airflow에 로그인한다. 지난 시간 gi..

Airflow 2023.12.13