터칭 데이터

Airflow - Web UI로 Variables와 Connections 세팅 본문

Airflow

Airflow - Web UI로 Variables와 Connections 세팅

터칭 데이터 2023. 12. 13. 18:33

 

 

실행 데모

앞서 5가지 버전을 실행해보자

git clone https://github.com/learndataeng/learn-airflow

 

웹 UI(이번 시간) 와 Command line 두 개를 사용

 

 

 

 

 

 

 

사전 준비 - 파일 다운로드

 

 

실습을 따라 진행했다면 위와 같이 dags 폴더에 위와 같은 3개의 DAGs가 존재할 것입니다.

(저는 D드라이브의 Dev_KDT 디렉토리의 airflow-setup 폴더에 실습을 위한 git을 clone했었습니다.)

 

터미널에서 dags가 아닌 airflow-setup으로 이동 후

 

'git clone https://github.com/learndataeng/learn-airflow' 명령을 입력합니다.

 

D:\Dev_KDT\airflow-setup>git clone https://github.com/learndataeng/learn-airflow

 

 

 

 

그러면 위와 같이 많은 Dags와 서브 폴더를 다운 받으신 걸 확인할 수 있습니다. 이후 실습에서 사용할 DAGs들을 지금 미리 다운 받았습니다.

 

이제 위의 다운받은 파일들을 airflow-setup의 dags로 복사할 것입니다.

 

D:\Dev_KDT\airflow-setup>xcopy /E /I "D:\Dev_KDT\airflow-setup\learn-airflow\dags" "D:\Dev_KDT\airflow-setup\dags"

 

CMD 상에서의 명령은 위와 같습니다.

 

/E는 디렉토리가 비어있더라도 복사하라는 명령

/I는 하나 이상의 파일 또는 디렉토리를 복사할 때 대상을 디렉토리로 간주하도록 xcopy에게 지시합니다.

 

이제 준비가 끝났습니다. 지난 시간 Airflow를 설치한 Docker에서 확인해봅시다.

 

 

 

 

 

 

 

 

 

 

Web UI

 

 

Web UI에서 확인해보면 우리가 방금 다운 받고 복사한 수많은 Dag들을 확인하실 수 있습니다.

 

 

그런데 NameGender 버전이 2가지 밖에 없습니다. 우리는 분명 dags 디렉토리에서

 

 

위와 같이 NameGender의 버전을 다섯 가지로 다운 받았습니다.

 

 

 

 

 

이유는 Connections와 Variables가 세팅이 되어있지 않기 때문입니다.

 

최상단의 DAG Import Errors를 펼쳐보시면 자세한 내용을 살펴보실 수 있습니다.

 

먼저 Name Gender DAG들의 문제를 해결해보도록 하겠습니다.

 

 

 

 

 

 

 

 

Variables 설정

 

 

Admin 메뉴의 Variables를 클릭합니다.

 

 

 

 

 

Key는 csv_url로 Value는 csv 파일이 보관된 S3의 주소를 작성한 후 Save하세요.

 

 

 

 

 

 

 

 

 

DAGs 메뉴로 돌아와 시간이 어느 정도 경과하면 8개의 Errors가 5개로 줄어드는 걸 확인하실 수 있습니다. NameGender의 3, 4, 5 3개의 버전이 csv_url Variables 정의로 해결되었기 때문입니다.

 

DAGs의 내용들이 최신화되는데는 보통의 경우는 5분 정도 조금 시간이 걸릴 수 있습니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Connections 설정

 

Admin 메뉴의 Connections 메뉴에서 위와 같이 Connection을 추가합니다.

 

Connection Type에서 Redshift가 없다면 PostgreSQL을 선택하셔도 됩니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

NAME GENDER V2 실행

 

지금 당장의 실습을 위해 필요한 Connections와 Variables는 모두 설정했습니다.

 

 

 

DAGs로 돌아와 name_gender_v2를 실행합니다.

 

 

 

 

perform_etl 태스크 하나 밖에 보이지 않습니다.

 

 

 

name gender v2는 함수들을 하나의 태스크로 엮어 실행하는 버전이었습니다. 그래서 graph탭에서도 다이어그램이 하나밖에 뜨지 않습니다.

 

그런데 perform_etl 우측의 작은 네모와 다이어그램의 색이 노란색입니다. 이는 정상 실행되지 않아 retry를 대기 중이라는 뜻으로 시간 경과에 따라서는 빨간색으로 색이 바뀔 수도 있습니다.

 

이 이유는 logs를 보시면

 

 

위와 같이 Redshift 연결이 실패했기 때문인데

 

def get_Redshift_connection():
    host = "learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com"
    redshift_user = "keeyong"  # 본인 ID 사용
    redshift_pass = "..."  # 본인 Password 사용
    port = 5439
    dbname = "dev"
    conn = psycopg2.connect(f"dbname={dbname} user={redshift_user} host={host} password={redshift_pass} port={port}")
    conn.set_session(autocommit=True)
    return conn.cursor()

 

코드를 살펴보시면 위와 같이 패스워드가 제대로 적혀있지 않기 때문입니다. (하드코딩)

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

NAME GENDER V4 실행

 

 

def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()

 

그렇다면 redshift_dev_db라는 ID를 갖는 Connections를 사용하는 버전 4는 정상 작동할까요?

 

 

 

 

 

name_gender_v4를 실행합니다.

 

 

 

extract와 transform은 이미 성공해 진한 녹색이고 load는 진행 중이라고 뜹니다.

 

 

 

 

 

 

 

 

 

 

 

 

NAME GENDER V5

 

from airflow.decorators import task

@task
def extract(url):
    logging.info(datetime.utcnow())
    f = requests.get(url)
    return f.text


@task
def transform(text):
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records


@task
def load(schema, table, records):
    logging.info("load started")    
    cur = get_Redshift_connection()   
    """
    records = [
      [ "Keeyong", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")

 

name_gender_v5는 @task 데코레이터를 사용합니다.

 

 

데코레이터를 사용해도 이전 버전들과 마찬가지로 extract, transform, load 3개의 태스크로 나누어져 있는 것을 확인할 수 있습니다.

 

 

 

 

 

 

 

 

 

이상 Web UI에서 Variables와 Connection를 세팅하는 방법을 살펴보았습니다.