dfListingSales.toPandas() 또는 dfListingSales.show() 를 호출 후 Spark UI SQL 탭에서 Plan 을 보면 다음과 같습니다.
# dfListingSales.show()+----------+--------------------+-----------+-----------+|listing_id| listing_name|price_order|count_order|+----------+--------------------+-----------+-----------+|192430|TREETOP VIEW ROOM...|10171.0|304||360|LoHi Secret garde...|5161.0|40||98014|Beautiful single ...|9240.0|154||590|Comfortable - an...|13642.0|249||74125|Spacious Cap Hill...|47635.0|365||39405|LoHi Secret garde...|7314.0|66||172196|Luxury Wash Park ...|8900.0|25||236207|Denver Penthouse ...|52325.0|221||59631|VICTORIAN TOWNHOM...|2682.0|18||283162|2000 SF-Lodo Club...|0.0|3||81540|Affordable S. Den...|8532.0|126||592| private|8360.0|190||217996| Highland Snug|3292.0|32||90307|Comfy King Size R...|17100.0|190||56185|charming home for...|7400.0|24||184529|HIP SUITE IN WES...|6965.0|199||98008|Beautiful sun fil...|10780.0|154||1940|Baker Studio Clos...|23507.0|270||31503|Highland Park Gue...|2044.0|24||142683|Historic Denver C...|38750.0|293|+----------+--------------------+-----------+-----------+
재밌는 부분은 explain("FORMATTED") 에서는 Sort-merge Join 전략을 사용한다고 나왔었는데, 실제로는 Broadcast Hash Join 전략을 사용해습니다. 이는 Spark 3 의 AQE (Adaptive Query Execution) 가 동작하면서 비용이 더 저렴한 Broadcast Hash Join 전략을 선택하게 된 것인데, 추후 아래의 Join Strategy 및 AQE 섹션에서 살펴 보겠습니다.
2. dfListingMeta 를 기준으로 월별로, 리뷰가 남은 업체에 대해서만 리뷰 숫자를 집계하기
두번째 문제는 쉽습니다. dfListingMeta, dfListingReview 를 이용하되 INNER JOIN 을 사용하면 됩니다.
dfListingReviewStat = dfListingReview\.groupBy(col("listing_id"))\.agg(count("*").alias("count_review"))dfListingReview = dfListingMeta\.select(col("id").alias("listing_id"), col("name").alias("listing_name"))\.alias("LISTING_META")\.join( other = dfListingReviewStat.alias("LISTING_REVIEW"), on =col("LISTING_META.listing_id") ==col("LISTING_REVIEW.listing_id"), how ="inner" )\.select(col("LISTING_META.listing_id"), col("LISTING_META.listing_name"), coalesce(col("LISTING_REVIEW.count_review"), lit(0)).alias("count_review") )
# dfListingReview.show()+----------+--------------------+------------+|listing_id| listing_name|count_review|+----------+--------------------+------------+|1143676|Modern Sizzle-New...|35||9237825|Remodeled, hip, p...|68||13204273|Cozy N. Park Hill...|188||15989734|Expansive Washing...|56||18879542|SWEET Master Suit...|5||20022219|Queen Irene's B&B...| 23||20690944|Capital Hill Cond...|102||24809940| Cute and Close!|5||26366956|Private Modern St...|92||28014823|Renovated, pictur...|35||28392632| Lowry getaway|89||28514634|Three Bedroom, S....|60||29132612|Cosy walkout Base...|21||32985397|Beautiful Histori...|25||33491696|Single bedroom an...|2||34616832|Larimer Luxury Do...|12||36283016|Old schoolhouse 2...|2||38409657|★Amazing Home on ...|6||40141597|Spacious 2 Story ...|1||8066392|Historic Home Nea...|102|+----------+--------------------+------------+
dfListingMeta 와 dfListingReview (INNER JOIN 집계 결과) 를 보면, dfListingReview 가 더 적음을 볼 수 있습니다. 이는 INNER JOIN 과정에서 리뷰가 없는 숙소를 제거했기 때문입니다.
이제 dfListingCalendar 내에서 일별로 예약건이 있었던 "업체 숫자" 를 구해봅시다. 다만 사전에 한 가지 변경해야 할 것은, dfListingCalendar 의 date 컬럼은 timestamp 타입이고 직접 만든 dfCalendarMeta 내의 date 컬럼은 date 타입니다. (예제의 단순함을 위해 날짜는 UTC 기준이라 가정)
양쪽 테이블에 모두 똑같은 Join Hint 가 지정된다면 (Broadcast 및 Shuffle Hash) Spark 는 테이블의 사이즈를 고려해 작은쪽을 기반 테이블로 사용합니다.
SQL 힌트는 다음처럼 제공할 수 있습니다.
-- Join Hints for broadcast joinSELECT/*+ BROADCAST(t1) */*FROM t1 INNER JOIN t2 ON t1.key = t2.key;SELECT/*+ BROADCASTJOIN (t1) */*FROM t1 left JOIN t2 ON t1.key = t2.key;SELECT/*+ MAPJOIN(t2) */*FROM t1 right JOIN t2 ON t1.key = t2.key;-- Join Hints for shuffle sort merge joinSELECT/*+ SHUFFLE_MERGE(t1) */*FROM t1 INNER JOIN t2 ON t1.key = t2.key;SELECT/*+ MERGEJOIN(t2) */*FROM t1 INNER JOIN t2 ON t1.key = t2.key;SELECT/*+ MERGE(t1) */*FROM t1 INNER JOIN t2 ON t1.key = t2.key;-- Join Hints for shuffle hash joinSELECT/*+ SHUFFLE_HASH(t1) */*FROM t1 INNER JOIN t2 ON t1.key = t2.key;-- Join Hints for shuffle-and-replicate nested loop joinSELECT/*+ SHUFFLE_REPLICATE_NL(t1) */*FROM t1 INNER JOIN t2 ON t1.key = t2.key;-- When different join strategy hints are specified on both sides of a join, Spark-- prioritizes the BROADCAST hint over the MERGE hint over the SHUFFLE_HASH hint-- over the SHUFFLE_REPLICATE_NL hint.-- Spark will issue Warning in the following example-- org.apache.spark.sql.catalyst.analysis.HintErrorLogger: Hint (strategy=merge)-- is overridden by another hint and will not take effect.SELECT/*+ BROADCAST(t1), MERGE(t1, t2) */*FROM t1 INNER JOIN t2 ON t1.key = t2.key;
Spark SQL 을 이용해 다음 집계를 수행해봅시다
각 업체에 대해 2020년 1월부터 2020년 12월까지 빈 월이 없도록 축을 만들고 (CROSS JOIN)
리뷰의 월별 누적 집계를 구해봅시다. (Window Function)
Join Strategy
이제 각 조인 전략의 동작 방식에 대해 알아보겠습니다. Spark 에서는 사용자가 직접 Hint 로 지정하지 않는 경우 Join 시 양 테이블의 데이터 사이즈를 고려해 최적화된 Join 전략을 사용합니다.
= 를 이용한 동등 조인 (Equi) 일 경우와 아닐 경우에 따라 사용할 수 있는 Join 전략이 다릅니다.
Equi Join 의 경우 Broadcast, Shuffle Hash, Sort Merge, Shuffle Replicate NL 전략이 사용 가능합니다
Non Equi Join 의 경우 Broadcast, Shuffle Replicate NL 전략만 사용이 가능합니다.
우선 Join 과정에서 가장 먼저 생각해야 할 것은, "데이터의 이동" 입니다.
즉 데이터를 어떻게 이동시킬것이냐에 따라 Broadcast / Shuffle 방식이 구분됩니다.
그리고 그 이후에 Hash, Sort-merge, Nested Loop 등 구체적인 Join 알고리즘이 구분이 됩니다.
Broadcast 방식은 작은 사이즈의 테이블 (DataFrame) 을 Driver 를 거쳐 각 Worker 노드에 전부 뿌린 뒤 Shuffle 없이 Join 을 진행합니다.
따라서 Broadcast 되는 대상 테이블이 크다면 Driver 의 메모리가 터질 수 있습니다
Shuffle 이 없으므로 네트워크를 통한 데이터 이동이 없어 Join 속도가 매우 빠릅니다
반면 Shuffle 을 이용한 Join 은 데이터를 각 노드마다 이동 시키므로 네트워크 비용이 들어 속도가 느린 반면 큰 데이터까지 Join 할 수 있습니다.
Broadcast Nested Loop Join 은 선택할수 있는 전략이 없을 경우 마지막으로 선택되는 Join 전략입니다. Broadcast Nested Loop 는 작은 데이터셋을 Broadcast 한 후 이중 반복문을 돌며 하나씩 데이터를 비교해 Join 하는 방식입니다.
정렬이 되어있지 않은 Nested Loop 방식과 정렬이 되어있는 Sort Merge 방식은 어떤 차이점이 있을까요?
5. Cartesian Join
Cross Join 은 키가 지정되지 않을 경우 사용되며, Cross Join 사용을 위해서는 spark.sql.crossJoin.enabled=true 옵션이 활성화 되어야있어야 합니다.
Adaptive Query Execution
Spark 의 Cost-based Optimizer (CBO) 는 Data 에 Row 등 각종 통계를 바탕으로 더 나은 실행 계획을 결정하도록 돕습니다. 예를 들어 Broadcast Hash Join 대신 Sort Merge Join 을 사용하거나, Hash Join 일 경우 Build Side (메모리에 해싱되어 올라갈 쪽) 을 정할 수 있습니다.
그러나 항상 모든 시점에 최신값의 데이터에 대한 통계를 이용할 수 있는것은 아니며 이런 부정확한 통계값들은 최적의 실행계획을 만드는데 도움이 되진 못했습니다.
Spark 3.0 부터는 Adaptive Query Execution (이하 AQE) 추가 되었고, 이 기능으로 인해 더 나은 실행 계획을 사용자의 간섭 없이 Spark 가 자동적으로 만들 수 있습니다.
Spark Stage 가 실행되면서 중간중간 AQE 는 종료된 Stage 를 바탕으로 런타임 통계를 업데이트하고, 이 통계 정보를 바탕으로 다시 최적화를 수행합니다. Spark 3.0 에서는 AQE 의 3가지 기능이 포함되었습니다.
Dynamic Coalescing Shuffle Partitions
Dynamically Switching Joni Strategies
Dynamically Optimizing Skew Joins
AQE 사용을 위해서는 Spark 3.0+ 에서 spark.sql.adaptive.enabled 활성화 되어 있어야 합니다. (Default = true)
Aggregation 과정에서 발생하는 Shuffle 은 네트워크 데이터 전송 및 Disk 쓰기 포함하기 때문에 굉장히 비싼 연산입니다. spark.sql.shuffle.partitions 는 기본값 200 으로 작은 데이터셋에 대해서는 매우 큰 숫자인데, AQE 를 통해 파티션을 병합하고 성능을 높일 수 있습니다.
위의 그림에서는 Mapping Stage 에서 더 적은 수의 Partition 을 만들어 내며 그로인해 Reducer 숫자가 줄어든 것을 볼 수 있습니다.
If there are too few partitions, then the data size of each partition may be very large, and the tasks to process these large partitions may need to spill data to disk (e.g., when sort or aggregate is involved) and, as a result, slow down the query.
If there are too many partitions, then the data size of each partition may be very small, and there will be a lot of small network data fetches to read the shuffle blocks, which can also slow down the query because of the inefficient I/O pattern. Having a large number of tasks also puts more burden on the Spark task scheduler.
2. Dynamically Switching Join Strategies
앞서 언급했던 것처럼, Build Side 의 사이즈가 작다면 일반적으로는 Broadcast Hash Join 이 가장 성능이 좋습니다. Spark 는 Broadcast Hash Join 을 위해 Threshold (Size) 설정을 사용하는데, 추정된 테이블 사이즈가 실제로는 더 작은 경우가 종종 있습니다.
AQE 는 정확한 Relation (테이블) Size 를 바탕으로 Broadcast Hash Join 이 가능한지 체크하고, 가능할 경우 Sort Merge Join 을 Broadcast Hash Join 으로 변경합니다.
위 그림에서는 Stage 2 를 실행 후 판별해보니 Broadcast 가 더 나을것 같아 이미 실행된 결과가 있음에도 (Map-side Shuffle Write) Broadcast Join 으로 변경하는 것을 보여줍니다.
AQE converts sort-merge join to broadcast hash join when the runtime statistics of any join side is smaller than the adaptive broadcast hash join threshold. This is not as efficient as planning a broadcast hash join in the first place, but it’s better than keep doing the sort-merge join, as we can save the sorting of both the join sides, and read shuffle files locally to save network traffic(if spark.sql.adaptive.localShuffleReader.enabled is true)
3. Dynamically Optimizing Skew Joins
AQE 는 런타임에 Shuffle 파일의 통계 정보를 바탕으로 Skew 를 일으키는 큰 파티션이 있을 경우 분할합니다. 위 사진에서는 A0 파티션이 상대적으로 커 Skew 를 일으킬 수가 있고, 이것이 AQE 에 의해 A0-1, A0-2 로 분할 된 것을 볼 수 있습니다.