터칭 데이터
Airflow API와 모니터링 본문
Contents
1. ELT 구현
2. Slack 연동하기
3. 구글 시트 연동하기 (1): 시트 => Redshift 테이블
4. 구글 시트 연동하기 (2): Redshift 테이블 => 시트
5. API & Airflow 모니터링
6. 숙제
API & Airflow 모니터링
Airflow가 제공해주는 API에 대해서 알아보고 이를 이용해 모니터링 방법에 대해 알아보자
이번 섹션에서 해보고자 하는 일들
Airflow의 건강 여부 체크 (health check)을 어떻게 할지 학습
대부분의 서비스와 마찬가지로 Airflow도 얼마나 건강한지 외부에서 체크할 수 있는 Health Check라는 엔드 포인트가 있습니다. 이를 주기적으로 실행해 중요한 Airflow의 컴포넌트 2개 Web Server와 Scheduler의 상태를 체크할 수 있습니다.
Airflow API로 외부에서 Airflow를 조작해보는 방법에 대해 학습
Airflow도 API를 갖고 있습니다. 현재 우리는 Docker compose로 Airflow를 실행하고 있는데 이때 디폴트로 외부에서 Airflow를 조작할 수 있습니다. 어떤 config를 설정해야 Airflow를 API로 조작할 수 있는지 알아보겠습니다.
Airflow API 활성화 (1)
airflow.cfg의 api 섹션에서 auth_backend의 값을 변경
[api]
auth_backend = airflow.api.auth.backend.basic_auth
(가장 간단한 방법은 basic_auth)
docker-compose.yaml에는 이미 설정이 되어 있음 (environments)
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
AIRFLOW의 cfg 파일의 API 섹션의 auth_backend를 덮어 쓴다는 뜻입니다. 처음 언더스코어(_) 2개 다음은 섹션(API)이고 그 다음은 그 섹션 밑의 키(auth_backend)입니다. backend로 단수형이라면 값을 하나만 세팅할 수 있고 다수의 방법으로 로그인하고 인증한다면 backends로 s를 붙여주고 다수의 각 값을 콤마를 사이에 두고 적어줍니다.
아래 명령으로 확인해보기
$ docker exec -it learn-airflow-airflow-scheduler-1 airflow config get-value api auth_backend
airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session
Airflow API 활성화 (2)
Airflow Web UI에서 새로운 사용자 추가 (API 사용자)
Security → List Users → +
이후 화면에서 새 사용자 정보 추가 (monitor:MonitorUser1)
Health API 호출
API로 Airflow를 조작 하는 것이 아닌 상태를 체크합니다. 덕분에 Health 엔드 포인트는 Airflow API 활성화 여부와 관계 없이 사용할 수 있습니다.
/health API 호출
curl -X GET --user "monitor:MonitorUser1" http://localhost:8080/health
정상 경우 응답:
{
"metadatabase": {
"status": "healthy"
},
"scheduler": {
"status": "healthy",
"latest_scheduler_heartbeat": "2022-03-12T06:02:38.067178+00:00"
}
}
latest_scheduler_heartbeat는 5~10분 전이어야 합니다. 예를 들어 30분 혹은 1시간이나 그 이상이라면 상태가 healthy여도 문제가 있다고 보셔야 합니다. 날짜 중간의 T와 맨 뒤의 +00:00은 UTC로 그리니치 천문대와의 시차를 얘기합니다.
API 사용예
API 레퍼런스 살펴보기
특정 DAG를 API로 Trigger하기
모든 DAG 리스트하기
모든 Variable 리스트하기
모든 Config 리스트하기
…
API 사용예 - 특정 DAG를 API로 Trigger하기
curl -X POST --user "airflow:airflow" -H 'Content-Type: application/json' -d
'{"execution_date":"2023-05-24T00:00:00Z"}'
"http://localhost:8080/api/v1/dags/HelloWorld/dagRuns"
DAG를 실행하는 명령들은 execution_date이 필요합니다. 그래서 POST 방식으로 -d 옵션을 이용해 single quote(')로 감싸 날짜(필요한 데이터)를 JSON 포맷으로 전달합니다. api/v1/dags/뒤에는 DAG ID가 들어갑니다. (여기서는 HelloWorld)
{
"detail": "DAGRun with DAG ID: 'HelloWorld' and DAGRun logical date: '2023-05-24 00:00:00+00:00'
already exists",
"status": 409,
"title": "Conflict",
"type":
"https://airflow.apache.org/docs/apache-airflow/2.5.1/stable-rest-api-ref.html#section/Errors/AlreadyExists"
}
API 사용예 - 모든 DAG 리스트하기
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/dags
{
"dag_id": "SQL_to_Sheet",
"default_view": "grid",
"description": null,
"file_token": "...",
"fileloc": "/opt/airflow/dags/SQL_to_Sheet.py",
"has_import_errors": false,
"has_task_concurrency_limits": false,
"is_active": true,
"is_paused": true,
"is_subdag": false,
"last_expired": null,
"last_parsed_time": "2023-06-18T05:21:34.266335+00:00",
"last_pickled": null,
"max_active_runs": 16,
"max_active_tasks": 16,
"next_dagrun": "2022-06-18T00:00:00+00:00",
"next_dagrun_create_after": "2022-06-18T00:00:00+00:00",
"next_dagrun_data_interval_end": "2022-06-18T00:00:00",
"next_dagrun_data_interval_start": "2022-06-18T00:00:00",
"owners": [ "airflow" ],
"pickle_id": null,
"root_dag_id": null,
"schedule_interval": {
"__type": "CronExpression",
"value": "@once"
},
"scheduler_lock": null,
"tags": [ { "name": "example" }],
"timetable_description": "Once, as soon as possible"
API 사용예 - 모든 Variable 리스트하기
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/variables
{
"total_entries": 7,
"variables": [
{
"description": null,
"key": "api_token",
"value": "12345667"
},
{
"description": null,
"key": "csv_url",
"value": "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
}
]
}
API 사용예 - 모든 Config 리스트하기
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/config
{
"detail": "Your Airflow administrator chose not to expose the configuration, most likely for security reasons.",
"status": 403,
"title": "Forbidden",
"type": "https://airflow.apache.org/docs/apache-airflow/2.5.1/stable-rest-api-ref.html#section/Errors/PermissionDenied"
}
Variables/Connections Import/Export
airflow variables export variables.json
airflow variables import variables.json
airflow connections export connections.json
airflow connections import connections.json
데모
API 활성화 여부 점검
API 4개를 실행해보기
모니터링하기
API 활성화 여부 점검
docker exec -it learn-airflow-airflow-scheduler-1 airflow config get-value api auth_backend
airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session
docker exec -it learn-airflow-airflow-scheduler-1 airflow config get-value api auth_backend 명령으로 yaml파일에 설정된 auth.backend 키의 값들을 확인했습니다.
참고로 위의 명령은
docker exec -it learn-airflow-airflow-scheduler-1 sh
(airflow)airflow config get-value api auth_backend
airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session
이렇게 airflow 로그인 후에 연달아 airflow config 명령을 실행한 것과 같습니다.
특정 DAG 트리거
맥
curl -X POST --user "airflow:airflow" -H 'Content-Type: application/json' -d '{"execution_date":"2023-05-24T00:00:00Z"}' "http://localhost:8080/api/v1/dags/HelloWorld/dagRuns"
윈도우
$headers = New-Object "System.Collections.Generic.Dictionary[[String],[String]]"
$headers.Add("Content-Type", "application/json")
$headers.Add("Authorization", "Basic " + [System.Convert]::ToBase64String([System.Text.Encoding]::ASCII.GetBytes("airflow:airflow")))
$body = @{
execution_date = "2023-05-24T00:00:00Z"
} | ConvertTo-Json
Invoke-WebRequest -Uri "http://localhost:8080/api/v1/dags/HelloWorld/dagRuns" -Method POST -Body $body -Headers $headers
airflow scheduler안에서는 8080이 열리지 않았으므로 airflow web server에 로그인하거나 airflow 로그인을 안한 상태에서 실행하셔야 합니다.
모든 DAG 리스팅하기
curl -X GET --user "aiflow:airflow" http://localhost:8080/api/v1/dags
모든 Variables 리스팅하기
curl -X GET --user "aiflow:airflow" http://localhost:8080/api/v1/variables
모든 Connections 리스팅하기
curl -X GET --user "aiflow:airflow" http://localhost:8080/api/v1/connections
모든 Config 리스팅하기
curl -X GET --user "aiflow:airflow" http://localhost:8080/api/v1/config
아마 보안 이슈로 막혀있을텐데 airflow.cfg에서 컨트롤할 수 있는 섹션과 키가 있을 것입니다. 이를 docker-compose.yaml 파일에서 사용하셔야 합니다.
Variables/Connections Import/Export
airflow 바깥 터미널에서
docker exec -it learn-airflow-airflow-scheduler-1 airflow variables export var.json
var.json에서 var이름은 원하시는대로 주셔도 됩니다.
바깥으로 variables를 내보냅니다.
docker exec -it learn-airflow-airflow-scheduler-1 airflow variables import var.json
바깥의 variables를 import할 수도 있습니다.
'Airflow 고급 기능, dbt, Data Catalog' 카테고리의 다른 글
Dag Dependencies - TriggerDagRunOperator & Jinja Template (0) | 2024.01.03 |
---|---|
2 번째 챕터 (0) | 2024.01.03 |
Airflow와 구글시트 연동하기 - Redshift → sheet (0) | 2024.01.02 |
Airflow와 구글시트 연동하기 - 개요와 sheet → Redshift (0) | 2024.01.02 |
Slack 연동하기 (0) | 2024.01.02 |