2.4.4 Streaming Window

이번 챕터에서는 Streaming 시스템에서 사용되는 Window 의 개념과 사용법에 대해서 배웁니다. API 별 Window 관련 내용은 다음 Spark 공식 문서에서 살펴볼 수 있습니다.

Streaming Application 에서 데이터는 실시간으로 들어옵니다. 이 때 '시간' 은 두 가지로 분류됩니다.

  • Event Timestamp: 이벤트 (데이터) 에 남은 시간 값입니다. App / Web 의 경우 OS 에 세팅된 시간일 수 있습니다.

  • Processing Timestamp: Streaming Application (Spark) 이 해당 데이터를 처리하는 시간입니다. 아래 그림과 같이 Event Time 과 Processing Time 은 크게 차이날 수 있습니다.

이러한 개념은 Spark 뿐만 아니라 다른 프레임워크인 Flink 에서도 볼 수 있듯이 일반적입니다.

Client 에 남는 Event Timestamp 는 부정확하거나 적시에 데이터가 들어오지 않을 수 있습니다. 다음 경우를 생각해봅시다.

  • Client (Browser, App 등) 에서 시간 세팅이 제대로 되어있지 않을 경우 과거나 미래 데이터가 들어올 수 있을지

  • Client 의 종료나 Network 오류로 인해 어제 발생한 이벤트가 다음날 또는 몇분 뒤에 들어올 수 있을지,

  • 혹은 전송 과정에서 통신 망의 이슈로 특정 로그의 순서가 뒤바뀌어 올 수 있을지 (Click 이 먼저 들어오고 Impression 로그가 나중에 들어옴)

Window

Streaming Application 는 Window 통해 데이터를 어느 기간동안 모아서 처리할지 지정합니다. 일반적으로 3 종류의 Window 로 나눕니다.

  • Fixed Window (Tumbling Wnidow) 는 겹치는 기간 없이 지정된 기간동안의 데이터만 처리합니다.

  • Sliding Window : 현재 기간과 과거 기간의 일부를 겹쳐서 처리합니다. 과거 기간의 일부를 포함해 상태를 만들거나 판별할 때 사용할 수 있습니다.

  • Session Window: 특정 Key (주로 사용자 ID 등) 를 중심으로 이벤트의 발생 유무에 따라 기간을 처리합니다. 즉 이벤트가 연속적으로 (지정된 Gap 이하의 시간동안) 들어오면 Window 기간이 늘어날 수 있습니다.

    • Spark 3.2.0+ 부터 Structured Streaming 에서 지원됩니다.

이제 Window 타입을 하나씩 살펴보겠습니다.

Tumbling / Fixed Window

Tumbling Window 는 지정된 간격 동안의 데이터를 처리합니다. window 함수의 파라미터중 slideDuration 을 주지 않을 경우 Tumbling Window 가 됩니다.

window(
  timeColumn: Column,
  windowDuration: String): Column  (1)
  
window(
  timeColumn: Column,
  windowDuration: String,
  slideDuration: String): Column   (2)
  
window(
  timeColumn: Column,
  windowDuration: String,
  slideDuration: String,
  startTime: String): Column       (3)

아래는 groupBy 함수 내에서 Tumbling Window 를 사용하는 예제입니다.

val streamUserActivity = spark.readStream(...)

val = streamWindowed
    .groupBy(window(col("timestamp"), "5 minutes"))
    ...

Sliding Window

Sliding Window 는 Window Duration (Triggering 시점) 기준으로 좌우로 Sliding Duration 만큼의 Event Time 을 가진 데이터를 처리합니다.

val streamWindowed = streamUserActivity
    .groupBy(window(col("timestamp"), "10 minutes", "5 minutes"))
    ...

Session Window

Windowing 을 사용하면 Window 값을 가진 컬럼이 추가됩니다. (아래 예제에서는 session 이라는 이름으로 변경)

따라서 Windowding 을 단순히 컬럼을 추가하고 해당 컬럼에 맞는 이벤트만 뽑아내 Group By 하는것으로 단순화 해 생각할 수 있습니다.

val streamUserActivity = spark.readStream(...)

val streamWindowed = streamUserActivity
    .groupBy(
        session_window(col("timestamp"), "5 minutes").as("session"),
        col("userId").as("user")
    )
    ...

위 그림에서는 5분 단위 Gap 을 가진 사용자 ID 기준의 Session Window 를 사용했고, 이로 인해 10:12 데이터가 포함된 윈도우에는 기존 Window 에서 집계된 Count 가 없음을 볼 수 있습니다.

Structured Streaming 에서 Session Window 는 3.2.0+ 부터 지원되었습니다.

그 이전에는 사용자 Session 을 어떻게 구현했을까요? 다음 Github Code 와 Article 읽어보며 논의해 봅시다.

 val sessionUpdates = events
      .groupByKey(event => event.sessionId)
      .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) {

        case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>

          // If timed out, then remove session and send final update
          if (state.hasTimedOut) {
            val finalUpdate =
              SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = true)
            state.remove()
            finalUpdate
          } else {
            // Update start and end timestamps in session
            val timestamps = events.map(_.timestamp.getTime).toSeq
            val updatedSession = if (state.exists) {
              val oldSession = state.get
              SessionInfo(
                oldSession.numEvents + timestamps.size,
                oldSession.startTimestampMs,
                math.max(oldSession.endTimestampMs, timestamps.max))
            } else {
              SessionInfo(timestamps.size, timestamps.min, timestamps.max)
            }
            state.update(updatedSession)

            // Set timeout such that the session will be expired if no data received for 10 seconds
            state.setTimeoutDuration("10 seconds")
            SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = false)
          }
      }

