터칭 데이터
Kafka CLI Tools 본문
4. Kafka 기본 프로그래밍
Kafka로 데이터를 생성하고 소비하는 코드를 작성해보자
Contents
1. Client tool 사용
2. Topic 파라미터 설정
3. Consumer 옵션 살펴보기
4. ksqlDB 사용해보기
5. 숙제
Client Tool 사용
Kafka Client Tool을 사용해보자
Kafka CLI Tools 접근 방법
docker ps를 통해 Broker의 Container ID 혹은 Container 이름 파악
해당 컨테이너로 로그인
○ docker exec -it Broker_Container_ID sh
거기서 다양한 kafka 관련 클라이언트 툴을 사용 가능
○ kafka-topics (이번 실습에서 사용)
○ kafka-configs
○ kafka-console-consumer
○ kafka-console-producer (이번 실습에서 사용)
○ …
kafka-topics
kafka-topics는 보이는 그대로 kafka의 Topic과 관련된 기능을 제공하는 유틸리티입니다.
$ kafka-topics --bootstrap-server kafka1:9092 --list
모든 Kafka 관계된 커맨드 라인 유틸리티는 대상이 되는 Kafka 클러스터를 알아야 하기 때문에 --bootstrap-server라는 파라미터로 대상이 되는 카프카 클러스터의 브로커를 하나 지정하게 되어있습니다. kafka1 대신에 보통은 Localhost를 적어주겠지만 지금은 Docker Container안에 세팅이 된 Kafka Broker의 특성 때문에 kafka1이라고 적어줍니다.
우리가 kafka를 서치할 때 사용한 full-stack.yml 파일에서 Broker의 호스트 이름을 kafka1으로 주었었습니다.
그 kafka1의 포트 번호는 9092였습니다.
--list로 내가 지정한 Kafka Broker가 속한 Kafka 클러스터에 있는 모든 Topic들의 리스트가 보입니다.
$ kafka-topics --bootstrap-server kafka1:9092 --delete --topic topic_test
특정 Topic을 지웁니다. --topic 뒤에 내가 지우고 싶은 topic의 이름을 적어줍니다.
Topic들의 리스트를 보니 6개의 토픽들이 보입니다. 지난 시간 Conduktor Web UI에서 보았던 3개의 Topic들 외에도 _로 시작하는 토픽들도 보입니다. Kafka Connect에 관한 토픽들이며 이중에서 __consumer_offsets는 Backpressure(배압)을 대비해 Consumer의 수를 늘릴 때를 대비해 Consumer의 Offset을 기록하는 역할을 합니다.
kafka-console-producer
데이터를 만드는 유틸리티입니다. 만들어진 topic에서 메시지를 소비하는 kafka-conosle-consumer의 반대 역할을 합니다.
Command line을 통해 Topic 만들고 Message 생성 가능
$ kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console
만들고 싶은 혹은 메시지를 ingest하고 싶은 topic의 이름을 --뒤에 지정합니다. (--topic test_console) 만일 토픽의 이름이 이미 있다면 topic에 데이터를 ingest하게 될테고 존재하지 않는다면 topic을 새로 만들고 ingest하게 될 것입니다.
이렇게 만들어진 topic은 기본 파티션 하나에 레플리카 하나입니다.
유틸리티 명령을 입력하면 메시지를 입력할 수 있는 프롬프트가 뜹니다. 타입하고 엔터를 누르면 메시지가 입력됩니다. 더 이상 메시지를 입력하고 싶지 않다면 Ctrl + C를 누르시면 됩니다.
3개의 메시지가 만들어졌군요. (KYH, Hello World!, Bye Bye!)
kafka-console-consumer
내가 혹은 남이 만든 topic에서 메시지를 소비하는 테스트를 할 수 있습니다.
Command line을 통해 Topic에서 Message 읽기 가능
○ --from-beginning 옵션이 있으면 처음부터 읽음 (earliest). 아니면 latest로 동작
$ kafka-console-consumer --bootstrap-server kafka1:9092 --topic test_console --from-beginning
다른 부분은 동일하지만 --from-beginning가 눈에 띕니다. 이 옵션을 주지 않으면 디폴트로 이미 쌓여있는 메시지를 읽지 않고 새로 들어온 메시지만 읽습니다. 지난 시간에 실습한 consumer.py에서
어느 offset부터 읽을 것인지 설정했었죠? 기본은 latest로 내가 실행된 다음 생성된 것만 읽어오게 동작합니다. 우리는 그 대신 쌓여온 모든 메시지들을 처음 부터 읽어오도록 earliest로 설정했었습니다. 이 earliest와 같은 역할을 하는 유틸리티 명령 옵션이 바로 --from-beginning입니다.
consumer 코드는 기본적으로 새로 생긴 메시지를 읽어오기 위해 계속 기다립니다. 다 읽어왔다해서 끝내지 않고 계속 대기 상태로 기다립니다. 코드를 보면 아실 수 있지만 for loop 명령을 도는데 명시적으로 프로그램을 끝내주어야 동작을 멈춥니다. (Ctrl + C로 중지)
두 Console 프로세스들의 Side-by-side 실행
터미널을 하나 열고 동일 Broker 로그인 후 console-producer로 메세지 발생
Producer에서 Coffee, Hey Can You See Me? 등을 타이핑하고 엔터를 치니 Consumer 쪽에서 레코드가 바로 보입니다.
Kafka CLI Tools 데모
앞서 내용들을 데모로 실행
PS D:\Dev_KDT\kafka\kafka-stack-docker-compose> docker ps
docker ps로 실행 중인 Kafka 컨테이너에서 broker 컨테이너의 ID를 찾습니다. 실습을 그대로 따라오며 full-stack.yml로 docker up을 하셨다면 kafka1의 ID를 사용하시면 됩니다.
docker exec -it (kafka1의 컨테이너 ID) sh
PS D:\Dev_KDT\kafka\kafka-stack-docker-compose> docker exec -it (ID) sh
sh-4.4$
sh-4.4$ ls -tl
total 0
kafka는 이미 다른 곳에 설치되어 있고 지금은 폴더에 설치된 것이 아무 것도 없습니다.
sh-4.4$ kafka-topics
Create, delete, describe, or change a topic.
Option Description
------ -----------
--alter Alter the number of partitions,
replica assignment, and/or
configuration for the topic.
(생략..)
kafka-topics만 칠 경우 뜨는 kafka-topics 유틸리티의 다양한 옵션 도움말들
현재 clear 명령어가 설치되어 있지 않아 창을 정리하고 싶다면 Ctrl + L을 누르세요.
sh-4.4$ kafka-topics --bootstrap-server kafka1:9092 --list
__consumer_offsets
_confluent-ksql-ksqldb-server__command_topic
_schemas
docker-connect-configs
docker-connect-offsets
docker-connect-status
topic_test
kafka-topics --bootstrap-server kafka1:9092 --list으로 Topic 들을 조회합니다.
--bootstrap-server에 대해: 모든 Kafka 관계된 커맨드 라인 유틸리티는 대상이 되는 Kafka 클러스터를 알아야 하기 때문에 --bootstrap-server라는 파라미터로 대상이 되는 카프카 클러스터의 브로커를 하나 지정하게 되어있습니다.
위의 토픽들에서 지우고 싶으신 topic이 있다면
kafka-topics --bootstrap-server kafka1:9092 --delete --topic (토픽의 이름)
kafka-topics --bootstrap-server kafka1:9092 --delete --topic (토픽의 이름)
두 콘솔을 Side-by-side로 실행해보겠습니다.
터미널 2개를 열고 두 터미널 모두 broker 컨테이너로 docker exec로 접속한 후 왼쪽에는 producer 오른쪽에는 consumer를 실습해보겠습니다.
왼쪽 Producer 터미널
sh-4.4$ kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console
오른쪽 Consumer 터미널
sh-4.4$ kafka-console-consumer --bootstrap-server kafka1:9092 --topic test_console
[2024-01-25 07:21:49,390] WARN [Consumer clientId=console-consumer, groupId=console-consumer-87893] Error while fetching metadata with correlation id 2 : {test_console=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
왼쪽 Producer 터미널에서 Hello, Who Are You?를 타이핑하고 엔터를 눌러보겠습니다.
왼쪽 Producer 터미널
sh-4.4$ kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console
>Hello
>Who Are You?
오른쪽 Consumer 터미널
sh-4.4$ kafka-console-consumer --bootstrap-server kafka1:9092 --topic test_console
[2024-01-25 07:21:49,390] WARN [Consumer clientId=console-consumer, groupId=console-consumer-87893] Error while fetching metadata with correlation id 2 : {test_console=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
Hello
Who Are You?
왼쪽 Producer 터미널에서 입력한 단어들이 오른쪽 Consuer 터미널에서 실시간으로 보입니다.
지금 어떤 일이 내부에서 벌어지는 중인지 설명드리자면 test_console이라는 토픽에 Hello, Who Are You라는 메시지를 write했고 오른쪽에 있는 Consumer가 읽어 보여주는 중입니다.
왼쪽 Producer 터미널
sh-4.4$ kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console
>Hello
>Who Are You?
>^Csh-4.4$
왼쪽 Producer 터미널에서 Ctrl + C로 끝내도
오른쪽 Consumer 터미널
sh-4.4$ kafka-console-consumer --bootstrap-server kafka1:9092 --topic test_console
[2024-01-25 07:21:49,390] WARN [Consumer clientId=console-consumer, groupId=console-consumer-87893] Error while fetching metadata with correlation id 2 : {test_console=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
Hello
Who Are You?
오른쪽 Consumer 터미널은 끝나지 않습니다.
왼쪽 터미널이 끝났는지 알 수도 없고 우리가 짠 코드 자체가 계속 Consumer를 기다리도록 만들었기 때문입니다.
Conduktor Web UI에서 우리가 방금 실습한 test_console Topic의 모습이 어떨지 확인하고 실습을 마치겠습니다.
Topics 메뉴에서 test_console을 클릭하면
우리가 입력했던 Hello, Who Are You 메시지들이 타임스탬프와 함께 보입니다. Producer에서 Key는 넣지 않는 모양인지 Key는 null입니다.
'Kafka와 Spark Streaming' 카테고리의 다른 글
Consumer 옵션 살펴보기 (0) | 2024.01.25 |
---|---|
Topic 파라미터 설정 (0) | 2024.01.25 |
Kafka Python 프로그래밍 기본과 숙제 (0) | 2024.01.24 |
Kafka 설치 (0) | 2024.01.24 |
Kafka 기타 기능 살펴보기 (0) | 2024.01.24 |