Spark Tutorial 에서는 DataFrame 의 기초적인 내용을 2천 건 정도의 작은 데이터셋을 이용해 다루어보았습니다. 이제는 조금 더 복잡하면서도 실제로 많이 활용되는 사례를 바탕으로 유용한 함수들을 알아봅시다.
이번 챕터에서 사용할 데이터는 (2020-Jan.csv) 입니다.
ecommerce_event.csv
란 이름으로 저장 후 다음과 같이 함수를 이용해 로딩합니다. 이번에 사용하는 데이터셋의 구분자는 \t
가 아니라 ,
(comma) 이므로 sep
옵션을 생략해도 좋습니다.
Copy from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
df = spark.read.load("./ecommerce_event.csv",
format="csv", inferSchema="true", header="true")
df.count() # 4264752, 약 450 MiB 파일
df.printSchema()
root
|-- event_time: string (nullable = true)
|-- event_type: string (nullable = true)
|-- product_id: integer (nullable = true)
|-- category_id: long (nullable = true)
|-- category_code: string (nullable = true)
|-- brand: string (nullable = true)
|-- price: double (nullable = true)
|-- user_id: integer (nullable = true)
|-- user_session: string (nullable = true)
Basic & Aggregation
Copy df.agg(max("event_time"), min("event_time")).show(truncate=False)
Copy
+-----------------------+-----------------------+
|max(event_time) |min(event_time) |
+-----------------------+-----------------------+
|2020-01-31 23:59:58 UTC|2020-01-01 00:00:00 UTC|
+-----------------------+-----------------------+
brand
별로 category
를 살펴봅시다. SQL 로 치자면 Group By 후 Count 하는 간단한 함수입니다.
Copy df.groupBy(col("brand"), col("category")).agg(count("*")).show()
Copy +---------+-------------------+--------+
| brand| category_id|count(1)|
+---------+-------------------+--------+
| runail|1487580007936033754| 4395|
| oniq|1487580005092295511| 3298|
| domix|1487580011970953351| 1292|
| bioaqua|1597770225539875791| 113|
| domix|1487580007256556476| 1133|
| ingarden|1487580011996119176| 900|
| markell|1783999067156644376| 205|
| null|1487580008145748965| 19054|
| null|1487580011677352062| 6008|
| null|1998040852064109417| 880|
|beautific|1487580008288355308| 37|
| naomi|1487580012524601496| 86|
| missha|1783999073758478650| 6|
| null|2145935122136826354| 1|
| runail|1487580009051717646| 2630|
| kiss|1487580013506068678| 237|
| pnb|1487580007457883075| 491|
|bespecial|1487580013338296510| 134|
| eunyul|1487580011585077370| 1860|
| matrix|1487580008263189483| 573|
+---------+-------------------+--------+
Copy df\
.groupBy("brand", "category_code")\
.agg(countDistinct("product_id").alias("product_count"))\
.show(truncate=False)
Copy
+------------+--------------------------------------+-------------+
|brand |category_code |product_count|
+------------+--------------------------------------+-------------+
|beautix |null |316 |
|dr.gloderm |null |31 |
|farmona |null |39 |
|profhenna |null |56 |
|runail |appliances.environment.vacuum |3 |
|invisibobble|null |2 |
|macadamia |appliances.environment.air_conditioner|1 |
|riche |null |58 |
|nova |null |1 |
|oniq |null |590 |
|lebelage |null |45 |
|fancy |null |15 |
|vilenta |null |20 |
|siberina |null |181 |
|tertio |null |115 |
|jaguar |null |21 |
|nitrimax |apparel.glove |21 |
|jas |null |16 |
|rocknailstar|null |6 |
|koreatida |null |3 |
+------------+--------------------------------------+-------------+
SQL 과 유사하게 하나의 컬럼에 존재하는 ENUM 타입 값을 보기 위해 Group By 를 사용할수도 있겠지만, Distinct 를 활용할 수 있습니다.
Copy df.select("event_type").distinct().show()
Copy
+----------------+
| event_type|
+----------------+
| purchase|
| view|
| cart|
|remove_from_cart|
+----------------+
Copy eventPurchase = "purchase"
df\
.selectExpr("CAST(event_time as DATE) as event_date", "event_type")\
.where(col("event_type") == lit(eventPurchase))\
.groupBy("event_date")\
.agg(count("*").alias("purchase_count"))\
.orderBy(asc("event_date"))\
.show(truncate=False)
Copy +----------+--------------+
|event_date|purchase_count|
+----------+--------------+
|2020-01-01|3269 |
|2020-01-02|4875 |
|2020-01-03|6031 |
|2020-01-04|6602 |
|2020-01-05|7227 |
|2020-01-06|6368 |
|2020-01-07|7564 |
|2020-01-08|7602 |
|2020-01-09|8774 |
|2020-01-10|8575 |
|2020-01-11|7470 |
|2020-01-12|8147 |
|2020-01-13|10138 |
|2020-01-14|10316 |
|2020-01-15|9962 |
|2020-01-16|9354 |
|2020-01-17|9265 |
|2020-01-18|6197 |
|2020-01-19|8168 |
|2020-01-20|9014 |
+----------+--------------+
이제 where
함수를 익혔으니 브랜드별 전체 판매 금액에 대해서도 추출해 볼 수 있습니다. 전체 기간 내 판매가 가장 많은 브랜드 10개만 추출해 보겠습니다.
Copy df\
.selectExpr("brand", "price")\
.where("brand IS NOT NULL")\
.groupBy("brand")\
.agg(sum("price").alias("sales_price"))\
.orderBy(desc("sales_price"))\
.limit(10)\
.show(truncate=False)
Copy +--------+------------------+
|brand |sales_price |
+--------+------------------+
|strong |2651513.6799999983|
|jessnail|2297451.200000003 |
|runail |2108654.9900000114|
|irisk |1467889.08999999 |
|grattol |1055984.4699999897|
|marathon|789238.6300000004 |
|masura |693152.7399999842 |
|cnd |599136.3200000003 |
|uno |546019.0499999991 |
|estel |544366.7999999983 |
+--------+------------------+
Copy df\
.selectExpr("brand", "price", "user_id", "event_type")\
.where("brand IS NOT NULL")\
.groupBy("brand")\
.agg(
countDistinct("user_id").alias("user_count_all"),
countDistinct(when(col("event_type") == lit(eventPurchase), col("user_id"))).alias("user_count_purchase"),
sum("price").alias("sales_price"))\
.selectExpr(
"brand",
"sales_price",
"user_count_all",
"sales_price / user_count_all as ARPU",
"user_count_purchase",
"sales_price / user_count_purchase as ARPPU",
)\
.orderBy(desc("sales_price"))\
.limit(10)\
.show(truncate=False)
Copy +--------+------------------+--------------+------------------+-------------------+------------------+
|brand |sales_price |user_count_all|ARPU |user_count_purchase|ARPPU |
+--------+------------------+--------------+------------------+-------------------+------------------+
|strong |2651513.6799999983|5683 |466.5693612528591 |186 |14255.449892473109|
|jessnail|2297451.200000003 |21039 |109.19963876610119|1119 |2053.1288650580905|
|runail |2108654.9900000114|59595 |35.38308566154898 |9096 |231.8222284520681 |
|irisk |1467889.08999999 |48028 |30.56319417839573 |7277 |201.71624158306858|
|grattol |1055984.4699999897|34884 |30.271312636165284|4732 |223.15817202028524|
|marathon|789238.6300000004 |2871 |274.9002542668061 |38 |20769.437631578956|
|masura |693152.7399999842 |22668 |30.578469207692965|3085 |224.6848427876772 |
|cnd |599136.3200000003 |9129 |65.63000547705118 |597 |1003.578425460637 |
|uno |546019.0499999991 |16370 |33.35485949908364 |2446 |223.2293744889612 |
|estel |544366.7999999983 |24438 |22.27542352074631 |2130 |255.571267605633 |
+--------+------------------+--------------+------------------+-------------------+------------------+
Window Function
대부분의 데이터 처리 프레임워크는 Window Function 을 지원합니다.
다음 네 가지 문제를 풀어봅시다.
전체 기간동안 브랜드별로 두번째로 많이 팔린 (판매 금액 총합이 높은) 상품 카테고리는 무엇입니까?
일별로 많이 팔린 (판매 금액 총합이 높은) 브랜드별 랭킹 Top 3 는 무엇입니까?
일별로 모든 브랜드를 통틀어, 판매 금액의 합산을 누적으로 구하면 매출의 변화량은 어떻게 변화합니까?
문제를 잘 살펴보면, 단순 Group By 보다는 조금 더 까다로움을 알 수 있습니다. 이해를 돕기 위해 우선 Spark SQL 을 이용해 문제를 풀어보고, 각각의 경우에 대해 PySpark DataFrame 코드도 같이 적어보겠습니다.
Copy df.createOrReplaceTempView("PURCHASE")
spark.sql("""
SELECT *
FROM PURCHASE
LIMIT 10
""").show()
Copy +--------------------+----------------+----------+-------------------+-------------+--------+------+---------+--------------------+
| event_time| event_type|product_id| category_id|category_code| brand| price| user_id| user_session|
+--------------------+----------------+----------+-------------------+-------------+--------+------+---------+--------------------+
|2020-01-01 00:00:...| view| 5809910|1602943681873052386| null| grattol| 5.24|595414620|4adb70bb-edbd-498...|
|2020-01-01 00:00:...| view| 5812943|1487580012121948301| null|kinetics| 3.97|595414640|c8c5205d-be43-4f1...|
|2020-01-01 00:00:...| view| 5798924|1783999068867920626| null| zinger| 3.97|595412617|46a5010f-bd69-4fb...|
|2020-01-01 00:00:...| view| 5793052|1487580005754995573| null| null| 4.92|420652863|546f6af3-a517-475...|
|2020-01-01 00:00:...| view| 5899926|2115334439910245200| null| null| 3.92|484071203|cff70ddf-529e-4b0...|
|2020-01-01 00:00:...| view| 5837111|1783999068867920626| null| staleks| 6.35|595412617|46a5010f-bd69-4fb...|
|2020-01-01 00:00:...| cart| 5850281|1487580006300255120| null|marathon|137.78|593016733|848f607c-1d14-474...|
|2020-01-01 00:00:...| view| 5802440|2151191070908613477| null| null| 2.16|595411904|74ca1cd5-5381-4ff...|
|2020-01-01 00:00:...| view| 5726464|1487580005268456287| null| null| 5.56|420652863|546f6af3-a517-475...|
|2020-01-01 00:01:...|remove_from_cart| 5850281|1487580006300255120| null|marathon|137.78|593016733|848f607c-1d14-474...|
+--------------------+----------------+----------+-------------------+-------------+--------+------+---------+--------------------+
이제 SQL / DataFrame 버전으로 각각 Window 함수를 이용해 위 문제를 풀어보겠습니다.
1. 전체 기간동안 브랜드별로 가장 많이 팔린 (판매 금액 총합이 높은) 상품 카테고리
Copy spark.sql("""
WITH CALCULATED (
SELECT
brand,
category_code,
sum(price) as price_sum
FROM PURCHASE
WHERE
brand IS NOT NULL
AND category_code IS NOT NULL
GROUP BY brand, category_code
),
RANKED (
SELECT
brand,
category_code,
price_sum,
rank() OVER (PARTITION BY brand ORDER BY price_sum DESC) as rank
FROM CALCULATED
)
SELECT *
FROM RANKED
WHERE rank = 1
ORDER BY price_sum DESC
""").show(truncate=False)
다음은 PySpark DataFrame 으로 위 코드를 변환한 결과입니다.
Copy dfCalculated = df\
.select(
col("brand"),
col("category_code"),
col("price"),
)\
.where(col("brand").isNotNull() & col("category_code").isNotNull())\
.groupBy("brand", "category_code")\
.agg(sum("price").alias("price_sum"))
dfRanked = dfCalculated\
.select(
col("brand"),
col("category_code"),
col("price_sum"),
rank().over(Window.partitionBy(col("brand")).orderBy(desc("price_sum"))).alias("rank")
)
dfRanked\
.where(col("rank") == lit(1))\
.orderBy(desc("price_sum"))\
.show(truncate=False)
결과는 아래와 같이 동일합니다.
Copy +---------+-------------------------------+------------------+----+
|brand |category_code |price_sum |rank|
+---------+-------------------------------+------------------+----+
|max |appliances.environment.vacuum |489141.04999999976|1 |
|polarus |appliances.environment.vacuum |418171.82000000007|1 |
|emil |appliances.environment.vacuum |296071.9100000001 |1 |
|jessnail |appliances.environment.vacuum |136883.68 |1 |
|runail |furniture.living_room.cabinet |125213.18000000004|1 |
|irisk |furniture.bathroom.bath |79211.96 |1 |
|vosev |accessories.bag |48592.50000000001 |1 |
|benovy |apparel.glove |43165.819999999985|1 |
|kosmekka |furniture.living_room.cabinet |42673.57000000006 |1 |
|italwax |stationery.cartrige |17865.97999999999 |1 |
|jaguar |appliances.personal.hair_cutter|16316.279999999988|1 |
|nitrimax |apparel.glove |14064.090000000006|1 |
|nitrile |apparel.glove |10399.849999999999|1 |
|kondor |appliances.personal.hair_cutter|9074.939999999997 |1 |
|domix |furniture.bathroom.bath |5248.419999999998 |1 |
|shik |accessories.cosmetic_bag |4122.72 |1 |
|depilflax|stationery.cartrige |3861.7799999999966|1 |
|gezatone |appliances.personal.massager |3240.320000000001 |1 |
|naturmed |furniture.bathroom.bath |2997.71 |1 |
|naomi |apparel.glove |2002.319999999998 |1 |
+---------+-------------------------------+------------------+----+
2. 일별로 많이 팔린 (판매 금액 총합이 높은) 브랜드별 랭킹 Top 3
Copy spark.sql("""
WITH CALCULATED (
SELECT
CAST(event_time AS DATE) as event_date,
brand,
sum(price) as price_sum
FROM PURCHASE
WHERE
brand IS NOT NULL
GROUP BY 1, 2
),
RANKED (
SELECT
event_date,
brand,
price_sum,
rank() OVER (PARTITION BY event_date ORDER BY price_sum DESC) as rank
FROM CALCULATED
)
SELECT *
FROM RANKED
WHERE rank <= 3
ORDER BY event_date ASC, rank ASC
""").show(truncate=False)
Copy +----------+--------+------------------+----+
|event_date|brand |price_sum |rank|
+----------+--------+------------------+----+
|2020-01-01|jessnail|58125.96000000012 |1 |
|2020-01-01|strong |45510.60999999997 |2 |
|2020-01-01|runail |40028.73999999981 |3 |
|2020-01-02|jessnail|75567.9100000001 |1 |
|2020-01-02|strong |53850.130000000005|2 |
|2020-01-02|runail |46773.700000000055|3 |
|2020-01-03|jessnail|130618.19000000042|1 |
|2020-01-03|strong |67431.65 |2 |
|2020-01-03|runail |65798.47999999997 |3 |
|2020-01-04|jessnail|77021.25000000009 |1 |
|2020-01-04|strong |72237.20999999993 |2 |
|2020-01-04|runail |55250.03000000005 |3 |
|2020-01-05|jessnail|83268.23000000014 |1 |
|2020-01-05|strong |69062.75999999997 |2 |
|2020-01-05|runail |56429.79000000005 |3 |
|2020-01-06|strong |80781.73999999989 |1 |
|2020-01-06|jessnail|66437.16000000012 |2 |
|2020-01-06|runail |50932.03000000017 |3 |
|2020-01-07|jessnail|76882.80000000008 |1 |
|2020-01-07|strong |74643.85999999997 |2 |
+----------+--------+------------------+----+
3. 전체 기간동안 브랜드별 매출(판매 금액의 합) 을 구하되, 자신보다 한단계 높은 순위 또는 낮은 순위의 매출도 같이 표시하기
Copy spark.sql("""
WITH CALCULATED (
SELECT
brand,
sum(price) as price_sum
FROM PURCHASE
WHERE
brand IS NOT NULL
GROUP BY brand
),
RANKED (
SELECT
brand,
lag(price_sum, 1) OVER (PARTITION BY 1 ORDER BY price_sum DESC) as price_sum_prev,
price_sum as price_sum_current,
lead(price_sum, 1) OVER (PARTITION BY 1 ORDER BY price_sum DESC) as price_sum_next,
dense_rank() OVER (PARTITION BY 1 ORDER BY price_sum DESC) as rank
FROM CALCULATED
)
SELECT *
FROM RANKED
ORDER BY rank ASC
""").show(truncate=False)
Copy |brand |price_sum_prev |price_sum_current |price_sum_next |rank|
+---------+------------------+------------------+------------------+----+
|strong |null |2651513.6799999983|2107905.480000001 |1 |
|jessnail |2651513.6799999983|2107905.480000001 |1877075.9400000153|2 |
|runail |2107905.480000001 |1877075.9400000153|1280128.789999989 |3 |
|irisk |1877075.9400000153|1280128.789999989 |1055984.4699999897|4 |
|grattol |1280128.789999989 |1055984.4699999897|789238.6300000004 |5 |
|marathon |1055984.4699999897|789238.6300000004 |693014.0199999842 |6 |
|masura |789238.6300000004 |693014.0199999842 |599136.3200000003 |7 |
|cnd |693014.0199999842 |599136.3200000003 |546019.0499999991 |8 |
|uno |599136.3200000003 |546019.0499999991 |544323.2499999983 |9 |
|estel |546019.0499999991 |544323.2499999983 |489141.04999999976|10 |
|max |544323.2499999983 |489141.04999999976|427986.93999999884|11 |
|ingarden |489141.04999999976|427986.93999999884|418171.82000000007|12 |
|polarus |427986.93999999884|418171.82000000007|365795.62000000005|13 |
|italwax |418171.82000000007|365795.62000000005|306945.5899999999 |14 |
|browxenna|365795.62000000005|306945.5899999999 |296071.9100000001 |15 |
|emil |306945.5899999999 |296071.9100000001 |293430.7299999994 |16 |
|kapous |296071.9100000001 |293430.7299999994 |293421.13 |17 |
|shik |293430.7299999994 |293421.13 |260097.03000000014|18 |
|jas |293421.13 |260097.03000000014|254294.83999999994|19 |
|kosmekka |260097.03000000014|254294.83999999994|245135.18000000008|20 |
+---------+------------------+------------------+------------------+----+
4. 일별로 모든 브랜드를 통틀어, 판매 금액의 합산을 누적으로 구하면 매출의 변화량은 어떤지 살펴보기
Copy spark.sql("""
WITH CALCULATED (
SELECT
CAST(event_time AS DATE) as event_date,
sum(price) as price_sum
FROM PURCHASE
WHERE
brand IS NOT NULL
GROUP BY 1
),
RANKED (
SELECT
event_date,
price_sum,
sum(price_sum) OVER (PARTITION BY 1 ORDER BY event_date RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as price_acc
FROM CALCULATED
)
SELECT *
FROM RANKED
ORDER BY event_date ASC
""").show(truncate=False)
Copy +----------+------------------+--------------------+
|event_date|price_sum |price_acc |
+----------+------------------+--------------------+
|2020-01-01|478880.47999999445|478880.47999999445 |
|2020-01-02|591996.0299999958 |1070876.5099999902 |
|2020-01-03|764825.2799999974 |1835701.7899999875 |
|2020-01-04|672783.4599999917 |2508485.249999979 |
|2020-01-05|698293.9199999882 |3206779.1699999673 |
|2020-01-06|648607.0899999944 |3855386.2599999616 |
|2020-01-07|702788.3999999919 |4558174.659999954 |
|2020-01-08|710164.8099999978 |5268339.469999951 |
|2020-01-09|786942.4299999884 |6055281.89999994 |
|2020-01-10|769210.5999999931 |6824492.499999933 |
|2020-01-11|732801.5299999891 |7557294.029999922 |
|2020-01-12|818940.0799999872 |8376234.109999909 |
|2020-01-13|860498.5800000073 |9236732.689999916 |
|2020-01-14|867423.4899999918 |1.0104156179999907E7|
|2020-01-15|858057.1199999887 |1.0962213299999895E7|
|2020-01-16|788017.7599999914 |1.1750231059999887E7|
|2020-01-17|771007.1899999875 |1.2521238249999875E7|
|2020-01-18|714140.5399999954 |1.323537878999987E7 |
|2020-01-19|781291.109999992 |1.4016669899999863E7|
|2020-01-20|849404.3399999914 |1.4866074239999853E7|
+----------+------------------+--------------------+
only showing top 20 rows
Window Function - Attribution
광고 또는 마케팅에는 기여도 (Attribution) 이라는 개념이 있습니다. 광고 또는 마케팅 켐페인을 위해 지불한 비용의 효과를 측정하기 위해, "원하는 Action 을 이끌어낸 것이 내가 돈을 지불한 광고가 맞느냐" 를 판별하기 위한 방법입니다.
다음과 같은 우동마켓의 시나리오를 가정해 봅시다.
사용자 A 는 우동마켓 앱 내에서 검색 키워드 K1 (어묵우동) 이후에 상품을 P1, P2, P3 를 탐색합니다.
사용자 A 는 검색 키워드 K2 (대파우동) 로 검색을 해 P3, P4, P5 를 탐색합니다.
사용자 A 는 어느정도 시간이 지난 후 마음의 결정을 해 상품 P3 를 구매합니다.
이 때, 데이터의 유실이 없고 키워드 검색 이벤트가 상품 클릭 이벤트보다 항상 먼저 들어온다고 가정하면
위 처럼 데이터를 묶어, 아래와 같이 만들어 볼 수 있을 것입니다. 사용자 A 가 P3 를 주문했을때,
그 주문을 위해 탐색한 키워드 K1 (Non-last Click간접 기여), K3 (Last Click, 직접 기여)
위와 같이 이벤트를 Window 함수로 묶어 K1, K3 가 사용자 A 의 상품 주문 P3 에 기여했음을 만들 수 있지 않을까요? Spark Window Function 에서 제공되는 다음의 함수를 통해 어떻게 Attribution 데이터를 가공할지 생각해 봅시다.
Window Function - Session
데이터 분석에는 일반적으로 Session 이라는 개념이 자주 사용됩니다.
동일한 사용자라도, 특정 시점에는 특정 상품에 관심이 있다는 이론을 바탕으로 사용자를 지정된 '시간' 단위로 나눌 수 있습니다.
예를 들어 Google Analytics (GA) 에서는 시간 기반으로 세션을 나눌때, 활동이 없을 경우 30 분이 지나면 새로운 세션으로 구분됩니다.
위 그림에서 사용자는 14:04 가 마지막 이벤트라고 가정하면, 14:35 에 다시 이벤트가 발생했을 때 다른 세션 ID 를 발급 받습니다.
User A, Session 1 이벤트: 14:04 까지
User B, Session 2 이벤트: 14:35 부터 발생하는 이벤트
만약 ecommerce_event
데이터 셋을 이용해 세션을 만든다면, Window 함수를 어떻게 사용할 수 있을까요? 다음 문서를 참조해 봅시다,
구현에 참조할 만한 몇 가지 아이디어는 다음과 같습니다.
Window Function 중 Lag 는, 현재 집합 (Window) 에 대해 이전의 값을 현재 컬럼에 붙일 수 있습니다.
event_time - Lag(event_time) > Threshold
Array Type & Unnest
SQL 에서 함수의 적용 대상은 date_to(event_time)
와 같이 일반적으로 Column 입니다. 그러나 경우에 따라 Array (배열) 를 취급해야 하는 경우가 있습니다.
이번 섹션에서는 ecommerce_event 데이터를 바탕으로 Array 연산을 알아보겠습니다.
데이터가 지금은 펼쳐진 상태이기에 몇몇 컬럼을 Array 타입으로 변경해보겠습니다. 브랜드별로 판매하는 product_id 를 배열로 묶어 하나의 컬럼으로 만들어 보면
Copy df\
.where(col("brand").isNotNull())\
.groupBy("brand")\
.agg(
collect_list("product_id").alias("product_id_list"),
size(collect_list("product_id")).alias("product_id_list_count"),
collect_set("product_id").alias("product_id_set"),
size(collect_set("product_id")).alias("product_id_set_count"),
)\
.show()
Copy +------------+--------------------+---------------------+--------------------+--------------------+
| brand| product_id_list|product_id_list_count| product_id_set|product_id_set_count|
+------------+--------------------+---------------------+--------------------+--------------------+
| beautix|[5862589, 5722999...| 19861|[5885135, 5706318...| 316|
| dr.gloderm|[5875651, 5875661...| 578|[5875646, 5865200...| 31|
| farmona|[5685354, 5685382...| 1484|[5685353, 5685348...| 46|
| profhenna|[5853441, 5853441...| 1531|[5853440, 5853419...| 56|
|invisibobble| [5597144, 5712776]| 2| [5712776, 5597144]| 2|
| riche|[5842213, 5842221...| 758|[5922120, 5842233...| 58|
| nova| [5755588]| 1| [5755588]| 1|
| oniq|[5884364, 5884359...| 24010|[5884342, 5834400...| 592|
| lebelage|[5830804, 5830809...| 714|[5866297, 5830820...| 45|
| vilenta|[5840375, 5840374...| 851|[5840374, 5840410...| 20|
| fancy|[5810171, 5810177...| 287|[5810175, 5817137...| 15|
| jaguar|[5809118, 5809118...| 1991|[5712807, 5712808...| 22|
| siberina|[5870946, 5914135...| 2392|[5914135, 5869563...| 181|
| tertio|[5865103, 5780729...| 1179|[5780317, 5771070...| 115|
| koreatida|[5868794, 5868794...| 119|[5868794, 5868792...| 3|
| jas|[5862545, 5862544...| 5882|[5862544, 5618277...| 16|
|rocknailstar|[5683806, 5683806...| 11|[5629064, 5630048...| 6|
| depilflax|[24390, 24390, 24...| 5651|[36210, 82960, 24...| 64|
|protokeratin|[5856625, 5856574...| 579|[5856533, 5856483...| 70|
| relouis|[5916864, 5916864...| 6234|[5916494, 5916617...| 94|
+------------+--------------------+---------------------+--------------------+--------------------+
Spark DataFrame 은 다양한 Array 함수를 제공합니다. 이름을 보고 어떤 역할을 할지 유추해본 뒤에, Spark 문서를 읽으며 확인해 봅시다.
이제 전체 브랜드에서 일별로 판매된 상품의 ID 를 중복 제거해 배열로 만들어 보겠습니다. 그리고 추후 분석을 위해 category_id
, category_code
컬럼을 묶으면
Copy df\
.selectExpr("CAST(event_time AS DATE) as event_date", "brand", "product_id", "ARRAY(category_code, category_id) as category")\
.where(
col("brand").isNotNull() &
((col("category_code").isNotNull()))
)\
.groupBy("event_date")\
.agg(
collect_set("product_id").alias("product_id_set"),
collect_set("category").alias("category_set")
)\
.orderBy(asc("event_date"))\
.limit(1)\
.show(truncate=False)
Copy +----------+--------------------+--------------------+
|event_date| product_id_set| category_set|
+----------+--------------------+--------------------+
|2020-01-01|[5911195, 5901859...|[[furniture.bathr...|
|2020-01-02|[5911195, 5901859...|[[furniture.bathr...|
|2020-01-03|[5911195, 5901859...|[[furniture.bathr...|
|2020-01-04|[5911195, 5901859...|[[furniture.bathr...|
|2020-01-05|[5911195, 5901859...|[[appliances.envi...|
|2020-01-06|[5911195, 5901859...|[[appliances.envi...|
|2020-01-07|[5911195, 5901859...|[[furniture.bathr...|
|2020-01-08|[5911195, 5901859...|[[furniture.bathr...|
|2020-01-09|[5911195, 5901859...|[[furniture.bathr...|
|2020-01-10|[5911195, 5901859...|[[appliances.envi...|
|2020-01-11|[5911195, 5901859...|[[appliances.envi...|
|2020-01-12|[5911195, 5901859...|[[appliances.envi...|
|2020-01-13|[5911195, 5807068...|[[appliances.envi...|
|2020-01-14|[5911195, 5901859...|[[furniture.bathr...|
|2020-01-15|[5911195, 5901859...|[[furniture.bathr...|
|2020-01-16|[5911195, 5889693...|[[appliances.envi...|
|2020-01-17|[5911195, 5889693...|[[furniture.bathr...|
|2020-01-18|[5911195, 5889693...|[[furniture.bathr...|
|2020-01-19|[5911195, 5901859...|[[appliances.envi...|
|2020-01-20|[5911195, 5901859...|[[appliances.envi...|
+----------+--------------------+--------------------+
# event_date
2020-01-01
# product_id
[5911195, 5901859, 5810081, 5914941, 5788783, 5889695, 5854574, 5767493, 5896431, 5665819, 5712700, 5885587, 24336, 5712694, 5901857, 5746974, 5835254, 5873430, 5820598, 5855509, 5873432, 5395, 5904212, 5856192, 5835931, 5877461, 5847851, 5749198, 5796100, 5753706, 5675830, 5809792, 5803579, 5915204, 24330, 5861762, 5884578, 5880245, 5825604, 5889696, 5767494, 5885596, 5824088, 5823936, 5861764, 5884580, 5854579, 5848272, 5885592, 5914105, 5810082, 5907080, 5901862, 5901854, 5809118, 5810084, 5746848, 5665820, 5855512, 5863222, 5820603, 5712695, 5830789, 5901858, 5914940, 5873431, 5788782, 5856191, 5712516, 5746846, 5855510, 24335, 5861767, 5835932, 55717, 5796101, 5824195, 5901869, 5885595, 5861763, 5823935, 5775813, 5884579, 5885291, 5732026, 5868559, 5885589, 5787829, 5901988, 5766983, 5901861, 82962, 5902675, 5810083, 5855511, 5856194, 5901855, 5665821, 5820604, 5766981, 5775822, 5855507, 5877459, 5788781, 5765551, 5907079, 24334, 5861766, 5771614, 5823938, 5856184, 5892336, 5830786, 5891197, 5796102, 5873428, 5743974, 5901870, 5775814, 5885588, 5901872, 5867188, 5885590, 5888124, 5861098, 82959, 5861760, 5753698, 5889694, 5767492, 5914107, 5885586, 5892330, 5913318, 5901987, 5889690, 5766980, 5835253, 24331, 5712697, 5855508, 5861765, 5854573, 5765552, 5901852, 5877462, 5847852, 5856193, 5830785, 5749199, 5848273, 5885593, 5861761, 5892337, 5884577, 5796103, 5877529, 5798933, 5825605, 5896187, 5759489, 5829359, 5823937, 5915788, 5884581, 5846774, 5753703, 5892335]
# [category_code, category_id]
[[furniture.bathroom.bath, 1487580011970953351], [furniture.living_room.cabinet, 2193074740619379535], [furniture.bathroom.bath, 2018287324474901238], [appliances.personal.hair_cutter, 1487580008070251489], [appliances.environment.vacuum, 1487580006350586771], [accessories.bag, 1487580010695884882], [accessories.cosmetic_bag, 1921723506584715388], [furniture.bathroom.bath, 2193074740686488401], [furniture.bathroom.bath, 1487580012147114126], [apparel.glove, 1487580012071616651], [apparel.glove, 2007399943458784057], [stationery.cartrige, 1487580013053083824]]
Window 함수와 array_intersect 를 사용해서 일별로 beautix 에서 전날과 비교했을때 당일에 팔린 상품의 중복이 많았는지 살펴보려면 어떻게 해야할까요? SQL 함수를 이용해서 작성해봅시다. 아래는 몇 가지 힌트입니다.
SELECT
CAST(event_time AS DATE) as event_date
collect_set(product_id) as products
array_intersect(products_current, products_prev) as products_common
Copy spark.sql("""
WITH GROUPED as (
SELECT CAST(event_time AS DATE) as event_date, collect_set(product_id) as product_id_list
FROM PURCHASE
WHERE brand = "beautix"
GROUP BY event_date
),
WINDOWED as (
SELECT
event_date,
product_id_list as products_current,
lag(product_id_list, 1) OVER (PARTITION BY 1 ORDER BY event_date ASC) as products_prev
FROM GROUPED
),
CALCULATED as (
SELECT
event_date,
products_current,
products_prev,
array_intersect(products_current, products_prev) as products_common,
size(array_intersect(products_current, products_prev)) as products_common_size
FROM WINDOWED
)
SELECT *
FROM CALCULATED
ORDER BY event_date ASC
""").show()
Copy +----------+--------------------+--------------------+--------------------+--------------------+
|event_date| products_current| products_prev| products_common|products_common_size|
+----------+--------------------+--------------------+--------------------+--------------------+
|2020-01-01|[5885135, 5842741...| null| null| -1|
|2020-01-02|[5842741, 5706333...|[5885135, 5842741...|[5842741, 5706333...| 63|
|2020-01-03|[5706333, 5733063...|[5842741, 5706333...|[5706333, 5733063...| 72|
|2020-01-04|[5842741, 5842712...|[5706333, 5733063...|[5706333, 5733063...| 73|
|2020-01-05|[5842712, 5862588...|[5842741, 5842712...|[5842712, 5862588...| 72|
|2020-01-06|[5706114, 5885135...|[5842712, 5862588...|[5842712, 5714043...| 88|
|2020-01-07|[5706114, 5842741...|[5706114, 5885135...|[5706114, 5733063...| 97|
|2020-01-08|[5842741, 5842706...|[5706114, 5842741...|[5842741, 5842706...| 84|
|2020-01-09|[5706114, 5885135...|[5842741, 5842706...|[5842706, 5912890...| 89|
|2020-01-10|[5706114, 5862588...|[5706114, 5885135...|[5706114, 5714043...| 106|
|2020-01-11|[5842741, 5862588...|[5706114, 5862588...|[5862588, 5714043...| 106|
|2020-01-12|[5885135, 5842741...|[5842741, 5862588...|[5842741, 5862588...| 102|
|2020-01-13|[5706114, 5842741...|[5885135, 5842741...|[5842741, 5862588...| 127|
|2020-01-14|[5733067, 5862588...|[5706114, 5842741...|[5733067, 5862588...| 117|
|2020-01-15|[5706114, 5842741...|[5733067, 5862588...|[5862588, 5714043...| 116|
|2020-01-16|[5862588, 5706333...|[5706114, 5842741...|[5862588, 5706333...| 123|
|2020-01-17|[5706114, 5862588...|[5862588, 5706333...|[5862588, 5842706...| 96|
|2020-01-18|[5706114, 5885135...|[5706114, 5862588...|[5706114, 5862588...| 86|
|2020-01-19|[5885135, 5842741...|[5706114, 5885135...|[5885135, 5842741...| 90|
|2020-01-20|[5885135, 5862588...|[5885135, 5842741...|[5885135, 5862588...| 112|
+----------+--------------------+--------------------+--------------------+--------------------+
아까 만들어 두었던 브랜드별 상품 및 카테고리 메타 데이터를 별도 DataFrame 으로 저장하면 다음과 같습니다.
Copy dfRaw = df\
.selectExpr("CAST(event_time AS DATE) as event_date", "brand", "product_id", "ARRAY(category_code, category_id) as category")\
.where(
col("brand").isNotNull() &
((col("category_code").isNotNull()))
)\
.groupBy("event_date")\
.agg(
collect_set("product_id").alias("product_id_set"),
collect_set("category").alias("category_set")
)
dfRaw.printSchema()
Copy root
|-- event_date: date (nullable = true)
|-- product_id_set: array (nullable = false)
| |-- element: integer (containsNull = false)
|-- category_set: array (nullable = false)
| |-- element: array (containsNull = false)
| | |-- element: string (containsNull = true)
이 중에서 product_id
, category_id
, category_code
를 추출해 상품 정보 데이터를 만들어 보겠습니다.
Copy dfRaw\
.withColumn("product_id", explode(col("product_id_set")))\
.withColumn("category_element", explode(col("category_set")))\
.withColumn("category_code", element_at(col("category_element"), 1))\
.withColumn("category_id", element_at(col("category_element"), 2))\
.select("product_id", "category_code", "category_id")\
.show(truncate=False)
Copy +----------+-------------------------------+-------------------+
|product_id|category_code |category_id |
+----------+-------------------------------+-------------------+
|5911195 |furniture.bathroom.bath |1487580011970953351|
|5911195 |furniture.living_room.cabinet |2193074740619379535|
|5911195 |furniture.bathroom.bath |2018287324474901238|
|5911195 |accessories.cosmetic_bag |1921723506584715388|
|5911195 |appliances.environment.vacuum |1487580006350586771|
|5911195 |accessories.bag |1487580010695884882|
|5911195 |appliances.personal.hair_cutter|1487580008070251489|
|5911195 |furniture.bathroom.bath |2193074740686488401|
|5911195 |furniture.bathroom.bath |1487580012147114126|
|5911195 |apparel.glove |1487580012071616651|
|5911195 |apparel.glove |2007399943458784057|
|5911195 |stationery.cartrige |1487580013053083824|
|5901859 |furniture.bathroom.bath |1487580011970953351|
|5901859 |furniture.living_room.cabinet |2193074740619379535|
|5901859 |furniture.bathroom.bath |2018287324474901238|
|5901859 |accessories.cosmetic_bag |1921723506584715388|
|5901859 |appliances.environment.vacuum |1487580006350586771|
|5901859 |accessories.bag |1487580010695884882|
|5901859 |appliances.personal.hair_cutter|1487580008070251489|
|5901859 |furniture.bathroom.bath |2193074740686488401|
+----------+-------------------------------+-------------------+
이렇게 한 Row 를 뻥튀기해 여러 Row 로 만드는 것을 Unnest, Explode 라고 부릅니다. 쿼리 엔진마다 이름이 조금씩 다르긴 하나, 대부분의 데이터 처리 엔진이 Unnest 함수를 지원합니다.
JSON Function & Struct Type
DB 를 다루는 경우에는 테이블 다루므로 대부분 정형화된 데이터입니다. 최근에는 RDB 에서 JSON 컬럼 타입도 지원하나, 일반적인 경우 서비스에서 자주 사용하진 않으므로 DB 데이터를 다루면서 JSON 타입을 만날 일은 적습니다.
그러나 Kafka 또는 Kinesis 로 전송되는 Client (App / Web) 또는 Server 이벤트의 경우 JSON 인 경우가 종종 있습니다.
Kinesis 등 받아주는 Queue 시스템의 제한이나, 사용성등의 이유로 사이즈를 포기하고 JSON 으로 전송될 수 있습니다.
혹은 원본 포맷 자체는 ProtoBuf, Avro 등으로 관리된다 하더라도 Relay 시에 가공 용이성을 이유로 JSON 으로 다른 브로커로 데이터가 변경될 수 있습니다.
물론 당연히 ProtoBuf, Avro 등 포맷을 유지하는것이 Consumer 입장에서 일부 비용을 희생해 (CPU 등) 강한 타입, 메세지 호환성 등 많은 장점을 얻을 수 있습니다.다만 이 섹션에서는 JSON 으로 들어오는 경우를 가정해 Spark DataFrame 의 JSON 함수를 이용해 데이터를 다루어 봅니다.
JSON 의 장점은, 복잡한 Structure Type 을 생산자가 별 노동 없이 Write 하고, 소비자도 별 노동 없이 당장은 Read 수 있다는 장점이 있습니다.문제는 오늘 들어오는 데이터의 타입이 그렇다고 하더라도, 자유롭게 변하므로 미래에도 그럴지는 알 수 없는것입니다.
JSON 함수를 사용해 데이터를 가공해 보겠습니다.
우선 현재 가진 데이터의 타입을 다시 살펴보고, to_json
을 JSON 컬럼으로 변경해보겠습니다.
Copy df.printSchema()
root
|-- event_time: string (nullable = true)
|-- event_type: string (nullable = true)
|-- product_id: integer (nullable = true)
|-- category_id: long (nullable = true)
|-- category_code: string (nullable = true)
|-- brand: string (nullable = true)
|-- price: double (nullable = true)
|-- user_id: integer (nullable = true)
|-- user_session: string (nullable = true)
Copy dfPrepared = df\
.select("brand", "product_id", "category_code", "category_id")\
.dropDuplicates()
dfPrepared.show()
Copy +---------+----------+-------------+-------------------+
| brand|product_id|category_code| category_id|
+---------+----------+-------------+-------------------+
| null| 5847345| null|1487580005092295511|
| null| 5700094| null|1487580011652186237|
| null| 5608703| null|1487580005553668971|
| null| 5561084| null|1487580005595612013|
| null| 5683960| null|1487580005268456287|
| null| 5886759| null|1487580006317032337|
| runail| 4958| null|1487580009471148064|
| nagaraku| 5847810| null|1487580013522845895|
| irisk| 5729898| null|1487580008145748965|
| staleks| 5899158| null|1487580009286598681|
| bluesky| 5804328| null|1487580005461394279|
숫자도 세보면, 중복이 제거되었음을 알 수 있습니다.
Copy df.count() # 4264752
dfPrepared.count() # 45720
Copy dfJson = dfPrepared\
.where(col("category_code").isNotNull())\
.withColumn("category", to_json(struct(col("category_id"), col("category_code"))))\
dfJson.printSchema()
dfJson.show(truncate=False)
Copy root
|-- brand: string (nullable = true)
|-- product_id: integer (nullable = true)
|-- category_code: string (nullable = true)
|-- category_id: long (nullable = true)
|-- category: string (nullable = true)
+--------+----------+-------------------------------+-------------------+-------------------------------------------------------------------------------------+
|brand |product_id|category_code |category_id |category |
+--------+----------+-------------------------------+-------------------+-------------------------------------------------------------------------------------+
|null |5902332 |apparel.glove |2007399943458784057|{"category_id":2007399943458784057,"category_code":"apparel.glove"} |
|null |5571162 |furniture.bathroom.bath |1487580012147114126|{"category_id":1487580012147114126,"category_code":"furniture.bathroom.bath"} |
|benovy |5911195 |apparel.glove |2007399943458784057|{"category_id":2007399943458784057,"category_code":"apparel.glove"} |
|null |5756536 |apparel.glove |2007399943458784057|{"category_id":2007399943458784057,"category_code":"apparel.glove"} |
|emil |5861764 |appliances.environment.vacuum |1487580006350586771|{"category_id":1487580006350586771,"category_code":"appliances.environment.vacuum"} |
|shik |5914940 |accessories.cosmetic_bag |1921723506584715388|{"category_id":1921723506584715388,"category_code":"accessories.cosmetic_bag"} |
|italwax |5775813 |stationery.cartrige |1487580013053083824|{"category_id":1487580013053083824,"category_code":"stationery.cartrige"} |
|null |5923808 |appliances.personal.hair_cutter|1487580008070251489|{"category_id":1487580008070251489,"category_code":"appliances.personal.hair_cutter"}|
|null |5856308 |furniture.living_room.chair |2022622168218599898|{"category_id":2022622168218599898,"category_code":"furniture.living_room.chair"} |
|max |5855509 |appliances.environment.vacuum |1487580006350586771|{"category_id":1487580006350586771,"category_code":"appliances.environment.vacuum"} |
|null |5813075 |furniture.bathroom.bath |1487580011970953351|{"category_id":1487580011970953351,"category_code":"furniture.bathroom.bath"} |
|benovy |5901872 |apparel.glove |2007399943458784057|{"category_id":2007399943458784057,"category_code":"apparel.glove"} |
|null |5912860 |furniture.bathroom.bath |2018287324474901238|{"category_id":2018287324474901238,"category_code":"furniture.bathroom.bath"} |
|nitrile |5889686 |apparel.glove |2007399943458784057|{"category_id":2007399943458784057,"category_code":"apparel.glove"} |
|null |5913116 |furniture.bathroom.bath |2018287324474901238|{"category_id":2018287324474901238,"category_code":"furniture.bathroom.bath"} |
|null |5713291 |appliances.personal.hair_cutter|1487580008070251489|{"category_id":1487580008070251489,"category_code":"appliances.personal.hair_cutter"}|
|null |5657896 |stationery.cartrige |1487580013053083824|{"category_id":1487580013053083824,"category_code":"stationery.cartrige"} |
|jessnail|5877460 |appliances.environment.vacuum |1487580006350586771|{"category_id":1487580006350586771,"category_code":"appliances.environment.vacuum"} |
|vosev |5904212 |accessories.bag |1487580010695884882|{"category_id":1487580010695884882,"category_code":"accessories.bag"} |
|kinetics|5829359 |furniture.bathroom.bath |1487580011970953351|{"category_id":1487580011970953351,"category_code":"furniture.bathroom.bath"} |
+--------+----------+-------------------------------+-------------------+-------------------------------------------------------------------------------------+
dfJson.printSchema()
에서 볼 수 있듯이 category 컬럼은 String 타입의 문자열이되, JSON 형식을 가지고 있습니다.
이제 dfJson
에서 category_code
, category_id
컬럼을 제거하고 category
컬럼을 바탕으로 JSON 함수를 이용해 category_code
, category_id
값을 다시 만들어 봅시다.
Copy dfCategory = dfJson.select("brand", "product_id", "category")
dfCategory.show(truncate=False)
Copy +--------+----------+-------------------------------------------------------------------------------------+
|brand |product_id|category |
+--------+----------+-------------------------------------------------------------------------------------+
|null |5902332 |{"category_id":2007399943458784057,"category_code":"apparel.glove"} |
|null |5571162 |{"category_id":1487580012147114126,"category_code":"furniture.bathroom.bath"} |
|benovy |5911195 |{"category_id":2007399943458784057,"category_code":"apparel.glove"} |
|null |5756536 |{"category_id":2007399943458784057,"category_code":"apparel.glove"} |
|emil |5861764 |{"category_id":1487580006350586771,"category_code":"appliances.environment.vacuum"} |
|shik |5914940 |{"category_id":1921723506584715388,"category_code":"accessories.cosmetic_bag"} |
|italwax |5775813 |{"category_id":1487580013053083824,"category_code":"stationery.cartrige"} |
Copy dfRecovered = dfCategory\
.withColumn("category_id", get_json_object(col("category"), "$.category_id"))\
.withColumn("category_code", get_json_object(col("category"), "$.category_code"))
dfRecovered.printSchema()
dfRecovered.show(truncate=False)
Copy root
|-- brand: string (nullable = true)
|-- product_id: integer (nullable = true)
|-- category: string (nullable = true)
|-- category_id: string (nullable = true)
|-- category_code: string (nullable = true)
+--------+----------+-------------------------------------------------------------------------------------+-------------------+-------------------------------+
|brand |product_id|category |category_id |category_code |
+--------+----------+-------------------------------------------------------------------------------------+-------------------+-------------------------------+
|null |5902332 |{"category_id":2007399943458784057,"category_code":"apparel.glove"} |2007399943458784057|apparel.glove |
|null |5571162 |{"category_id":1487580012147114126,"category_code":"furniture.bathroom.bath"} |1487580012147114126|furniture.bathroom.bath |
|benovy |5911195 |{"category_id":2007399943458784057,"category_code":"apparel.glove"} |2007399943458784057|apparel.glove |
|null |5756536 |{"category_id":2007399943458784057,"category_code":"apparel.glove"} |2007399943458784057|apparel.glove |
|emil |5861764 |{"category_id":1487580006350586771,"category_code":"appliances.environment.vacuum"} |1487580006350586771|appliances.environment.vacuum |
|shik |5914940 |{"category_id":1921723506584715388,"category_code":"accessories.cosmetic_bag"} |1921723506584715388|accessories.cosmetic_bag |
|italwax |5775813 |{"category_id":1487580013053083824,"category_code":"stationery.cartrige"} |1487580013053083824|stationery.cartrige |
|null |5923808 |{"category_id":1487580008070251489,"category_code":"appliances.personal.hair_cutter"}|1487580008070251489|appliances.personal.hair_cutter|
|null |5856308 |{"category_id":2022622168218599898,"category_code":"furniture.living_room.chair"} |2022622168218599898|furniture.living_room.chair |
|max |5855509 |{"category_id":1487580006350586771,"category_code":"appliances.environment.vacuum"} |1487580006350586771|appliances.environment.vacuum |
|null |5813075 |{"category_id":1487580011970953351,"category_code":"furniture.bathroom.bath"} |1487580011970953351|furniture.bathroom.bath |
|benovy |5901872 |{"category_id":2007399943458784057,"category_code":"apparel.glove"} |2007399943458784057|apparel.glove |
|null |5912860 |{"category_id":2018287324474901238,"category_code":"furniture.bathroom.bath"} |2018287324474901238|furniture.bathroom.bath |
|nitrile |5889686 |{"category_id":2007399943458784057,"category_code":"apparel.glove"} |2007399943458784057|apparel.glove |
|null |5913116 |{"category_id":2018287324474901238,"category_code":"furniture.bathroom.bath"} |2018287324474901238|furniture.bathroom.bath |
|null |5713291 |{"category_id":1487580008070251489,"category_code":"appliances.personal.hair_cutter"}|1487580008070251489|appliances.personal.hair_cutter|
|null |5657896 |{"category_id":1487580013053083824,"category_code":"stationery.cartrige"} |1487580013053083824|stationery.cartrige |
|jessnail|5877460 |{"category_id":1487580006350586771,"category_code":"appliances.environment.vacuum"} |1487580006350586771|appliances.environment.vacuum |
|vosev |5904212 |{"category_id":1487580010695884882,"category_code":"accessories.bag"} |1487580010695884882|accessories.bag |
|kinetics|5829359 |{"category_id":1487580011970953351,"category_code":"furniture.bathroom.bath"} |1487580011970953351|furniture.bathroom.bath |
+--------+----------+-------------------------------------------------------------------------------------+-------------------+-------------------------------+
Copy dfRecovered = dfCategory\
.withColumn("category_id", get_json_object(col("category"), "$.category_id").cast(LongType()))\
.withColumn("category_code", get_json_object(col("category"), "$.category_code"))
dfRecovered.printSchema()
Copy root
|-- brand: string (nullable = true)
|-- product_id: integer (nullable = true)
|-- category: string (nullable = true)
|-- category_id: long (nullable = true)
|-- category_code: string (nullable = true)
만약 타입이 고정되어 있다면 StructType (구조체 타입) 을 이용해 Class 처럼 타입을 형식화 할 수 있습니다.
Copy structCategory = StructType(
[
StructField("category_id", LongType(), True),
StructField("category_code", StringType(), True),
]
)
dfStructed = dfCategory\
.withColumn("category_parsed", from_json(col("category"), structCategory))
dfStructed.printSchema()
dfStructed.show(truncate=False)
Copy root
|-- brand: string (nullable = true)
|-- product_id: integer (nullable = true)
|-- category: string (nullable = true)
|-- category_parsed: struct (nullable = true)
| |-- category_id: long (nullable = true)
| |-- category_code: string (nullable = true)
+--------+----------+-------------------------------------------------------------------------------------+------------------------------------------------------+
|brand |product_id|category |category_parsed |
+--------+----------+-------------------------------------------------------------------------------------+------------------------------------------------------+
|null |5902332 |{"category_id":2007399943458784057,"category_code":"apparel.glove"} |{2007399943458784057, apparel.glove} |
|null |5571162 |{"category_id":1487580012147114126,"category_code":"furniture.bathroom.bath"} |{1487580012147114126, furniture.bathroom.bath} |
|benovy |5911195 |{"category_id":2007399943458784057,"category_code":"apparel.glove"} |{2007399943458784057, apparel.glove} |
|null |5756536 |{"category_id":2007399943458784057,"category_code":"apparel.glove"} |{2007399943458784057, apparel.glove} |
|emil |5861764 |{"category_id":1487580006350586771,"category_code":"appliances.environment.vacuum"} |{1487580006350586771, appliances.environment.vacuum} |
|shik |5914940 |{"category_id":1921723506584715388,"category_code":"accessories.cosmetic_bag"} |{1921723506584715388, accessories.cosmetic_bag} |
|italwax |5775813 |{"category_id":1487580013053083824,"category_code":"stationery.cartrige"} |{1487580013053083824, stationery.cartrige} |
|null |5923808 |{"category_id":1487580008070251489,"category_code":"appliances.personal.hair_cutter"}|{1487580008070251489, appliances.personal.hair_cutter}|
|null |5856308 |{"category_id":2022622168218599898,"category_code":"furniture.living_room.chair"} |{2022622168218599898, furniture.living_room.chair} |
|max |5855509 |{"category_id":1487580006350586771,"category_code":"appliances.environment.vacuum"} |{1487580006350586771, appliances.environment.vacuum} |
|null |5813075 |{"category_id":1487580011970953351,"category_code":"furniture.bathroom.bath"} |{1487580011970953351, furniture.bathroom.bath} |
|benovy |5901872 |{"category_id":2007399943458784057,"category_code":"apparel.glove"} |{2007399943458784057, apparel.glove} |
|null |5912860 |{"category_id":2018287324474901238,"category_code":"furniture.bathroom.bath"} |{2018287324474901238, furniture.bathroom.bath} |
|nitrile |5889686 |{"category_id":2007399943458784057,"category_code":"apparel.glove"} |{2007399943458784057, apparel.glove} |
|null |5913116 |{"category_id":2018287324474901238,"category_code":"furniture.bathroom.bath"} |{2018287324474901238, furniture.bathroom.bath} |
|null |5713291 |{"category_id":1487580008070251489,"category_code":"appliances.personal.hair_cutter"}|{1487580008070251489, appliances.personal.hair_cutter}|
|null |5657896 |{"category_id":1487580013053083824,"category_code":"stationery.cartrige"} |{1487580013053083824, stationery.cartrige} |
|jessnail|5877460 |{"category_id":1487580006350586771,"category_code":"appliances.environment.vacuum"} |{1487580006350586771, appliances.environment.vacuum} |
|vosev |5904212 |{"category_id":1487580010695884882,"category_code":"accessories.bag"} |{1487580010695884882, accessories.bag} |
|kinetics|5829359 |{"category_id":1487580011970953351,"category_code":"furniture.bathroom.bath"} |{1487580011970953351, furniture.bathroom.bath} |
+--------+----------+-------------------------------------------------------------------------------------+------------------------------------------------------+
category_parsed
컬럼이 struct 타입임을 볼 수 있습니다.
Practice
실습 과제입니다.
본 챕터 내의 Window Function -Attribution 문제를 DataFrame 을 이용해 구해봅니다.
본 챕터 내의 Window Function - Session 문제를 DataFrame 을 이용해 구해봅니다.
본 챕터 내의 dfRaw
를 DataFrame API 의 explode() 가 아니라 LATERAL VIEW 를 이용해서 풀어 헤쳐 봅니다. dfRaw 를 테이블로 등록하기 위해 다음 챕터에서 배울 `createOrReplaceTempView` 를 사용할 수 있습니다.
Copy dfRaw.createOrReplaceTempView("RAW")
spark.sql("""
SELECT ...
LATERAL VIEW ...
""")
Summary
아래는 이번 챕터에서 다룬 핵심 키워드입니다.
explode(), Unnest, LATERAL VIEW