터칭 데이터

Spark DataFrame 실습 3 본문

하둡과 Spark

Spark DataFrame 실습 3

터칭 데이터 2024. 1. 17. 19:25

 

 

Contents

 

1. Spark 데이터 처리

 

2. Spark 데이터 구조: RDD, DataFrame, Dataset

 

3. 프로그램 구조

 

4. 개발/실습 환경 소개

 

5. Spark DataFrame 실습

 

 

 

 

 

 

 

 

 

Spark DataFrame 실습


앞서 배운 내용들을 바탕으로 다양한 코딩 연습을 해보자

 

 

 

 

 

 

 

 

 

실습 3. 텍스트를 파싱해서 구조화된 데이터로 변환하기

Regex를 이용해서 아래와 같이 변환해보는 것이 목표

 

입력: “On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling
    ○ regex 패턴: “On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)
        ■ \S (non-whitespace character), \d (numeric character)

 

정규표현식 regex로 컬러 코딩이 된 부분들만 추출하려 합니다. 컬러가 아닌 텍스트들은 모든 레코드에서 똑같습니다.

 

 

 

출력:

 

 

 

 

 

 

 

 

 

 

 

 

실습

 

 

!pip install pyspark==3.3.1 py4j==0.10.9.5

 

 

 

 

from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #3")
conf.set("spark.master", "local[*]")

spark = SparkSession.builder\
        .config(conf=conf)\
        .getOrCreate()

 

이번에는 SparkConf를 사용해  SparkSession을 생성했습니다.

 

 

 

 

!wget https://s3-geospatial.s3.us-west-2.amazonaws.com/transfer_cost.txt

 

사용할 transfer_cost.txt 파일을 다운 받았습니다.

 

 

 

 

!ls -tl

total 288
drwxr-xr-x 1 root root   4096 Jan 12 19:20 sample_data
-rw-r--r-- 1 root root 286779 Apr 24  2022 transfer_cost.txt

 

 

 

 

!head -5 transfer_cost.txt

On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85004 is $25.68 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85007 is 19.86 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85007 is 20.52 at Haul Today
On 2021-01-04 the cost per ton from 85001 to 85010 is 20.72 at Haul Today

 

이번에도 역시 헤더는 없습니다.

regex로 볼드체 부분을 추출할 겁니다.

 

 

 

 

import pyspark.sql.functions as F
from pyspark.sql.types import *

schema = StructType([ StructField("text", StringType(), True)])
transfer_cost_df = spark.read.schema(schema).text("transfer_cost.txt")

 

StructField로 text를 주었습니다. 그리고 이전 실습들에서는 schema(schema).csv()였지만 이번에는 .text()입니다.

 

 

transfer_cost_df.show(truncate=False)

 

show에 truncate 옵션을 False로 주었는데 기본적으로 필드의 크기가 일정 수준을 넘어가면 잘려서 표시되는데 그냥 다 보여주도록 하는 옵션입니다.

 

schema에서 정의한대로 text 필드로 들어왔습니다.

 

 

 

 

 

 

 

 

from pyspark.sql.functions import *
regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)'

df_with_new_columns = transfer_cost_df\
    .withColumn('week', regexp_extract('text', regex_str, 1))\
    .withColumn('departure_zipcode', regexp_extract(column('text'), regex_str, 2))\
    .withColumn('arrival_zipcode', regexp_extract(transfer_cost_df.text, regex_str, 3))\
    .withColumn('cost', regexp_extract(col('text'), regex_str, 4))\
    .withColumn('vendor', regexp_extract(col('text'), regex_str, 5))

 

정규 표현식을 적용할 DataFrame인 transfer_cost_df를 정의했으니 이제 이곳에서 정규 표현식으로 5개의 필드를 추출해 새로운 데이터 프레임에 추가하려 합니다.

 

이 때 withColumn 메서드를 사용합니다. 새로운 컬럼을 만들거나 존재하는 컬럼의 내용을 바꿀 수 있습니다.

 

첫 번째 파라미터는 추가하려는 새로운 컬럼의 이름 혹은 이미 존재하는 컬럼의 이름 입니다.

 

두 번재는 그 컬럼의 value 즉 어떤 값으로 채울 것인지 입니다. 예시에서 보이는 regexp는 흔히 말하는 UDF(User Defined Function)이며 UDF에 대해서는 이후에 설명하겠습니다. 다만 지금 실습 코드에서는 UDF가 아닌 pyspark.sql.functions 중 하나인 regexp_extract를 사용하겠습니다.

 

regexp_extract 함수에 대해 설명하겠습니다. 첫 번째 인자는 대상이 되는 데이터 프레임의 컬럼명입니다. 두 번째 인자는 어떤 방식으로 추출할지를 정의하는 정규 표현식입니다. 세 번째는 매칭 된 것들 중 몇 번째 매칭값을 가져올지입니다. 주의할 점은 0부터가 아닌 1부터 시작한다는 점입니다.

 

 

 

 

 

 

df_with_new_columns.printSchema()

