Python ETL 개선하기
데이터 웨어하우스에서 테이블을 업데이트 하는 방법은 두 가지라고 말씀 드렸습니다.
1. Full Refresh
단순하고 강력하지만 데이터의 크기가 커질수록 시간이 오래걸립니다.
이번의 숙제는 Full Refreshi 구현입니다.
2. Incremental Update
데이터가 클 경우 효과적이지만 복잡도가 증가합니다.
보통 타임스탬프 혹은 일련 번호 등의 필드가 필요합니다.
execution_date 활용 (Airflow가 Incremental Update를 쉽게 수행하도록 도와주는 기능입니다. 이후에 자세히 대해 설명드리겠습니다.)
테이블을 삭제하는 방법들 DELETE FROM vs TRUNCATE
TRUNCATE는 일괄적으로 모든 데이터를 삭제, DELETE FROM은 WHERE로 조건을 거는 것이 가능. 또 DELETE FROM은 transaction을 준수하지만 TRUNCATE는 트랜잭션과 상관없이 테이블의 모든 레코들을 삭제
Colab 숙제 리뷰
%%sql
DROP TABLE IF EXISTS (id).name_gender;
CREATE TABLE (id).name_gender (
name varchar(32) primary key,
gender varchar(8)
);
본인의 id를 딴 스키마 아래에 name_gender 테이블 생성
import psycopg2
# Redshift connection 함수
# 본인 ID/PW 사용!
def get_Redshift_connection():
host = "(Redshift 호스트)"
redshift_user = "(id)"
redshift_pass = "(pw)"
port = (포트 번호)
dbname = "(db 이름)"
conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
dbname=dbname,
user=redshift_user,
password=redshift_pass,
host=host,
port=port
))
conn.set_session(autocommit=True) # autocommit 설정에 유의
return conn.cursor()
Python에서 PostreSQL 기반의 SQL은 psycopg2 라이브러리를 이용해 연결합니다. Redshift는 PostgreSQL과 호환이 되므로 역시 psycopg2를 사용합니다.
conn.set_session(autocommit=True)임에 유의하세요. 트랜잭션과 관련이 있는 코드입니다. autocommit에 대한 자세한 설명은 이후에 진행하겠습니다.
ETL 함수들
Extract
import requests
def extract(url):
f = requests.get(url)
return (f.text)
Transform
def transform(text):
lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리, 즉 CSV의 헤더는 제외
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
return records
Load
def load(records):
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
schema = "(id)"
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;") , COMMIT과 END는 동의어
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
raise
DELETE FROM으로 파이프라인을 몇번 실행하더라도 데이터가 중복되어 들어가지 않도록 멱등성(Impotency)를 준수합니다.
try 단에서 DELETE와 INSERT INTO를 실행하다 에러가 발생한다면 except 단에서 ROLLBACK을 실행해 데이터의 정합성을 지키도록 코드를 수정했습니다.
트랜잭션이란? (1)

중간에 실패하면 불완전 상황에 놓이는 작업들이 있다면 트랜잭션을 사용합니다.
위의 예시와 같이 인출은 성공했지만 송금에서 문제가 발생하는 경우를 생각해봅시다. 내 계좌에서 돈은 사라졌는데 돈을 보낸 계좌는 돈을 받지 못한 이상한 상황이 발생하게됩니다. 이런 상황을 방지하기 위해 트랜잭션을 사용합니다.
데이터의 정합성을 깨뜨릴 수 있는 작업들을 하나로 묶는 것을 트랜잭션(Transaction)이라고 합니다.
관계형 DB(RDBS)에서 사용하는 개념입니다.
트랜잭션이란? (2)
Atomic하게 실행되어야 하는 SQL들을 묶어서 하나의 작업처럼 처리하는 방법
트랜잭션 실현은 BEGIN과 END 혹은 BEGIN과 COMMIT 사이에 해당 SQL들을 사용하는 방식으로 구현합니다.
ROLLBACK은 BEGIN의 이전 상태로 돌아가라는 SQL 명령입니다.
트랜잭션이란? (3)

BEGIN 이후의 SQL 결과는 COMMIT이나 END를 만나기 전까지는 임시 상태가 됩니다. 다른 세션에서는 커밋 전까지 보이지 않습니다.
트랜잭션에 들어가는 SQL을 최소화하는 것이 좋습니다.
위의 경우 auto commit을 사용한 경우입니다.
트랜잭션 구현방법 (1)
두 가지 종류의 트랜잭션이 존재
레코드 변경/삭제/추가를 바로 반영하는지 여부. autocommit이라는 파라미터로 조절가능
autocommit=True
기본적으로 모든 SQL statement가 바로 물리 테이블에 커밋됨
이를 바꾸고 싶다면 BEGIN;END; 혹은 BEGIN;COMMIT을 사용 (혹은 ROLLBACK)
autocommit=False
기본적으로 모든 SQL statement가 커밋되지 않음. 즉 모두 스테이징 상태로 존재
커넥션 객체의 .commit()과 .rollback()함수로 커밋할지 말지 결정
트랜잭션 구현방법 (2)
auto commit을 사용할 것인지 말지?
이는 개인이나 팀의 선택
Python의 경우 try/catch와 같이 사용하는 것이 일반적
try/catch로 에러가 나면 rollback을 명시적으로 실행. 에러가 안 나면 commit을 실행
try/except 사용시 유의할 점
try:
cur.execute(create_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
위의 경우는 try/except 자체는 정상적으로 작동하지만 에러가 발생했을 시 print문으로 출력하거나 슬랙에 메시지를 보내는 기능 등을 구현하지 않은 코드입니다. 만일 에러가 발생 발생해도 이를 모르고 지나가게 되며 전체적인 시스템, 프로그램이 정상작동하지 않게 되는 경우가 발생합니다.
불완전한 에러 처리의 위험성!
try:
cur.execute(create_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
불완전한 에러 처리는 굉장히 위험한 코드입니다.
except에서 raise를 호출하면 발생한 원래 exception이 위로 전파됨
ETL을 관리하는 입장에서 어떤 에러가 감춰지는 것보다는 명확하게 드러나는 것이 더 좋음
위의 경우 cur.execute 뒤에 raise를 호출하는 것이 좋음
'Airflow' 카테고리의 다른 글
| Airflow 설치 - EC2 사용 (0) | 2023.12.12 |
|---|---|
| Airflow 설치 개요 (0) | 2023.12.12 |
| Airflow 구성 (0) | 2023.12.11 |
| Airflow 소개 (0) | 2023.12.11 |
| Airflow - ETL 실습 (0) | 2023.12.11 |