이제 DataFrame API 를 이용해 Join 을 해보겠습니다. 다음 두 케이스들을 중심으로 코드를 작성해보겠습니다
dfListingMeta 를 기준으로 전체 기간 내 예약 수 / 판매 금액 합을 구합니다 (available = 'f' 인 경우를 예약이 되었다고 가정합니다.) 단 NULL 값이 예약 수와 금액 합에 없어야 합니다. NULL 일 경우 0 으로 변경합니다.
dfListingMeta 를 기준으로 월별로, 리뷰가 남은 업체에 대해서만 리뷰 숫자를 집계합니다.
dfListingCalendar 내의 12월 일별 기간을 기준으로 (단, 빈 날짜가 없어야 함) 일별 예약이 있었던 업체 숫자를 집계합니다.
dfListingMeta 를 축으로 사용할 경우, 컬럼은 listing_id, listing_name 만 사용합니다.
1. Listing 기준으로 업체 예약 수 및 판매 금액 합 구하기
이제 dfListingMeta 를 축으로 삼아 LEFT JOIN 을 통해 붙여보겠습니다.
단, 이 때 NULL 이 허용되지 않으므로 판매 금액이나 예약 숫자가 없는 경우는 0 으로 처리하기 위해 coleasce() 를 사용합니다.
특정 Join 타입에서 테이블 (DataFrame) Alias 를 지정하지 않을 경우 사용자의 의도와는 다른 결과가 나올 수 있습니다. 따라서 DataFrame 및 컬럼 사용시 alias 를 지정하는 편이 좋습니다.
만약 여러분이 위에서 나온 예제처럼이 아니라 DataFrame 의 이름을 이용하고 싶다면
dfListingMeta("listing_id") 처럼 특정 DataFrame 내 컬럼을 명시적으로 지정할 수 있습니다.
이제 explain 을 해보면 다음과 같습니다. Sort Merge Join 전략이 선택되어 LEFT (OUTER) 을 하는걸 볼 수 있습니다.
dfListingSales.toPandas() 또는 dfListingSales.show() 를 호출 후 Spark UI SQL 탭에서 Plan 을 보면 다음과 같습니다.
Spark UI - Broadcast Join 2 (LEFT)
재밌는 부분은 explain("FORMATTED") 에서는 Sort-merge Join 전략을 사용한다고 나왔었는데, 실제로는 Broadcast Hash Join 전략을 사용해습니다. 이는 Spark 3 의 AQE (Adaptive Query Execution) 가 동작하면서 비용이 더 저렴한 Broadcast Hash Join 전략을 선택하게 된 것인데, 추후 아래의 Join Strategy 및 AQE 섹션에서 살펴 보겠습니다.
2. dfListingMeta 를 기준으로 월별로, 리뷰가 남은 업체에 대해서만 리뷰 숫자를 집계하기
두번째 문제는 쉽습니다. dfListingMeta, dfListingReview 를 이용하되 INNER JOIN 을 사용하면 됩니다.
dfListingMeta 와 dfListingReview (INNER JOIN 집계 결과) 를 보면, dfListingReview 가 더 적음을 볼 수 있습니다. 이는 INNER JOIN 과정에서 리뷰가 없는 숙소를 제거했기 때문입니다.
일반적으로는 데이터가 없더라도 축이되는 Row 를 제거하지 않는 편이 좋습니다.
0 일 경우는 데이터가 없다고 이해할 수 있지만
NULL 일 경우는 데이터가 없어서인지, 아니면 집계 과정에서 코드 실수로 인해 NULL 이 나온건지 판별하기 어렵기 때문입니다.
3. `dfListingCalendar` 내의 12월 일별 기간을 기준으로 (단, 빈 날짜가 없어야 함) 일별 예약이 있었던 업체 숫자를 집계하기
dfListingCalender 내에 있는 데이터가, 만약 연속적이지 않다면 어떻게 빈 날짜를 메꿀 수 있을까요?
sequence() 함수를 이용해봅시다. 단, spark.createDataFrame 을 사용해 만드는건 번잡하니 SQL 을 이용해 봅시다.
만약 기간을 월단위로 잡고 싶다면 MONTH Interval 을 사용할 수 있습니다.
이제 dfListingCalendar 내에서 일별로 예약건이 있었던 "업체 숫자" 를 구해봅시다. 다만 사전에 한 가지 변경해야 할 것은, dfListingCalendar 의 date 컬럼은 timestamp 타입이고 직접 만든 dfCalendarMeta 내의 date 컬럼은 date 타입니다. (예제의 단순함을 위해 날짜는 UTC 기준이라 가정)
이제 아래와 타입을 변경하고 2019. 12 월 이벤트만 필터링 해서 집계를 해보겠습니다.
위의 예제에서 dateStart, dateEnd 처럼 Driver 내 변수를 Executor 에서 사용한다면 어떤 과정을 거칠까요? Serialization, Deserialization 관점에서 이야기 해보고 문제가 될 수 있을지도 생각해 봅시다.
Broadcast Nested Loop Join 은 선택할수 있는 전략이 없을 경우 마지막으로 선택되는 Join 전략입니다. Broadcast Nested Loop 는 작은 데이터셋을 Broadcast 한 후 이중 반복문을 돌며 하나씩 데이터를 비교해 Join 하는 방식입니다.
정렬이 되어있지 않은 Nested Loop 방식과 정렬이 되어있는 Sort Merge 방식은 어떤 차이점이 있을까요?
5. Cartesian Join
Spark UI - Cartesian Join
Cross Join 은 키가 지정되지 않을 경우 사용되며, Cross Join 사용을 위해서는 spark.sql.crossJoin.enabled=true 옵션이 활성화 되어야있어야 합니다.
Adaptive Query Execution
Spark 의 Cost-based Optimizer (CBO) 는 Data 에 Row 등 각종 통계를 바탕으로 더 나은 실행 계획을 결정하도록 돕습니다. 예를 들어 Broadcast Hash Join 대신 Sort Merge Join 을 사용하거나, Hash Join 일 경우 Build Side (메모리에 해싱되어 올라갈 쪽) 을 정할 수 있습니다.
그러나 항상 모든 시점에 최신값의 데이터에 대한 통계를 이용할 수 있는것은 아니며 이런 부정확한 통계값들은 최적의 실행계획을 만드는데 도움이 되진 못했습니다.
Spark 3.0 부터는 Adaptive Query Execution (이하 AQE) 추가 되었고, 이 기능으로 인해 더 나은 실행 계획을 사용자의 간섭 없이 Spark 가 자동적으로 만들 수 있습니다.
Aggregation 과정에서 발생하는 Shuffle 은 네트워크 데이터 전송 및 Disk 쓰기 포함하기 때문에 굉장히 비싼 연산입니다. spark.sql.shuffle.partitions 는 기본값 200 으로 작은 데이터셋에 대해서는 매우 큰 숫자인데, AQE 를 통해 파티션을 병합하고 성능을 높일 수 있습니다.
위의 그림에서는 Mapping Stage 에서 더 적은 수의 Partition 을 만들어 내며 그로인해 Reducer 숫자가 줄어든 것을 볼 수 있습니다.
If there are too few partitions, then the data size of each partition may be very large, and the tasks to process these large partitions may need to spill data to disk (e.g., when sort or aggregate is involved) and, as a result, slow down the query.
If there are too many partitions, then the data size of each partition may be very small, and there will be a lot of small network data fetches to read the shuffle blocks, which can also slow down the query because of the inefficient I/O pattern. Having a large number of tasks also puts more burden on the Spark task scheduler.
앞서 언급했던 것처럼, Build Side 의 사이즈가 작다면 일반적으로는 Broadcast Hash Join 이 가장 성능이 좋습니다. Spark 는 Broadcast Hash Join 을 위해 Threshold (Size) 설정을 사용하는데, 추정된 테이블 사이즈가 실제로는 더 작은 경우가 종종 있습니다.
AQE 는 정확한 Relation (테이블) Size 를 바탕으로 Broadcast Hash Join 이 가능한지 체크하고, 가능할 경우 Sort Merge Join 을 Broadcast Hash Join 으로 변경합니다.
위 그림에서는 Stage 2 를 실행 후 판별해보니 Broadcast 가 더 나을것 같아 이미 실행된 결과가 있음에도 (Map-side Shuffle Write) Broadcast Join 으로 변경하는 것을 보여줍니다.
AQE converts sort-merge join to broadcast hash join when the runtime statistics of any join side is smaller than the adaptive broadcast hash join threshold. This is not as efficient as planning a broadcast hash join in the first place, but it’s better than keep doing the sort-merge join, as we can save the sorting of both the join sides, and read shuffle files locally to save network traffic(if spark.sql.adaptive.localShuffleReader.enabled is true)
AQE 는 런타임에 Shuffle 파일의 통계 정보를 바탕으로 Skew 를 일으키는 큰 파티션이 있을 경우 분할합니다. 위 사진에서는 A0 파티션이 상대적으로 커 Skew 를 일으킬 수가 있고, 이것이 AQE 에 의해 A0-1, A0-2 로 분할 된 것을 볼 수 있습니다.
-- Join Hints for broadcast join
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ BROADCASTJOIN (t1) */ * FROM t1 left JOIN t2 ON t1.key = t2.key;
SELECT /*+ MAPJOIN(t2) */ * FROM t1 right JOIN t2 ON t1.key = t2.key;
-- Join Hints for shuffle sort merge join
SELECT /*+ SHUFFLE_MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ MERGEJOIN(t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
-- Join Hints for shuffle hash join
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
-- Join Hints for shuffle-and-replicate nested loop join
SELECT /*+ SHUFFLE_REPLICATE_NL(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
-- When different join strategy hints are specified on both sides of a join, Spark
-- prioritizes the BROADCAST hint over the MERGE hint over the SHUFFLE_HASH hint
-- over the SHUFFLE_REPLICATE_NL hint.
-- Spark will issue Warning in the following example
-- org.apache.spark.sql.catalyst.analysis.HintErrorLogger: Hint (strategy=merge)
-- is overridden by another hint and will not take effect.
SELECT /*+ BROADCAST(t1), MERGE(t1, t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;