Airflow

Airflow - MySQL 테이블 복사하기 (2)

터칭 데이터 2023. 12. 15. 22:14

 

 

MySQL_to_Redshift DAG의 Task 구성

 

SqlToS3Operator

MySQL SQL 결과 -> S3
(s3://grepp-data-engineering/{본인ID}-nps)
s3://s3_bucket/s3_key

 

백엔드 DB에 존재하는 데이터를 얻기 위해 SELECT 쿼리를 수행하고 얻은 데이터들을 S3 버킷에 적재하는 태스크입니다.

 

 

S3ToRedshiftOperator

S3 -> Redshift 테이블
(s3://grepp-data-engineering/{본인ID}-nps) -> Redshift (본인스키마.nps)
COPY command is used

 

S3의 파일들을 Redshift의 테이블에 COPY하는 태스크입니다.

 

 

 

from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator

 

위의 코드를 보시면 짐작 하시겠지만 직접 Operator들을 구현하실 필요가 없습니다.

두 Operator(태스크) 모두 Airflow가 제공하는 기능을 import한 뒤에 파라미터 값만 바꿔서 사용하면 됩니다.

 

 

 

 

 

 

 

 

 

 

 

 

Bulk Update Sequence - COPY SQL

 

 

 

 

 

SqlToS3Operator가 1번, 2번의 역할을 수행합니다.

 

S3ToRedshiftOperator가 3번 역할을 수행합니다.

 

 

 

바로 이전 시간의 권한 설정들이 그림에서 Airflow가 S3에 접근하고 Redshift가 S3에 접근하기 위한 사전준비였습니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

코드 리뷰 - MySQL_to_Redshift.py 살펴보기 (Full Refresh)

 

MySQL_to_Redshift.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable

from datetime import datetime
from datetime import timedelta

import requests
import logging
import psycopg2
import json


dag = DAG(
    dag_id = 'MySQL_to_Redshift',
    start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 9 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)

schema = "keeyong"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table

mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps',
    query = "SELECT * FROM prod.nps",
    s3_bucket = s3_bucket,
    s3_key = s3_key, # S3 버킷의 어떤 path인지 지정
    sql_conn_id = "mysql_conn_id",
    aws_conn_id = "aws_conn_id",
    verify = False, # 
    replace = True, # S3 버킷과 key에 이미 똑같은 이름의 파일이 존재한다면 overwirte할지
    pd_kwargs={"index": False, "header": False}, # 일련번호와 헤더 생략
    dag = dag
)

s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'],
    method = 'REPLACE',
    redshift_conn_id = "redshift_dev_db",
    aws_conn_id = "aws_conn_id",
    dag = dag
)

mysql_to_s3_nps >> s3_to_redshift_nps

 

2개의 Operator를 사용해서 구현

SqlToS3Operator
S3ToRedshiftOperator


궁극적으로 하는 일은 MySQL 있는 테이블 nps를 Redshift내의 각자 스키마 밑의 nps 테이블로 복사

S3를 경유해서 COPY 명령으로 복사

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

MySQL 테이블의 Incremental Update 방식 (1)

 

MySQL/PostgreSQL 테이블이라면 다음을 만족해야함

created (timestamp): Optional
modified (timestamp)
deleted (boolean): 레코드를 삭제하지 않고 deleted를 True로 설정

 

Incremental Update을 진행하려면 대상이 되는 테이블의 레코드가 실제로 Physically 하게 삭제되지 않아야 하므로 deleted 필드를 Boolean으로 표현

 

 

 

 

 

 

 

 

MySQL 테이블의 Incremental Update 방식 (2)


Daily Update이고 테이블의 이름이 A이고 MySQL에서 읽어온다면

 

ROW_NUMBER로 직접 구현하는 경우

- 먼저 Redshift의 A 테이블의 내용을 temp_A로 복사
- MySQL의 A 테이블의 레코드 중 modified의 날짜가 지난 일(execution_date)에 해당하는 모든 레코드를 읽어다가 temp_A로 복사


    ■ 아래는 MySQL에 보내는 쿼리. 결과를 파일로 저장한 후 S3로 업로드하고 COPY 수행

SELECT * 
FROM A 
WHERE DATE(modified) = DATE(execution_date)

 

 

- temp_A의 레코드들을 primary key를 기준으로 파티션한 다음에 modified 값을 기준으로 DESC 정렬해서, 일련번호가 1인 것들만 다시 A로 복사

 

 

 

 

 

 

 

 

 

 

 

 

MySQL 테이블의 Incremental Update 방식 (3)

 

Daily Update이고 테이블의 이름이 A이고 MySQL에서 읽어온다면

 

S3ToRedshiftOperator로 구현하는 경우

query 파라미터로 아래를 지정


SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)

method 파라미터로 “UPSERT”를 지정
upsert_keys 파라미터로 Primary key를 지정

    ■ 앞서 nps 테이블이라면 “id” 필드를 사용

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

코드 리뷰 - MySQL_to_Redshift_v2.py 살펴보기 (1)

 

MySQL 있는 테이블 nps를 Redshift내의 각자 스키마 밑의 nps 테이블로 복사

 

이 작업이 성공하려면 Redshift가 S3 버킷에 대한 액세스 권한을 갖고 있어야함

 

권한의 생성은 Redshift에게 위 S3 버켓에 대한 액세스 권한 지정

 

 

 

 

 

 

 

 

 

 

 

 

 

 

코드 리뷰 - MySQL_to_Redshift_v2.py 살펴보기 (2)

 

2개의 Operator를 사용해서 구현

 

SqlToS3Operator: execution_date에 해당하는 레코드만 읽어오게 바뀜

    ■ https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html

 

S3ToRedshiftOperator

 

 

s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'],
    redshift_conn_id = "redshift_dev_db",
    aws_conn_id = "aws_conn_id",    
    method = "UPSERT",
    upsert_keys = ["id"],
    dag = dag
)

 

method가 REPLACE에서 UPSERT로 바꿨고, 기존에는 없던 upsert_keys를 ["id"]로 정의했습니다.

 

id를 기준으로 이미 테이블에 레코드가 존재하면 새로운 내용으로 바꾸고(update), 없다면 그냥 적재(insert)하게 됩니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

실습


AWS S3 Connections 설정 (IAM User 설정)

 

Redshift S3 Connections 설정 (IAM Role 설정)

 

MySQL 관련 모듈 설치 (Docker)

 

MySQL_to_Redshift DAG 실행

 

MySQL_to_Redshift_v2 DAG 실행