이제 dfListingCalendar 내에서 일별로 예약건이 있었던 "업체 숫자" 를 구해봅시다. 다만 사전에 한 가지 변경해야 할 것은, dfListingCalendar 의 date 컬럼은 timestamp 타입이고 직접 만든 dfCalendarMeta 내의 date 컬럼은 date 타입니다. (예제의 단순함을 위해 날짜는 UTC 기준이라 가정)
양쪽 테이블에 모두 똑같은 Join Hint 가 지정된다면 (Broadcast 및 Shuffle Hash) Spark 는 테이블의 사이즈를 고려해 작은쪽을 기반 테이블로 사용합니다.
SQL 힌트는 다음처럼 제공할 수 있습니다.
-- Join Hints for broadcast join
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ BROADCASTJOIN (t1) */ * FROM t1 left JOIN t2 ON t1.key = t2.key;
SELECT /*+ MAPJOIN(t2) */ * FROM t1 right JOIN t2 ON t1.key = t2.key;
-- Join Hints for shuffle sort merge join
SELECT /*+ SHUFFLE_MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ MERGEJOIN(t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
-- Join Hints for shuffle hash join
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
-- Join Hints for shuffle-and-replicate nested loop join
SELECT /*+ SHUFFLE_REPLICATE_NL(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
-- When different join strategy hints are specified on both sides of a join, Spark
-- prioritizes the BROADCAST hint over the MERGE hint over the SHUFFLE_HASH hint
-- over the SHUFFLE_REPLICATE_NL hint.
-- Spark will issue Warning in the following example
-- org.apache.spark.sql.catalyst.analysis.HintErrorLogger: Hint (strategy=merge)
-- is overridden by another hint and will not take effect.
SELECT /*+ BROADCAST(t1), MERGE(t1, t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
Spark SQL 을 이용해 다음 집계를 수행해봅시다
각 업체에 대해 2020년 1월부터 2020년 12월까지 빈 월이 없도록 축을 만들고 (CROSS JOIN)
리뷰의 월별 누적 집계를 구해봅시다. (Window Function)
Join Strategy
이제 각 조인 전략의 동작 방식에 대해 알아보겠습니다. Spark 에서는 사용자가 직접 Hint 로 지정하지 않는 경우 Join 시 양 테이블의 데이터 사이즈를 고려해 최적화된 Join 전략을 사용합니다.
= 를 이용한 동등 조인 (Equi) 일 경우와 아닐 경우에 따라 사용할 수 있는 Join 전략이 다릅니다.
Equi Join 의 경우 Broadcast, Shuffle Hash, Sort Merge, Shuffle Replicate NL 전략이 사용 가능합니다
Non Equi Join 의 경우 Broadcast, Shuffle Replicate NL 전략만 사용이 가능합니다.
우선 Join 과정에서 가장 먼저 생각해야 할 것은, "데이터의 이동" 입니다.
즉 데이터를 어떻게 이동시킬것이냐에 따라 Broadcast / Shuffle 방식이 구분됩니다.
그리고 그 이후에 Hash, Sort-merge, Nested Loop 등 구체적인 Join 알고리즘이 구분이 됩니다.
Broadcast 방식은 작은 사이즈의 테이블 (DataFrame) 을 Driver 를 거쳐 각 Worker 노드에 전부 뿌린 뒤 Shuffle 없이 Join 을 진행합니다.
따라서 Broadcast 되는 대상 테이블이 크다면 Driver 의 메모리가 터질 수 있습니다
Shuffle 이 없으므로 네트워크를 통한 데이터 이동이 없어 Join 속도가 매우 빠릅니다
반면 Shuffle 을 이용한 Join 은 데이터를 각 노드마다 이동 시키므로 네트워크 비용이 들어 속도가 느린 반면 큰 데이터까지 Join 할 수 있습니다.
Join Type
Broadcast Hash Join
Equi Join
All Join Types (Full Outer Join 제외)
Shuffle Hash Join
Equi Join
All Join Types
Shuffle Sort Merge Join
Equi Join, Sortable Join Key
All Join Types
Broadcast nested Loop Join
Equi, Non-equi
All Join Types
Shuffle Replicate Nested Loop Join
(Cartesian Product Join)
Equi, Non-equi
Inner Like Joins only
Join 전략에 따라서 지원되는 Join 종류와 (Left, Full Outer 등) Join 키가 다 다릅니다. 그리고 특정 옵션의 여부도 Join 전략 사용에 영향을 줍니다.
spark.sql.autoBroadcastJoinThreshold 값은 기본으로 10 MiB 입니다. 만약 이 값을 -1 로 세팅하면 Brodacast Join 을 비활성화 합니다.
spark.sql.broadcastTimeout = 300 (초) 는 Broadcast Timeout 입니다
spark.sql.shuffle.partitions 옵션은 Join 또는 Aggregation 을 위한 Shuffle 발생시 사용할 파티션 숫자를 지정합니다.
이러한 Join 전략들은 일반적인 RDB 에서도 일부 지원됩니다. 다만 다루는 데이터의 크기와 Latency 가 다르므로 지원되는 전략이 다를 수 있습니다.
MySQL 의 경우에는 8.0.18+ 부터 Nested Loop 뿐만 아니라 Hash Join 을 지원합니다.
Hash Join 을 조금 더 알아보겠습니다.
작은 데이터 셋의 키를 조회하면서 자주 접근될 메모리에 Hash 셋을 만들고, 큰 테이블을 순회하면서 Hash 셋을 조회하면서 조인합니다.
1. Broadcast Hash Join
Broadcast Hash Join 은 작은 데이터셋을 Broadcast 한 후에 Executor 에서 Hash Join 을 수행합니다.
Broadcast 될 대상 테이블이 클 경우 Driver 를 거쳐서 Broadcasting 이 발생하므로 Driver OOM 또는 해시 테이블로 인한 Executor OOM 이 발생할 수 있습니다.
그러나 한쪽 데이터의 사이즈가 매우 작다면 Shuffle 이 발생하지 않고 상대적으로 적은 양의 데이터만 Executor 로 이동하면 되므로 네트워크 비용이 매우 저렴합니다.
2. Shuffle Hash Join
Shuffle Hash Join 은 Shuffle 을 발생시켜 데이터를 이동한 뒤 Hash Join 을 수행합니다.
작은 쪽, 즉 메모리에 Hash 셋을 만들 테이블은 spark.sql.autoBroadcastJoinThreshold (10 MiB, Default ) * spark.sql.shuffle.partitions (200, Default) = 2 GiB 보다 작아야 합니다.
Spark 2.3 부터는 Shuffle Hash Join 대신 Shuffle Sort Merge Join 이 기본 전략으로 세팅되어 있습니다. (spark.sql.join.preferSortMergeJoin = true, Default)
Shuffle Hash Join 은 큰 데이터셋을 다룰 수 있으나 Hash 셋을 빌드할때 메모리에 올려야 하므로 너무 크다면 Executor OOM 이 발생할 수 있습니다.
Shuffle Hash Join 과정에서 OOM 이 발생할 경우 어떻게 하면 OOM 을 방지할 수 있을까요?
spark.sql.shuffle.partitions 옵션을 바탕으로 생각해봅시다.
3. Shuffle Sort Merge Join
Shuffle Sort Merge Join 은 데이터를 Shuffle 시켜 정렬한 후 (Sort) Join 을 수행합니다.
Spark 2.3 부터 spark.sql.join.preferSortMergeJoin = true활성화 되어있어, 큰 데이터셋에 대해 주로 사용됩니다
다만 이름에서 알 수 있듯이 Join Key 가 정렬 가능해야 합니다
4. Broadcast Nested Loop Join
Broadcast Nested Loop Join 은 선택할수 있는 전략이 없을 경우 마지막으로 선택되는 Join 전략입니다. Broadcast Nested Loop 는 작은 데이터셋을 Broadcast 한 후 이중 반복문을 돌며 하나씩 데이터를 비교해 Join 하는 방식입니다.
정렬이 되어있지 않은 Nested Loop 방식과 정렬이 되어있는 Sort Merge 방식은 어떤 차이점이 있을까요?
5. Cartesian Join
Cross Join 은 키가 지정되지 않을 경우 사용되며, Cross Join 사용을 위해서는 spark.sql.crossJoin.enabled=true 옵션이 활성화 되어야있어야 합니다.
Adaptive Query Execution
Spark 의 Cost-based Optimizer (CBO) 는 Data 에 Row 등 각종 통계를 바탕으로 더 나은 실행 계획을 결정하도록 돕습니다. 예를 들어 Broadcast Hash Join 대신 Sort Merge Join 을 사용하거나, Hash Join 일 경우 Build Side (메모리에 해싱되어 올라갈 쪽) 을 정할 수 있습니다.
그러나 항상 모든 시점에 최신값의 데이터에 대한 통계를 이용할 수 있는것은 아니며 이런 부정확한 통계값들은 최적의 실행계획을 만드는데 도움이 되진 못했습니다.
Spark 3.0 부터는 Adaptive Query Execution (이하 AQE) 추가 되었고, 이 기능으로 인해 더 나은 실행 계획을 사용자의 간섭 없이 Spark 가 자동적으로 만들 수 있습니다.
Spark Stage 가 실행되면서 중간중간 AQE 는 종료된 Stage 를 바탕으로 런타임 통계를 업데이트하고, 이 통계 정보를 바탕으로 다시 최적화를 수행합니다. Spark 3.0 에서는 AQE 의 3가지 기능이 포함되었습니다.
Dynamic Coalescing Shuffle Partitions
Dynamically Switching Joni Strategies
Dynamically Optimizing Skew Joins
AQE 사용을 위해서는 Spark 3.0+ 에서 spark.sql.adaptive.enabled 활성화 되어 있어야 합니다. (Default = true)
1. Dynamic Coalescing Shuffle Partitions
Aggregation 과정에서 발생하는 Shuffle 은 네트워크 데이터 전송 및 Disk 쓰기 포함하기 때문에 굉장히 비싼 연산입니다. spark.sql.shuffle.partitions 는 기본값 200 으로 작은 데이터셋에 대해서는 매우 큰 숫자인데, AQE 를 통해 파티션을 병합하고 성능을 높일 수 있습니다.
위의 그림에서는 Mapping Stage 에서 더 적은 수의 Partition 을 만들어 내며 그로인해 Reducer 숫자가 줄어든 것을 볼 수 있습니다.
If there are too few partitions, then the data size of each partition may be very large, and the tasks to process these large partitions may need to spill data to disk (e.g., when sort or aggregate is involved) and, as a result, slow down the query.
If there are too many partitions, then the data size of each partition may be very small, and there will be a lot of small network data fetches to read the shuffle blocks, which can also slow down the query because of the inefficient I/O pattern. Having a large number of tasks also puts more burden on the Spark task scheduler.
2. Dynamically Switching Join Strategies
앞서 언급했던 것처럼, Build Side 의 사이즈가 작다면 일반적으로는 Broadcast Hash Join 이 가장 성능이 좋습니다. Spark 는 Broadcast Hash Join 을 위해 Threshold (Size) 설정을 사용하는데, 추정된 테이블 사이즈가 실제로는 더 작은 경우가 종종 있습니다.
AQE 는 정확한 Relation (테이블) Size 를 바탕으로 Broadcast Hash Join 이 가능한지 체크하고, 가능할 경우 Sort Merge Join 을 Broadcast Hash Join 으로 변경합니다.
위 그림에서는 Stage 2 를 실행 후 판별해보니 Broadcast 가 더 나을것 같아 이미 실행된 결과가 있음에도 (Map-side Shuffle Write) Broadcast Join 으로 변경하는 것을 보여줍니다.
AQE converts sort-merge join to broadcast hash join when the runtime statistics of any join side is smaller than the adaptive broadcast hash join threshold. This is not as efficient as planning a broadcast hash join in the first place, but it’s better than keep doing the sort-merge join, as we can save the sorting of both the join sides, and read shuffle files locally to save network traffic(if spark.sql.adaptive.localShuffleReader.enabled is true)
3. Dynamically Optimizing Skew Joins
AQE 는 런타임에 Shuffle 파일의 통계 정보를 바탕으로 Skew 를 일으키는 큰 파티션이 있을 경우 분할합니다. 위 사진에서는 A0 파티션이 상대적으로 커 Skew 를 일으킬 수가 있고, 이것이 AQE 에 의해 A0-1, A0-2 로 분할 된 것을 볼 수 있습니다.
Summary
아래는 이번 시간에 다룬 핵심 키워드들 입니다.
Broadcast Hash Join
Broadcast Nested Loop Join
Shuffle Hash Join
Shuffle Sort Merge Join
Cartesian Product Join
AQE
Spark 는 분산 처리 컴퓨팅 엔진이므로 일반적인 RDMS 보다는 조금 더 폭넓게 Join Strategy 를 지원합니다. 아래는 Spark 에서 사용 가능한 Join 과 Strategy 입니다. ()
Spark Join Strategies ()
단, 이 때 NULL 이 허용되지 않으므로 판매 금액이나 예약 숫자가 없는 경우는 0 으로 처리하기 위해 를 사용합니다.
재밌는 부분은 explain("FORMATTED") 에서는 Sort-merge Join 전략을 사용한다고 나왔었는데, 실제로는 Broadcast Hash Join 전략을 사용해습니다. 이는 Spark 3 의 AQE () 가 동작하면서 비용이 더 저렴한 Broadcast Hash Join 전략을 선택하게 된 것인데, 추후 아래의 Join Strategy 및 AQE 섹션에서 살펴 보겠습니다.
함수를 이용해봅시다. 단, spark.createDataFrame 을 사용해 만드는건 번잡하니 SQL 을 이용해 봅시다.
지난 챕터에서의 샘플과 에서 볼 수 있듯이 SQL 로 Join 이 가능합니다. Spark 버전이 3 가 되면서 SQL Join 부분에 변화가 있다면, Broadcast Join 이외에도 다른 타입의 힌트가 지원이 된다는 점 입니다.
Spark 3.2.0 의 파일을 살펴보면, Spark 는 5 가지의 Join 을 지원함을 알 수 있습니다.
예를 들어 Spark 3 부터는 Cross Join (Cartesian Product) 를 사용하기 위한 옵션인 spark.sql.crossJoin.enabled 이 자동으로 활성화 되어 있습니다. ()
Broadcast Hash Join 은 사용자가 를 지정하거나 지정하지 않았더라도 한쪽 테이블의 사이즈가 spark.sql.autoBroadcastJoinThreshold = 10 MiB (Default) 보다 작으면 실행됩니다
Spark 1.6 에서 () 제거되었으나 Spark 2.0 에서 () 다시 추가되었습니다.
Join 과정에서 Shuffle Hash Join 과 달리 Memory 가 아닌 Disk 를 이용할 수 있기 때문에 OOM 이 발생하지 않습니다 (, )
문서에서 AQE 관련 설정을 볼 수 있습니다.
Partition 의 숫자는 Shuffle 에 어떤 영향을 미칠까요? 을 보면서 이야기를 나누어 봅시다.