터칭 데이터
Kafka Stream 예제 프로그램 본문
5. Spark Streaming 소개와 Kafka 연동
Spark Structured Streaming에 대해 알아보고 Kafka Topic을 사용하는 방법을 배워보자
Contents
1. Spark Streaming 소개
2. Spark 환경 설정
3. Streaming WordCount 예제 프로그램
4. Kafka Stream 예제 프로그램
5. 강의 마무리
Kafka Stream 예제 프로그램
Structured Streaming으로 Kafka 토픽을 처리해보자
이번에는 Input을 Kafka Topic으로 받아 처리하는 실습을 진행하겠습니다.
Kafka와 Spark Streaming 다이어그램
Spark Structured Streaming을 보통 사용함
fake_people에서 읽어들이는 Kafka Stream
fake_people 토픽을 기준으로 가장 많은 title 열 개를 계산하는 코드
코드 자체는 아주 단순하지만 실행하는 부분이 복잡함
○ 그냥 spark-submit으로는 안되고 환경설정이 좀 필요함. 뒤에서 더 설명
○ Kafka와 연결하는 Connect가 필요합니다. Java의 Jar파일로 다운 받아 세팅해야합니다.
○ spark-sumbit에서 패키지라는 옵션을 사용해 Connect Jar파일을 명시합니다.
Kafka Structured Streaming 프로그램 실행 준비
spark.jars.packages를 설정해주어야함: 두 가지 옵션이 존재
○ spark-defaults.conf 파일 수정
○ SparkSession 생성시 config로 지정
○ spark-submit 실행시 --packages 옵션 사용 => 우리는 이걸 사용해볼 예정
packages 옵션으로 지정할 값 찾기
https://mvnrepository.com/search?q=kafka+structured+streaming
설치한 Spark 버전과 같이 표시된 Scala 버전을 정확하게 만족하는 Jar파일을 사용해야합니다. 위의 메이븐 링크에서 확인한 후 선택해 사용합니다.
Kafka는 버전 0.10에서 바뀌지 않았으므로 안심하고 Kafka 0.10을 선택하고 Spark와 Scala 버전을 맞춰 선택하세요.
spark-submit 명령
spark-submit --packages 옵션에는 <groupId>태그의 값과 콜론(:)과 <artifactId>태그의 값 그리고 콜론(:)과 <versioin>태그의 값을 적어줍니다.
Kafka Streaming 코드 보기
kafka_source_streaming.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("File Streaming Demo") \
.master("local[3]") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.getOrCreate()
schema = StructType([
StructField("id", StringType()),
StructField("name", StringType()),
StructField("title", StringType())
])
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "fake_people") \
.option("startingOffsets", "earliest") \
.load()
kafka_df.printSchema()
"""
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
"""
value_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("value"))
value_df.createOrReplaceTempView("fake_people")
value_df.printSchema()
count_df = spark.sql("SELECT value.title, COUNT(1) count FROM fake_people GROUP BY 1 ORDER BY 2 DESC LIMIT 10")
count_writer_query = count_df.writeStream \
.format("console") \
.outputMode("complete") \
.option("checkpointLocation", "chk-point-dir-json") \
.start()
print("Listening to Kafka")
count_writer_query.awaitTermination()
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1
kafka_source_streaming.py
Kafka Stream 데모
앞의 내용을 실제로 데모
먼저 본인의 Spark와 Scala 버전을 확인합니다.
>spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/26 15:58:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://host.docker.internal:4040
Spark context available as 'sc' (master = local[*], app id = local-123456789).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.5.0
/_/
Using Scala version 2.12.18 (Java HotSpot(TM) 64-Bit Server VM, Java 14.0.1)
Type in expressions to have them evaluated.
Type :help for more information.
저는 Spark 버전은 3.5.0이고 Scala는 2.12.18입니다.
버전에 맞는 곳을 클릭해
태그의 값을 확인해 위에서 설명한대로 --packages 옵션에 사용합니다.
kafka-stack-docker-compose 프로젝트 디렉토리에 kafka_source_streaming.py 파일을 배치합니다.
PS D:\Dev_KDT\kafka\kafka-stack-docker-compose> spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 kafka_source_streaming.py
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12 kafka_source_streaming.py
명령을 실행합니다.
'Kafka와 Spark Streaming' 카테고리의 다른 글
Kafka 기본 개념 정리 (0) | 2024.01.26 |
---|---|
마무리와 숙제 (0) | 2024.01.26 |
Streaming WordCount 예제 프로그램 (0) | 2024.01.26 |
Spark 환경설정 (0) | 2024.01.26 |
Spark Streaming 소개 (0) | 2024.01.26 |