터칭 데이터
Spark DataFrame 실습 3 본문
Contents
1. Spark 데이터 처리
2. Spark 데이터 구조: RDD, DataFrame, Dataset
3. 프로그램 구조
4. 개발/실습 환경 소개
5. Spark DataFrame 실습
Spark DataFrame 실습
앞서 배운 내용들을 바탕으로 다양한 코딩 연습을 해보자
실습 3. 텍스트를 파싱해서 구조화된 데이터로 변환하기
Regex를 이용해서 아래와 같이 변환해보는 것이 목표
입력: “On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling”
○ regex 패턴: “On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)”
■ \S (non-whitespace character), \d (numeric character)
정규표현식 regex로 컬러 코딩이 된 부분들만 추출하려 합니다. 컬러가 아닌 텍스트들은 모든 레코드에서 똑같습니다.
출력:
실습
!pip install pyspark==3.3.1 py4j==0.10.9.5
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #3")
conf.set("spark.master", "local[*]")
spark = SparkSession.builder\
.config(conf=conf)\
.getOrCreate()
이번에는 SparkConf를 사용해 SparkSession을 생성했습니다.
!wget https://s3-geospatial.s3.us-west-2.amazonaws.com/transfer_cost.txt
사용할 transfer_cost.txt 파일을 다운 받았습니다.
!ls -tl
total 288
drwxr-xr-x 1 root root 4096 Jan 12 19:20 sample_data
-rw-r--r-- 1 root root 286779 Apr 24 2022 transfer_cost.txt
!head -5 transfer_cost.txt
On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85004 is $25.68 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85007 is 19.86 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85007 is 20.52 at Haul Today
On 2021-01-04 the cost per ton from 85001 to 85010 is 20.72 at Haul Today
이번에도 역시 헤더는 없습니다.
regex로 볼드체 부분을 추출할 겁니다.
import pyspark.sql.functions as F
from pyspark.sql.types import *
schema = StructType([ StructField("text", StringType(), True)])
transfer_cost_df = spark.read.schema(schema).text("transfer_cost.txt")
StructField로 text를 주었습니다. 그리고 이전 실습들에서는 schema(schema).csv()였지만 이번에는 .text()입니다.
transfer_cost_df.show(truncate=False)
show에 truncate 옵션을 False로 주었는데 기본적으로 필드의 크기가 일정 수준을 넘어가면 잘려서 표시되는데 그냥 다 보여주도록 하는 옵션입니다.
schema에서 정의한대로 text 필드로 들어왔습니다.
from pyspark.sql.functions import *
regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)'
df_with_new_columns = transfer_cost_df\
.withColumn('week', regexp_extract('text', regex_str, 1))\
.withColumn('departure_zipcode', regexp_extract(column('text'), regex_str, 2))\
.withColumn('arrival_zipcode', regexp_extract(transfer_cost_df.text, regex_str, 3))\
.withColumn('cost', regexp_extract(col('text'), regex_str, 4))\
.withColumn('vendor', regexp_extract(col('text'), regex_str, 5))
정규 표현식을 적용할 DataFrame인 transfer_cost_df를 정의했으니 이제 이곳에서 정규 표현식으로 5개의 필드를 추출해 새로운 데이터 프레임에 추가하려 합니다.
이 때 withColumn 메서드를 사용합니다. 새로운 컬럼을 만들거나 존재하는 컬럼의 내용을 바꿀 수 있습니다.
첫 번째 파라미터는 추가하려는 새로운 컬럼의 이름 혹은 이미 존재하는 컬럼의 이름 입니다.
두 번재는 그 컬럼의 value 즉 어떤 값으로 채울 것인지 입니다. 예시에서 보이는 regexp는 흔히 말하는 UDF(User Defined Function)이며 UDF에 대해서는 이후에 설명하겠습니다. 다만 지금 실습 코드에서는 UDF가 아닌 pyspark.sql.functions 중 하나인 regexp_extract를 사용하겠습니다.
regexp_extract 함수에 대해 설명하겠습니다. 첫 번째 인자는 대상이 되는 데이터 프레임의 컬럼명입니다. 두 번째 인자는 어떤 방식으로 추출할지를 정의하는 정규 표현식입니다. 세 번째는 매칭 된 것들 중 몇 번째 매칭값을 가져올지입니다. 주의할 점은 0부터가 아닌 1부터 시작한다는 점입니다.
df_with_new_columns.printSchema()
root
|-- text: string (nullable = true)
|-- week: string (nullable = true)
|-- departure_zipcode: string (nullable = true)
|-- arrival_zipcode: string (nullable = true)
|-- cost: string (nullable = true)
|-- vendor: string (nullable = true)
regex를 적용시킨 뒤 스키마가 어떻게 바뀌었는지 보겠습니다. 원래는 text라는 컬럼 하나만 있었지만 week에서 vendor에 이르기까지 5개의 새로운 컬럼이 추가되었습니다. 더 이상 text라는 컬럼은 필요하지 않습니다.
final_df = df_with_new_columns.drop("text")
drop이라는 메서드로 text를 없앤 새로운 final_df라는 DataFrame을 만들었습니다.
final_df.write.csv("extracted.csv")
이렇게 만든 결과물 DataFrame을 csv 파일로 저장할 수 있습니다.
보통 저장을 HDFS나 S3와 같은 클라우드 스토리지에 하는 것이 일반적이지만 현재 우리는 실습을 LocalStandalone에서 하는 중이기 때문에 로컬에 파일로 저장합니다.
write.csv(경로)입니다.
!ls -tl
total 292
drwxr-xr-x 2 root root 4096 Jan 17 10:12 extracted.csv
drwxr-xr-x 1 root root 4096 Jan 12 19:20 sample_data
-rw-r--r-- 1 root root 286779 Apr 24 2022 transfer_cost.txt
실제 저장이 되었나 확인해보니 extracted.csv 파일이 보입니다.
그런데 drwxr-xr-x를 보면 파일이 아닌 디렉토리인데 그 이유는 큰 데이터 프레임을 프로세싱해 저장하면 HDFS 같은 곳에서는 데이터 블록 단위로 나누어 저장되기 때문입니다. 대개 128MB 단위로 나뉩니다. 그래서 디렉토리로 만들어지는 것이고 그 디렉토리로 가보면 해당 DataFrame의 내용이 여러 개의 파일로 저장되어 있습니다. 다만 이번 실습의 DataFrame은 크기가 크지 않기 때문에 파일 하나로 이루어져 있을 것입니다.
!ls -tl extracted.csv/
total 156
-rw-r--r-- 1 root root 0 Jan 17 10:12 _SUCCESS
-rw-r--r-- 1 root root 156423 Jan 17 10:12 part-00000-d559dcf0-7105-4307-9bc4-8a1fc47e4ea0-c000.csv
extracted.csv라는 디렉토리 내용을 보면 part로 시작하는 파일이 있습니다. 첫 번째 데이터 블록입니다.
!head -5 extracted.csv/part-00000-d559dcf0-7105-4307-9bc4-8a1fc47e4ea0-c000.csv
2021-01-04,85001,85002,$28.32,ABC Hauling
2021-01-04,85001,85004,$25.68,ABC Hauling
2021-01-04,85001,85007,19.86,ABC Hauling
2021-01-04,85001,85007,20.52,Haul Today
2021-01-04,85001,85010,20.72,Haul Today
part 파일의 내용을 살펴보겠습니다. part 뒤의 나머지 파일 이름 부분은 실행할 때마다 바뀌니 이 점 기억 해주세요.
우리가 정규 표현식대로 추출해 컬럼별로 정리한 형태의 레코드들을 확인할 수 있습니다.
csv가 아닌 json 형태로 저장하기
final_df.write.csv("extracted.csv")
아까는 write.csv()에 이름(경로)을 주었습니다.
final_df.write.format("json").save("extracted.json")
이번에는 롱폼 형태로 format(json)과 save(경로)를 주었습니다.
extracted.json이라는 디렉토리가 만들어졌고
!ls -tl extracted.json/
total 428
-rw-r--r-- 1 root root 0 Jan 17 10:24 _SUCCESS
-rw-r--r-- 1 root root 436305 Jan 17 10:24 part-00000-31371704-6038-4b2c-9aeb-6450321a5f8a-c000.json
part 파일이 만들어져 있습니다.
!head -1 extracted.json/part-00000-31371704-6038-4b2c-9aeb-6450321a5f8a-c000.json
{"week":"2021-01-04","departure_zipcode":"85001","arrival_zipcode":"85002","cost":"$28.32","vendor":"ABC Hauling"}
데이터들이 json 형태로 저장된 것을 볼 수 있습니다.
'하둡과 Spark' 카테고리의 다른 글
요약 (0) | 2024.01.18 |
---|---|
Spark DataFrame 실습 4 (0) | 2024.01.18 |
Spark DataFrame 실습 2 (0) | 2024.01.17 |
Spark DataFrame 실습 1 (0) | 2024.01.17 |
Local Standalone REP 데모 - 윈도우(Windows) (0) | 2024.01.16 |