https://touchingdata.tistory.com/194
Airflow - 숙제 리뷰와 트랜잭션
Python ETL 개선하기 데이터 웨어하우스에서 테이블을 업데이트 하는 방법은 두 가지라고 말씀 드렸습니다. 1. Full Refresh 단순하고 강력하지만 데이터의 크기가 커질수록 시간이 오래걸립니다. 이
touchingdata.tistory.com
지난 시간 Name Gender csv파일을 Redshift에 적재하는 파이프라인 실습을 진행한 적이 있습니다. 이 때 만든 프로그램을 Airflow로 포팅하며 Airflow에서의 프로그래밍이 어떻게 진행되는지 이해하는 시간을 가져보겠습니다.
Colab Python 코드를 Airflow로 포팅하기
1. 헤더가 레코드로 추가되는 문제 해결
2. Idempotent하게 잡을 만듬 (멱등성)
a. 여러 번 실행해도 동일한 결과가 나오게 만들기
b. 매번 새로 모든 데이터를 읽어오는 잡이라고 가정하고 구현할 것
지난번 코랩에서 실습한 ETL을 Airflow에서 사용 가능하도록 포팅한 코드는 다음과 같습니다.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests
import logging
import psycopg2
def get_Redshift_connection():
host = "(호스트)"
user = "(ID)" # 본인 ID 사용
password = "..." # 본인 Password 사용
port = (포트번호)
dbname = (DB 이름)
conn = psycopg2.connect(f"dbname={dbname} user={user} host={host} password={password} port={port}")
conn.set_session(autocommit=True)
return conn.cursor()
def extract(url):
logging.info("Extract started")
f = requests.get(url)
logging.info("Extract done")
return (f.text)
def transform(text):
logging.info("Transform started")
lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
logging.info("Transform ended")
return records
def load(records):
logging.info("load started")
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
schema = "keeyong"
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
logging.info("load done")
def etl():
link = "(S3의 링크)"
data = extract(link)
lines = transform(data)
load(lines)
dag_second_assignment = DAG(
dag_id = 'name_gender',
catchup = False,
start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *') # 적당히 조절
task = PythonOperator(
task_id = 'perform_etl',
python_callable = etl,
dag = dag_second_assignment)
전체적인 코드는 같지만 몇 가지 특징, 차이·개선점을 짚어보겠습니다.
Airflow상에서의 작동을 위해 airflow에서 DAG와 PythonOperator를 import 했습니다.
extract(), transform(), load() 함수들이 etl() 함수에서 작동합니다. 3개의 태스크(함수)들을 하나의 태스크로 만든 셈입니다. (extract, transform, load들을 각각 나누어 3개의 태스크로 만드는 작업을 이후 실습에서 진행하겠습니다.)
get_Redshift_connection에서 host, user, password 등의 민감한 정보가 고스란히 하드코딩으로 노출되어 있습니다. 보안상 매우 좋지 않습니다. 또한 csv 파일이 존재하는 S3 링크의 URL도 하드코딩으로 노출되어 있습니다. 코드의 재활용성이 떨어집니다. 이와 같은 정보들을 환경변수 상으로 빼주는 것이 좋을 것 같습니다.
태스크 데코레이터를 사용하는 것도 추가 개선사항이 될 수 있겠습니다.
NameGenderCSVtoRedshift.py 개선하기 # 1
params를 통해 변수 넘기기
load_nps = PythonOperator(
dag=dag,
task_id='task_id',
python_callable=python_func,
params={
'table':'delighted_nps',
'schema':'raw_data'
},
)
바로 이전 시간에 PythonOperator를 정의할 때 python_callable에 지정한 함수가 받을 인자(parameter)를 params 항목으로 전달할 수 있다는 것을 기억하실 겁니다. params를 이용해 S3의 링크 등을 하드코딩 없이 전달하도록 코드를 개선할 것입니다.
execution_date 얻어내기
다음 챕터에서 살펴볼 내용입니다. Airflow가 갖고 있는 시스템 변수들을 어떻게 읽어올 수 있는지에 관한 내용입니다.
개선한 코드
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
def get_Redshift_connection():
host = "learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com"
redshift_user = "keeyong" # 본인 ID 사용
redshift_pass = "..." # 본인 Password 사용
port = 5439
dbname = "dev"
conn = psycopg2.connect(f"dbname={dbname} user={redshift_user} host={host} password={redshift_pass} port={port}")
conn.set_session(autocommit=True)
return conn.cursor()
def extract(url):
logging.info("Extract started")
f = requests.get(url)
logging.info("Extract done")
return (f.text)
def transform(text):
logging.info("Transform started")
lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
logging.info("Transform ended")
return records
def load(records):
logging.info("load started")
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
schema = "keeyong"
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
logging.info("load done")
def etl(**context):
link = context["params"]["url"]
# task 자체에 대한 정보 (일부는 DAG의 정보가 되기도 함)를 읽고 싶다면 context['task_instance'] 혹은 context['ti']를 통해 가능
# https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance
task_instance = context['task_instance']
execution_date = context['execution_date']
logging.info(execution_date)
data = extract(link)
lines = transform(data)
load(lines)
dag = DAG(
dag_id = 'name_gender_v2',
start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *', # 적당히 조절
catchup = False,
max_active_runs = 1,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
task = PythonOperator(
task_id = 'perform_etl',
python_callable = etl,
params = {
'url': "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
},
dag = dag)
etl(**context) 함수에서 S3의 링크를 context["params"]["url"] 형태로 params를 이용해 받아냈습니다.
etl()함수 아래에서 task_instance, execution_date, loggin.info(execution_date)를 보실 수 있는데 일단은 context 딕셔너리 변수 아래 Airflow가 관리 중인 다양한 시스템 변수들 중 task_instance와 execution_date에 접근할 수 있었다는 것만 알아주세요.
최하단에서 task라는 PythonOperator를 정의할 때 params의 변수를 callable 함수가 이용할 수 있다는 점 기억해주세요.
NameGenderCSVtoRedshift.py 개선하기 # 2
Xcom 객체를 사용해서 세 개의 task로 나누기
Redshift의 스키마와 테이블 이름을 params로 넘기기
아직 위와 같은 추가 개선점들이 남아 있는데 이는 이후에 살펴보겠습니다.
Connections and Variables
Connections
This is used to store some connection related info such as hostname, port number, and access credential
Postgres connection or Redshift connection info can be stored here
- 코드에 민감한 정보들이 노출된 경우 사용할 수 있습니다.
- 중요한 정보들을 환경 설정 형태로 코드 밖으로 빼낼 수 있게 해줍니다.
Variables
Used to store API keys or some configuration info
Use “access” or “secret” in the name if you want its value to be encrypted
We will practice this
- S3의 링크가 바뀌면 코드에서 일일히 링크를 수정해야 하는 번거로움이 있습니다.
- airflow 환경 설정을 이용하면 방지할 수 있습니다.
- airflow에서 key-value 스토리지 형태로 key에 대한 value를 미리 세팅할 수 있습니다.
- 필요하다면 기존의 value를 바꿔 저장할 수 있습니다.
- 이 때 value를 암호화 상태(****)로 조회되도록 설정할 수 있습니다.
- 터미널에서 airflow variables list 명령으로 variables 조회가 가능합니다.
- 또한 내용을 읽어오기 위해서 airflow variables get variables명을 입력할 수 있고, 그 값을 설정해 줄 수도 있다.

Airflow Web UI에서 위와 같이 Connections, Variables를 설정할 수 있습니다.
NameGenderCSVtoRedshift.py 개선하기 # 3
1. Variable를 이용해 CSV parameter 넘기기

csv_url:
- https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv
- 값을 암호화하려면 (*****)
NameGenderCSVtoRedshift_v3.py#L72
from airflow.models import Variable
params = {
'url': Variable.get("csv_url")
}
Variable 라이브러리의 도움으로 csv_url의 value를 받아 사용하는 모습입니다.
2. Xcom을 사용해서 3개의 태스크로 나눠보기
Pytyhon Operator들이 별개로 실행되는 경우 한 Operator의 출력이 다음 오퍼레이터의 입력이 되는 경우 Xcom을 사용하면 순서(인과관계) 상의 입출력을 관리할 수 있습니다.
Xcom이란?
태스크(Operator)들간에 데이터를 주고 받기 위한 방식
보통 한 Operator의 리턴값을 다른 Operator에서 읽어가는 형태가 됨
이 값들은 Airflow 메타 데이터 DB에 저장이 되기에 큰 데이터를 주고받는데는 사용불가
- 보통 큰 데이터는 S3등에 로드하고 그 위치를 넘기는 것이 일반적

좌측과 같이 태스크 하나로 함수들을 구성하는 경우는 extract()의 결과물 data를 transform()의 인자로, transform(data)의 결과물 lines를 load()의 인자로 주는 것을 보실 수 있는데 이럴 경우 데이터의 교환을 직접 정의해 컨트롤 하는 것이 매우 간단합니다.
우측과 같이 여러개의 태스크로 나눈 경우는 각 태스크의 리턴 값(데이터)의 교환을 개발자가 직접 컨트롤하기 곤란합니다. 이 때 Xcom이 이를 담당합니다.
모든 태스크의 리턴값은 Airflow 메타 데이터 DB에 저장됩니다. 데이터가 너무 크다면 S3와 같은 크고 저비용인 스토리지에 로드하고 그 위치를 넘깁니다.
Vraiable
from airflow.models import Variable
Variable은 두가지 함수가 있습니다. 첫번째는 get으로 key에 해당하는 value를 얻어오는 것이고 두번째는 set으로 key에 해당하는 value를 지정한 값으로 세팅하는 것입니다.
def get_Redshift_connection():
host = "learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com"
redshift_user = "keeyong" # 본인 ID 사용
redshift_pass = "..." # 본인 Password 사용
port = 5439
dbname = "dev"
conn = psycopg2.connect(f"dbname={dbname} user={redshift_user} host={host} password={redshift_pass} port={port}")
conn.set_session(autocommit=True)
return conn.cursor()
추후 위와 같이 민감한 정보들이 담기 get_Redshift_connection() 함수를 Airflow의 Connections 기능으로 개선할 것입니다.
태스크 나누기
extract = PythonOperator(
task_id = 'extract',
python_callable = extract,
params = {
'url': Variable.get("csv_url")
},
dag = dag)
transform = PythonOperator(
task_id = 'transform',
python_callable = transform,
params = {
},
dag = dag)
load = PythonOperator(
task_id = 'load',
python_callable = load,
params = {
'schema': 'keeyong',
'table': 'name_gender'
},
dag = dag)
이번에는 etl()함수 안에 모든 함수들을 담아 하나의 태스크로 작동시키지 않고 각각 3개의 태스크로 나누어 진행하도록 개선했습니다.
extract >> transform >> load
태스크를 나누었으므로 순서를 명시합니다.
params & Xcom
def transform(**context):
logging.info("Transform started")
text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
logging.info("Transform ended")
return records
PythonOperator에서 params를 딕셔너리를 인자로 받기 때문에 인자를 **context로 받는 것을 보실 수 있습니다.
xcom_pull을 이용해 extract의 return_value를 받아 사용하도록 개선했습니다.
'Airflow' 카테고리의 다른 글
| Airflow - Web UI로 Variables와 Connections 세팅 (0) | 2023.12.13 |
|---|---|
| Airflow - Redshift Connection 설정 & Decorators (0) | 2023.12.13 |
| Airflow - Python Operator (0) | 2023.12.13 |
| Airflow 기본 프로그램 실행 (0) | 2023.12.12 |
| Airflow 설치 - 도커 사용 (0) | 2023.12.12 |