Airflow

Airflow - MySQL 테이블 복사하기 (1) 전체적인 개요

터칭 데이터 2023. 12. 15. 11:50

 

 

 

 

OLTP 복사

 

거의 대부분의 경우 데이터 인프라를 처음 구축할 때 MySQL, PostgreSQL 과 같은 프로덕션 데이터베이스를 데이터 웨어하우스로 복사하는 작업을 진행합니다.

 

이를 OLTP(Online Transaction Processing) 복사라고 하는데 OLTP의 정의는 다음과 같습니다.

 

Online Transaction Processing (OLTP) system. OLTP databases are optimized for fast, real-time CRUD operations (Create, Read, Update, Delete) and are designed to handle many short transactions. These systems are used for day-to-day operations in applications like e-commerce platforms, banking systems, and customer relationship management (CRM) systems.

 

 

참고로 데이터 웨어하우스의 특징인 OLAP(Online Analytical Processing)의 정의는 다음과 같습니다.

 

A data warehouse is typically associated with Online Analytical Processing (OLAP). OLAP systems are designed to handle complex analytical queries and are optimized for read-heavy operations. They are used for business reporting, data analysis, and business intelligence purposes.

 

 

 

 

 

 

 

 

구현하려는 ETL 소개 (1)

 

 

MySQL의 prod라는 데이터 베이스의 nps라는 테이블을 Redshift의 raw_data 스키마의 nps라는 테이블로 복사하려 합니다.

 

이를 ETL(DAG 혹은 파이프라인)로 2가지 방법으로 구현하려 합니다.

 

첫번째는 Full Refresh, 두번째는 Incremental Update 방식입니다.

 

Incremental update는 execution_date의 도움을 받아 구현하고 Backfill(백필)까지 데모해보겠습니다.

 

 

 

 

 

 

 

 

 

 

 

 

구현하려는 ETL 소개 (2)

 

 

 

MySQL(OLTP)에서 OLAP로 적재하고 싶은 레코드의 수가 적다면 INSERT INTO 커맨드로 데이터를 옮기면 간단합니다.

 

 

 

문제는 옮기고 싶은 레코드의 수가 굉장히 많은 경우 시간이 오래걸려 좋은 방법이 아니라는 점입니다. 이런 경우 COPY 커맨드를 사용해야하고 거쳐야할 몇가지 단계가 있습니다.

 

첫번째, OLTP의 데이터들을 파일 형태로 Airflow 서버(Local Disk)에 저장합니다.

두번째, 그렇게 저장된 파일들을 S3 버킷에 업로드합니다.

세번째, S3 버킷에 업로드된 파일들을 COPY 명령어로 Redshift(OLAP, 데이터 웨어하우스)에 벌크 업데이트합니다.

 

참고로 위의 단계를 밟으려면 Connection들이 필요하고 보안 상의 세팅이 조금 필요합니다. 이 단계 역시 설명드리겠습니다. 그리고 INSERT를 사용하든 COPY를 사용하든 Airflow와 MySQL의 Connection은 필요합니다.

 

 

 

우리가 사용해 볼 S3 버킷의 이름은 grepp-data-engineering입니다.

또, 옮길 데이터가 있는 곳은 MySQL의 prod 데이터 베이스의 nps 테이블에 있고

옮겨갈 목표는 Redshift의 raw_data 스키마의 nps테이블이라고 말씀 드렸었죠?

 

 

 

 

 

 

 

 

 

 

 

 

 

S3와 MySQL 정보


S3 버킷 이름: grepp-data-engineering

a. 이 S3 연결을 위해 별도 사용자를 만들고 그 사용자의 키들을 권한 인증을 위해 사용할 예정


MySQL 서버 연결 정보: 

a. Host: (호스트)

    - 이 정보는 변경될 수 있음으로 강의 페이지를 꼭 확인해볼 것! 


b. Schema: prod


c. Login: (ID)


d. Password: (패스워드)


e. Port: 3306

 

 

 

 

 

 

 

 

 

 

 

 

AWS 관련 권한 설정


Airflow DAG에서 S3 접근 (쓰기 권한)

a. IAM User를 만들고 S3 버킷에 대한 읽기/쓰기 권한 설정하고 access key와 secret key를 사용


Redshift가 S3 접근 (읽기 권한)

a. Redshift에 S3를 접근할 수 있는 역할 (Role)을 만들고 이를 Redshift에 지정

 

 

실습에서는 이미 위의 권한들 2개가 만들어져 있고 이를 사용합니다만 데모 과정에서 어떻게 권한을 만드는지까지 보여드리겠습니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

MySQL Connections 설정시 유의사항

 

아래 명령을 Airflow Scheduler Docker Container에 root 유저로 로그인해서 실행

docker exec --user root -it (airflow 스케줄러 ID) sh
(airflow) sudo apt-get update
sudo apt-get install -y default-libmysqlclient-dev
sudo apt-get install -y gcc
sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"

 

이 부분은 실행하는 시점에 따라 다른 문제들이 존재할 수 있음. 문제가 생기면 질문채널을 통해 꼭 질문!

위의 명령들은 “ModuleNotFoundError: No module named 'MySQLdb'”에러를 해결하기 위함임

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Airflow MySQL Connection 설정

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

AWS S3 접근은? (Connections)

 

 

지난 시간에 Snowflake 실습을 하며 S3 버킷의 파일들을 Copy 벌크 업데이트를 진행한 적이 있습니다. 이 때 S3 접근을 위해 AWS IAM에서 권한 설정을 했던 적이 있었는데 기억하시나요?

