터칭 데이터

Kafka Python 프로그래밍 기본과 숙제 본문

Kafka와 Spark Streaming

Kafka Python 프로그래밍 기본과 숙제

터칭 데이터 2024. 1. 24. 17:53

 

3. Kafka 소개


Kafka가 무엇인지 소개하는 시간을 가져보자

 

 

 

 

 

 

 

Contents


1. Kafka 역사
2. Kafka 소개
3. Kafka 아키텍처
4. Kafka 중요 개념
5. Kafka 설치
6. Kafka Python 프로그래밍

 

 

 

 

 

 

 

 

 

 

Kafka Python 프로그래밍

Kafka 프로그래밍을 위한 Python 모듈을 설치하고 기본 프로그래밍을 수행해보자

 

 

 

 

 

 

 

 

Kafka 프로그래밍 옵션들

 

Java:

○ Apache Kafka Java Client: 아파치 카프카의 공식 Java 클라이언트 라이브러리
○ Spring Kafka: 스프링 프레임워크와 Kafka를 통합하기 위한 라이브러리

 

Python:

○ Confluent Kafka Python: Confluent에서 개발한 공식 Kafka Python 클라이언트 라이브러리
○ Kafka-Python: 또다른 파이썬 기반 라이브러리

 

.NET:

○ Confluent Kafka .NET Client: Confluent에서 개발한 공식 Kafka .NET 클라이언트 라이브러리

 

Go:

○ Sarama: Go 언어용 Kafka 클라이언트 라이브러리로

 

Node.js:

○ node-rdkafka: librdkafka를 기반으로 한 Node.js용 Kafka 클라이언트 라이브러리
○ kafka-node: Node.js용 Kafka 클라이언트 라이브러리

 

 

 

 

 

 

 

 

 

 

 

Python 모듈 설치

pip3 install kafka-python

 

 

Kafka 컨테이너 내부가 아닌 본인의 로컬에서 pip3 install 해주세요!

 

 

 

 

 

 

 

 

 

 

 

간단한 Producer 만들기

 

from time import sleep
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(
   bootstrap_servers=['localhost:9092'],
   value_serializer=lambda x: dumps(x).encode('utf-8')
)

for j in range(999):
   print("Iteration", j)
   data = {'counter': j}
   producer.send('topic_test', value=data)
   sleep(0.5)

 

 

KafkaProducer

 

 

producer.send

 

 

 

 

 

 

 

 

 

 

 

 

잠깐: Python Lambda 함수

 

람다 함수는 흔히 이야기하는 함수형 언어(LISP, Haskell, …)에서는 기본 개념

○ Imperative Programming (step-by-step): Python, C/C++, Java, …
○ Functional Programming (수학 공식처럼 함수를 연결해서 계산): Erlang, Lisp, 
○ Declarative Programming (원하는 결과와 어디서 그걸 얻을지만 기술): SQL

 

파이썬에서 람다함수를 사용하는 경우는?

○ 보통 higher-order 함수 (함수를 인자로 받는 함수)의 인자로 람다함수를 사용
    ■ higher-order 함수의 예: map, filter, reduce, sorted… 앞의 value_serializer
    ■ Pandas의 일부 함수들은 higher-order 함수: apply, assign, map, …
○ 람다함수의 포맷:
    ■ lambda arguments: expression
    ■ 예) lambda x: dumps(x).encode('utf-8')

 

예제 코드 살펴보기

 

 

 

 

 

 

 

 

 

간단한 Producer 실행

Kafka가 실행되어 있지 않다면 “NoBrokersAvailable”이란 에러 메세지 발생

 

 

 

 

 

 

 

 

 

 

 

Kafka 웹 콘솔에서 Topic 생성 여부 확인

http://localhost:8080/ 방문 후 왼쪽 메뉴에서 Topics 선택

 

 

 

 

 

 

 

 

 

 

 

 

 

Consumer 객체 만들기

 

KafkaConsumer

 

일반적으로 현업에서는 enable_auto_commit을 True로 두는 것은 위험하기 때문에 False로 두는 것이 일반적입니다.

 

 

 

 

 

 

 

 

 

 

간단한 Consumer 만들기

from kafka import KafkaConsumer
from json import loads
from time import sleep

consumer = KafkaConsumer(
   'topic_test',
   bootstrap_servers=['localhost:9092'],
   auto_offset_reset='earliest',
   enable_auto_commit=True,
   group_id='my-group-id',
   value_deserializer=lambda x: loads(x.decode('utf-8'))
)
for event in consumer:
   event_data = event.value
   # Do whatever you want
   print(event_data)
   sleep(2)

 

 

KafkaConsumer

 

 

 

 

 

 

 

 

 

 

간단한 Consumer 실행

Kafka가 실행되어 있지 않다면 “NoBrokersAvailable”이란 에러 메세지 발생

 

 

 

 

 

 

 

 

 

 

 

Kafka Python Producer/Consumer 데모

 

앞서 내용을 실제로 데모해보자

 

999는 너무 클수도 있으니 적당히 줄여서 실행하는 것도 좋음

 

 

 

 

 

pip install kafka-python

 

먼저 pip3 install kafka-python

C:\Users\User>pip3 install kafka-python
Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
     ---------------------------------------- 246.5/246.5 kB 1.3 MB/s eta 0:00:00
Installing collected packages: kafka-python
Successfully installed kafka-python-2.0.2
WARNING: There was an error checking the latest version of pip.

 

 

 

 

 

 

 

Producer.py 실행

 

위에서 살펴본 producer.py를 루트 디렉토리에 놓은 후

D:\Dev_KDT\kafka\kafka-stack-docker-compose>python producer.py
Iteration 0
Iteration 1
Iteration 2
Iteration 3
Iteration 4
 

실행시켜줍니다. 0부터 998까지는 시간이 오래 걸려 0부터 4까지 반복문이 돌도록 코드를 임의로 수정했습니다.

 

 

 
 

우측 하단을 보면 의도한대로 topic_test가 보입니다. 0부터 4까지 Count는 5로 잡힙니다.

 
 
 
 

클릭해보니 별다른 Key와 기타 옵션도 주지 않았기 때문에 위와 같이 Timestamp와 Value가 보입니다.

 

 

 
 
 
 
 
 
 

Consumer.py 실행

 
이번에는 consumer.py를 루트 디렉토리에 놓은 후
 
 
 
 
D:\Dev_KDT\kafka\kafka-stack-docker-compose>python consumer.py
{'counter': 0}
{'counter': 1}
{'counter': 2}
{'counter': 3}
{'counter': 4}
python 실행해줍니다.
 

우리가 조금 전에 만든 topic_test라는 Topic에서 5개의 메시지를 하나씩 읽어오는 것을 볼 수 있습니다.

 

 

 
 

 

 

'Kafka와 Spark Streaming' 카테고리의 다른 글

Topic 파라미터 설정  (0) 2024.01.25
Kafka CLI Tools  (0) 2024.01.25
Kafka 설치  (0) 2024.01.24
Kafka 기타 기능 살펴보기  (0) 2024.01.24
Kafka 중요 개념  (0) 2024.01.24