Spark DataFrame 실습 4
Contents
1. Spark 데이터 처리
2. Spark 데이터 구조: RDD, DataFrame, Dataset
3. 프로그램 구조
4. 개발/실습 환경 소개
5. Spark DataFrame 실습
Spark DataFrame 실습
앞서 배운 내용들을 바탕으로 다양한 코딩 연습을 해보자
실습 4. Stackoverflow 서베이 기반 인기 언어 찾기
stackoverflow CSV파일에서 다음 두 필드는 ;를 구분자로 프로그래밍 언어를 구분
LanguageHaveWorkedWith
LanguageWantToWorkWith
이를 별개 레코드로 분리하여 가장 많이 사용되는 언어 top 50와 가장 많이 쓰고 싶은 언어 top 50를 계산해보기
실습
!pip install pyspark==3.3.1 py4j==0.10.9.5
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.jars", "/usr/local/lib/python3.7/dist-packages/pyspark/jars/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar") \
.getOrCreate()
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/survey_results_public.csv
!ls -tl
total 79208
drwxr-xr-x 1 root root 4096 Jan 12 19:20 sample_data
-rw-r--r-- 1 root root 81101949 Jan 15 2023 survey_results_public.csv
!head -5 survey_results_public.csv
ouput 이미지는 잘렸지만 굉장히 많은 필드가 존재합니다.
df = spark.read.csv("survey_results_public.csv", header=True).select('ResponseId', 'LanguageHaveWorkedWith', 'LanguageWantToWorkWith')
select로 필요한 3개의 필드만 가져옵니다.
df.printSchema()
root
|-- ResponseId: string (nullable = true)
|-- LanguageHaveWorkedWith: string (nullable = true)
|-- LanguageWantToWorkWith: string (nullable = true)
inferschema 등이 없으므로 string 타입인 상태입니다.
import pyspark.sql.functions as F
# LanguageHaveWorkedWith 값을 트림하고 ;를 가지고 나눠서 리스트의 형태로 language_have 필드로 설정
df2 = df.withColumn(
"language_have",
F.split(F.trim(F.col("LanguageHaveWorkedWith")), ";")
)
withColumn 함수로 language_have라는 새로운 컬럼을 만듭니다.
이 때 F.col("LanguageHaveWorkedWith")는 "LanguageHaveWorkedWith" 컬럼의 내용들을 가리킵니다.
이 컬럼 내용들을 trim()으로 각 레코드 앞 뒤의 공백 문자를 제거합니다.
그리고 그 결과를 split()함수를 이용해 세미콜론(;)를 구분으로 나누어 리스트로 만듭니다.
그렇게 나누어진 내용들을 language_have라는 새로운 컬럼의 내용으로 삼습니다.
df2.show(5)
language_have의 내용들이 LanguageHaveWorkedWith의 내용들이 ; 구분으로 나누어져 리스트 형태로 들어가있습니다.
# LanguageWantToWorkWith 값을 트림하고 ;를 가지고 나눠서 리스트의 형태로 language_want 필드로 설정
df3 = df2.withColumn(
"language_want",
F.split(F.trim(F.col("LanguageWantToWorkWith")), ";")
)
이번에는 LanguageWantToWorkWith로 language_want 컬럼을 만듭니다.
df3.printSchema()
root
|-- ResponseId: string (nullable = true)
|-- LanguageHaveWorkedWith: string (nullable = true)
|-- LanguageWantToWorkWith: string (nullable = true)
|-- language_have: array (nullable = true)
| |-- element: string (containsNull = false)
|-- language_want: array (nullable = true)
| |-- element: string (containsNull = false)
df3.show(5)
현재 많이 사용되는 언어들 찾기
df_language_have = df3.select(
df3.ResponseId,
F.explode(df3.language_have).alias("language_have")
)
df3의 ResponseId 컬럼과
df3의 language_have 컬럼을 explode시킨 결과들을 language_have라는 이름으로 하는 새로운 DataFrame을 만듭니다.
explode 함수는 리스트와 같은 형태의 인자들을 각각의 개별적인 레코드들로 만드는 함수입니다.
예를 들어 ResponseId 2번 레코드의 JavaScript와 Python을 각각 2개의 레코드로 만들게 됩니다.
ResponseId 컬럼이 같이 존재하기 때문에 explode로 분리시켜도 ResponseId 2번인 JavaScript, 2번인 Python으로 만들어질 것입니다.
df_language_have.show(10)
df_language_have.groupby("language_have").count().show(10)
language_have 컬럼으로 groupby하고 count하면 각 언어별 사용량을 알 수 있습니다.
그런데 sorting이 되어있으면 더 좋을 것 같습니다.
Sorting 두 가지 방법
sort & orderBy
ascending & descending
sort 사용
df_language_have.groupby("language_have").count().sort(F.desc("count")).collect()
sort 함수에 인자로 pyspark.sql.functions의 desc("count")을 주었습니다. "count"를 기준으로 내림차순한 결과를 반환합니다.
df_language_have.groupby("language_have").count().orderBy('count', ascending=False).collect()
orderBy 함수 인자로 정렬의 대상이 되는 컬럼명, 방향을 입력합니다.
df_language50_have = df_language_have.groupby("language_have")\
.count()\
.orderBy('count', ascending=False)\
.limit(50)
최다 사용 언어 상위 50개를 새로운 DataFrame으로 만듭니다.
df_language50_have.write.mode('overwrite').csv("language50_have")
HDFS에 csv 파일로 write합니다. mode('overwrite')은 이미 language50_have가 존재해도 덮어 쓰라는 명령입니다.
!ls -tl
total 79212
drwxr-xr-x 2 root root 4096 Jan 17 15:14 language50_have
drwxr-xr-x 1 root root 4096 Jan 12 19:20 sample_data
-rw-r--r-- 1 root root 81101949 Jan 15 2023 survey_results_public.csv
!ls -tl language50_have/
total 4
-rw-r--r-- 1 root root 0 Jan 17 15:14 _SUCCESS
-rw-r--r-- 1 root root 447 Jan 17 15:14 part-00000-5d084366-adef-4878-bbdf-b0e5ab14a0fe-c000.csv
이곳에도 part가 접두사로 붙은 데이터 블록(파일)이 존재합니다.
!cat language50_have/part-00000-5d084366-adef-4878-bbdf-b0e5ab14a0fe-c000.csv
JavaScript,53587
HTML/CSS,46259
Python,39792
SQL,38835
Java,29162
Node.js,27975
(생략..)
가장 배우고 싶은 언어들 찾기
df_language_want = df3.select(
df3.ResponseId,
F.explode(df3.language_want).alias("language_want")
)
df_language_want.show(5)
df_language_want.groupby("language_want").count().show(10)
df_language50_want = df_language_want.groupby("language_want").count().orderBy('count', ascending=False).limit(50)
df_language50_want.show(10)
df_language50_want.write.mode('overwrite').csv("language50_want")
!ls -tl language50_want/
total 4
-rw-r--r-- 1 root root 0 Jan 17 15:18 _SUCCESS
-rw-r--r-- 1 root root 449 Jan 17 15:18 part-00000-d007ac3a-a1d4-4361-b9a0-ec11ccf58b19-c000.csv