터칭 데이터 2024. 1. 2. 04:12

 

 

Slack 연동하기


DAG가 실패하면 Slack으로 에러를 보내보자

 

 

 

 

 

 

 

 

 

 

 

이번 섹션에서 하려는 일

 

DAG 실행 중에 에러가 발생하면 그걸 지정된 슬랙 workspace의 채널로 보내기

 

이를 위해서 해당 슬랙 workspace에 App 설정이 필요

 

다음으로 연동을 위한 함수를 하나 만들고 (plugins/slack.py)

slack.py

from airflow.models import Variable

import logging
import requests

def on_failure_callback(context):
    """
    https://airflow.apache.org/_modules/airflow/operators/slack_operator.html
    Define the callback to post on Slack if a failure is detected in the Workflow
    :return: operator.execute
    """
    text = str(context['task_instance'])
    text += "```" + str(context.get('exception')) +"```"
    send_message_to_a_slack_channel(text, ":scream:")


# def send_message_to_a_slack_channel(message, emoji, channel, access_token):
def send_message_to_a_slack_channel(message, emoji):
    # url = "https://slack.com/api/chat.postMessage"
    url = "https://hooks.slack.com/services/"+Variable.get("slack_url")
    headers = {
        'content-type': 'application/json',
    }
    data = { "username": "Data GOD", "text": message, "icon_emoji": emoji }
    r = requests.post(url, json=data, headers=headers)
    return r

 

 

이를 태스크에 적용되는 default_args의 on_failure_callback에 지정

 

from plugins import slack
    …
    default_args= {
    'on_failure_callback': slack.on_failure_callback,
}

 

 

 

 

 

 

 

 

 

 

 

 

 

먼저 어느 Workspace의 어느 Channel로 보낼 것인지 결정

 

prgms-de라는 Workspace 밑에 DataAlert이라는 App 생성

 

이 App이 #data-alert이라는 채널에 메세지를 보낼 수 있게 설정

 

 

 

 

 

 

 

 

 

 

 

 

데이터 파이프라인 문제를 슬랙에 표시 (1)


https://api.slack.com/messaging/webhooks

위를 따라해서 Incoming Webhooks App을 생성

 

 

 

 

 

 

 

 

 

 

 

 

데이터 파이프라인 문제를 슬랙에 표시 (2)

 

 

 

 

 

 

 

 

 

 

 

 

 

앞서 Webhook으로 메세지 보내기

 

curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, World!"}'

https://hooks.slack.com/services/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

 

 

위의 링크는 예시이며 더 이상 작동하지 않습니다. 여러분이 받으신 엔드포인트 URL로 바꾸셔야합니다.

 

 

 

 

 

 

 

 

 

데이터 파이프라인 실패/경고를 슬랙으로 보내는 방법

 

T016X1V5HBQ/B02QB4GGNQM/xone4l4N3gMLTQRnRBWYaZ9y를 “slack_url” Variable로 저장

 

slack에 에러 메세지를 보내는 별도 모듈로 개발: slack.py

 

이를 DAG 인스턴스를 만들 때 에러 콜백으로 지정

예제: NameGenderCSVtoRedshift_v4.py

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

실행 화면

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

실습

 

https://api.slack.com/

 

 

 

 

Your apps를 클릭합니다.

 

 

 

 

 

 

 

Create an App을 클릭합니다.

 

 

 

 

 

 

 

 

처음부터 만들 것이므로 From scratch를 선택하세요.

 

 

 

 

 

 

 

 

 

 

 

 

 

워크스페이스는 실습을 위해 임의로 생성한 Airflow실습 workspace를 생성했습니다.

 

App Name은 어떤 것이든 상관없습니다. 여기서는 DataAlert으로 정했습니다.

 

 

 

 

 

 

 

 

Incoming Webhooks를 선택합니다.

 

 

 

 

 

 

 

 

 

 

 

 

On으로 켜주신 뒤 파란색 박스로 표시된 YOUR_WEBHOOK_URL_HERE에 들어갈 webhook를 얻기 위해 Add New Webhook to Workspace를 클릭합니다.

 

 

 

 

 

 

 

 

 

 

채널로 워크스페이스 생성시 만들어둔 data-alert을 지정했습니다.

 

허용을 클릭합니다.

 

 

 

 

 

 

 

 

 

 

아까 화면으로 돌아왔는데 다른 점이 있습니다. 아까는 YOUR_WEBHOOK_URL_HERE로 되어있던 부분에 실제로 URL 엔드포인트가 개시되어 있습니다. 이를 활용해 DAG 실패시 slack에 메시지를 보낼 수 있게 되었습니다.

 

이를 Airflow variables로 저장해 활용하기 전에 커맨드 라인에서 실행해 정말로 워크스페이스의 채널로 메시지가 가는지 확인해보겠습니다.

 

 

 

 

 

 

 

PS C:\Users\User> $headers = New-Object "System.Collections.Generic.Dictionary[[String],[String]]"
>> $headers.Add("Content-Type", "application/json")
>>
>> $body = @{
>>     text = "Hello, World!!!"
>> } | ConvertTo-Json
>>
>> Invoke-WebRequest -Uri "https://hooks.slack.com/services/~~~" -Method POST -Body $body -Headers $headers


