터칭 데이터
Kafka Python 프로그래밍 기본과 숙제 본문
3. Kafka 소개
Kafka가 무엇인지 소개하는 시간을 가져보자
Contents
1. Kafka 역사
2. Kafka 소개
3. Kafka 아키텍처
4. Kafka 중요 개념
5. Kafka 설치
6. Kafka Python 프로그래밍
Kafka Python 프로그래밍
Kafka 프로그래밍을 위한 Python 모듈을 설치하고 기본 프로그래밍을 수행해보자
Kafka 프로그래밍 옵션들
Java:
○ Apache Kafka Java Client: 아파치 카프카의 공식 Java 클라이언트 라이브러리
○ Spring Kafka: 스프링 프레임워크와 Kafka를 통합하기 위한 라이브러리
Python:
○ Confluent Kafka Python: Confluent에서 개발한 공식 Kafka Python 클라이언트 라이브러리
○ Kafka-Python: 또다른 파이썬 기반 라이브러리
.NET:
○ Confluent Kafka .NET Client: Confluent에서 개발한 공식 Kafka .NET 클라이언트 라이브러리
Go:
○ Sarama: Go 언어용 Kafka 클라이언트 라이브러리로
Node.js:
○ node-rdkafka: librdkafka를 기반으로 한 Node.js용 Kafka 클라이언트 라이브러리
○ kafka-node: Node.js용 Kafka 클라이언트 라이브러리
Python 모듈 설치
pip3 install kafka-python
Kafka 컨테이너 내부가 아닌 본인의 로컬에서 pip3 install 해주세요!
간단한 Producer 만들기
from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
for j in range(999):
print("Iteration", j)
data = {'counter': j}
producer.send('topic_test', value=data)
sleep(0.5)
잠깐: Python Lambda 함수
람다 함수는 흔히 이야기하는 함수형 언어(LISP, Haskell, …)에서는 기본 개념
○ Imperative Programming (step-by-step): Python, C/C++, Java, …
○ Functional Programming (수학 공식처럼 함수를 연결해서 계산): Erlang, Lisp,
○ Declarative Programming (원하는 결과와 어디서 그걸 얻을지만 기술): SQL
파이썬에서 람다함수를 사용하는 경우는?
○ 보통 higher-order 함수 (함수를 인자로 받는 함수)의 인자로 람다함수를 사용
■ higher-order 함수의 예: map, filter, reduce, sorted… 앞의 value_serializer
■ Pandas의 일부 함수들은 higher-order 함수: apply, assign, map, …
○ 람다함수의 포맷:
■ lambda arguments: expression
■ 예) lambda x: dumps(x).encode('utf-8')
예제 코드 살펴보기
간단한 Producer 실행
Kafka가 실행되어 있지 않다면 “NoBrokersAvailable”이란 에러 메세지 발생
Kafka 웹 콘솔에서 Topic 생성 여부 확인
http://localhost:8080/ 방문 후 왼쪽 메뉴에서 Topics 선택
Consumer 객체 만들기
일반적으로 현업에서는 enable_auto_commit을 True로 두는 것은 위험하기 때문에 False로 두는 것이 일반적입니다.
간단한 Consumer 만들기
from kafka import KafkaConsumer
from json import loads
from time import sleep
consumer = KafkaConsumer(
'topic_test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group-id',
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
for event in consumer:
event_data = event.value
# Do whatever you want
print(event_data)
sleep(2)
간단한 Consumer 실행
Kafka가 실행되어 있지 않다면 “NoBrokersAvailable”이란 에러 메세지 발생
Kafka Python Producer/Consumer 데모
앞서 내용을 실제로 데모해보자
999는 너무 클수도 있으니 적당히 줄여서 실행하는 것도 좋음
pip install kafka-python
먼저 pip3 install kafka-python
C:\Users\User>pip3 install kafka-python
Collecting kafka-python
Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
---------------------------------------- 246.5/246.5 kB 1.3 MB/s eta 0:00:00
Installing collected packages: kafka-python
Successfully installed kafka-python-2.0.2
WARNING: There was an error checking the latest version of pip.
Producer.py 실행
위에서 살펴본 producer.py를 루트 디렉토리에 놓은 후

D:\Dev_KDT\kafka\kafka-stack-docker-compose>python producer.py
Iteration 0
Iteration 1
Iteration 2
Iteration 3
Iteration 4
실행시켜줍니다. 0부터 998까지는 시간이 오래 걸려 0부터 4까지 반복문이 돌도록 코드를 임의로 수정했습니다.

우측 하단을 보면 의도한대로 topic_test가 보입니다. 0부터 4까지 Count는 5로 잡힙니다.
클릭해보니 별다른 Key와 기타 옵션도 주지 않았기 때문에 위와 같이 Timestamp와 Value가 보입니다.
Consumer.py 실행

D:\Dev_KDT\kafka\kafka-stack-docker-compose>python consumer.py
{'counter': 0}
{'counter': 1}
{'counter': 2}
{'counter': 3}
{'counter': 4}
우리가 조금 전에 만든 topic_test라는 Topic에서 5개의 메시지를 하나씩 읽어오는 것을 볼 수 있습니다.
'Kafka와 Spark Streaming' 카테고리의 다른 글
Topic 파라미터 설정 (0) | 2024.01.25 |
---|---|
Kafka CLI Tools (0) | 2024.01.25 |
Kafka 설치 (0) | 2024.01.24 |
Kafka 기타 기능 살펴보기 (0) | 2024.01.24 |
Kafka 중요 개념 (0) | 2024.01.24 |