이제 Window 를 Key 별로 시각화 해서 정리해 보면 아래와 같이 표현할 수 있습니다.

Watermark

Watermark 는 Streaming Framework 에서 "늦은" 데이터를 어떻게 처리할지를 (얼마나 늦게 들어온 이벤트를 Drop 할지) 지정할 수 있는 기능입니다. Processing Timestamp 를 사용하면 데이터의 '시간' 이 처리 시점이 되어서 늦는 경우가 없지만, 만약 Event Timestamp 를 사용할 경우 위에서 논의한 바와 같이 '늦게' 들어오는 이벤트가 생길 수 있습니다.

Streaming Watermark of a stateful streaming query is how long to wait for late and possibly out-of-order events until a streaming state can be considered final and not to change. Streaming watermark is used to mark events (modeled as a row in the streaming Dataset) that are older than the threshold as "too late", and not "interesting" to update partial non-final streaming state. (Streaming Watermark: Spark Internals)

withWatermark 를 통한 Watermark 지정은 Structured Streaming 에서 다음 두 가지 경우에 지원됩니다.

  • window 함수를 사용한 groupBy 집계 연산 (주의! groupByKey 가 아님)

  • mapGroupsWithState / flatMapGroupsWithState 를 사용한 사용자 관리 State 연산 (일반적으로 groupByKey 와 같이 사용

위 그림에서 분홍색의 케이스의 경우 (동일 상품 및 동일 사용자)

  • Click 이벤트는 12:06 시각에 발생했으나 (Event Time 기준)

  • Impr 이벤트가 12:09 에 발생했습니다 (Event Time 기준)

  • 일반적으로는 Impr (상품 노출) 이벤트가 Click (상품 클릭) 이벤트 보다 먼저 발생하기 때문에 순서가 뒤집어 진 경우임을 볼 수 있습니다.

만약 (Processing Timestamp 기준으로) 현재 시간이 12:10 이라고 하고 두 이벤트가 지금 Kafka 에서 Streaming Application 으로 들어왔다고 할 때

  • Click 이벤트는 12:06 시간에 발생했으나 12:10 에 처리가 되고

  • Impr 이벤트는 1209 시간에 발생했으나 12:10 에 처리가 되는 상황입니다.

이 때 아래와 같이 withWatermark 함수를 사용하면 두 이벤트 모두 버려지게 됩니다.

  • (Tumbling Window 를 사용한다 가정했을때) Click 이벤트는 Window 시간 대비 Event Time 이 최대 2분이 지나면 버려집니다.

  • (Tumbling Window 를 사용한다 가정했을때) Impr 이벤트는 Window 시간 대비 Event Time 이 최대 3분이 지나면 버려집니다.

val dfImprRaw = spark.readStream(...)
val dfClickRaw = spark.readStream(...)

val dfImprWatermarked = dfImprRaw
  ...
  .withWatermark("ts_impr", "2 minutes")
  

val dfClickWatermarked = dfClickRaw \
  ...
  .withWatermark("ts_click", "3 minutes")

Sliding Window 와 Watermark 는 어떤 관점에서 다른걸까요?

Sliding Window 를 사용하면 윈도우 기간내에 이전 Window 기간을 일부 포함할 수 있으므로 Late Event 를 처리할 수 있는 것처럼 보입니다.

다음 경우를 고려해봅시다.

  • Sliding Window 만 사용하고 Watermark 를 사용하지 않는 경우

  • Sliding Window 와 Watermark 를 모두 사용하는 경우

아래 그림은 시각적으로 Processing / Event Timestamp 에 대해 어떤 이벤트가 Drop 되는지를 보여줍니다.

위 그림에서 사용한 코드는 아래와 같습니다.

  • withWatermark 함수를 통해 최대 10 분까지 늦은 Event Time 을 가진 데이터까지만 사용할 것을 지정하고

  • groupBy 내에서 10분 간격으로 좌우 5분의 Sliding Window 간격을 지정합니다.

  • groupBy 내에 Window 뿐만 아니라 word 컬럼이 있으므로 이 기준으로 Executor 에서 분산처리 되어 집계를 수행합니다.

val windowedCounts = words
    .withWatermark("timestamp", "10 minutes") -- 최대 10분까지 늦은 Event Time 을 가진 데이터 허용
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),  -- 5분마다 10분 Sliding
        col("word")
     )
    .count()

Output Mode 에 따라 결과 (Result) 테이블의 업데이트 시점이 달라지는데, 이 부분은 Streaming Sink 챕터에서 다루겠습니다.

왜 Watermark 가 필요할까요? 메모리 관점에서 생각해 봅시다.

  • Spark Structured Streaming 은 Micro-batch 로 동작하고 매 배치마다 Window 는 State 가 됩니다.

  • 늦게 들어온 데이터를 무한히 처리해야 한다면, State 는 영원히 커질 수 있습니다 (OOM)

  • Watermark 를 통해 늦게 들어오는 데이터의 시간을 제한해 State 가 업데이트 되지 않도록 합니다.

Summary

이번 챕터에서 나온 개념 또는 용어 대한 정리입니다.

  • Event Timestamp

  • Processing Timestamp

  • Tumbling Window (Fixed Window)

  • Sliding Window (Slide Duration)

  • Session Window

  • Watermark

Last updated