터칭 데이터
UDF(User Defined Function) 본문
Contents
1. Spark SQL 소개
2. Aggregation, JOIN, UDF
3. Spark SQL 실습
4. Hive 메타스토어 사용하기
5. 유닛 테스트
Aggregation, JOIN, UDF
다양한 Aggregation과 JOIN 방식과 UDF에 대해 살펴보자
UDF(User Defined Function) 사용해보기
데이터프레임의 경우 .withColumn 함수와 같이 사용하는 것이 일반적
Spark SQL에서도 사용 가능함
Aggregation용 UDAF(User Defined Aggregation Function)도 존재
GROUP BY에서 사용되는 SUM, AVG와 같은 함수를 만드는 것
PySpark에서 지원되지 않음. Scalar/Java를 사용해야 함
UDF란 무엇인가?
User Defined Function
DataFrame이나 SQL에서 적용할 수 있는 사용자 정의 함수
Scalar 함수 vs. Aggregation 함수
● Scalar 함수 예: UPPER, LOWER, …
● Aggregation 함수 (UDAF) 예: SUM, MIN, MAX
UDF 사용 방법 (1)
함수 구현
● 파이썬 람다 함수
● 파이썬 (보통) 함수
● 파이썬 판다스 함수:
- pyspark.sql.functions.pandas_udf로 annotation
- Apache Arrow를 사용해서 파이썬 객체를 자바 객체로 변환이 훨씬 더 효율적
UDF 사용 방법 (2)
함수 등록
● pyspark.sql.functions.udf
- DataFrame에서만 사용 가능
● spark.udf.register
- SQL 모두에서 사용 가능
함수 사용
● .withColumn, .agg
● SQL
성능이 중요하다면?
Scala나 Java로 구현하는 것이 제일 좋음
파이썬을 사용해야한다면 Pandas UDF로 구현
UDF - DataFrame에 사용해보기 #1
람다 함수로 사용
import pyspark.sql.functions as F
from pyspark.sql.types import *
upperUDF = F.udf(lambda z:z.upper())
df.withColumn("Curated Name", upperUDF("Name"))
파이썬 함수로 생성한 후 Spark SQL에서 사용
def upper(s):
return s.upper()
# 먼저 테스트
upperUDF = spark.udf.register("upper", upper)
spark.sql("SELECT upper('aBcD')").show()
# DataFrame 기반 SQL에 적용
df.createOrReplaceTempView("test")
spark.sql("""SELECT name, upper(name) "Curated Name" FROM test""").show()
UDF - DataFram에 사용해보기 #2
람다 함수로 사용
data = [
{"a": 1, "b": 2},
{"a": 5, "b": 5}
]
df = spark.createDataFrame(data)
df.withColumn("c", F.udf(lambda x, y: x + y)("a", "b"))
파이썬 함수로 생성한 후 Spark SQL에서 사용
def plus(x, y):
return x + y
plusUDF = spark.udf.register("plus", plus)
spark.sql("SELECT plus(1, 2)").show()
df.createOrReplaceTempView("test")
spark.sql("SELECT a, b, plus(a, b) c FROM test").show()
앞에서 파이썬 람다 함수나 파이썬 일반 함수로 UDF를 만들어 사용했습니다.
그런데 이 두 방식은 Scala 함수만 만들 수 있고 Aggregation 함수는 만들 수 없습니다. 또한 속도도 상대적으로 느립니다.
그래서 pyspark에서 제공하는 pandas_udf로 함수를 만드는 것이 더 빠르고 Aggregation함수 역시 만들 수 있으므로 더욱 권장되는 방식입니다.
UDF - Pandas UDF Scalar 함수 사용해보기
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(StringType())
def upper_udf2(s: pd.Series) -> pd.Series:
return s.str.upper()
upperUDF = spark.udf.register("upper_udf", upper_udf2)
df.select("Name", upperUDF("Name")).show()
spark.sql("""SELECT name, upper_udf(name) `Curated Name` FROM test""").show()
기본적으로 앞의 람다 함수와 파이썬 일반 함수와 마찬가지로 레코드를 하나씩 입력 받는 구조가 아닌 레코드 집합을 받아 프로세싱하는 방식이며 레코드 집합은 판다스의 Series 형태입니다. 그렇기 때문에 pandas를 pd로 import합니다.
import한 pandas_udf를 annotation해 대문자화 하는 upper_udf2라는 함수를 만들고 이를 등록해서 사용했습니다.
(s: pd.Series) -> pd.Series와 같이 값이 하나 들어오는 것이 아닌 값들의 집합이 들어오고 값들의 집합이 나가므로 Scala라고 부릅니다.
Aggregation이라면 값들의 집합이 들어오고 값 하나가 나가겠죠? 이어서 보도록 하겠습니다.
UDF - DataFrame/SQL에 Aggregation 사용해보기
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(FloatType())
def average(v: pd.Series) -> float:
return v.mean()
averageUDF = spark.udf.register('average', average)
spark.sql('SELECT average(b) FROM test').show()
df.agg(averageUDF("b").alias("count")).show()
UDF 실습
앞서 UDF를 실습
하나의 레코드로부터 다수의 레코드 만들어내기
Order 데이터의 items 필드에서 다수의 Order Item 레코드를 만들기
'하둡과 Spark' 카테고리의 다른 글
Spark SQL 실습 1 (JOIN) (0) | 2024.01.18 |
---|---|
UDF 실습 (0) | 2024.01.18 |
Aggregation - JOIN (0) | 2024.01.18 |
Spark SQL 소개 (0) | 2024.01.18 |
Intro (0) | 2024.01.18 |