StatusCode        : 200
StatusDescription : OK
Content           : ok
(생략...)

 

Mac의 경우

curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, World!!!"}' https://hooks.slack.com/services/~~~

 

Windows의 경우

$headers = New-Object "System.Collections.Generic.Dictionary[[String],[String]]"
$headers.Add("Content-Type", "application/json")

$body = @{
    text = "Hello, World!!!"
} | ConvertTo-Json

Invoke-WebRequest -Uri "~~~" -Method POST -Body $body -Headers $headers

 

windows power shell 기준으로는 명령이 조금 복잡합니다. ~~~ 부분에 webhook url을 입력하고 명령을 실행해주세요.

 

우리가 입력한 메시지가 그대로 전달되는지 확인하기 위해 기존의 Hello, World!에서 Hell, World!!!로 느낌표를 3개로 만들어 전송하겠습니다.

 

 

 

 

실제로 Hell, World!!!로 메시지가 전달되었습니다.

 

일단 연동이 되었음을 확인했습니다.

 

이제는 Airflow 상에서 Variables로 변수를 설정하고 일부러 DAG를 실패하도록 하여 slack에 에러 메시지가 제대로 전달되는지 확인해보겠습니다.

 

 

 

 

 

 

 

 

 

 

 

 

Airflow

 

 

Airflow WebUI에서 Variable로 Key는 slack_url로 Value는 아까 얻어낸 webhook를 입력하고 save합니다.

 

 

 

 

 

 

 

 

 

 

Variables와 Connections 설정 (S3와 Redshift)

 

 

이번 실습에서 일부러 에러를 낼 NameGenderCSVtoRedshift_v4.py는 redshift_dev_db라는 Connections를 사용할텐데 실습을 지금까지 쭉 따라오셨다면 아마 redshift_dev_db가 존재하지 않을 겁니다. 이를 다시 설정해주세요.

 

 

 

 

 

csv_url: 
    - https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv

 

그리고 NameGenderCSVtoRedshift_v4.py DAG에서 사용하는 csv_url 역시 위와 같이 설정합니다.

 

 

 

 

터미널상에서 한번 테스트해보겠습니다.

 

PS C:\Users\User> docker exec -it learn-airflow-airflow-scheduler-1 sh
(airflow)whoami
airflow

 

docker ps 명령어로 airflow schduler의 이름을 알아내고 이를 이용해 docker에 shell 스크립트상으로 로그인합니다.

 

 

 

 

(airflow)airflow dags list | grep v4

Error: Failed to load all files. For details, run `airflow dags list-import-errors`
name_gender_v4                           | NameGenderCSVtoRedshift_v4.py                                                                                    | airflow | True

 

우리가 사용할 dags 중에서 v4가 들어간 dags를 찾았습니다.

 

name_gender_v4가 있습니다.

 

 

 

 

 

(airflow)airflow tasks list name_gender_v4

SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')
extract
load
transform

 

찾아낸 dag ID인 name_gender_v4를 이용해 task들을 조회하니 extract, load, transform가 있습니다. SELECT로 시작하는 라인은 코드 안의 main에서 print 된 것이므로 신경쓰지 않으셔도 좋습니다.

 

 

 

 

 

 

 

(airflow)airflow dags test name_gender_v4 2023-12-31

 

이제 DAG를 실행해보겠습니다.

 

정상적으로 실행되었을 겁니다.

 

 

 

 

 

 

 

에러 내보기

이제 DAG에서 에러를 일부러 내보겠습니다.

 

NameGenderCSVtoRedshift_v4.py를 열어

 

 

load 함수에서 INSERT 구문을 IINSERT로 바꾸고

 

 

 

 

 

 

slack import 부분의 주석을 해제합니다.

 

 

 

 

 

 

그리고 dag의 on_failure_callback 부분의 주석도 해제합니다.

 

그리고 저장해주세요.

 

 

 

 

 

 

 

 

 

 

 

 

그리고 Docker Container의 airflow 터미널로 돌아와

 

DAG를 실행해보겠습니다.

 

그런데 이 때 기존에 입력한 날짜와는 다른 날짜를 주어야합니다.

 

왜냐하면 한번 주었던 날짜가 성공했었다면 성공했다고 기록이 남고 다시 그 날짜로 실행하면 skip을 해버립니다. 명확하지 않은 경우이므로 혼동하기 쉽습니다. 이점 조금은 유념해주세요.

 

(airflow)airflow dags test name_gender_v4 2023-06-09
(생략..)
psycopg2.errors.SyntaxError: syntax error at or near "IINSERT"
LINE 1: IINSERT INTO (각자 스키마).name_gender VALUES ('Adaleigh', 'F')

 

실행해보지 않았을 것 같은 날짜를 주고 실행해봅시다.

 

일단 에러를 내는데 성공했습니다.

 

 

 

 

만약 슬랙에 에러 메시지가 오지 않는다면

 

DAG를 정의하는 코드에서 retries: 0 으로 해야합니다.

 

retry 도중에는 성공하지 못한 DAG를 Queue에 집어 넣게되고 retry 마저 실패하기 전 까지는 아직 실패라고 간주하지 않아 슬랙에 에러 메시지를 바로 보내지 않기 때문입니다.