이해를 돕기 위해 Spark 의 구조에 대해 "간략화" 하여 설명합니다. 각각의 주제에 대한 자세한 설명은 본 문서의 개별 챕터 또는 공식 문서를 통해 확인할 수 있습니다.
이번 챕터에서 사용할 데이터셋은 입니다.
Hello Spark
Spark 를 활용해 사용자는 데이터를 읽고 가공 할 수 있습니다. 예를 들어, 아래의 코드는 파일 시스템에 있는 작은 CSV 파일을을 읽어 몇 가지 함수를 이용해 가공하는 샘플입니다.
환경은 편의에 따라 Databricks Notebook / 로컬 내 PySpark Shell 등 자유롭게 사용할 수 있습니다. 이 문서에서는 코드를 간략히 표현하기 위해 PySpark 3 와 DataFrame API 를 사용했습니다.
from pyspark.sql.functions import *
from pyspark.sql.types import *
# 현재 디렉토리에 CSV 파일을 다운받은 후 아래 코드를 실행합니다.
# 해당 파일의 확장자는 `.csv` 로 되어있으나, 실제로 데이터의 구분자는 `\t` (탭) 입니다
# DataBricks 로 실습한다면 경로를 "/FileStore/tables/marketing_campaign.csv" 로 변경합니다
df = spark.read.load("./marketing_campaign.csv",
format="csv",
sep="\t",
inferSchema="true",
header="true")
spark.read.load 를 사용해 데이터를 로딩할 때, inferSchema="true", header="true" 옵션을 사용했으므로 Spark 가 CSV 파일의 첫 줄을 헤더로 인식해 스키마에서 Column 명을 지정하고, 데이터를 읽으며 컬럼의 타입값을 자동으로 결정합니다. (integer, string 등)
데이터를 보거나 Row (행) 의 숫자를 세기 위해서는 df, 즉 DataFrame 의 다양한 함수를 사용할 수 있습니다.
df.count() # 로딩한 데이터의 숫자를 센 후 출력합니다
df.show() # 데이터를 일부 콘솔에 출력합니다.
df.toPandas() # PySpark 에서 사용할 수 있는 함수로, Jupyter 에서 데이터를 편하게 볼 수 있습니다.
컬럼 이름을 변경한 결과를 dfSelected DataFrame 에 저장했기 때문에, 최초의 df DataFrame 은 그대로 존재합니다. 따라서 df 와 dfSelected 는 다른 DataFrame 입니다.
그러나 DataFrame 은 논리적인 테이블이므로 실제 물리적으로 데이터가 복사되어 df, dfSelect 두 벌이 되는 것은 아닙니다.
# df.rdd.id() 실행 결과
<bound method RDD.id of MapPartitionsRDD[25] at javaToPython at NativeMethodAccessorImpl.java:0>
# dfSelected.rdd.id() 실행 결과
<bound method RDD.id of MapPartitionsRDD[31] at javaToPython at NativeMethodAccessorImpl.java:0>
이제 데이터를 로딩해서, 원하는 컬럼만 추출했으니 데이터를 일부 가공해보겠습니다.
count_kid 는 고객의 유아 자녀, count_teen 은 고객의 초등학생 이상 자녀를 나타내는 컬럼인데, 이 값을 더한 count_children 컬럼을 만들어 보겠습니다. 다만 이 때, 양쪽 또는 한쪽의 값이 NULL 일 수 있으므로 덧셈에 유의해야 합니다.
education 컬럼은 졸업 학위를 나타냅니다. 이 때 2n Cycle 값을 허용하지 않는다는 정책이 세워져, 대신 문자열 NONE 값을 넣도록 하겠습니다. 여기서는 IF ELSE 와 유사한 SQL 구문인 CASE WHEN 을 사용해 보겠습니다.
date_customer 는 최초 가입일 컬럼인데 팀원의 실수로 데이터가 잘못되었다고 가정하고 7년을 더해보겠습니다. 예를 들어 2013-01-01 이면 변경 후에는 2020-01-01 이 되어야 합니다.
Spark DataFrame API 는 SQL 에 대응되는 함수가 대부분 존재하므로 어떻게 다룰지 모르더라도 SQL 함수를 기반으로 생각해보고 구글링을 통해 해결할 수 있습니다.
우선 시작 전에 통계 정보와 스키마를 다시 살펴 보면,
dfSelected.printSchema() # 스키마를 확인합니다.
dfSelected.describe().show() # 통계 정보를 확인합니다. PySpark 에서는 `show` 대신 `toPandas` 를 활용할 수 있습니다.
# printSchema() 의 출력 결과
root
|-- id: integer (nullable = true)
|-- year_birth: integer (nullable = true)
|-- education: string (nullable = true)
|-- count_kid: integer (nullable = true)
|-- count_teen: integer (nullable = true)
|-- date_customer: string (nullable = true)
|-- days_last_login: integer (nullable = true)
# describe().show() 의 출력 결과
+-------+------------------+------------------+---------+-------------------+------------------+-------------+-----------------+
|summary| id| year_birth|education| count_kid| count_kid|date_customer| days_last_login|
+-------+------------------+------------------+---------+-------------------+------------------+-------------+-----------------+
| count| 2240| 2240| 2240| 2240| 2240| 2240| 2240|
| mean| 5592.159821428571|1968.8058035714287| null|0.44419642857142855| 0.50625| null| 49.109375|
| stddev|3246.6621975643416|11.984069456885827| null| 0.5383980977345935|0.5445382307698761| null|28.96245280837821|
| min| 0| 1893| 2n Cycle| 0| 0| 01-01-2013| 0|
| max| 11191| 1996| PhD| 2| 2| 31-12-2013| 99|
+-------+------------------+------------------+---------+-------------------+------------------+-------------+-----------------+
count_kid 또는 count_teen 컬럼에 미래에는 NULL 값이 들어올 수도 있으므로 coalesce("count_kid", lit(0)) 와 같이 기본값을 0 으로 세팅합니다. 그리고 두 컬럼의 값을 더해 count_children 컬럼을 만듭니다.
이후에는 결과 DataFrame 인 dfConverted1 에서 보기 쉽게 원하는 컬럼만 선택해서 5개만 추출해 데이터를 확인해봅니다. 아까 언급했던 바와 같이 Spark DataFrame 은 Immutable 하므로 (더 엄밀히는 DataFrame 을 구성하는 RDD) dfConverted1.select 구문은 그 위에서 만든 dfConverted1에 영향을 미치지 않습니다.
그 다음으로는 education 컬럼을 살펴보고 변경해보겠습니다.
# 이 작업에서는 컬럼 이름을 가공하지 않으므로, `select` 내에서 `col` 함수를 사용하지 않았습니다.
# Spark 는 이와 같이 API 에서 다양한 형태로 사용자의 편의성을 지원합니다.
dfConverted1\
.select("education")\
.distinct()\
.show()
# `show()` 출력 결과
+----------+
| education|
+----------+
| 2n Cycle|
| PhD|
| Master|
|Graduation|
| Basic|
+----------+
앞서 언급한 것과 같이 2n Cycle 값은 CASE WHEN 구문을 사용해 NONE 으로 변경하도록 하겠습니다.
educationInvalid = '2n Cycle'
educationDefault = 'NONE'
# 다음 SQL 구문과 동일합니다.
#
# SELECT CASE WHEN education = '2n Cycle' THEN 'NONE' ELSE education as education
#
dfConverted2 = dfConverted1.withColumn(
"education",
when(col("education") == lit(educationInvalid), educationDefault).otherwise(col("education"))
)
dfConverted2.select("education").distinct().show()
# `show()` 의 출력 결과
+----------+
| education|
+----------+
| PhD|
| Master|
|Graduation|
| Basic|
| NONE|
+----------+
오늘 작업한 education 컬럼에는 NULL 값이 없었지만, 미래에는 들어올지 모릅니다. 만약 정책적으로 지정한 값 이외에 허용되지 않는 데이터를 전부 NONE 으로 세팅하려면 코드를 어떻게 변경해야 할까요?
PySpark 의 API 문서에서 isin 함수를 살펴보고, 이것을 통해 문제를 해결할 수 있을지 고민해 봅니다.
Spark DataFrame API 는 selectExpr 이란 함수를 제공합니다. SQL 문법을 사용할 수 있습니다. 예를 들어, 위와 동일한 작업을 할 때 다음처럼 코드를 작성할 수 있습니다.
dfConverted3 = dfConverted1\
.selectExpr("*",
f"CASE WHEN education == '{educationInvalid}' THEN '{educationDefault}' ELSE education END as education"
)
# 기존 date_customer 컬럼의 값과 비교를 위해 `date_joined` 라는 다른 이름으로 컬럼 값 변환 결과를 저장합니다
# 1. 이 과정에서 `to_date` 함수를 사용해 타입을 변경하고
# 2. `add_months` 함수를 통해 72개월 (= 6년) 을 기존 값에 추가했습니다.
dfWithJoined = dfConverted2.withColumn("date_joined", add_months(to_date(col("date_customer"), "d-M-yyyy"), 72))
dfWithJoined.select("date_customer", "date_joined").limit(5).show()
dfWithJoined.printSchema()
# `show()` 출력 결과
+-------------+-----------+
|date_customer|date_joined|
+-------------+-----------+
| 04-09-2012| 2018-09-04|
| 08-03-2014| 2020-03-08|
| 21-08-2013| 2019-08-21|
| 10-02-2014| 2020-02-10|
| 19-01-2014| 2020-01-19|
+-------------+-----------+
# `printSchema()` 출력 결과
root
|-- id: integer (nullable = true)
|-- year_birth: integer (nullable = true)
|-- education: string (nullable = true)
|-- count_kid: integer (nullable = true)
|-- count_teen: integer (nullable = true)
|-- date_customer: string (nullable = true)
|-- days_last_login: integer (nullable = true)
|-- count_children: integer (nullable = false)
|-- date_joined: date (nullable = true)
이쯤에서 분산처리는 어디서 하고 실시간 처리는 어떻게 하냐는 질문이 생기실 수 있습니다.
Spark 는 사용자가 데이터를 처리하는 비즈니스 로직과, 분산처리 및 로딩 가공 등을 담당하는 인프라 로직을 분리해 사용할 수 있도록 아주 높은 수준에서 추상화를 제공합니다. 이로인해 사용자는
분산 처리 / 데이터의 로딩 및 저장 등은 비즈니스 로직과 분리되므로 사용자는 데이터를 하나의 테이블처럼 취급해 가공에만 집중할 수 있습니다.
위에서 다루었던 예제에서는 1개의 CSV 파일 내에 2240 개의 Row 밖에 없었지만, 데이터가 매우 크다고 가정하면 하나의 파일로 만들 수 없어, 일반적으로 파일을 분할해 S3 나 HDFS 등에 보관하게 됩니다. Spark 는 데이터 가공시에 여러 파일들을 메모리에 읽어 처리할 수 있습니다.
DataFrame, Dataset and SQL
Spark 는 Scala, Java, R, Python, SQL 등 다양한 언어를 제공합니다. 이번 섹션에서는 Python 언어, 즉 PySpark 를 사용해보았으며 데이터를 다루기 위해 DataFrame API 를 사용해 보았습니다.
Spark 초기 버전은 RDD 만 지원했으나, DataFrame / Dataset 과 같은 고수준의, 사용자가 다루기 쉬운 API 들이 추가되었습니다. Spark 2.0 부터는 DataFrame 은 Dataset[Row] 타입이 됨으로써 DataFrame 과 Dataset 이 통합되었습니다.
아래는 Scala Spark 를 이용해 Case Class 생성후 DataSet 을 이용하는 코드 샘플입니다.
Dataset API 는 Type 지원되는 언어인 Scala / Java 를 지원합니다. Python 및 R 은 DataFrame API 를 이용할 수 있습니다
어떤 API 가 '우월하거나' 그렇지는 않습니다. 생산성과 타입 안전성을 고려해 취사 선택할 수 있습니다.
데이터를 탐색하거나, 빠르게 (생산성) 가공하고 싶다면 SQL API 를 선택할 수 있습니다
조금 더 타입을 갖추고 싶다면, DataFrame API 를 이용하고 UDF 등을 위한 Unit Test 를 작성할 수 있습니다
복잡한 Spark Streaming Application 을 작성하고 Kafka Avro 데이터를 읽어 가공한다면 Scala 로 Dataset 의 함수형 API 를 이용해 가공할 수 있습니다
낮은 수준의 API 인 RDD 를 이용한다면 여러분이 더 많은 컨트롤을 가질 수 있습니다. 많은 제어권 = 더 많은 책임인 경우가 대다수이므로, 고도의 촤적화 등 특별한 경우가 아니라면 사용하지 않는 편이 낫습니다.
그리고 일정 규모에서는 머신을 더 부어 넣는게, 고도의 최적화를 유지보수하는 것 보다 저렴합니다. Cloud 시대에는 기계가 사람보다 구하기 쉽기 때문입니다.
예를 들어, DataFrame 의 경우에는 Spark 가 컬럼의 타입을 알고 있으므로 단순히 Row 로 표현되는 RDD 에 비해 Serialization / Deserializaiton 을 효율적으로 수행할 수 있습니다. Dataset 을 사용한다면 Encoder 를 이용해 JVM 에 최적화된 형태로 객체를 Serialization / Deserialization 을 수행할 수 있습니다.
Hello Dataset
위에서 다루었던 DataFrame 은 Dataset[Row] 입니다. 즉 Row 라는 타입의 Dataset 이 DataFrame 인데, Dataset 은 사용자가 지정한 타입도 당연히 사용할 수 있습니다.
예를 들어 MarketingUser 라는 클래스가 있을 때 Dataset[MarketingUser] 처럼 사용할 수 있습니다.
Dataset 은 Scala / Java 언어로만 사용할 수 있습니다. RDD 와 같이 혹은 List 를 다루듯이 map, flatmap, filter 등 함수형 Collection API 의 일부를 사용할 수 있고 DataFrame 과 같이 SQL Optimizer 에 의해 최적화 됩니다.
일반적으로 Streaming 이나 Batch Application 만들때 주로 사용합니다. 데이터 탐색 / 프로토타이핑시에는 생산성을 이유로 PySpark 가 자주 사용됩니다.
이제 데이터를 읽어 Dataset API 를 사용해보겠습니다.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
// 만약 Databricks 노트북을 사용한다면 경로를
// `/FileStore/tables/marketing_campaign.csv"` 로 변경할 수 있습니다.
val df = spark.read
.format("csv")
.option("format", "csv")
.option("sep", "\t")
.option("inferSchema", "true")
.option("header", "true")
.load("marketing_campaign.csv")
DataFrame 에서는 select, withColumn 등을 이용해 컬럼을 가공했지만 Dataset 은 List 내 객체를 다루듯이 map, flatMap 등을 이용해 변경이 가능합니다.
val dsUserFiltered = dsMarketingUser.filter(x => x.education == "Master")
그런데 석사 학위 소지 사용자만 추출한 dsUserFiltered 를 보기 위해 show() 함수를 호출하면 다음처럼 오류가 발생하는 것을 볼 수 있습니다.
# dsUserFiltered.show()
Caused by: NullPointerException: Null value appeared in non-nullable field:
- field (class: "scala.Int", name: "income")
- root class: "$line9228218dd17046b3aa8b6bbe593dc28a520.$read.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.MarketingUser"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
이는 NULL 값을 가지기 위해서 Scala 에서는 Int 가 아니라 Option[Int] 를 사용해야 하기 때문입니다.
따라서 오류에 나온대로 income 을 Option[Int] 로 변경해보겠습니다.
case class MarketingUserRefined(userId: Int,
yearBirth: Int,
education: String,
income: Option[Int],
kidhome: Option[Int],
teenhome: Option[Int],
dtCustomer: String,
recency: Option[Int])
val dsUserRefined = dfSelected
.withColumnRenamed("id", "userId")
.as[MarketingUserRefined]
val dsUserFiltered = dsUserRefined.filter(x => x.education == "Master")
dsUserFiltered.show()
Map 함수도 이용해보겠습니다. 다만 Case Class 는 불변이기 때문에 한 필드를 생성하면 객체가 새로 생성됩니다. Copy 를 이용해 원하는 필드를 변경할 수 있습니다.
이 결과물을 (CSV, JSON) 마케팅 플랫폼 팀에 전달하거나 관리자 도구에 업로드 할 경우 자동화된 형태로 Push 메세지를 전송할 수 있습니다.
Summary
이번 챕터에서는 Spark 의 기본적인 개념과 사용법에 대해 알아보았습니다. 아래는 이번 챕터에서 다룬 핵심 키워드입니다. 시간이 된다면 구글링을 통해 찾아봅시다.
Partition
RDD
DataFrame
Dataset
Catalyst Optimizer
Encoder
Serialization / Deserialization
Spark UI
Stage
다음 챕터부터는 이러한 Spark 의 다양한 API 및 컨셉등을 하나씩 익혀보겠습니다.
df 라는 변수에 데이터를 로딩한 결과를 저장했으므로, df 변수를 통해 읽은 데이터를 조작해볼 수 있습니다.
물리적인 데이터는 여전히 Disk 에 CSV 파일로 존재하고, count()toPandas() 와 같은 을 수행할 때 데이터를 메모리로 읽어 처리 하게됩니다. RDD 와 액션에 대해서는 추후 다른 챕터에서 더 자세히 설명하겠습니다.
위 스크린샷은 에서 확인할 수 있는 Stage 정보로, toPandas() 를 호출하기 까지 실행되는 Spark 연산입니다.
이제 마지막으로 date_customer 컬럼을 날짜 타입으로 변경 후 6년을 더해 보겠습니다. 만약 월 (Month) 가 아니라 일 (Day) 기준으로 변경하고 싶다면 함수를 사용할 수 있습니다.
Spark Batch / 구분없이 데이터를 가공할 수 있습니다. 비즈니스 로직을, 즉 코드를 재활용하는 것이 가능해 Stream 의 State (상태) 를 다시 복구하기 위한 배치 작업을 쉽게 만들 수 있습니다.
여기에도 물론 예외는 있습니다. 공식 등 일부 커넥터는 DataFrame 이 아니라 RDD 형태로 데이터를 가공해야 할 수 있습니다. 또한 Stream 의 경우 State (상태) 와 (단위 간격) 을 다루므로 추가적인 API 가 존재할 수 있습니다.
이 과정에서 Spark 를 Local 모드 (단일 머신) 모드로 사용한다면 하나의 머신에서 처리가 되고, 여러 머신에서 분산처리를 진행하려면 를 선택할 수 있습니다. 모드와 환경 부분은 Spark Architecture 부분에서 설명 드리겠습니다.
아래 그림은 RDD 가 아니라 Spark SQL 또는 DataFrame (Dataset) API 를 사용할 경우 Spark 의 를 이용해 최적화 되는 과정을 보여줍니다.
A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. ()