터칭 데이터

Airflow API와 모니터링 본문

Airflow 고급 기능, dbt, Data Catalog

Airflow API와 모니터링

터칭 데이터 2024. 1. 3. 00:18

 

 

Contents


1. ELT 구현

 

2. Slack 연동하기

 

3. 구글 시트 연동하기 (1): 시트 => Redshift 테이블

 

4. 구글 시트 연동하기 (2): Redshift 테이블 => 시트

 

5. API & Airflow 모니터링

 

6. 숙제

 

 

 

 

 

 

 

 

 

API & Airflow 모니터링


Airflow가 제공해주는 API에 대해서 알아보고 이를 이용해 모니터링 방법에 대해 알아보자

 

 

 

 

 

 

이번 섹션에서 해보고자 하는 일들

 

Airflow의 건강 여부 체크 (health check)을 어떻게 할지 학습

대부분의 서비스와 마찬가지로 Airflow도 얼마나 건강한지 외부에서 체크할 수 있는 Health Check라는 엔드 포인트가 있습니다. 이를 주기적으로 실행해 중요한 Airflow의 컴포넌트 2개 Web Server와 Scheduler의 상태를 체크할 수 있습니다.

 

Airflow API로 외부에서 Airflow를 조작해보는 방법에 대해 학습

Airflow도 API를 갖고 있습니다. 현재 우리는 Docker compose로 Airflow를 실행하고 있는데 이때 디폴트로 외부에서 Airflow를 조작할 수 있습니다. 어떤 config를 설정해야 Airflow를 API로 조작할 수 있는지 알아보겠습니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Airflow API 활성화 (1)

 

airflow.cfg의 api 섹션에서 auth_backend의 값을 변경

[api]
auth_backend = airflow.api.auth.backend.basic_auth

(가장 간단한 방법은 basic_auth)

 

 

docker-compose.yaml에는 이미 설정이 되어 있음 (environments)

AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'

 

AIRFLOW의 cfg 파일의 API 섹션의 auth_backend를 덮어 쓴다는 뜻입니다. 처음 언더스코어(_) 2개 다음은 섹션(API)이고 그 다음은 그 섹션 밑의 키(auth_backend)입니다. backend로 단수형이라면 값을 하나만 세팅할 수 있고 다수의 방법으로 로그인하고 인증한다면 backendss를 붙여주고 다수의 각 값을 콤마를 사이에 두고 적어줍니다.

 

 

아래 명령으로 확인해보기

$ docker exec -it learn-airflow-airflow-scheduler-1 airflow config get-value api auth_backend
airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session

 

 

 

 

 

 

 

 

 

 

Airflow API 활성화 (2)


Airflow Web UI에서 새로운 사용자 추가 (API 사용자)


Security → List Users → +
이후 화면에서 새 사용자 정보 추가 (monitor:MonitorUser1)

 

 

 

 

 

 

 

 

 

 

 

Health API 호출

API로 Airflow를 조작 하는 것이 아닌 상태를 체크합니다. 덕분에 Health 엔드 포인트는  Airflow API 활성화 여부와 관계 없이 사용할 수 있습니다.


/health API 호출

curl -X GET --user "monitor:MonitorUser1" http://localhost:8080/health

 

정상 경우 응답:

{
    "metadatabase": {
        "status": "healthy"
    },
    "scheduler": {
        "status": "healthy",
        "latest_scheduler_heartbeat": "2022-03-12T06:02:38.067178+00:00"
    }
}

 

latest_scheduler_heartbeat는 5~10분 전이어야 합니다. 예를 들어 30분 혹은 1시간이나 그 이상이라면 상태가 healthy여도 문제가 있다고 보셔야 합니다. 날짜 중간의 T와 맨 뒤의 +00:00은 UTC로 그리니치 천문대와의 시차를 얘기합니다.

 

 

 

 

 

 

 

 

 

 

 

API 사용예

 

API 레퍼런스 살펴보기

 

특정 DAG를 API로 Trigger하기

 

모든 DAG 리스트하기

 

모든 Variable 리스트하기

 

모든 Config 리스트하기

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

API 사용예 - 특정 DAG를 API로 Trigger하기


curl -X POST --user "airflow:airflow" -H 'Content-Type: application/json' -d 
'{"execution_date":"2023-05-24T00:00:00Z"}' 
"http://localhost:8080/api/v1/dags/HelloWorld/dagRuns"

 

