Airflow 28

Airflow - 최종 정리

Airflow란 무엇인가? Airflow는 파이썬으로 작성된 데이터 파이프라인 (ETL) 프레임웍 가장 많이 사용되는 데이터 파이프라인 관리/작성 프레임웍 Airflow에서 데이터 파이프라인을 DAG(Directed Acyclic Graph)라고 부름 Airflow의 장점 데이터 파이프라인을 세밀하게 제어 가능 다양한 데이터 소스와 데이터 웨어하우스를 지원 백필(Backfill)이 쉬움 Airflow 관련 중요 용어/개념 start_date, execution_date, catchup 스케일링 방식 Scale Up vs. Scale Out vs. 클라우드 버전 vs. K8s 사용 데이터 파이프라인 작성시 기억할 점 데이터 파이프라인에 관한 정보를 수집하는 것이 중요 비지니스 오너와 데이터 리니지에 주의할..

Airflow 2023.12.18

Airflow - MySQL 테이블 복사하기 (3) Backfill 실행해보기

Backfill을 커맨드라인에서 실행하는 방법 airflow dags backfill dag_id -s 2018- 07- 01 -e 2018- 08- 01 This assumes the followings: ○ catchUp이 True로 설정되어 있음 ○ execution_date을 사용해서 Incremental update가 구현되어 있음 start_date부터 시작하지만 end_date은 포함하지 않음 파이썬 arr=[0, 1, 2, 3, 4, 5, 6, 7]에서 arr[:5]=[0, 1, 2, 3, 4]인 것과 같은 개념 실행순서는 날짜/시간순은 아니고 랜덤. 만일 날짜순으로 하고 싶다면 DAG default_args의 depends_on_past를 True로 설정 default_args = { '..

Airflow 2023.12.15

Airflow - MySQL 테이블 복사하기 (2)

MySQL_to_Redshift DAG의 Task 구성 SqlToS3Operator MySQL SQL 결과 -> S3 (s3://grepp-data-engineering/{본인ID}-nps) s3://s3_bucket/s3_key 백엔드 DB에 존재하는 데이터를 얻기 위해 SELECT 쿼리를 수행하고 얻은 데이터들을 S3 버킷에 적재하는 태스크입니다. S3ToRedshiftOperator S3 -> Redshift 테이블 (s3://grepp-data-engineering/{본인ID}-nps) -> Redshift (본인스키마.nps) COPY command is used S3의 파일들을 Redshift의 테이블에 COPY하는 태스크입니다. from airflow.providers.amazon.aws.t..

Airflow 2023.12.15

Airflow - MySQL 테이블 복사하기 (1) 전체적인 개요

OLTP 복사 거의 대부분의 경우 데이터 인프라를 처음 구축할 때 MySQL, PostgreSQL 과 같은 프로덕션 데이터베이스를 데이터 웨어하우스로 복사하는 작업을 진행합니다. 이를 OLTP(Online Transaction Processing) 복사라고 하는데 OLTP의 정의는 다음과 같습니다. Online Transaction Processing (OLTP) system. OLTP databases are optimized for fast, real-time CRUD operations (Create, Read, Update, Delete) and are designed to handle many short transactions. These systems are used for day-to-day..

Airflow 2023.12.15

Airflow - 숙제 리뷰

퀴즈 리뷰 퀴즈 풀기 더보기 Q) Airflow에서 하나의 DAG는 다수의 ()로 구성된다. ()에 들어갈 말은? Task(Operator)입니다. Q) 매일 동작하는 DAG의 Start date이 2021-02-05라면 이 DAG의 첫 실행 날짜는? 매일은 곧 Daily이므로 2021-02-06입니다. Q) 위 DAG의 경우 이때 execution_date으로 들어오는 날짜는? 2021-02-06의 execution_date로 2021-02-05가 들어옵니다. Q) Schedule interval이 "30 * * * *"으로 설정된 DAG에 대한 올바른 설명은? 매시 30분마다 한번씩 실행합니다. Q) Schedule interval이 "0 * * * *"으로 설정된 DAG의 start date이 "2..

Airflow 2023.12.15

Airflow - Backfill

Backfill과 Airflow 관리하는 데이터 파이프라인의 수가 늘어나면 이 중의 몇은 항상 실패하게 되며 이를 어떻게 관리하느냐가 데이터 엔지니어의 삶에 큰 영향을 준다 Incremental Update가 실패하면? 예전의 실패를 나중에서야 알게 되었다면? 아마존 리포트 데이터를 가져오는 Daily DAG에서 5월24일~5월25일 DAG가 실패했는데 데이터 웨어하우스에는 이틀치의 데이터들이 담기지 않은 상태입니다. 이와 같은 일들은 데이터 엔지니어들에게 굉장히 자주 발생하며 이를 최대한 방지하고 만회하는 것이 데이터 엔지니어들에게 요구되는 매우 중요한 역량입니다. Airflow가 거의 모든 기업에서 쓰이는 이유는 Backfill에서의 성능이 막강하기 때문 이제부터 할 이야기는 Incremental U..

Airflow 2023.12.14

Airflow - Primary Key Uniqueness 보장하기

이번 시간의 목표 1. Primary Key Uniquness 보장 방법을 살펴보기 2. Open Weathermap DAG를 Incremental Update 방식으로 구현하며 Primary Key Uniquness 보장 방법을 복습하기 Primary Key Uniqueness란? 테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드(들) 하나의 필드가 일반적이지만 다수의 필드를 사용할 수도 있음 이를 CREATE TABLE 사용시 지정 관계형 데이터베이스 시스템이 Primary key의 값이 중복 존재하는 것을 막아줌 예 1) Users 테이블에서 email 필드 예 2) Products 테이블에서 product_id 필드 PK 선언 방식1) 필드 뒤에 붙이기 CREATE TABLE product..

Airflow 2023.12.14

Airflow - Open Weathermap DAG 구현하기

API를 사용해서 DAG를 만들어보자 Open Weathermap API 소개 위도/경도를 기반으로 그 지역의 기후 정보를 알려주는 서비스 무료 계정으로 api key를 받아서 이를 호출시에 사용 - https://openweathermap.org/price 만들려는 DAG: 서울 8일 낮/최소/최대 온도 읽기 먼저 Open Weathermap에 각자 등록하고 자신의 API Key를 다운로드 받거나 강의에서 제공 받은 API Key 사용하기 API Key를 open_weather_api_key라는 Variable로 저장 서울의 위도와 경도를 찾을 것 One-Call API를 사용: https://openweathermap.org/api/one-call-api - 앞서 API KEY와 서울의 위도/경도를 ..

Airflow 2023.12.14

Airflow 숙제 리뷰

airflow.cfg (1) 1. DAGs 폴더는 어디에 지정되는가? a. 기본적으로는 Airflow가 설치된 디렉토리 밑의 dags 폴더가 되며 dags_folder 키에 저장됨 2. DAGs 폴더에 새로운 Dag를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되나? 이 스캔 주기를 결정해주는 키의 이름이 무엇인가? a. dag_dir_list_interval (기본값은 300 = 5분) 3. 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경해야하는가? a. api 섹션의 auth_backend를 airflow.api.auth.backend.basic_auth로 변경 4. Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들..

Airflow 2023.12.14