터칭 데이터
Spark 내부동작 (Execution Plan) - 실습 본문
Contents
1. Spark 파일 포맷
Parquet, Avro, csv, json
2. Execution Plan
Spark이 우리가 만든 코드를 어떻게 실행할 지 보겠습니다.
Action과 Transformation의 차이를 살펴봅니다.
하나의 액션이 하나의 잡이고 하나의 잡이 다수의 스테이지를 만들 수 있고 하나의 스테이지가 다수의 태스크를 만들 수 있습니다.
그리고 이런 Execution Plan을 Spark Web UI로 살펴볼 수 있습니다.
3. Bucketing과 Partitioning
데이터를 처리하기 쉬운 형태로 HDFS에 저장하는 방식에 대해 살펴보겠습니다.
Execution Plan
Spark은 개발자가 만든 코드를 어떻게 변환하여 실행하는가?
WordCount 코드
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.config("spark.sql.adaptive.enabled", False) \
.config("spark.sql.shuffle.partitions", 3) \
.getOrCreate()
df = spark.read.text("shakespeare.txt")
df_count = df.select(explode(split(df.value, " ")).alias("word")).groupBy("word").count()
df_count.show()
spark.adaptive.enabled를 True로 해놓으면 우리가 이해하기 힘든 최적화를 하는 경우가 있기 때문에 지금 우리가 진행 중인 학습이라는 측면에서는 도움이 안됩니다. 그래서 False로 세팅했습니다.
txt 파일을 로딩했기 때문에 기본으로 주어지는 컬럼이 하나 있고 그 이름은 value입니다. value를 중심으로 split한 뒤 word들의 리스트를 만들고 각 word가 별개의 레코드가 됩니다. value 컬럼의 이름은 word로 합니다. word 컬럼을 GROUPBY한 뒤 COUNT하여 df_count라는 새로운 데이터 프레임을 생성합니다.
header가 True인 csv 로딩 때와 달리 이번에는 show에서 앞선 작업들이 실제로 실행됩니다.
이 코드는 show()까지 실행될 때가 되어서야 1개의 Job을 만듭니다.
만약 show()가 없다면Job은 0개입니다. (Lazy Execution)
WordCount Stage Visualization
WordCount Query Visualization
JOIN 코드
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.config("spark.sql.adaptive.enabled", False) \
.config("spark.sql.shuffle.partitions", 3) \
.getOrCreate()
df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")
join_expr = df_large.id == df_small.id
join_df = df_large.join(df_small, join_expr, "inner")
join_df.show()
Job은 3개 만들어집니다. read.json에서 large_data, small_data를 처리할 때 2개, show에서 1개 만들어집니다.
SparkSQL 챕터에서 살펴본 JOIN 방식 중 Shuffle Hashing JOIN이 발생할 겁니다. 만일 df_small이 충분히 작다면 굳이 그 방식을 사용하는 것은 엄청난 낭비일 것입니다. Broadcasting JOIN이 더 적합하며 지금의 Shuffle JOIN의 시각화를 살펴본 후 이어서 Broadcast JOIN으로 넘어가겠습니다.
JOIN Stage Visualization
JOIN Query Visualization
BROADCAST JOIN 코드
spark.sql.adaptive.autoBroadcastJoinThreshold
from pyspark.sql.functions import broadcast
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.config("spark.sql.adaptive.enabled", False) \
.config("spark.sql.shuffle.partitions", 3) \
.getOrCreate()
df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")
join_expr = df_large.id == df_small.id
join_df = df_large.join(broadcast(df_small), join_expr, "inner")
join_df.show()
broadcast를 명시한 것 빼고는 나머지 모든 코드가 동일합니다. 명시하는 함수 호출 방법 외에도 옵션 설정으로 Spark이 알아서 최적화하며 작은 데이터 프레임을 broadcast하도록 할 수 있는데 그 때 사용하는 configuratioin이 spark.sql.adaptive.autoBroadcastJoinThreshold입니다. byte 크기이며 byte 크기 보다 작을 때 broadcast하게 됩니다.
BROADCAST JOIN Query Visualization
이러한 시각화는 Spark WEB UI에서 볼 수 있습니다. 로컬이라면 http://192.168.0.16:4040/SQL/execution/?id=0 링크에서 확인할 수 있습니다.
실습은 구글 Colab에서 진행하지 않도록 하겠습니다. Spark WEB UI를 구글 Colab에서 사용하는게 가능은 하지만 쉽지 않기 때문입니다.
'하둡과 Spark' 카테고리의 다른 글
요약 (0) | 2024.01.22 |
---|---|
Bucketing과 Partitioning (0) | 2024.01.20 |
Spark 내부동작 (Execution Plan) (0) | 2024.01.20 |
Spark 파일포맷 (0) | 2024.01.20 |
Intro (Spark 내부동작) (0) | 2024.01.20 |