DAG를 실행하는 명령들은 execution_date이 필요합니다. 그래서 POST 방식으로 -d 옵션을 이용해 single quote(')로 감싸 날짜(필요한 데이터)를 JSON 포맷으로 전달합니다. api/v1/dags/뒤에는 DAG ID가 들어갑니다. (여기서는 HelloWorld)

 


{
 "detail": "DAGRun with DAG ID: 'HelloWorld' and DAGRun logical date: '2023-05-24 00:00:00+00:00' 
already exists",
 "status": 409,
 "title": "Conflict",
 "type": 
"https://airflow.apache.org/docs/apache-airflow/2.5.1/stable-rest-api-ref.html#section/Errors/AlreadyExists"
}

 

 

 

 

 

 

 

 

 

 

 

 

API 사용예 - 모든 DAG 리스트하기

 

curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/dags

 

더보기

 {
 "dag_id": "SQL_to_Sheet",
 "default_view": "grid",
 "description": null,
 "file_token": "...",
 "fileloc": "/opt/airflow/dags/SQL_to_Sheet.py",
 "has_import_errors": false,
 "has_task_concurrency_limits": false,
 "is_active": true,
 "is_paused": true,
 "is_subdag": false,
 "last_expired": null,
 "last_parsed_time": "2023-06-18T05:21:34.266335+00:00",
 "last_pickled": null,
 "max_active_runs": 16,
 "max_active_tasks": 16,

 "next_dagrun": "2022-06-18T00:00:00+00:00",
 "next_dagrun_create_after": "2022-06-18T00:00:00+00:00",
 "next_dagrun_data_interval_end": "2022-06-18T00:00:00",
 "next_dagrun_data_interval_start": "2022-06-18T00:00:00",
 "owners": [ "airflow" ],
 "pickle_id": null,
 "root_dag_id": null,
 "schedule_interval": {
 "__type": "CronExpression",
 "value": "@once"
 },
 "scheduler_lock": null,
 "tags": [ { "name": "example" }],
 "timetable_description": "Once, as soon as possible"

 

 

 

 

 

 

 

 

 

API 사용예 - 모든 Variable 리스트하기

 

curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/variables


{
    "total_entries": 7,
    "variables": [
        {
            "description": null,
            "key": "api_token",
            "value": "12345667"
        },
        {
            "description": null,
            "key": "csv_url",
            "value": "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
        }
    ]
}

 

 

 

 

 

 

 

 

 

 

API 사용예 - 모든 Config 리스트하기

 

curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/config

 

{
    "detail": "Your Airflow administrator chose not to expose the configuration, most likely for security reasons.",
    "status": 403,
    "title": "Forbidden",
    "type": "https://airflow.apache.org/docs/apache-airflow/2.5.1/stable-rest-api-ref.html#section/Errors/PermissionDenied"
}

 

 

 

 

 

 

 

 

 

 

 

 

 

Variables/Connections Import/Export


airflow variables export variables.json
airflow variables import variables.json


airflow connections export connections.json
airflow connections import connections.json

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

데모

 

API 활성화 여부 점검

 

API 4개를 실행해보기

 

모니터링하기

 

 

 

 

 

 

 

 

 

 

API 활성화 여부 점검

 

docker exec -it learn-airflow-airflow-scheduler-1 airflow config get-value api auth_backend

airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session

 

docker exec -it learn-airflow-airflow-scheduler-1 airflow config get-value api auth_backend 명령으로 yaml파일에 설정된 auth.backend 키의 값들을 확인했습니다.

 

참고로 위의 명령은

 

docker exec -it learn-airflow-airflow-scheduler-1 sh
(airflow)airflow config get-value api auth_backend

airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session

 

이렇게 airflow 로그인 후에 연달아 airflow config 명령을 실행한 것과 같습니다.

 

 

 

 

 

 

특정 DAG 트리거

 

curl -X POST --user "airflow:airflow" -H 'Content-Type: application/json' -d '{"execution_date":"2023-05-24T00:00:00Z"}' "http://localhost:8080/api/v1/dags/HelloWorld/dagRuns"

 

윈도우

$headers = New-Object "System.Collections.Generic.Dictionary[[String],[String]]"
$headers.Add("Content-Type", "application/json")
$headers.Add("Authorization", "Basic " + [System.Convert]::ToBase64String([System.Text.Encoding]::ASCII.GetBytes("airflow:airflow")))

$body = @{
    execution_date = "2023-05-24T00:00:00Z"
} | ConvertTo-Json

Invoke-WebRequest -Uri "http://localhost:8080/api/v1/dags/HelloWorld/dagRuns" -Method POST -Body $body -Headers $headers

 

airflow scheduler안에서는 8080이 열리지 않았으므로 airflow web server에 로그인하거나 airflow 로그인을 안한 상태에서 실행하셔야 합니다.

 

 

 

 

 

 

 

 

 

모든 DAG 리스팅하기

curl -X GET --user "aiflow:airflow" http://localhost:8080/api/v1/dags

 

 

 

 

 

 

모든 Variables 리스팅하기

curl -X GET --user "aiflow:airflow" http://localhost:8080/api/v1/variables

 

 

 

 

 

 

 

모든 Connections 리스팅하기

curl -X GET --user "aiflow:airflow" http://localhost:8080/api/v1/connections

 

 

 

 

 

 

 

 

모든 Config 리스팅하기

curl -X GET --user "aiflow:airflow" http://localhost:8080/api/v1/config

 

아마 보안 이슈로 막혀있을텐데 airflow.cfg에서 컨트롤할 수 있는 섹션과 키가 있을 것입니다. 이를 docker-compose.yaml 파일에서 사용하셔야 합니다.

 

 

 

 

 

 

 

 

Variables/Connections Import/Export

airflow 바깥 터미널에서

docker exec -it learn-airflow-airflow-scheduler-1 airflow variables export var.json

var.json에서 var이름은 원하시는대로 주셔도 됩니다.

바깥으로 variables를 내보냅니다.

 

 

docker exec -it learn-airflow-airflow-scheduler-1 airflow variables import var.json

바깥의 variables를 import할 수도 있습니다.