터칭 데이터

Spark 파일포맷 본문

하둡과 Spark

Spark 파일포맷

터칭 데이터 2024. 1. 20. 09:00

 

 

 

Contents


1. Spark 파일 포맷

Parquet, Avro, csv, json

 

2. Execution Plan

Spark이 우리가 만든 코드를 어떻게 실행할 지 보겠습니다.

Action과 Transformation의 차이를 살펴봅니다.

하나의 액션이 하나의 잡이고 하나의 잡이 다수의 스테이지를 만들 수 있고 하나의 스테이지가 다수의 태스크를 만들 수 있습니다.

 

그리고 이런 Execution Plan을 Spark Web UI로 살펴볼 수 있습니다.

 

3. Bucketing과 Partitioning

데이터를 처리하기 쉬운 형태로 HDFS에 저장하는 방식에 대해 살펴보겠습니다.

 

 

 

 

 

 

 

 

 

 

 

Spark 파일포맷


Spark에서 사용가능한 파일 포맷을 알아보자

 

 

 

 

 

 

 

 

데이터는 디스크에 파일로 저장됨: 일에 맞게 최적화 필요

 

 

 

 

 

 

 

 

 

 

 

Spark의 주요 파일 타입

 

1: 압축되면 Splittable 하지 않음 (압축 방식에 따라 다름 - snappy 압축이라면 Splittable)
2: Spark의 기본 파일 포맷

 

gzip으로 압축된 CSV, JSON 파일은 Splittable하지 않기 때문에 하나의 executor가 일단 처리하게 되며 메모리 에러가 날
확률이 높음

 

 

 

 

 

 

 

 

 

 

 

 

Parquet: Spark의 기본 파일 포맷

트위터와 클라우데라에서 공동 개발 (Doug Cutting)

 

 

 

 

 

 

 

 

 

 

 

DataFrame에서 다른 포맷 사용 방법 실습

 

DataFrame.write.format("avro"). …

 

DataFrame.write.format("parquet"). …

 

 

 

 

실습

 

 

 

 

 

!pip install pyspark==3.3.1 py4j==0.10.9.5

 

 

 

from pyspark.sql import *
from pyspark.sql.functions import *

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("Spark Writing Demo") \
        .master("local[3]") \
        .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.3.1") \
        .getOrCreate()

 

이번에는 config로 spark.jars.packages로 경로를 지정해줬는데 avro는 기본적으로 spark과 로딩이 되지 않기 때문에 별도로 지정을 해줬습니다.

 

 

 

!wget https://pyspark-test-sj.s3.us-west-2.amazonaws.com/appl_stock.csv

 

애플의 주식가격 데이터입니다.

 

 

 

 

df = spark.read \
    .format("csv") \
    .load("appl_stock.csv")

 

 

 

print("Num Partitions before: " + str(df.rdd.getNumPartitions()))
df.groupBy(spark_partition_id()).count().show()

 

파티션의 수와 파티션별 레코드의 수 확인

 

 

 

 

 

 

 

df2 = df.repartition(4)
print("Num Partitions after: " + str(df2.rdd.getNumPartitions()))
df2.groupBy(spark_partition_id()).count().show()

 

repartition으로 4개를 만들고 동일한 작업 수행

 

레코드의 수가 거의 4등분으로 분배

 

 

 

 

 

df3 = df2.coalesce(2)
print("Num Partitions after: " + str(df3.rdd.getNumPartitions()))
df3.groupBy(spark_partition_id()).count().show()

 

coalesce는 현재보다 적은 수로 파티션을 줄입니다. 이 때 셔플링을 최소화하는 방향으로 파티션의 수를 줄입니다.

 

 

 

 

 

 

 

 

df.write \
    .format("avro") \
    .mode("overwrite") \
    .option("path", "dataOutput/avro/") \
    .save()

 

1개 파티션 df는 avro

 

df2.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("path", "dataOutput/parquet/") \
    .save()

 

4개 파티션 df2는 parquet

 

df3.write \
    .format("json") \
    .mode("overwrite") \
    .option("path", "dataOutput/json/") \
    .save()

 

2개 파티션 df3는 json으로 저장

 

 

 

 

 

 

 

 

!ls -tl dataOutput/

 

 

 

 

 

!ls -tl dataOutput/parquet/

 

기본적으로 압축이 되지만 snappy로 압축되었는데 splittable한 압축이며 블록 단위로 나눌 수 있는 유용한 방식입니다.

 

 

 

!ls -tl dataOutput/avro/

 

 

 

!ls -tl dataOutput/json/

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Schema Evolution 소개

Parquet 파일 3개로 테스트

● schema1.parquet
● schema2.parquet
● schema3.parquet

 

 

 

 

 

 

 

 

 

실습

 

 

 

!pip install pyspark==3.3.1 py4j==0.10.9.5

 

 

 

from pyspark.sql import *
from pyspark.sql.functions import *

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("Spark Schema Evolution Demo") \
        .master("local[3]") \
        .getOrCreate()

 

 

 

!wget https://s3-geospatial.s3.us-west-2.amazonaws.com/schema1.parquet
!wget https://s3-geospatial.s3.us-west-2.amazonaws.com/schema2.parquet
!wget https://s3-geospatial.s3.us-west-2.amazonaws.com/schema3.parquet

 

 

 

 

 

 

df1 = spark.read. \
    parquet("schema1.parquet")
    
df1.printSchema()
df1.show()

(생략)

 

 

 

df2 = spark.read. \
    parquet("schema2.parquet")
    
df2.printSchema()
df2.show()

(생략)

 

 

 

 

 

df3 = spark.read. \
    parquet("schema3.parquet")
    
df3.printSchema()
df3.show()

(생략)

 

 

 

schema1 부터 3까지 오면서 컬럼이 한개 씩 더 늘어나는 것을 볼 수 있습니다.

 

원래는 3개의 파일을 동시에 로딩할 수 없습니다. 각 파일의 스키마가 다르기 때문이죠.

 

하지만 parquet처럼 schema evolution을 지원해주는 파일 포맷에서는 3개의 파일을 동시에 로딩하며 아래와 같은 문법을 사용할 수 있습니다.

 

df = spark.read. \
    option("mergeSchema", True). \
    parquet("*.parquet")
    
df.printSchema()

 

그러면서 옵션으로 mergeSchema를 True로 주었는데 이렇게 되면 3개의 다른 포맷(스키마)을 갖는 파일들이 마치 동일한 스키마를 갖는 파일처럼 하나의 데이터 프레임으로 로딩이 되고 스키마가 하나로 merge됩니다.

 

 

 

 

 

 

df.show(10)

 

정말로 3개의 다른 포맷을 갖고 있던 파일들이 원래 하나의 파일이었던 것 처럼 하나의 데이터 프레임으로 merge된 것을 볼 수 있습니다.

 

parquet과 avro가 갖는 schema evolution의 성능을 두 번째 실습으로 진행 해보았습니다.

 

 

 

 

 

'하둡과 Spark' 카테고리의 다른 글

Spark 내부동작 (Execution Plan) - 실습  (0) 2024.01.20
Spark 내부동작 (Execution Plan)  (0) 2024.01.20
Intro (Spark 내부동작)  (0) 2024.01.20
요약  (0) 2024.01.19
유닛테스트  (0) 2024.01.18