Airflow - MySQL 테이블 복사하기 (2)
MySQL_to_Redshift DAG의 Task 구성
SqlToS3Operator
MySQL SQL 결과 -> S3
(s3://grepp-data-engineering/{본인ID}-nps)
s3://s3_bucket/s3_key
백엔드 DB에 존재하는 데이터를 얻기 위해 SELECT 쿼리를 수행하고 얻은 데이터들을 S3 버킷에 적재하는 태스크입니다.
S3ToRedshiftOperator
S3 -> Redshift 테이블
(s3://grepp-data-engineering/{본인ID}-nps) -> Redshift (본인스키마.nps)
COPY command is used
S3의 파일들을 Redshift의 테이블에 COPY하는 태스크입니다.
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
위의 코드를 보시면 짐작 하시겠지만 직접 Operator들을 구현하실 필요가 없습니다.
두 Operator(태스크) 모두 Airflow가 제공하는 기능을 import한 뒤에 파라미터 값만 바꿔서 사용하면 됩니다.
Bulk Update Sequence - COPY SQL

SqlToS3Operator가 1번, 2번의 역할을 수행합니다.
S3ToRedshiftOperator가 3번 역할을 수행합니다.
바로 이전 시간의 권한 설정들이 그림에서 Airflow가 S3에 접근하고 Redshift가 S3에 접근하기 위한 사전준비였습니다.
코드 리뷰 - MySQL_to_Redshift.py 살펴보기 (Full Refresh)
MySQL_to_Redshift.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
dag = DAG(
dag_id = 'MySQL_to_Redshift',
start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 9 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
schema = "keeyong"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = "SELECT * FROM prod.nps",
s3_bucket = s3_bucket,
s3_key = s3_key, # S3 버킷의 어떤 path인지 지정
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False, #
replace = True, # S3 버킷과 key에 이미 똑같은 이름의 파일이 존재한다면 overwirte할지
pd_kwargs={"index": False, "header": False}, # 일련번호와 헤더 생략
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
method = 'REPLACE',
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
dag = dag
)
mysql_to_s3_nps >> s3_to_redshift_nps
2개의 Operator를 사용해서 구현
SqlToS3Operator
S3ToRedshiftOperator
궁극적으로 하는 일은 MySQL 있는 테이블 nps를 Redshift내의 각자 스키마 밑의 nps 테이블로 복사
S3를 경유해서 COPY 명령으로 복사
MySQL 테이블의 Incremental Update 방식 (1)
MySQL/PostgreSQL 테이블이라면 다음을 만족해야함
created (timestamp): Optional
modified (timestamp)
deleted (boolean): 레코드를 삭제하지 않고 deleted를 True로 설정
Incremental Update을 진행하려면 대상이 되는 테이블의 레코드가 실제로 Physically 하게 삭제되지 않아야 하므로 deleted 필드를 Boolean으로 표현
MySQL 테이블의 Incremental Update 방식 (2)
Daily Update이고 테이블의 이름이 A이고 MySQL에서 읽어온다면
ROW_NUMBER로 직접 구현하는 경우
- 먼저 Redshift의 A 테이블의 내용을 temp_A로 복사
- MySQL의 A 테이블의 레코드 중 modified의 날짜가 지난 일(execution_date)에 해당하는 모든 레코드를 읽어다가 temp_A로 복사
■ 아래는 MySQL에 보내는 쿼리. 결과를 파일로 저장한 후 S3로 업로드하고 COPY 수행
SELECT *
FROM A
WHERE DATE(modified) = DATE(execution_date)
- temp_A의 레코드들을 primary key를 기준으로 파티션한 다음에 modified 값을 기준으로 DESC 정렬해서, 일련번호가 1인 것들만 다시 A로 복사
MySQL 테이블의 Incremental Update 방식 (3)
Daily Update이고 테이블의 이름이 A이고 MySQL에서 읽어온다면
S3ToRedshiftOperator로 구현하는 경우
query 파라미터로 아래를 지정
SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
method 파라미터로 “UPSERT”를 지정
upsert_keys 파라미터로 Primary key를 지정
■ 앞서 nps 테이블이라면 “id” 필드를 사용
코드 리뷰 - MySQL_to_Redshift_v2.py 살펴보기 (1)
MySQL 있는 테이블 nps를 Redshift내의 각자 스키마 밑의 nps 테이블로 복사
이 작업이 성공하려면 Redshift가 S3 버킷에 대한 액세스 권한을 갖고 있어야함
권한의 생성은 Redshift에게 위 S3 버켓에 대한 액세스 권한 지정
코드 리뷰 - MySQL_to_Redshift_v2.py 살펴보기 (2)
2개의 Operator를 사용해서 구현
SqlToS3Operator: execution_date에 해당하는 레코드만 읽어오게 바뀜
■ https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
S3ToRedshiftOperator
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
method = "UPSERT",
upsert_keys = ["id"],
dag = dag
)
method가 REPLACE에서 UPSERT로 바꿨고, 기존에는 없던 upsert_keys를 ["id"]로 정의했습니다.
id를 기준으로 이미 테이블에 레코드가 존재하면 새로운 내용으로 바꾸고(update), 없다면 그냥 적재(insert)하게 됩니다.
실습
AWS S3 Connections 설정 (IAM User 설정)
Redshift S3 Connections 설정 (IAM Role 설정)
MySQL 관련 모듈 설치 (Docker)
MySQL_to_Redshift DAG 실행
MySQL_to_Redshift_v2 DAG 실행