터칭 데이터
DBT Sources 본문
Contents
1. ELT의 미래는?
2. Database Normalization
3. dbt 소개
4. dbt 사용 시나리오
5. dbt 설치와 환경 설정
6. dbt Models: Input
7. dbt Models: Output
8. dbt Seeds
9. dbt Sources
10. dbt Snapshots
11. dbt Tests
12. dbt Documentation
13. dbt Expectations
14. 마무리
dbt Sources
dbt Sources란 무엇인가?
Staging 테이블을 만들 때 입력 테이블들이 자주 바뀐다면?
models 밑의 .sql 파일들을 일일이 찾아 바꿔주어야함
이 번거로움을 해결하기 위한 것이 Sources
입력 테이블에 별칭을 주고 별칭을 staging 테이블에서 사용
Sources 소개
기본적으로 처음 입력이 되는 ETL 테이블을 대상으로 함
별칭 제공
최신 레코드 체크 기능 제공
테이블 이름들에 별명(alias)을 주는 것
이를 통해 ETL단의 소스 테이블이 바뀌어도 뒤에 영향을 주지 않음
추상화를 통한 변경처리를 용이하게 하는 것
이 별명은 source 이름과 새 테이블 이름의 두 가지로 구성됨
- 예) raw_data.user_metadata -> keeyong, metadata
Source 테이블들에 새 레코드가 있는지 체크해주는 기능도 제공
Sources 실습 (1)
models/sources.yml 파일 생성
version: 2
sources:
- name: keeyong
schema: raw_data
tables:
- name: metadata
identifier: user_metadata
- name: event
identifier: user_event
- name: variant
identifier: user_variant
raw_data.user_metadata는
JINJA에서 source(“keeyong”, “metadata”)로 지칭됨
Sources 실습 (2)
아래는 src_user_event.sql의 예
models 밑의 다른 파일들도 적절하게 변경
WITH src_user_event AS (
SELECT * FROM raw_data.user_event → {{ source ("keeyong", "event") }}
)
SELECT
user_id,
datestamp,
item_id,
clicked,
purchased,
paidamount
FROM
src_user_event
Sources 최신성 (Freshness)
특정 데이터가 소스와 비교해서 얼마나 최신성이 떨어지는지 체크하는 기능
dbt source freshness 명령으로 수행
이를 하려면 models/sources.yml의 해당 테이블 밑에 아래 추가
version: 2
sources:
- name: keeyong
schema: raw_data
tables:
- name: event
identifier: user_event
loaded_at_field: datestamp
freshness:
warn_after: { count: 1, period: hour }
error_after: { count: 24, period: hour }
볼드체 부분은 freshness를 결정해주는 필드입니다.
지금 raw_data.user_event 테이블에서 datestamp의 최대값이 현재 시간보다 1시간 이상 뒤쳐져 있지만
24시간은 아니라면 warning. 24시간 이상이라면 error!
Sources 최신성 (Freshness)
dbt source freshness 명령으로 수행
실습
learn_dbt/models에 sources.yml 파일을 생성한 후
version: 2
sources:
- name: 각자 스키마 이름
schema: raw_data
tables:
- name: metadata
identifier: user_metadata
- name: event
identifier: user_event
- name: variant
identifier: user_variant
아까 보신대로 작성합니다.
raw_data를 접근하는 models/src에 가신 뒤
src_user_metadata.sql
WITH src_user_metadata AS (
SELECT * FROM {{ source("각자 스키마 이름", "metadata") }}
)
SELECT
user_id,
age,
gender,
updated_at
FROM
src_user_metadata
src_user_variant.sql
WITH src_user_variant AS (
SELECT * FROM {{ source("각자 스키마 이름", "variant") }}
)
SELECT
user_id,
variant_id
FROM
src_user_variant
src_user_event.sql
WITH src_user_event AS (
SELECT * FROM {{ source("각자 스키마 이름", "event") }}
)
SELECT
user_id,
datestamp,
item_id,
clicked,
purchased,
paidamount
FROM
src_user_event
위와 같이 3개의 파일들을 jinja template으로 바꿔 작성합니다.
그리고 root 디렉토리에서 dbt run을 실행합니다.
PS D:\Dev_KDT\dbt\learn_dbt> dbt run
제대로 매핑이 되어 들어갔습니다. 그런데 test를 붙이지 않아 맞는지 틀린지 알 수는 없습니다. 이에 관한 부분은 dbt test 과정에서 알아보겠습니다.
이제 Freshness를 체크하는 실습을 진행하겠습니다.
다시 models/sources.yml 파일을 여신 뒤
version: 2
sources:
- name: 각자 스키마 이름
schema: raw_data
tables:
- name: metadata
identifier: user_metadata
- name: event
identifier: user_event
loaded_at_field: datestamp
freshness:
warn_after: { count: 1, period: hour }
error_after: { count: 24, period: hour }
- name: variant
identifier: user_variant
볼드체 부분을 추가합니다. Freshness를 체크하는 필드입니다.
지금 raw_data.user_event 테이블에서 datestamp의 최대값이 현재 시간보다 1시간 이상 뒤쳐져 있지만
24시간은 아니라면 warning. 24시간 이상이라면 error!
PS D:\Dev_KDT\dbt\learn_dbt> dbt source freshness
16:52:16 1 of 1 START freshness of (각자 스키마).event ........................................ [RUN]
16:52:16 1 of 1 ERROR STALE freshness of (각자 스키마).event .................................. [ERROR STALE in 0.46s]
Freshness를 만족 시키지 못해 Error가 발생했습니다.
PS D:\Dev_KDT\dbt\learn_dbt> date
2024년 1월 5일 금요일 오전 1:53:16
여러분의 현재 시간에서 24시간이 지나지 않는 새로운 데이터를 INSERT INTO로 적재하면 더 이상 에러가 발생하지 않을 것입니다.
INSERT INTO raw_data.user_event VALUES (101, '2024-01-05 00:00:00', 100, 1, 0, 0);
시간을 2024년 1월5일 자정으로 주었습니다.
PS D:\Dev_KDT\dbt\learn_dbt> dbt source freshness
16:52:16 1 of 1 START freshness of (각자 스키마).event ........................................ [RUN]
16:52:16 1 of 1 ERROR STALE freshness of (각자 스키마).event .................................. [WARN in 0.46s]
다시 dbt source freshness를 실행하면 WARN이 뜹니다. datestamp 필드의 가장 큰 값이 1시간 이상 뒤쳐졌지만 24시간 이상은 아니기 때문입니다.
'Airflow 고급 기능, dbt, Data Catalog' 카테고리의 다른 글
DBT Tests, Documentation, Expectations (0) | 2024.01.05 |
---|---|
DBT Snapshots (0) | 2024.01.05 |
DBT Seeds (0) | 2024.01.05 |
데모 Input-Output (0) | 2024.01.04 |
DBT - Outnput (0) | 2024.01.04 |