터칭 데이터

Bucketing과 Partitioning 본문

하둡과 Spark

Bucketing과 Partitioning

터칭 데이터 2024. 1. 20. 18:33

 

 

Contents


1. Spark 파일 포맷

Parquet, Avro, csv, json

 

2. Execution Plan

Spark이 우리가 만든 코드를 어떻게 실행할 지 보겠습니다.

Action과 Transformation의 차이를 살펴봅니다.

하나의 액션이 하나의 잡이고 하나의 잡이 다수의 스테이지를 만들 수 있고 하나의 스테이지가 다수의 태스크를 만들 수 있습니다.

 

그리고 이런 Execution Plan을 Spark Web UI로 살펴볼 수 있습니다.

 

3. Bucketing과 Partitioning

데이터를 처리하기 쉬운 형태로 HDFS에 저장하는 방식에 대해 살펴보겠습니다.

 

 

 

 

 

 

 

 

 

 

 

Bucketing과 Partitioning


HDFS 데이터를 처리 형태에 맞춰 최적화할 수 있다면 처리 시간을 단축하고 리소스를 덜 사용할 수 있다

 

 

 

 

 

 

 

 

Bucketing과 File System Partitioning 소개

 

둘다 Hive 메타스토어의 사용이 필요: saveAsTable

 

데이터 저장을 이후 반복처리에 최적화된 방법으로 하는 것

 

Bucketing

● 먼저 Aggregation이나 Window 함수나 JOIN에서 많이 사용되는 컬럼이 있는지?
● 있다면 데이터를 이 특정 컬럼(들)을 기준으로 테이블로 저장
    ▪ 이 때의 버킷의 수도 지정

셔플링을 줄이는 것이 목표입니다.

 

File System Partitioning

● 원래 Hive에서 많이 사용
● 데이터의 특정 컬럼(들)을 기준으로 폴더 구조를 만들어 데이터 저장 최적화
    ▪ 위의 컬럼들을 Partition Key라고 부름

 

우리가 이전에 봤던 partition을 얘기하는 것이 아닙니다! 이전의 partition은 데이터 프레임을 메모리에서 나눠 저장하는 단위였고 File System Partitioning은 파일 시스템에 저장되는 데이터를 특정 컬럼 혹은 컬럼들의 집합으로 나눠서 저장하는 것을 이야기 합니다. 내가 어떤 데이터를 쓰는 패턴이 특정 키를 중심으로 필터링 하는 등의 경우가 많다면 처음부터 데이터를 저장할 때 그 키를 중심으로 데이터를 저장함으로써 로딩과 필터링 부담을 줄일 수 있겠죠?

 

나눠 저장하는데 사용되는 키를 파티션 키(Partition Key)라고 부릅니다. 이 키는 하나의 컬럼일 필요는 없으며 다수의 컬럼이어도 상관 없습니다.

 

 

 

 

 

 

 

 

 

 

Bucketing

DataFrame을 특정 ID를 기준으로 나눠서 테이블로 저장

● 다음부터는 이를 로딩하여 사용함으로써 반복 처리시 시간 단축
    ▪ DataFrameWriter의 bucketBy 함수 사용
    • Bucket의 수와 기준 ID 지정
● 데이터의 특성을 잘 알고 있는 경우 사용 가능

 

 

 

 

 

 

 

 

 

 

 

 

 

 

File System Partitioning

 

데이터를 Partition Key 기반 폴더 (“Partition") 구조로 물리적으로 나눠 저장

● DataFrame에서 이야기하는 Partition
● Hive에서 사용하는 Partitioning을 말함

 

Partitioning의 예와 잇점

● 굉장히 큰 로그 파일을 데이터 생성시간 기반으로 데이터 읽기를 많이 한다면?
    1. 데이터 자체를 연도-월-일의 폴더 구조로 저장(연 밑에 월들, 월 밑에 일들)
    2. 보통 위의 구조로 이미 저장되는 경우가 많음
● 이를 통해 데이터를 읽기 과정을 최적화 (스캐닝 과정이 줄어들거나 없어짐)
● 데이터 관리도 쉬워짐 (Retention Policy 적용시)

 

DataFrameWriter의 partitionBy 사용

● Partition key를 잘못 선택하면 엄청나게 많은 파일들이 생성됨!

그러므로 Partition Key는 Cardinality가 낮은 것을 사용해야 합니다. Cardinality는 가능한 값의 경우의 수입니다.

 

 

 

 

 

 

 

 

 

 

 

 

File System Partitioning

 

 

 

 

 

 

 

 

 

 

 

데모


Bucketing Demo

py 파일로 Colab에서 실습 X

 


Partition Demo

 

 

 

 

!pip install pyspark==3.3.1 py4j==0.10.9.5

 

 

from pyspark.sql import *
from pyspark.sql.functions import *

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("Spark FS Partition Demo") \
        .master("local[3]") \
        .enableHiveSupport() \
        .getOrCreate()

 

 

 

!wget https://pyspark-test-sj.s3.us-west-2.amazonaws.com/appl_stock.csv

 

 

 

df = spark.read.csv("appl_stock.csv", header=True, inferSchema=True)
df.printSchema()

 

 

 

 

df.show(10)

 

 

 

df = df.withColumn("year", year(df.Date)) \
    .withColumn("month", month(df.Date))

 

 

 

 

spark.sql("DROP TABLE IF EXISTS appl_stock")

 

 

 

 

 

df.write.partitionBy("year", "month").saveAsTable("appl_stock")

 

 

 

 

!ls -tl

 

 

 

 

 

!ls -tl spark-warehouse/

 

 

 

 

 

!ls -tl spark-warehouse/appl_stock/

 

 

 

 

!ls -tl spark-warehouse/appl_stock/year\=2010/

 

 

 

 

!ls -tl spark-warehouse/appl_stock/year\=2010/month\=12/

 

 

 

 

 

 

 

 

 

 

 

 

How to Read from Partitioned table

 

df = spark.read.table("appl_stock").where("year = 2016 and month = 12")

 

 

 

 

df.show(10)

 

 

 

 

 

spark.sql("SELECT * FROM appl_stock WHERE year = 2016 and month = 12").show(10)

 

 

 

 

 

 

'하둡과 Spark' 카테고리의 다른 글

Intro  (0) 2024.01.22
요약  (0) 2024.01.22
Spark 내부동작 (Execution Plan) - 실습  (0) 2024.01.20
Spark 내부동작 (Execution Plan)  (0) 2024.01.20
Spark 파일포맷  (0) 2024.01.20