root
 |-- text: string (nullable = true)
 |-- week: string (nullable = true)
 |-- departure_zipcode: string (nullable = true)
 |-- arrival_zipcode: string (nullable = true)
 |-- cost: string (nullable = true)
 |-- vendor: string (nullable = true)

 

regex를 적용시킨 뒤 스키마가 어떻게 바뀌었는지 보겠습니다. 원래는 text라는 컬럼 하나만 있었지만 week에서 vendor에 이르기까지 5개의 새로운 컬럼이 추가되었습니다. 더 이상 text라는 컬럼은 필요하지 않습니다.

 

 

 

 

final_df = df_with_new_columns.drop("text")

 

drop이라는 메서드로 text를 없앤 새로운 final_df라는 DataFrame을 만들었습니다.

 

 

 

 

 

 

 

 

final_df.write.csv("extracted.csv")

 

이렇게 만든 결과물 DataFrame을 csv 파일로 저장할 수 있습니다.

 

보통 저장을 HDFS나 S3와 같은 클라우드 스토리지에 하는 것이 일반적이지만 현재 우리는 실습을 LocalStandalone에서 하는 중이기 때문에 로컬에 파일로 저장합니다.

 

write.csv(경로)입니다.

 

 

 

 

!ls -tl

total 292
drwxr-xr-x 2 root root   4096 Jan 17 10:12 extracted.csv
drwxr-xr-x 1 root root   4096 Jan 12 19:20 sample_data
-rw-r--r-- 1 root root 286779 Apr 24  2022 transfer_cost.txt

 

실제 저장이 되었나 확인해보니 extracted.csv 파일이 보입니다.

 

그런데 drwxr-xr-x를 보면 파일이 아닌 디렉토리인데 그 이유는 큰 데이터 프레임을 프로세싱해 저장하면 HDFS 같은 곳에서는 데이터 블록 단위로 나누어 저장되기 때문입니다. 대개 128MB 단위로 나뉩니다. 그래서 디렉토리로 만들어지는 것이고 그 디렉토리로 가보면 해당 DataFrame의 내용이 여러 개의 파일로 저장되어 있습니다. 다만 이번 실습의 DataFrame은 크기가 크지 않기 때문에 파일 하나로 이루어져 있을 것입니다.

 

 

 

 

!ls -tl extracted.csv/

total 156
-rw-r--r-- 1 root root      0 Jan 17 10:12 _SUCCESS
-rw-r--r-- 1 root root 156423 Jan 17 10:12 part-00000-d559dcf0-7105-4307-9bc4-8a1fc47e4ea0-c000.csv

 

 

extracted.csv라는 디렉토리 내용을 보면 part로 시작하는 파일이 있습니다. 첫 번째 데이터 블록입니다.

 

 

 

!head -5 extracted.csv/part-00000-d559dcf0-7105-4307-9bc4-8a1fc47e4ea0-c000.csv

2021-01-04,85001,85002,$28.32,ABC Hauling
2021-01-04,85001,85004,$25.68,ABC Hauling
2021-01-04,85001,85007,19.86,ABC Hauling
2021-01-04,85001,85007,20.52,Haul Today
2021-01-04,85001,85010,20.72,Haul Today

 

part 파일의 내용을 살펴보겠습니다. part 뒤의 나머지 파일 이름 부분은 실행할 때마다 바뀌니 이 점 기억 해주세요.

 

우리가 정규 표현식대로 추출해 컬럼별로 정리한 형태의 레코드들을 확인할 수 있습니다.

 

 

 

 

 

csv가 아닌 json 형태로 저장하기

 

 

final_df.write.csv("extracted.csv")

 

아까는 write.csv()에 이름(경로)을 주었습니다.

 

 

final_df.write.format("json").save("extracted.json")

 

이번에는 롱폼 형태로 format(json)과 save(경로)를 주었습니다.

extracted.json이라는 디렉토리가 만들어졌고

 

 

 

!ls -tl extracted.json/

total 428
-rw-r--r-- 1 root root      0 Jan 17 10:24 _SUCCESS
-rw-r--r-- 1 root root 436305 Jan 17 10:24 part-00000-31371704-6038-4b2c-9aeb-6450321a5f8a-c000.json

 

part 파일이 만들어져 있습니다.

 

 

 

 

 

 

!head -1 extracted.json/part-00000-31371704-6038-4b2c-9aeb-6450321a5f8a-c000.json

{"week":"2021-01-04","departure_zipcode":"85001","arrival_zipcode":"85002","cost":"$28.32","vendor":"ABC Hauling"}

 

데이터들이 json 형태로 저장된 것을 볼 수 있습니다.

 

 

 

 

 

'하둡과 Spark' 카테고리의 다른 글

요약  (0) 2024.01.18
Spark DataFrame 실습 4  (0) 2024.01.18
Spark DataFrame 실습 2  (0) 2024.01.17
Spark DataFrame 실습 1  (0) 2024.01.17
Local Standalone REP 데모 - 윈도우(Windows)  (0) 2024.01.16