터칭 데이터

DBT Sources 본문

Airflow 고급 기능, dbt, Data Catalog

DBT Sources

터칭 데이터 2024. 1. 5. 01:59

 

 

 

 

 

Contents


1. ELT의 미래는?
2. Database Normalization
3. dbt 소개
4. dbt 사용 시나리오
5. dbt 설치와 환경 설정
6. dbt Models: Input
7. dbt Models: Output
8. dbt Seeds
9. dbt Sources
10. dbt Snapshots
11. dbt Tests
12. dbt Documentation
13. dbt Expectations
14. 마무리

 

 

 

 

 

 

dbt Sources


dbt Sources란 무엇인가?

 

 

 

 

 

 

 

 

 

Staging 테이블을 만들 때 입력 테이블들이 자주 바뀐다면?

 

models 밑의 .sql 파일들을 일일이 찾아 바꿔주어야함

 

이 번거로움을 해결하기 위한 것이 Sources

입력 테이블에 별칭을 주고 별칭을 staging 테이블에서 사용

 

 

 

 

 

 

 

 

Sources 소개

 

기본적으로 처음 입력이 되는 ETL 테이블을 대상으로 함

별칭 제공
최신 레코드 체크 기능 제공

 

테이블 이름들에 별명(alias)을 주는 것

이를 통해 ETL단의 소스 테이블이 바뀌어도 뒤에 영향을 주지 않음
추상화를 통한 변경처리를 용이하게 하는 것
이 별명은 source 이름과 새 테이블 이름의 두 가지로 구성됨
    - 예) raw_data.user_metadata -> keeyong, metadata 

 

Source 테이블들에 새 레코드가 있는지 체크해주는 기능도 제공

 

 

 

 

 

 

 

 

 

 

 

 

Sources 실습 (1)

models/sources.yml 파일 생성

 

version: 2

sources:
    - name: keeyong
      schema: raw_data
      tables:
        - name: metadata
          identifier: user_metadata
        - name: event
          identifier: user_event
        - name: variant
          identifier: user_variant

 

 

raw_data.user_metadata는
JINJA에서 source(“keeyong”, “metadata”)로 지칭됨

 

 

 

 

 

 

 

 

 

 

 

 

Sources 실습 (2)

아래는 src_user_event.sql의 예

models 밑의 다른 파일들도 적절하게 변경

 

 

 

WITH src_user_event AS (
    SELECT * FROM raw_data.user_event {{ source ("keeyong", "event") }}
)
SELECT
    user_id,
    datestamp,
    item_id,
    clicked,
    purchased,
    paidamount
FROM
    src_user_event

 

 

 

 

 

 

 

 

 

Sources 최신성 (Freshness) 

 

특정 데이터가 소스와 비교해서 얼마나 최신성이 떨어지는지 체크하는 기능

 

dbt source freshness 명령으로 수행

 

이를 하려면 models/sources.yml의 해당 테이블 밑에 아래 추가

 

 

version: 2

sources:
    - name: keeyong
      schema: raw_data
      tables:
        - name: event
          identifier: user_event
          loaded_at_field: datestamp
          freshness:
              warn_after: { count: 1, period: hour }
              error_after: { count: 24, period: hour }

 

볼드체 부분은 freshness를 결정해주는 필드입니다.

 

 

지금 raw_data.user_event 테이블에서 datestamp의 최대값이 현재 시간보다 1시간 이상 뒤쳐져 있지만

24시간은 아니라면 warning. 24시간 이상이라면 error! 

 

 

 

 

 

 

 

 

 

 

 

Sources 최신성 (Freshness)

dbt source freshness 명령으로 수행

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

실습

 

learn_dbt/models에 sources.yml 파일을 생성한 후

 

version: 2

sources:
    - name: 각자 스키마 이름
      schema: raw_data
      tables:
        - name: metadata
          identifier: user_metadata
        - name: event
          identifier: user_event
        - name: variant
          identifier: user_variant

 

아까 보신대로 작성합니다.

 

 

 

 

 

 

 

raw_data를 접근하는 models/src에 가신 뒤

 

 

 