https://touchingdata.tistory.com/161

 

Snowflake 실습 - AWS IAM 설정하기

실습의 단계별 목표 1. DEV 데이터 베이스 만들기 2. 3개의 스키마 만들기 3. COPY SQL을 이용해 S3의 csv파일들을 벌크 업데이트하여 raw_data 스키마 밑에 테이블들 만들기 4. 그 과정에서 AWS IAM User(사용

touchingdata.tistory.com

 

그때와 마찬가지로 이번에도 S3 버킷의 접근을 위해 AWS IAM에서 권한을 설정하는 실습을 진행해 볼 겁니다.

 

 

Access Key ID와 Secret Access Key를 사용하는 걸로 바뀜

a. 루트 사용자의 키들을 사용하면 해킹시 AWS 자원들을 마음대로 사용 가능 -> 여러번 사고가 남

 

 

우리가 사용해볼 Best Practice는: 

a. IAM(Identity and Access Management)을 사용해 별도의 사용자를 만들고
b. 그 사용자에게 해당 S3 bucket을 읽고 쓸 수 있는 권한을 제공하고
c. 그 사용자의 Access Key ID와 Secret Access Key를 사용

    - 이 키도 주기적으로 변경해서 해킹이 될 경우의 피해를 최소화

 

 

 

 

 

 

 

 

 

 

 

IAM 사용자 설정 (1) - 이름을 airflow-s3-access로 지정

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

IAM 사용자 설정 (2) - Policy를 직접 추가

 

 

 

 

“Attach policies directly” 선택


두 가지 방법이 존재

 

#1. Create policy를 선택하고 대상이 되는 S3 bucket에 대한 권한만 지정
#2. S3에 대한 모든 권한 지정 (AmazonS3FullAccess)

당연히 더 좋은 방법은 #1번이며 이를 사용해볼 예정

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

IAM 사용자 설정 (3) - Custom Policy의 내용으로 아래를 설정

 

{
 	"Version": "2012-10-17",
 	"Statement": [
 	{
    	"Effect": "Allow",
     	"Action": [
     		"s3:GetBucketLocation",
     		"s3:ListAllMyBuckets"
    	],
    	"Resource": "arn:aws:s3:::*"
 },
 {
 	"Effect": "Allow",
 	"Action": "s3:*",
 	"Resource": [
 		"arn:aws:s3:::grepp-data-engineering",
 		"arn:aws:s3:::grepp-data-engineering/*"
    		]
 		}
 	]
}

 

여기서 눈여겨 볼 부분은 grepp-data-engineering자체와 그 이하에 있는 모든 데이터들에 대한 ...-engineering/* 권한을 허용("Effect":"Allow")하는 부분 입니다.

 

위의 코드에서는 policy 이름을 적지 않았는데 전체적인 과정을 간단하게 설명하자면 IAM에서 커스텀 policy를 만들고 이름을 부여한 뒤 그 이름의 policy를 IAM User에게 적용한다고 생각하시면 됩니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

IAM 사용자 설정 (4) - IAM > Users > airflow-s3-access 선택

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Airflow에서 S3 Connection 설정

 

 

Conection Type으로는 Amazon Web Services를 선택합니다 S3나 generic을 선택할 수도 있지만 Docker나 EC2로 Airflow로 설치했다면 Amazon Web Services가 보일 겁니다.

 

 

Conn Id: aws_conn_id
This needs to be clearly used in the S3 operators


Conn Type:
s3나 Amazon Web Service나 Generic을 선택


Extra:
{ "region_name": "ap-northeast-2" }

꼭 복사 붙여넣기 해야합니다. S3가 AWS 어느 지역에 있는지 알려주는 역할을 합니다.


앞선 만든 IAM 사용자의 Access Key ID와 Secret Access Key를 사용해야함. 이 정보는 스쿨 사이트에서 해당 강의에서 찾아보기 바람

 

 

 

 

 

 

 

 

 

 

 

 

2 New Connections (MySQL & AWS)

 

 

위의 과정을 따랐다면 기존에 만든 Redshift Connection에 더해 방금의 실습으로 만들어진 aws_conn_id와 mysql_conn_id 2개의 Connection이 만들어져 있었을 겁니다. 각각은 S3와 MySQL 연결과 관련된 Airflow Connection 입니다.

 

Redshift가 S3 버킷을 읽을 수 있는 권한은 뒤의 데모 과정에서 실습 하겠습니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

MySQL의 테이블 리뷰 (OLTP, Production Database)

 

CREATE TABLE prod.nps (
    id INT NOT NULL AUTO_INCREMENT primary key,
    created_at timestamp,
    score smallint
);

 

이미 테이블은 이미 MySQL쪽에 만들어져있고 레코드들이 존재하며 이를 Redshift로 복사하는 것이 우리가 해볼 실습

 

nps는 사용자가 자신이 이용한 상품이나 서비스를 타인에게 어느 정도로 추천하고 싶은지 의향을 점수로 기록한 것을 말합니다.

 

 

 

 

 

Redshift(OLAP, Data Warehouse)에 해당 테이블 생성

 

CREATE TABLE (본인의스키마).nps (
    id INT NOT NULL primary key,
    created_at timestamp,
    score smallint
);

 

 

이 테이블들은 Redshift쪽에 본인 스키마 밑에 별도로 만들고 뒤에서 실습할 DAG를 통해 MySQL쪽 테이블로부터 Redshift 테이블로 복사하는 것이 우리가 해볼 실습