터칭 데이터

Airflow와 구글시트 연동하기 - Redshift → sheet 본문

Airflow 고급 기능, dbt, Data Catalog

Airflow와 구글시트 연동하기 - Redshift → sheet

터칭 데이터 2024. 1. 2. 22:27

 

 

Contents


1. ELT 구현

 

2. Slack 연동하기

 

3. 구글 시트 연동하기 (1): 시트 => Redshift 테이블

 

4. 구글 시트 연동하기 (2): Redshift 테이블 => 시트

 

5. API & Airflow 모니터링

 

6. 숙제

 

 

 

 

 

구글 시트 연동하기 (2)

 

Redshift SELECT → 구글 시트

 

이번에는 RedShift의 SELECT 결과를 구글 시트에 COPY하는 과정을 DAG로 구현해보겠습니다.

 

 

 

 

 

 

 

 

 

 

 

 

SQL 결과를 구글 시트로 복사하는 예제 개요

 

 

 

 

SELECT * 
FROM analytics.nps_summary

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

소스 코드 보기

 

소스 코드 보기

SQL_to_Sheet.py

from airflow import DAG
from airflow.operators.python import PythonOperator

from plugins import gsheet
from datetime import datetime

def update_gsheet(**context):
    sql = context["params"]["sql"]
    sheetfilename = context["params"]["sheetfilename"]
    sheetgid = context["params"]["sheetgid"]

    gsheet.update_sheet(sheetfilename, sheetgid, sql, "redshift_dev_db")


with DAG(
    dag_id = 'SQL_to_Sheet',
    start_date = datetime(2022,6,18),
    catchup=False,
    tags=['example'],
    schedule = '@once'
) as dag:

    sheet_update = PythonOperator(
        dag=dag,
        task_id='update_sql_to_sheet1',
        python_callable=update_gsheet,
        params = {
            "sql": "SELECT * FROM analytics.nps_summary",
            "sheetfilename": "spreadsheet-copy-testing",
            "sheetgid": "RedshiftToSheet"
        }
    )

 

 

 

plugins/gsheet.py의 update_sheet 메서드(함수)

def update_sheet(filename, sheetname, sql, conn_id):
    client = get_gsheet_client()
    hook = PostgresHook(postgres_conn_id=conn_id)
    sh = client.open(filename)
    df = hook.get_pandas_df(sql)
    print(sh.worksheets())
    sh.worksheet(sheetname).clear()
    add_df_to_sheet_in_bulk(sh, sheetname, df.fillna(''))

 

 

 

 

 

 

 

 

 

 

 

데모

 

앞서 데모에서 사용했던 동일한 시트에 새로운 탭을 하나 만듬

이미 필요한 이메일 주소가 해당 시트에 편집자로 공유가 되어 있기에 별도 작업이 필요 없음

 

거기에 “SELECT * FROM analytics.nps_summary”의 내용을 복사

이 과정을 PythonOperator로 구현
해당 기능은 gsheet 모듈내에 있는 update_sheet라는 함수로 구현했음

 

 

 

 

 

 

 

 

 

 

 

 

 

실습

 

 

 

지난 Sheet to Redshift 실습에서 만든 각자 자신의 스프레드에 RedshiftToSheet라는 탭을 새로 만들고  https://docs.google.com/spreadsheets/d/1hW-_16OqgctX-_lXBa0VSmQAs98uUnmfOqvDYYjuE50/edit#gid=1555071985 링크의 RedshiftTosheet 탭의 내용을 그대로 복사해 실습을 진행하겠습니다.

 

이미 바로 지난 실습에서 필요한 이메일 주소를 해당 시트에 편집자로 공유했었기에 별도 이메일 공유 작업은 필요 없습니다.

 

빠르고 쉬운 이해를 위해 복사한다고 말씀 드렸지만 사실은 Redshift에 존재하는 테이블을 Google Sheet에 옮기는 실습이므로 복사하지 않고 RedshiftToSheet 탭만 새로 만든 다음에 실습을 진행해도 좋습니다.

 

 

 

 

 

 

 

 

터미널에서 실행

docker ps로 알아낸 airflow scheduler의 이름을 이용해

 

docker exec -it learn-airflow-airflow-scheduler-1 sh

docker exec -it learn-airflow-airflow-scheduler-1 sh 명령어로 airflow에 로그인합니다.

 

 

 

 

(airflow)airflow dags list | grep SQL

MySQL_to_Redshift        | MySQL_to_Redshift.py            | airflow | True
MySQL_to_Redshift_v2     | MySQL_to_Redshift_v2.py         | airflow | True
SQL_to_Sheet             | SQL_to_Sheet.py                 | airflow | True

DAG의 이름이 잘 기억나지 않는다면 dags 목록 중에서 SQL이 들어간 dags 목록을 찾을 수 있습니다.

 

SQL_to_Sheet를 찾았으니 test 명령어로 DAG를 실행해줍시다.

 

 

 

 

 

 

 

 

 

 

 

 

(airflow)airflow dags test SQL_to_Sheet 2023-12-31

 

execution_date은 Incremental update가 아닐 때는 의미가 없으며 미래 시간을 주거나 start_date보다 앞(과거)시간을 주어도 돌아가지 않습니다. 조건을 맞춰 실행한 경우도 성공적으로 실행된 적이 있다면 실행되지 않습니다. 복습겸 한번 더 상기 시켜드립니다.

 

정상적으로 실행되었다면 

 

 

여러분의 구글 시트에 데이터가 위와 같이 잘 적재되었을 것입니다.

 

 

 

 

 

만일

 

gspread.exceptions.APIError: {'code': 403, 'message': 'Google Drive API has not been used in project 224480770186 before or it is disabled. Enable it by visiting https://console.developers.google.com/apis/api/drive.googleapis.com/overview?project=224480770186 then retry. If you enabled this API recently, wait a few minutes for the action to propagate to our systems and retry.'

 

airflow dags test 명령시 위와 같은 에러가 뜬다면 해당 링크를 방문해 Google Drive API를 활성화 시켜주세요.

 

The Google Drive API is a service that allows applications to interact with Google Drive. In your SQL_to_Sheet.py script, it seems like you are using a custom plugin gsheet to interact with Google Sheets, which is a part of Google Drive.

The function update_gsheet in your script is using this gsheet plugin to update a specific Google Sheet with data fetched using a SQL query. The gsheet.update_sheet function likely uses the Google Drive API to access and modify the Google Sheet.

Here's a breakdown of what the update_gsheet function does:

  1. It takes a SQL query, a Google Sheets filename, and a sheet ID as parameters.
  2. It calls the gsheet.update_sheet function with these parameters and a database name.
  3. The gsheet.update_sheet function likely executes the SQL query against the specified database, fetches the result, and updates the specified Google Sheet with this data.

In order to perform these operations, the gsheet plugin needs to interact with Google Drive, and for that, it needs the Google Drive API to be enabled. If the API is not enabled, the plugin cannot interact with Google Drive, and you will see the error message you provided.

 

 

 

 

이제 구글 스프레드 시트를 Airflow와 연동해 사용하는 방법들을 살펴보고 직접 실습해보았습니다.

기업에서 모든 데이터들을 DB에 적재하고 모든 직원이 DB를 다룰 수는 없기 때문에 구글 스프레드 시트를 Airflow DAG로 연동해 사용하는 방법은 굉장히 유용한 기술입니다.

 

오늘 익힌 기술들을 꼭 잊지 않고 잘 숙지하시기를 바랍니다.