터칭 데이터

기타기능 Intro와 Broadcast Variable 본문

SparkML

기타기능 Intro와 Broadcast Variable

터칭 데이터 2024. 2. 5. 13:20

 

Spark 고급과 Spark ML


Shuffling시 Skew 처리방식과 Spark ML에 대해 배워보자

 

 

 

 

 

 

 

 

Contents


1. Spark 기타 기능과 메모리 관리
2. Spark Shuffling 최적화
3. Spark Partition 학습
4. Spark ML 소개와 ML 모델 빌딩
5. ML Pipeline과 Tuning 소개와 실습

 

 

 

 

 

 

기타 기능/개념 살펴보기

 

자주 필요하지는 않지만 알아두면 좋은 기능들을 살펴보자

 

 

 

 

 

 

살펴볼 기능과 개념

 

Broadcast Variable

 

Accumulators

 

Speculative Execution

 

Scheduler

 

Dynamic Resource Allocation

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Broadcast Variable이란 무엇인가?

룩업 테이블등을 브로드캐스팅하여 셔플링을 막는 방식으로 사용

● 브로드캐스트 조인에서 사용되는 것과 동일한 테크닉
● 대부분 룩업 테이블 (혹은 디멘션 테이블 - 10-20MB)을 Executor로 전송하는데 사용
    ▪ 많은 DB에서 스타 스키마 형태로 팩트 테이블과 디멘션 테이블을 분리
● spark.sparkContext.broadcast를 사용

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Broadcast Variable: 룩업 테이블(파일)을 UDF로 보내는 방법

 

Closure

● Serialization이 태스크 단위로 일어남
● UDF안에서 파이썬 데이터 구조를 사용하는 경우

 

❖ Broadcast

● Serialization이 Worker Node 단위로 일어남 (그 안에서 캐싱되기에 훨씬 더 효율적)
● UDF안에서 브로드캐스트된 데이터 구조를 사용하는 경우

 

❖ Broadcast 데이터셋의 특징

● Worker node로 공유되는 변경 불가 데이터
● Worker node별로 한번 공유되고 캐싱됨
● 제약점은 Task Memory안에 들어갈 수 있어야함

 

 

 

 

 

 

 

 

 

 

 

 

 

Broadcast Variable: 예제

 

 

가장 인기있는 마벨 수퍼히로를 찾는 예제를 보자

 

수퍼히로의 ID를 찾고 그에 해당하는 이름을 찾아야함

● 이때 룩업 테이블을 DataFrame으로 로딩하고 조인을 하는 것도 방법
● 아니면 룩업 테이블을 브로드캐스트하여 UDF안에서 사용하는 것도 방법

 

예제코드

broadcast-for-udf/demo.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


def my_func(code: str) -> str:
    # return prdCode.get(code)
    return bdData.value.get(code)


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("Demo") \
        .master("local[3]") \
        .getOrCreate()

    prdCode = spark.read.csv("data/lookup.csv").rdd.collectAsMap()

    bdData = spark.sparkContext.broadcast(prdCode)

    data_list = [("98312", "2021-01-01", "1200", "01"),
                 ("01056", "2021-01-02", "2345", "01"),
                 ("98312", "2021-02-03", "1200", "02"),
                 ("01056", "2021-02-04", "2345", "02"),
                 ("02845", "2021-02-05", "9812", "02")]
    df = spark.createDataFrame(data_list) \
        .toDF("code", "order_date", "price", "qty")

    spark.udf.register("my_udf", my_func, StringType())
    df.withColumn("Product", expr("my_udf(code)")) \
        .show()

 

 

 

 

 

 

'SparkML' 카테고리의 다른 글

Spark Scheduler  (0) 2024.02.05
Resource Dynamic Allocation  (0) 2024.02.05
Speculative Execution  (0) 2024.02.05
Accumulators  (0) 2024.02.05
Intro  (0) 2024.02.05