src_user_metadata.sql

WITH src_user_metadata AS (
    SELECT * FROM {{ source("각자 스키마 이름", "metadata") }}
)
SELECT 
    user_id,
    age, 
    gender,
    updated_at 
FROM
    src_user_metadata

 

 

 

 

 

src_user_variant.sql

WITH src_user_variant AS (
    SELECT * FROM {{ source("각자 스키마 이름", "variant") }}
)
SELECT 
    user_id,
    variant_id
FROM
    src_user_variant

 

 

 

 

 

 

 

src_user_event.sql

WITH src_user_event AS (
    SELECT * FROM {{ source("각자 스키마 이름", "event") }}
)
SELECT
    user_id,
    datestamp,
    item_id,
    clicked,
    purchased,
    paidamount
FROM
    src_user_event

 

 

 

 

 

위와 같이 3개의 파일들을 jinja template으로 바꿔 작성합니다.

 

 

 

 

 

 

그리고 root 디렉토리에서 dbt run을 실행합니다.

PS D:\Dev_KDT\dbt\learn_dbt> dbt run

 

제대로 매핑이 되어 들어갔습니다. 그런데 test를 붙이지 않아 맞는지 틀린지 알 수는 없습니다. 이에 관한 부분은 dbt test 과정에서 알아보겠습니다.

 

 

 

 

 

 

 

 

 

 

 

이제 Freshness를 체크하는 실습을 진행하겠습니다.

 

 

 

 

다시 models/sources.yml 파일을 여신 뒤

 

version: 2

sources:
    - name: 각자 스키마 이름
      schema: raw_data
      tables:
        - name: metadata
          identifier: user_metadata
        - name: event
          identifier: user_event
          loaded_at_field: datestamp
          freshness:
              warn_after: { count: 1, period: hour }
              error_after: { count: 24, period: hour }
        - name: variant
          identifier: user_variant

 

볼드체 부분을 추가합니다. Freshness를 체크하는 필드입니다.

 

지금 raw_data.user_event 테이블에서 datestamp의 최대값이 현재 시간보다 1시간 이상 뒤쳐져 있지만

24시간은 아니라면 warning. 24시간 이상이라면 error! 

 

 

 

 

 

 

 

PS D:\Dev_KDT\dbt\learn_dbt> dbt source freshness

16:52:16  1 of 1 START freshness of (각자 스키마).event ........................................ [RUN]
16:52:16  1 of 1 ERROR STALE freshness of (각자 스키마).event .................................. [ERROR STALE in 0.46s]

 

Freshness를 만족 시키지 못해 Error가 발생했습니다.

 

 

 

 

PS D:\Dev_KDT\dbt\learn_dbt> date

2024년 1월 5일 금요일 오전 1:53:16

 

여러분의 현재 시간에서 24시간이 지나지 않는 새로운 데이터를 INSERT INTO로 적재하면 더 이상 에러가 발생하지 않을 것입니다.

 

 

INSERT INTO raw_data.user_event VALUES (101, '2024-01-05 00:00:00', 100, 1, 0, 0);

 

시간을 2024년 1월5일 자정으로 주었습니다.

 

 

 

PS D:\Dev_KDT\dbt\learn_dbt> dbt source freshness

16:52:16  1 of 1 START freshness of (각자 스키마).event ........................................ [RUN]
16:52:16  1 of 1 ERROR STALE freshness of (각자 스키마).event .................................. [WARN in 0.46s]

 

다시 dbt source freshness를 실행하면 WARN이 뜹니다. datestamp 필드의 가장 큰 값이 1시간 이상 뒤쳐졌지만 24시간 이상은 아니기 때문입니다.

 

 

 

 

 

'Airflow 고급 기능, dbt, Data Catalog' 카테고리의 다른 글

DBT Tests, Documentation, Expectations  (0) 2024.01.05
DBT Snapshots  (0) 2024.01.05
DBT Seeds  (0) 2024.01.05
데모 Input-Output  (0) 2024.01.04
DBT - Outnput  (0) 2024.01.04