터칭 데이터

유닛테스트 본문

하둡과 Spark

유닛테스트

터칭 데이터 2024. 1. 18. 23:54

 

 

Contents

 

1. Spark SQL 소개

 

2. Aggregation, JOIN, UDF

 

3. Spark SQL 실습

 

4. Hive 메타스토어 사용하기

 

5. 유닛 테스트

 

 

 

 

 

 

 

 

 

 

 

유닛 테스트


DataFrame 코드에 테스트를 추가해보자

 

 

 

 

 

유닛 테스트란?

 

코드 상의 특정 기능 (보통 메소드의 형태)을 테스트하기 위해 작성된 코드

 

보통 정해진 입력을 주고 예상된 출력이 나오는지 형태로 테스트

 

CI/CD를 사용하려면 전체 코드의 테스트 커버러지가 굉장히 중요해짐

 

각 언어별로 정해진 테스트 프레임웍을 사용하는 것이 일반적

● JUnit for Java
● NUnit for .NET
● unittest for Python

 

unittest를 사용해볼 예정

 

 

 

 

 

https://www.udemy.com/course/apache-spark-programming-in-python-for-beginners/learn/lecture/20554784#overview

 

 

 

 

 

 

 

 

 

 

 

 

실습

 

 

 

!pip install pyspark==3.3.1 py4j==0.10.9.5

 

 

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark Unit Test") \
    .getOrCreate()

 

 

 

!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv

 

 

 

 

df = spark.read.option("header", True).csv("name_gender.csv")
df.printSchema()

 

 

 

df.count()

 

 

 

 

df.createOrReplaceTempView("namegender")
spark.sql("SELECT gender, COUNT(1) count FROM namegender GROUP BY 1").show()

 

 

 

 

 

 

upper_udf_f UDF를 테스트해볼 예정

 

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import *
import pandas as pd

# Define the UDF
@pandas_udf(StringType())
def upper_udf_f(s: pd.Series) -> pd.Series:
    return s.str.upper()

upperUDF = spark.udf.register("upper_udf", upper_udf_f)

 

1) 입력된 레코드들이 정말 대문자로 return되는지 테스트와

 

 

 

load_gender와 get_gender_count 함수를 테스트해볼 예정

def load_gender(spark, file_path):
    return spark.read.option("header", True).csv(file_path)

def get_gender_count(spark, df, field_to_count):
    df.createOrReplaceTempView("namegender_test")
    return spark.sql(f"SELECT {field_to_count}, COUNT(1) count FROM namegender_test GROUP BY 1")

 

2) namegender_test의 COUNT가 100개 인지

 

3) 각 gender의 수가 65, 28, 7과 같이 나오는지 테스트 총 2개를 진행하려 합니다.

 

 

 

 

 

 

 

 

 

 

 

df = load_gender(spark, "name_gender.csv")
get_gender_count(spark, df, "gender").show()
df.select(upperUDF("name").alias("NAME")).show()

 

2)와 3)을 테스트했습니다.

 

 

 

 

 

 

 

 

 

df.select(upperUDF("name").alias("NAME")).collect()

 

1) 까지 테스트 했습니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

유닛 테스트 코드 붙여보기

 

일반적으로 구글 Colab에서 테스트를 하지는 않습니다. 보통은 테스트하는 코드를 따로 만든 뒤 그 코드로 내가 테스트하고 싶은 함수를 import해 사용하는 것이 일반적입니다.

 

아래와 같이 테스트 하려는 코드와 테스트 코드가 한 몸에 같이 있지는 않습니다.

 

그러므로 보통은 CLI 환경에서 테스트하는 것이 일반적입니다.

 

from unittest import TestCase

# 일반적으로는 아래 함수가 정의된 모듈을 임포트하고 그걸 테스트
#  - upper_udf_f
#  - load_gender
#  - get_gender_count

class UtilsTestCase(TestCase):
    spark = None

    @classmethod
    def setUpClass(cls) -> None:
        cls.spark = SparkSession.builder \
            .appName("Spark Unit Test") \
            .getOrCreate()

    def test_datafile_loading(self):
        sample_df = load_gender(self.spark, "name_gender.csv")
        result_count = sample_df.count()
        self.assertEqual(result_count, 100, "Record count should be 100")

    def test_gender_count(self):
        sample_df = load_gender(self.spark, "name_gender.csv")
        count_list = get_gender_count(self.spark, sample_df, "gender").collect()
        count_dict = dict()
        for row in count_list:
            count_dict[row["gender"]] = row["count"]
        self.assertEqual(count_dict["F"], 65, "Count for F should be 65")
        self.assertEqual(count_dict["M"], 28, "Count for M should be 28")
        self.assertEqual(count_dict["Unisex"], 7, "Count for Unisex should be 7")

    def test_upper_udf(self):
        test_data = [
            { "name": "John Kim" },
            { "name": "Johnny Kim"},
            { "name": "1234" }
        ]
        expected_results = [ "JOHN KIM", "JOHNNY KIM", "1234" ]

        upperUDF = self.spark.udf.register("upper_udf", upper_udf_f)
        test_df = self.spark.createDataFrame(test_data)
        names = test_df.select("name", upperUDF("name").alias("NAME")).collect()
        results = []
        for name in names:
            results.append(name["NAME"])
        self.assertCountEqual(results, expected_results)

    @classmethod
    def tearDownClass(cls) -> None:
        cls.spark.stop()

 

 

import unittest

unittest.main(argv=[''], verbosity=2, exit=False)

 

 

 

 

 

 

'하둡과 Spark' 카테고리의 다른 글

Intro (Spark 내부동작)  (0) 2024.01.20
요약  (0) 2024.01.19
Hive - 메타스토어 사용하기  (0) 2024.01.18
Spark SQL 실습 1 (JOIN)  (0) 2024.01.18
UDF 실습  (0) 2024.01.18