터칭 데이터

ksqlDB 사용해보기 본문

Kafka와 Spark Streaming

ksqlDB 사용해보기

터칭 데이터 2024. 1. 25. 14:01

 

 

4. Kafka 기본 프로그래밍


Kafka로 데이터를 생성하고 소비하는 코드를 작성해보자

 

 

 

 

 

 

Contents

1. Client tool 사용
2. Topic 파라미터 설정
3. Consumer 옵션 살펴보기
4. ksqlDB 사용해보기
5. 숙제

 

 

 

 

 

 

ksqlDB 사용해보기

 

Topic 데이터를 SQL을 사용해서 접근해보자

 

 

 

 

 

 

ksqlDB

 

REST API나 ksql 클라이언트 툴을 사용해서 Topic을 테이블처럼 SQL로 조작

방법 1) REST API를 ksql DB 서버에 보내 ksql DB가 연결된 Kafka Cluster의 Topic들을 SQL DB의 테이블이나 뷰처럼 처리

 

방법 2) ksql이라는 커맨드라인 유틸리티를 사용해 동일한 일을 수행 가능 (시간 관계상 이것만 실습)

 

여기서는 ksql을 사용하는 간단한 데모

○ docker ps 후 confluentinc/cp-ksqldb-server의 Container ID 복사
○ docker exec -it ContainerID sh
○ ksql 실행후 아래 두 개의 명령 실행

    ■ CREATE STREAM my_stream (id STRING, name STRING, title STRING) with 
       (kafka_topic='fake_people', value_format='JSON');
    ■ SELECT * FROM my_stream;

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

ksqlDB 실행 스크린 샷

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

실습

 

 

docker ps로 ksqldb-server의 컨테이너 ID를 알아낸 뒤

 

docker exec -it (ID) sh
sh-4.4$

 

 

 

sh-4.4$ ksql

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =        The Database purpose-built       =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2022 Confluent Inc.

CLI v7.3.2, Server v7.3.2 located at http://localhost:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

 

ksql DB가 실행되어 있는 컨테이너로 들어왔고 ksql shell 유틸리티가 설치되어 있고 모든 인증이 끝나 있는 상태이므로 ksql만 치면 됩니다.

 

 

 

 

ksql> CREATE STREAM my_stream (id STRING, name STRING, title STRING) with (kafka_topic='fake_people', value_format='JSON');

 

CREATE STREAM my_stream (id STRING, name STRING, title STRING) with (kafka_topic='fake_people', value_format='JSON');

쿼리를 실행합니다.

 

 

ksql> SELECT * FROM my_stream;
+--------------------------------------+--------------------------------------+--------------------------------------+
|ID                                    |NAME                                  |TITLE                                 |
+--------------------------------------+--------------------------------------+--------------------------------------+
(생략..)
Query Completed
Query terminated

 

latest 형태로 새로 생긴 레코드만 읽고 싶다면 EMIT CHANGES를 붙일 수 있습니다.

 

producer가 레코드를 만들 때 timestamp가 뭐였는지 Topic안으로 들어갔을 때 timestamp가 알고 싶다면 ROWTIME이나 EVENTTIME을 찍을 수 있습니다.

 

 

 

ksql> SELECT *, ROWTIME FROM my_stream EMIT CHANGES;
+----------------------------+----------------------------+----------------------------+----------------------------+
|ID                          |NAME                        |TITLE                       |ROWTIME                     |
+----------------------------+----------------------------+----------------------------+----------------------------+

 

ROWTIME은 Producer가 레코드를 만들 때의 timestamp입니다.

EMIT CHANGES는 새로 생긴 레코드만 읽어옵니다.

 

현재는 읽어오는 것이 아무것도 없을겁니다.

 

 

 

Ctrl + C로 빠져나온 뒤

 

EMIT CHANGE를 빼고 10개만 읽어옵니다.

ksql> SELECT *, ROWTIME FROM my_stream LIMIT 10;
+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+
|ID                                                               |NAME                                                             |TITLE                                                            |ROWTIME                                                          |
+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+
|4d555f67-6f8d-4b9b-8534-4d3322ca22fb                             |Theresa Cooper                                                   |Sound Technician, Broadcasting/Film/Video                        |1706177748964                                                    |
|31db29a5-0923-44a5-972e-acb17fc9eb13                             |Margaret Dawson                                                  |Mechanical Engineer                                              |1706177748967                                                    |
|e331a572-dad4-42b3-ace1-39f32bcef4dd                             |Ronald Williams                                                  |Civil Service Administrator                                      |1706177748946                                                    |
|927c3fee-ed95-4e69-8629-ef7e38203196                             |Joshua Cunningham                                                |Psychologist, Prison And Probation Services                      |1706177748966                                                    |
|5c394f97-fa18-4ece-b081-5ad9f521e089                             |Alexis Prince                                                    |Homeopath                                                        |1706177748967                                                    |
|2d54d0f8-55eb-434e-8a23-54203aefb8b4                             |Wendy Jimenez                                                    |Energy Engineer                                                  |1706177748968                                                    |
|852e619e-e374-4504-a00f-f509390d47b3                             |Elizabeth Spencer                                                |Medical Sales Representative                                     |1706177748969                                                    |
|b0b3906e-4399-4d17-ac6a-80debe857586                             |Christina Mueller                                                |Copy                                                             |1706177748970                                                    |
|842be4da-cb4d-4c6d-a6d0-e7b27e38238b                             |Miss Julie Maynard PhD                                           |Personal Assistant                                               |1706177748965                                                    |
|af1f8a90-0c1f-40c6-8b33-9606992a4a61                             |Nancy Carson                                                     |Psychiatrist                                                     |1706177748966                                                    |
Limit Reached
Query terminated

 

1970년 1월1일 이후로 몇 밀리세컨드나 지났는지의 형태로 출력됩니다.

 

 

 

 

 

 

'Kafka와 Spark Streaming' 카테고리의 다른 글

Spark Streaming 소개  (0) 2024.01.26
숙제  (0) 2024.01.25
Consumer 옵션 살펴보기  (0) 2024.01.25
Topic 파라미터 설정  (0) 2024.01.25
Kafka CLI Tools  (0) 2024.01.25