# 2.4.4 Streaming Window

이번 챕터에서는 Streaming 시스템에서 사용되는 Window 의 개념과 사용법에 대해서 배웁니다. API 별 Window 관련 내용은 다음 Spark 공식 문서에서 살펴볼 수 있습니다.

* [Spark Direct Streaming: Window](https://spark.apache.org/docs/latest/streaming-programming-guide.html#window-operations)
* [Spark Structured Streaming: Window](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time)

Streaming Application 에서 데이터는 실시간으로 들어옵니다. 이 때 '시간' 은 두 가지로 분류됩니다.

* Event Timestamp: 이벤트 (데이터) 에 남은 시간 값입니다. App / Web 의 경우 OS 에 세팅된 시간일 수 있습니다.
* Processing Timestamp: Streaming Application (Spark) 이 해당 데이터를 처리하는 시간입니다. 아래 그림과 같이 Event Time 과 Processing Time 은 크게 차이날 수 있습니다.

![Processing Time vs Event Time (Link)](https://530470138-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lmdl6Kr60tPfvS6xNHH%2Fuploads%2FDPA4Vmc7oC3ORTNUrhsN%2Fimage.png?alt=media\&token=85bf8a23-1fe0-4942-ae20-6f260bd631c8)

![Event Time vs Processing Time (Link)](https://530470138-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lmdl6Kr60tPfvS6xNHH%2Fuploads%2FZUuCGcsPyyI2aIfkrHXf%2Fimage.png?alt=media\&token=bf890ea7-10ca-4761-920f-b23e1951d16a)

이러한 개념은 Spark 뿐만 아니라 다른 프레임워크인 Flink 에서도 볼 수 있듯이 일반적입니다.

* [Flink: Event Timestamp vs Processing Timestamp](https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/time/)

{% hint style="info" %}
Client 에 남는 Event Timestamp 는 부정확하거나 적시에 데이터가 들어오지 않을 수 있습니다.\
다음 경우를 생각해봅시다.

* Client (Browser, App 등) 에서 시간 세팅이 제대로 되어있지 않을 경우 과거나 미래 데이터가 들어올 수 있을지
* Client 의 종료나 Network 오류로 인해 어제 발생한 이벤트가 다음날 또는 몇분 뒤에 들어올 수 있을지,
* 혹은 전송 과정에서 통신 망의 이슈로 특정 로그의 순서가 뒤바뀌어 올 수 있을지 (Click 이 먼저 들어오고 Impression 로그가 나중에 들어옴)
  {% endhint %}

### Window

![Window Types (Link)](https://530470138-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lmdl6Kr60tPfvS6xNHH%2Fuploads%2Flo0Ij6lpsHWJSgDKbhlI%2Fimage.png?alt=media\&token=e337a760-6c35-444c-b4ea-bfc83b789c40)

Streaming Application 는 **Window** 통해 데이터를 어느 기간동안 모아서 처리할지 지정합니다. 일반적으로 3 종류의 Window 로 나눕니다.

* Fixed Window (Tumbling Wnidow) 는 겹치는 기간 없이 지정된 기간동안의 데이터만 처리합니다.
* Sliding Window : 현재 기간과 과거 기간의 일부를 겹쳐서 처리합니다.  과거 기간의 일부를 포함해 상태를 만들거나 판별할 때 사용할 수 있습니다.
* Session Window: 특정 Key (주로 사용자 ID 등) 를 중심으로 이벤트의 발생 유무에 따라 기간을 처리합니다. 즉 이벤트가 연속적으로 (지정된 Gap 이하의 시간동안) 들어오면 Window 기간이 늘어날 수 있습니다.
  * Spark 3.2.0+ 부터 Structured Streaming 에서 지원됩니다.

이제 Window 타입을 하나씩 살펴보겠습니다.

#### Tumbling / Fixed Window

![](https://530470138-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lmdl6Kr60tPfvS6xNHH%2Fuploads%2FBuebPz8Gc2iT2TgqoVTR%2Fimage.png?alt=media\&token=d31ad8bd-a364-4ccf-9e48-ebfd13f0794f)

Tumbling Window 는 지정된 간격 동안의 데이터를 처리합니다. [window](https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-window.html) 함수의 파라미터중 slideDuration 을 주지 않을 경우 Tumbling Window 가 됩니다.

```scala
window(
  timeColumn: Column,
  windowDuration: String): Column  (1)
  
window(
  timeColumn: Column,
  windowDuration: String,
  slideDuration: String): Column   (2)
  
window(
  timeColumn: Column,
  windowDuration: String,
  slideDuration: String,
  startTime: String): Column       (3)
```

아래는 groupBy 함수 내에서 Tumbling Window 를 사용하는 예제입니다.

```scala
val streamUserActivity = spark.readStream(...)

val = streamWindowed
    .groupBy(window(col("timestamp"), "5 minutes"))
    ...
```

#### Sliding Window

![Sliding  Window  (Link)](https://530470138-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lmdl6Kr60tPfvS6xNHH%2Fuploads%2F9yFCzd2g1lwSHBkWpGdQ%2Fimage.png?alt=media\&token=0858e996-9787-4a33-8e67-2343e1d08def)

Sliding Window 는 Window Duration (Triggering 시점) 기준으로 좌우로 Sliding Duration 만큼의 Event Time 을 가진 데이터를 처리합니다.

```scala
val streamWindowed = streamUserActivity
    .groupBy(window(col("timestamp"), "10 minutes", "5 minutes"))
    ...
```

#### Session Window

![User Event](https://530470138-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lmdl6Kr60tPfvS6xNHH%2Fuploads%2F5LqKVQV49tie9mB468Zu%2Fimage.png?alt=media\&token=a90ca5bb-19e2-4046-9d65-a7c3a3ca240d)

![Tumbling Window vs Session Window (Link)](https://530470138-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lmdl6Kr60tPfvS6xNHH%2Fuploads%2FiA72BOokXhlvXkml2pYV%2Fimage.png?alt=media\&token=83a79b9a-8552-4e4d-9a98-3b259c827ea1)

![Tumbling Window vs Session Window (Link)](https://530470138-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lmdl6Kr60tPfvS6xNHH%2Fuploads%2FSvgsEzp6ngjbBDDxRhCn%2Fimage.png?alt=media\&token=2fe6aada-1244-47ee-b59c-05769fa8bbb6)

Windowing 을 사용하면 Window 값을 가진 컬럼이 추가됩니다. (아래 예제에서는 session 이라는 이름으로 변경)

***따라서 Windowding 을 단순히 컬럼을 추가하고 해당 컬럼에 맞는 이벤트만 뽑아내 Group By 하는것으로 단순화 해 생각할 수 있습니다.***

```objectpascal
val streamUserActivity = spark.readStream(...)

val streamWindowed = streamUserActivity
    .groupBy(
        session_window(col("timestamp"), "5 minutes").as("session"),
        col("userId").as("user")
    )
    ...
```

![Session Window Details (Link)](https://530470138-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lmdl6Kr60tPfvS6xNHH%2Fuploads%2FppeVuC3IWqWsuoO0PAUL%2Fimage.png?alt=media\&token=8b81491c-fdbc-4ca8-a6ad-0cf847e49cc5)

위 그림에서는 5분 단위 Gap 을 가진 사용자 ID 기준의 Session Window 를 사용했고, 이로 인해 10:12 데이터가 포함된 윈도우에는 기존 Window 에서 집계된 Count 가 없음을 볼 수 있습니다.

{% hint style="info" %}
Structured Streaming 에서 Session Window 는 3.2.0+ 부터 지원되었습니다.

그 이전에는 사용자 Session 을 어떻게 구현했을까요? 다음 Github Code 와 Article 읽어보며 논의해 봅시다.

* [Spark 3.1 Example: StructuredSessionization.scala](https://github.com/apache/spark/blob/branch-3.1/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)
* [Arbitrary Stateful Processing in Apache Spark’s Structured Streaming](https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html)

```
 val sessionUpdates = events
      .groupByKey(event => event.sessionId)
      .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) {

        case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>

          // If timed out, then remove session and send final update
          if (state.hasTimedOut) {
            val finalUpdate =
              SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = true)
            state.remove()
            finalUpdate
          } else {
            // Update start and end timestamps in session
            val timestamps = events.map(_.timestamp.getTime).toSeq
            val updatedSession = if (state.exists) {
              val oldSession = state.get
              SessionInfo(
                oldSession.numEvents + timestamps.size,
                oldSession.startTimestampMs,
                math.max(oldSession.endTimestampMs, timestamps.max))
            } else {
              SessionInfo(timestamps.size, timestamps.min, timestamps.max)
            }
            state.update(updatedSession)

            // Set timeout such that the session will be expired if no data received for 10 seconds
            state.setTimeoutDuration("10 seconds")
            SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = false)
          }
      }
```

{% endhint %}

이제 Window 를 Key 별로 시각화 해서 정리해 보면 아래와 같이 표현할 수 있습니다.

![Window Strategies (Link)](https://530470138-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lmdl6Kr60tPfvS6xNHH%2Fuploads%2Fd6HQI5DLzFRzHUduyynK%2Fimage.png?alt=media\&token=986022fc-1b49-4351-a5f9-f64329a46411)

### Watermark

![Spark Structured Streaming: Handling Late Event (Link)](https://530470138-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lmdl6Kr60tPfvS6xNHH%2Fuploads%2FWI70lGNdzKFSYZTqCAik%2Fimage.png?alt=media\&token=5732616b-e69c-42bf-b06a-f5217b7e0e6c)

Watermark 는 Streaming Framework 에서 "늦은" 데이터를 어떻게 처리할지를  (얼마나 늦게 들어온 이벤트를 Drop 할지) 지정할 수 있는 기능입니다. Processing Timestamp 를 사용하면 데이터의 '시간' 이 처리 시점이 되어서 늦는 경우가 없지만, 만약 Event Timestamp 를 사용할 경우 위에서 논의한 바와 같이 '늦게' 들어오는 이벤트가 생길 수 있습니다.

* [Spark Structured Streaming: Watermark](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking)
* [Flink Streaming: Watermark](https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/time/#event-time-and-watermarks)

> Streaming Watermark of a stateful streaming query is **how long to wait for late and possibly out-of-order events** until a streaming state can be considered final and not to change. Streaming watermark is used to mark events (modeled as a row in the streaming Dataset) that are older than the threshold as "too late", and not "interesting" to update partial non-final streaming state. ([Streaming Watermark: Spark Internals](https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-watermark.html))

[withWatermark](https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-Dataset-withWatermark.html) 를 통한 Watermark 지정은 Structured Streaming 에서 다음 두 가지 경우에 지원됩니다.

* window 함수를 사용한 `groupBy` 집계 연산  **(주의! groupByKey 가 아님)**
* [mapGroupsWithState](https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-KeyValueGroupedDataset-mapGroupsWithState.html) / [flatMapGroupsWithState](https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-KeyValueGroupedDataset-flatMapGroupsWithState.html) 를 사용한 사용자 관리 State 연산 (일반적으로 `groupByKey` 와 같이 사용

![Late Event Handling w/ Wartermark (Link)](https://530470138-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lmdl6Kr60tPfvS6xNHH%2Fuploads%2FpH8w4aOFVum5AtyqwTAY%2Fimage.png?alt=media\&token=ce8115a0-a2b3-4d35-865b-8eb17383c698)

위 그림에서 분홍색의 케이스의 경우 (동일 상품 및 동일 사용자)

* Click 이벤트는 12:06 시각에 발생했으나 (Event Time 기준)
* Impr 이벤트가 12:09 에 발생했습니다 (Event Time 기준)
* 일반적으로는 Impr (상품 노출) 이벤트가 Click (상품 클릭) 이벤트 보다 먼저 발생하기 때문에 순서가 뒤집어 진 경우임을 볼 수 있습니다.

\
만약 (Processing Timestamp 기준으로) 현재 시간이 12:10 이라고 하고 두 이벤트가 지금 Kafka 에서 Streaming Application 으로 들어왔다고 할 때

* Click 이벤트는 12:06 시간에 발생했으나 12:10 에 처리가 되고
* Impr 이벤트는 1209 시간에 발생했으나 12:10 에 처리가 되는 상황입니다.

\
이 때 아래와 같이 withWatermark 함수를 사용하면 두 이벤트 모두 버려지게 됩니다.

* (Tumbling Window 를 사용한다 가정했을때) Click 이벤트는 Window 시간 대비 Event Time 이 최대 2분이 지나면 버려집니다.
* (Tumbling Window 를 사용한다 가정했을때) Impr 이벤트는 Window 시간 대비 Event Time 이 최대 3분이 지나면 버려집니다.

```scala
val dfImprRaw = spark.readStream(...)
val dfClickRaw = spark.readStream(...)

val dfImprWatermarked = dfImprRaw
  ...
  .withWatermark("ts_impr", "2 minutes")
  

val dfClickWatermarked = dfClickRaw \
  ...
  .withWatermark("ts_click", "3 minutes")
```

{% hint style="info" %}
Sliding Window 와 Watermark 는 어떤 관점에서 다른걸까요?&#x20;

Sliding Window 를 사용하면 윈도우 기간내에 이전 Window 기간을 일부 포함할 수 있으므로 Late Event 를 처리할 수 있는 것처럼 보입니다.&#x20;

\
다음 경우를 고려해봅시다.

* Sliding Window 만 사용하고 Watermark 를 사용하지 않는 경우
* Sliding Window 와 Watermark 를 모두 사용하는 경우
  {% endhint %}

아래 그림은 시각적으로 Processing / Event Timestamp 에 대해 어떤 이벤트가 Drop 되는지를 보여줍니다.

![Spark Structured Streaming: Handling Late Event (Link)](https://530470138-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lmdl6Kr60tPfvS6xNHH%2Fuploads%2FHlJkTHGnM25u2kfPjFKd%2Fimage.png?alt=media\&token=82fc8e5d-c6fb-49e7-93a7-ce7f02259abb)

위 그림에서 사용한 코드는 아래와 같습니다.

* withWatermark 함수를 통해 최대 10 분까지 늦은 Event Time 을 가진 데이터까지만 사용할 것을 지정하고
* groupBy 내에서 10분 간격으로 좌우 5분의 Sliding Window 간격을 지정합니다.
* groupBy 내에 Window 뿐만 아니라 word 컬럼이 있으므로 이 기준으로 Executor 에서 분산처리 되어 집계를 수행합니다.

```pascaligo
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes") -- 최대 10분까지 늦은 Event Time 을 가진 데이터 허용
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),  -- 5분마다 10분 Sliding
        col("word")
     )
    .count()
```

[Output Mode](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes) 에 따라 결과 (Result) 테이블의 업데이트 시점이 달라지는데, 이 부분은 Streaming Sink 챕터에서 다루겠습니다.

![Spark Structured Streaming State (Link)](https://530470138-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lmdl6Kr60tPfvS6xNHH%2Fuploads%2FSQXVHxVZJ1NWqgPfKbgq%2Fimage.png?alt=media\&token=a4d7fc87-deeb-4ab1-a8ef-460140bd0745)

{% hint style="info" %}
왜 Watermark 가 필요할까요? 메모리 관점에서 생각해 봅시다.&#x20;

* Spark Structured Streaming 은 Micro-batch 로 동작하고 매 배치마다 Window 는 State 가 됩니다.
* 늦게 들어온 데이터를 무한히 처리해야 한다면, State 는 영원히 커질 수 있습니다 (OOM)
* Watermark 를 통해 늦게 들어오는 데이터의 시간을 제한해 State 가 업데이트 되지 않도록 합니다.
  {% endhint %}

### Summary

이번 챕터에서 나온 개념 또는 용어 대한 정리입니다.

* Event Timestamp
* Processing Timestamp
* Tumbling Window (Fixed Window)
* Sliding Window (Slide Duration)
* Session Window
* Watermark
