터칭 데이터
개발환경소개 Colab 설정과 코딩 데모 본문
Contents
1. Spark 데이터 처리
2. Spark 데이터 구조: RDD, DataFrame, Dataset
3. 프로그램 구조
4. 개발/실습 환경 소개
5. Spark DataFrame 실습
개발/실습 환경 소개
Spark 개발 환경에 대해 알아보자
Spark 개발 환경 옵션
Local Standalone Spark + Spark Shell
Python IDE – PyCharm, Visual Studio
Databricks Cloud – 커뮤니티 에디션을 무료로 사용
다른 노트북 – 주피터 노트북, 구글 Colab, 아나콘다 등등
구글 Colab을 기본 환경으로 사용하고 간혹 Local Standalone 모드를 사용할 예정
Local Standalone Spark
Spark Cluster Manager로 local[n] 지정
master를 local[n]으로 지정
master는 클러스터 매니저를 지정하는데 사용
주로 개발이나 간단한 테스트 용도
하나의 JVM에서 모든 프로세스를 실행
하나의 Driver와 하나의 Executor가 실행됨
1+ 쓰레드가 Executor안에서 실행됨
Executor안에 생성되는 쓰레드 수
local:하나의 쓰레드만 생성
local[*]: 컴퓨터 CPU 수만큼 쓰레드를
Spark 잡을 실행할 때 master를 local[3]으로 지정한 경우
구글 Colab에서 Spark 사용
PySpark + Py4J를 설치
구글 Colab 가상서버 위에 로컬 모드 Spark을 실행
개발 목적으로는 충분하지만 큰 데이터의 처리는 불가
Spark Web UI는 기본적으로는 접근 불가
- ngrok을 통해 억지로 열 수는 있음
Py4J
- 파이썬에서 JVM내에 있는 자바 객체를 사용가능하게 해줌
데모: Colab 예제
데모
기본 세팅
PySpark을 로컬머신에 설치하고 노트북을 사용하기 보다는 머신러닝 관련 다양한 라이브러리가 이미 설치되었고 좋은 하드웨어를 제공해주는 Google Colab을 통해 실습을 진행한다. 이를 위해 pyspark과 Py4J 패키지를 설치한다. Py4J 패키지는 파이썬 프로그램이 자바가상머신상의 오브젝트들을 접근할 수 있게 해준다. Local Standalone Spark을 사용한다.
!pip install pyspark==3.3.1 py4j==0.10.9.5
설치를 하면 이와 동시에 Local Standalone Spark이 설치가 되고 하나의 드라이버와 하나의 Executor를 갖고 그 Executor안의 스레드(task)의 수는 Spark Session이라는 객체를 만들 때 지정됩니다.
pyspark 정확하게는 Local Standalone Spark이 설치합니다.
Spark Session: SparkSession은 Spark 2.0부터 엔트리 포인트로 사용된다. SparkSession을 이용해 RDD, 데이터 프레임등을 만든다. SparkSession은 SparkSession.builder를 호출하여 생성하며 다양한 함수들을 통해 세부 설정이 가능하다
local[*] Spark이 하나의 JVM으로 동작하고 그 안에 컴퓨터의 코어 수 만큼의 스레드가 Executor로 동작한다
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark Tutorial')\
.getOrCreate()
방금 우리가 만든 Local 모드의 Spark와 커뮤니케이션을 하기 위해서는, 정확하게는 Spark Cluster에게 명령을 내리기 위해서는 방금 설치한 pyspark.sql 밑의 SparkSession 모듈을 import하고 이 SparkSession이 Spark에게 명령을 내리는 엔트리 포인트입니다. SparkSession 객체(Object)를 통해 통신을 한다 생각하시면 됩니다.
SparkSession은 싱글톤 오브젝트라고 말씀드렸습니다. 딱 하나만 만들 수 있습니다.
만든 spark 변수를 살펴보겠습니다.
spark
SparkSession - in-memory
SparkContext
Spark UI 원래는 WebUI 접속이 가능한 링크이지만 Colab 환경이므로 현재는 막혀있습니다.
Version
v3.3.1
Master
local[*]
AppName
PySpark Tutorial
버전과 Spark의 Master가 사용 중인 리소스 매니저(local 머신)과 Application 이름 PySpark Tutorial을 알 수 있습니다.
현재 리눅스 서버에서 돌고 있기 때문에 컴퓨터 사양을 볼 수 있습니다.
!lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Address sizes: 46 bits physical, 48 bits virtual
Byte Order: Little Endian
CPU(s): 2
On-line CPU(s) list: 0,1
Vendor ID: GenuineIntel
Model name: Intel(R) Xeon(R) CPU @ 2.20GHz
CPU family: 6
Model: 79
Thread(s) per core: 2
Core(s) per socket: 1
Socket(s): 1
Stepping: 0
BogoMIPS: 4399.99
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clf
lush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_
good nopl xtopology nonstop_tsc cpuid tsc_known_freq pni pclmulqdq ssse3 fm
a cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hyp
ervisor lahf_lm abm 3dnowprefetch invpcid_single ssbd ibrs ibpb stibp fsgsb
ase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm rdseed adx smap xsa
veopt arat md_clear arch_capabilities
Virtualization features:
Hypervisor vendor: KVM
Virtualization type: full
Caches (sum of all):
L1d: 32 KiB (1 instance)
L1i: 32 KiB (1 instance)
L2: 256 KiB (1 instance)
L3: 55 MiB (1 instance)
NUMA:
NUMA node(s): 1
NUMA node0 CPU(s): 0,1
Vulnerabilities:
Gather data sampling: Not affected
Itlb multihit: Not affected
L1tf: Mitigation; PTE Inversion
Mds: Vulnerable; SMT Host state unknown
Meltdown: Vulnerable
Mmio stale data: Vulnerable
Retbleed: Vulnerable
Spec rstack overflow: Not affected
Spec store bypass: Vulnerable
Spectre v1: Vulnerable: __user pointer sanitization and usercopy barriers only; no swap
gs barriers
Spectre v2: Vulnerable, IBPB: disabled, STIBP: disabled, PBRSB-eIBRS: Not affected
Srbds: Not affected
Tsx async abort: Vulnerable
CPU가 2개임을 볼 수 있습니다. 이 Spark cluster 안의 Executor는 2개의 스레드(태스크)를 실행할 수 있게 세팅된다는 뜻입니다.
이 서버가 갖고 있는 메모리를 볼 수 있습니다.
!grep MemTotal /proc/meminfo
MemTotal: 13290472 kB
13GB를 사용할 수 있습니다.
프로그래밍
SparkSession을 만들었으니 Spark 프로그래밍을 할 수 있게 되었습니다.
Python 리스트를 RDD로 parallelize 함수를 이용해 로딩하고 RDD 결과를 collect 함수로 파이썬 쪽으로 가져오는 코딩을 한번 실습해보겠습니다.
또 RDD를 Spark상의 데이터 프레임으로 어떻게 바꿀 수 있는지 얘기했었는데 실제 메서드도 한번 살펴 보겠습니다.
Python ↔ RDD ↔ DataFrame
1. Python 리스트를 생성합니다.
name_list_json = [ '{"name": "keeyong"}', '{"name": "benjamin"}', '{"name": "claire"}' ]
리스트 안에 딕셔너리 자료가 있는 것 같지만 사실 '로 감싸진 문자열들입니다.
for n in name_list_json:
print(n)
{"name": "keeyong"}
{"name": "benjamin"}
{"name": "claire"}
2. 파이썬 리스트를 RDD로 변환
RDD로 변환되는 순간 Spark 클러스터의 서버들에 데이터가 나눠 저장됨 (파티션)
rdd = spark.sparkContext.parallelize(name_list_json)
rdd
ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274
단순히 parallelize만으로는 아무 일도 하지 않다가 쓰기, 읽기, show 등의 실제 데이터가 계산되어야 가능한 작업들을 진행할 때만 미뤄두었던 작업을 실행합니다.
rdd.count()
3
방금 만든 rdd에 몇개의 레코드가 있는지 카운트하는 count()를 실행해야 executing이라는 상태가 잠깐 뜨며 그제서야 실제로 name_list_json이라는 파이썬 리스트가 Spark cluster에 올라가는 작업이 실행됩니다.
람다 lambda
import json
parsed_rdd = rdd.map(lambda el:json.loads(el))
rdd는 람다 함수를 사용하기 편합니다.
문자열로 존재하던
{"name": "keeyong"}
{"name": "benjamin"}
{"name": "claire"}
데이터들을 실제로 json 형태로 바꿔주는 json.loads를 람다로 rdd 데이터들에게 적용하는 코드입니다.
rdd를 비롯한 Spark 데이터 구조(DataFrame, Dataset 등도 마찬가지)는 불변(Immutable)하므로 rdd를 변경할 수는 없고 새로운 parsed_rdd를 만듭니다.
parsed_rdd
PythonRDD[2] at RDD at PythonRDD.scala:53
위에서 한번 찍어 보았던 rdd와 마찬가지로 parsed_rdd는 현재 파이썬 프로그램이 돌고 있는 드라이버가 아닌 Spark cluster에 있기 때문에 Colab에서는 자세한 정보를 알 수 없습니다.
parsed_rdd.collect()
[{'name': 'keeyong'}, {'name': 'benjamin'}, {'name': 'claire'}]
collect() 함수를 사용하니 Spark 클러스터에서 rdd를 json 스트링에서 dict 형태로 람다 함수로 바꾼 결과를 가져옵니다.
한 번 더 나아가
parsed_name_rdd = rdd.map(lambda el:json.loads(el)["name"])
rdd의 데이터의 key인 "name"의 value를 json 형태로 파싱과 동시에 가져와 parsed_name_rdd로 변수에 담고
parsed_name_rdd.collect()
'keeyong', 'benjamin', 'claire']
collect() 함수로 가져오니 키 'name'에 해당하는 value들을 리스트로 가져왔습니다.
파이썬 리스트를 데이터프레임으로 변환하기
from pyspark.sql.types import StringType
df = spark.createDataFrame(name_list_json, StringType())
파이썬 리스트나 판다스 데이터 프레임워크나 RDD를 Spark DataFrame으로 변환하고 싶다면 SparkSession의 createDataFrame 함수를 사용하면 됩니다.
두 번째 인자로 스키마를 지정하는데 name_list_json은 문자열(string) 형태의 리스트이므로 일단 StringType()으로 지정합니다. 이 때 필드의 이름은 디폴트로 Value라고 지정됩니다.
Spark 데이터 타입들은 pyspark.sql.types 패키지 밑에 있고 이곳에 있는 다양한 데이터 타입들은 나중에 Spark DataFrame 스키마에 대해 이야기하는 시간에서 더 자세히 다루겠습니다.
df.count()
3
df에 있는 레코드의 수
df.printSchema()
root
|-- value: string (nullable = true)
DataFrame은 RDD와 다르게 데이터 스키마가 존재합니다. 그래서 printSchema라는 메서드를 지원하며 이를 통해 df에는 value라는 필드가 있고 value 필드의 타입은 string임을 확인했습니다.
df.select('*').collect()
[Row(value='{"name": "keeyong"}'),
Row(value='{"name": "benjamin"}'),
Row(value='{"name": "claire"}')]
Spark 클러스터의 데이터 프레임의 모든 데이터들을 collect()로 가져왔습니다.
이 떄 Row라는 타입으로 레코드들이 조회되는데 DataSet에서 DataFrame을 지원하기 위해 사용되는 타입입니다. 이는 뒤에 데이터 프레임 스키마를 설명할 때 더 자세하게 다루겠습니다.
일단 DataFrame을 파이썬으로 가져오면 레코드들이 Row타입으로 조회된다고 알아두시면 되겠습니다.
RDD를 DataFrame으로 변환해보는 예제: 앞서 parsed_rdd를 DataFrame으로 변환해보자
parsed_rdd.collect()
[{'name': 'keeyong'}, {'name': 'benjamin'}, {'name': 'claire'}]
df_parsed_rdd = parsed_rdd.toDF()
위에서 사용한 parsed_rdd를 toDF()를 이용해 DataFrame으로 바꿀 수 있습니다.
이미 parsed_rdd는 'name'이라는 필드가 이미 들어가 있으므로 별다른 문제 없이 바로 DF가 되었습니다.
df_parsed_rdd.printSchema()
root
|-- name: string (nullable = true)
스키마를 보니 name이라는 필드가 있고 타입은 string입니다.
df_parsed_rdd.select('name').collect()
[Row(name='keeyong'), Row(name='benjamin'), Row(name='claire')]
select로 필드를 선택할 수 있습니다. 어차피 df_parsed_rdd 데이터 프레임의 필드는 'name' 하나 뿐이므로 '*'대신 'name'으로 모든 레코드를 가져왔습니다.
Spark 데이터프레임으로 로드해보기
csv파일 하나를 Spark DataFrame으로 로딩하겠습니다.
!wget https://~~/name_gender.csv
대상이 되는 파일을 구글 Colab이 동작 중인 서버로 다운 받겠습니다.
df = spark.read.csv("name_gender.csv")
df.printSchema()
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
방금 다운 받은 csv 파일을 SparkSession(변수 spark)의 read의 csv라는 함수의 도움으로 읽어 스키마를 살펴보겠습니다.
그런데 name_gender csv파일은 헤더가 존재하는데도 불구하고 _c0와 _c1이라는 컬럼이고 String 타입으로 조회됩니다. 그 이유는 csv파일을 로딩할 때 헤더가 있다는 것을 알려줘야하는데 이를 해주지 않았기 때문입니다.
df = spark.read.option("header", True).csv("name_gender.csv")
df.printSchema()
root
|-- name: string (nullable = true)
|-- gender: string (nullable = true)
이번에는 option메서드에 "header"와 True를 입력하니 실제로 스키마가 name, gender로 조회됩니다.
df.show()
show()로 살펴볼 수 있는데 기본적으로 20개를 보여줍니다.
df.head(5)
[Row(name='Adaleigh', gender='F'),
Row(name='Amryn', gender='Unisex'),
Row(name='Apurva', gender='Unisex'),
Row(name='Aryion', gender='M'),
Row(name='Alixia', gender='F')]
head는 Pandas와 똑같이 최상단의 데이터들을 일부 보여주는 기능을 합니다. 다만 show()와는 보여주는 포맷이 다릅니다.
DataFrame에서는 groupby를 사용할 수 있습니다.
df.groupby(["gender"]).count()
DataFrame[gender: string, count: bigint]
그런데 어떤 레코드도 반환하지 않습니다. 어떻게 된 일 일까요? 위의 코드는 단순히 Spark Cluster 상에서 벌어진 작업이기 때문입니다. 파이썬 드라이버에서 확인하고 싶다면 collect()함수가 필요합니다.
df.groupby(["gender"]).count().collect()
[Row(gender='F', count=65),
Row(gender='M', count=28),
Row(gender='Unisex', count=7)]
collect()를 사용하니 gender가 F인 경우는 65, M은 28, Unisex는 7으로 집계되었습니다.
df.rdd.getNumPartitions()
1
방금 우리가 만든 DataFrame의 파티션을 알아볼 수도 있습니다.
DataFrame도 결국은 RDD에서 돌아간다고 말씀드렸죠? (RDD는 상대적으로 로우 레벨, DataFrame은 상대적으로 하이 레벨) 그래서 df 아래 있는 rdd 프로퍼티(property)가 갖고 있는 getNumPartitions()를 호출하면 데이터 프레임이 몇개의 파티션으로 구성되는지 알 수 있습니다.
현재는 1개입니다. 데이터의 크기가 크지 않기 때문입니다.
데이터프레임을 테이블뷰로 만들어서 SparkSQL로 처리해보기
DataFrame에 대해 간단하게 살펴보았습니다.
다음 시간에 Spark API 중 SparkSQL을 살펴볼텐데 그 전에 한 번 간단하게 앞의 작업들을 SparkSQL로 변환하면 얼마나 더 쉬워지는지 보도록 하겠습니다.
바로 앞에서 만든 name_gender.csv로 만든 df라는 데이터프레임를 SparkSQL로 다루겠습니다.
SparkSQL을 사용한다는 것은 SQL을 사용한다는 것이고 테이블이 있어야 한다는 뜻입니다. 이 때 DataFrame이 테이블이 됩니다. 그래서 DataFrame별로 테이블 이름을 지정해야합니다.
df.createOrReplaceTempView("namegender")
그 때 사용되는 함수가 createOrReplaceTempView입니다. df라는 데이터프레임을 SparkSQL로 실제 테이블처럼 처리하고 싶고 이름을 "namegender"로 주었습니다.
namegender_group_df = spark.sql("SELECT gender, count(1) FROM namegender GROUP BY 1")
gender를 중심으로 그룹핑하고 각 그룹에 속한 레코드를 카운트하는 쿼리를 입력합니다. 아까에 비해 쿼리를 직접 작성하고 눈으로 확인할 수 있기 때문에 훨씬 직관적입니다.
그리고 쿼리의 결과를 namegender_group_df 변수에 할당합니다.
namegender_group_df.collect()
[Row(gender='F', count(1)=65),
Row(gender='M', count(1)=28),
Row(gender='Unisex', count(1)=7)]
collect()로 결과를 보니 쿼리문 작성대로 결과가 조회됩니다.
그리고 현재 어떤 데이터프레임들이 테이블로 사용이 가능한지 확인할 수 있습니다.
spark.catalog.listTables()
[Table(name='namegender', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
namegender가 보이고 테이블 타입은 TEMPORARY입니다.
SparkSession을 만들 때 Hive를 사용하도록 세팅한다면 Hive가 사용하는 메타 데이터 스토어가 Spark과 연결이 되고 Hive의 테이블이 조회됩니다.
namegender_group_df.rdd.getNumPartitions()
1
방금 만든 DataFrame이 갖고 있는 파티션의 수를 알 수 있습니다.
two_namegender_group_df = namegender_group_df.repartition(2)
데이터 크기가 작기 때문에 굳이 파티션이 2개 이상일 필요는 없습니다만 repartition 메서드로 2로 나눌 수 있습니다. 별다른 해싱 방법을 지정하지 않았기 때문에 랜덤하게 레코드들을 2개로 나눌 것입니다.
둘로 나누어 새로운 DataFrame을 만들었고 그 이름을 two_namegender_group_df로 지었습니다.
two_namegender_group_df.rdd.getNumPartitions()
2
정말로 그 데이터프레임의 파티션이 2개인지 확인해보면 2로 나오는 것을 볼 수 있습니다.
'하둡과 Spark' 카테고리의 다른 글
Local Standalone REP 데모 - 윈도우(Windows) (0) | 2024.01.16 |
---|---|
Local Standalone REP 데모 - 맥(Mac) (0) | 2024.01.16 |
Spark 프로그램 구조 (0) | 2024.01.16 |
Spark 데이터 구조: RDD, DataFrame, Dataset (0) | 2024.01.16 |
Spark 데이터 처리 (0) | 2024.01.16 |