> For the complete documentation index, see [llms.txt](https://1ambda.gitbook.io/practical-data-pipeline/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://1ambda.gitbook.io/practical-data-pipeline/02-processing/2.4-stream/2.4.4-streaming-window.md).

# 2.4.4 Streaming Window

이번 챕터에서는 Streaming 시스템에서 사용되는 Window 의 개념과 사용법에 대해서 배웁니다. API 별 Window 관련 내용은 다음 Spark 공식 문서에서 살펴볼 수 있습니다.

* [Spark Direct Streaming: Window](https://spark.apache.org/docs/latest/streaming-programming-guide.html#window-operations)
* [Spark Structured Streaming: Window](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time)

Streaming Application 에서 데이터는 실시간으로 들어옵니다. 이 때 '시간' 은 두 가지로 분류됩니다.

* Event Timestamp: 이벤트 (데이터) 에 남은 시간 값입니다. App / Web 의 경우 OS 에 세팅된 시간일 수 있습니다.
* Processing Timestamp: Streaming Application (Spark) 이 해당 데이터를 처리하는 시간입니다. 아래 그림과 같이 Event Time 과 Processing Time 은 크게 차이날 수 있습니다.

![Processing Time vs Event Time (Link)](/files/SUDkQWoDBEnWg9qrvcst)

![Event Time vs Processing Time (Link)](/files/jEGYY06M9x6SQSXNztBd)

이러한 개념은 Spark 뿐만 아니라 다른 프레임워크인 Flink 에서도 볼 수 있듯이 일반적입니다.

* [Flink: Event Timestamp vs Processing Timestamp](https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/time/)

{% hint style="info" %}
Client 에 남는 Event Timestamp 는 부정확하거나 적시에 데이터가 들어오지 않을 수 있습니다.\
다음 경우를 생각해봅시다.

* Client (Browser, App 등) 에서 시간 세팅이 제대로 되어있지 않을 경우 과거나 미래 데이터가 들어올 수 있을지
* Client 의 종료나 Network 오류로 인해 어제 발생한 이벤트가 다음날 또는 몇분 뒤에 들어올 수 있을지,
* 혹은 전송 과정에서 통신 망의 이슈로 특정 로그의 순서가 뒤바뀌어 올 수 있을지 (Click 이 먼저 들어오고 Impression 로그가 나중에 들어옴)
  {% endhint %}

### Window

![Window Types (Link)](/files/ExXWPKlou7vJiCCTyDRu)

Streaming Application 는 **Window** 통해 데이터를 어느 기간동안 모아서 처리할지 지정합니다. 일반적으로 3 종류의 Window 로 나눕니다.

* Fixed Window (Tumbling Wnidow) 는 겹치는 기간 없이 지정된 기간동안의 데이터만 처리합니다.
* Sliding Window : 현재 기간과 과거 기간의 일부를 겹쳐서 처리합니다.  과거 기간의 일부를 포함해 상태를 만들거나 판별할 때 사용할 수 있습니다.
* Session Window: 특정 Key (주로 사용자 ID 등) 를 중심으로 이벤트의 발생 유무에 따라 기간을 처리합니다. 즉 이벤트가 연속적으로 (지정된 Gap 이하의 시간동안) 들어오면 Window 기간이 늘어날 수 있습니다.
  * Spark 3.2.0+ 부터 Structured Streaming 에서 지원됩니다.

이제 Window 타입을 하나씩 살펴보겠습니다.

#### Tumbling / Fixed Window

![](/files/DYpWkwaPZzTYyOA4W6ET)

Tumbling Window 는 지정된 간격 동안의 데이터를 처리합니다. [window](https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-window.html) 함수의 파라미터중 slideDuration 을 주지 않을 경우 Tumbling Window 가 됩니다.

```scala
window(
  timeColumn: Column,
  windowDuration: String): Column  (1)
  
window(
  timeColumn: Column,
  windowDuration: String,
  slideDuration: String): Column   (2)
  
window(
  timeColumn: Column,
  windowDuration: String,
  slideDuration: String,
  startTime: String): Column       (3)
```

아래는 groupBy 함수 내에서 Tumbling Window 를 사용하는 예제입니다.

```scala
val streamUserActivity = spark.readStream(...)

val = streamWindowed
    .groupBy(window(col("timestamp"), "5 minutes"))
    ...
```

#### Sliding Window

![Sliding  Window  (Link)](/files/HKWPKz1ijgXgafGKywPS)

Sliding Window 는 Window Duration (Triggering 시점) 기준으로 좌우로 Sliding Duration 만큼의 Event Time 을 가진 데이터를 처리합니다.

```scala
val streamWindowed = streamUserActivity
    .groupBy(window(col("timestamp"), "10 minutes", "5 minutes"))
    ...
```

#### Session Window

![User Event](/files/VT0he5j2DCWQo6Yawewu)

![Tumbling Window vs Session Window (Link)](/files/JkUighkoVuqPPw3kLB45)

![Tumbling Window vs Session Window (Link)](/files/cBYenRJOUedbge51lcWG)

Windowing 을 사용하면 Window 값을 가진 컬럼이 추가됩니다. (아래 예제에서는 session 이라는 이름으로 변경)

***따라서 Windowding 을 단순히 컬럼을 추가하고 해당 컬럼에 맞는 이벤트만 뽑아내 Group By 하는것으로 단순화 해 생각할 수 있습니다.***

```objectpascal
val streamUserActivity = spark.readStream(...)

val streamWindowed = streamUserActivity
    .groupBy(
        session_window(col("timestamp"), "5 minutes").as("session"),
        col("userId").as("user")
    )
    ...
```

![Session Window Details (Link)](/files/V0nySfrQMWuU7FzqfjXV)

위 그림에서는 5분 단위 Gap 을 가진 사용자 ID 기준의 Session Window 를 사용했고, 이로 인해 10:12 데이터가 포함된 윈도우에는 기존 Window 에서 집계된 Count 가 없음을 볼 수 있습니다.

{% hint style="info" %}
Structured Streaming 에서 Session Window 는 3.2.0+ 부터 지원되었습니다.

그 이전에는 사용자 Session 을 어떻게 구현했을까요? 다음 Github Code 와 Article 읽어보며 논의해 봅시다.

* [Spark 3.1 Example: StructuredSessionization.scala](https://github.com/apache/spark/blob/branch-3.1/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)
* [Arbitrary Stateful Processing in Apache Spark’s Structured Streaming](https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html)

```
 val sessionUpdates = events
      .groupByKey(event => event.sessionId)
      .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) {

        case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>

          // If timed out, then remove session and send final update
          if (state.hasTimedOut) {
            val finalUpdate =
              SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = true)
            state.remove()
            finalUpdate
          } else {
            // Update start and end timestamps in session
            val timestamps = events.map(_.timestamp.getTime).toSeq
            val updatedSession = if (state.exists) {
              val oldSession = state.get
              SessionInfo(
                oldSession.numEvents + timestamps.size,
                oldSession.startTimestampMs,
                math.max(oldSession.endTimestampMs, timestamps.max))
            } else {
              SessionInfo(timestamps.size, timestamps.min, timestamps.max)
            }
            state.update(updatedSession)

            // Set timeout such that the session will be expired if no data received for 10 seconds
            state.setTimeoutDuration("10 seconds")
            SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = false)
          }
      }
```

{% endhint %}

이제 Window 를 Key 별로 시각화 해서 정리해 보면 아래와 같이 표현할 수 있습니다.

![Window Strategies (Link)](/files/DLKLBZdB8SqYKoliAdmd)

### Watermark

![Spark Structured Streaming: Handling Late Event (Link)](/files/03ipgIixMd77dKNhSEPS)

Watermark 는 Streaming Framework 에서 "늦은" 데이터를 어떻게 처리할지를  (얼마나 늦게 들어온 이벤트를 Drop 할지) 지정할 수 있는 기능입니다. Processing Timestamp 를 사용하면 데이터의 '시간' 이 처리 시점이 되어서 늦는 경우가 없지만, 만약 Event Timestamp 를 사용할 경우 위에서 논의한 바와 같이 '늦게' 들어오는 이벤트가 생길 수 있습니다.

* [Spark Structured Streaming: Watermark](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking)
* [Flink Streaming: Watermark](https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/time/#event-time-and-watermarks)

> Streaming Watermark of a stateful streaming query is **how long to wait for late and possibly out-of-order events** until a streaming state can be considered final and not to change. Streaming watermark is used to mark events (modeled as a row in the streaming Dataset) that are older than the threshold as "too late", and not "interesting" to update partial non-final streaming state. ([Streaming Watermark: Spark Internals](https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-watermark.html))

[withWatermark](https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-Dataset-withWatermark.html) 를 통한 Watermark 지정은 Structured Streaming 에서 다음 두 가지 경우에 지원됩니다.

* window 함수를 사용한 `groupBy` 집계 연산  **(주의! groupByKey 가 아님)**
* [mapGroupsWithState](https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-KeyValueGroupedDataset-mapGroupsWithState.html) / [flatMapGroupsWithState](https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-KeyValueGroupedDataset-flatMapGroupsWithState.html) 를 사용한 사용자 관리 State 연산 (일반적으로 `groupByKey` 와 같이 사용

![Late Event Handling w/ Wartermark (Link)](/files/fN3Mfq9WksAaOixBrLyc)

위 그림에서 분홍색의 케이스의 경우 (동일 상품 및 동일 사용자)

* Click 이벤트는 12:06 시각에 발생했으나 (Event Time 기준)
* Impr 이벤트가 12:09 에 발생했습니다 (Event Time 기준)
* 일반적으로는 Impr (상품 노출) 이벤트가 Click (상품 클릭) 이벤트 보다 먼저 발생하기 때문에 순서가 뒤집어 진 경우임을 볼 수 있습니다.

\
만약 (Processing Timestamp 기준으로) 현재 시간이 12:10 이라고 하고 두 이벤트가 지금 Kafka 에서 Streaming Application 으로 들어왔다고 할 때

* Click 이벤트는 12:06 시간에 발생했으나 12:10 에 처리가 되고
* Impr 이벤트는 1209 시간에 발생했으나 12:10 에 처리가 되는 상황입니다.

\
이 때 아래와 같이 withWatermark 함수를 사용하면 두 이벤트 모두 버려지게 됩니다.

* (Tumbling Window 를 사용한다 가정했을때) Click 이벤트는 Window 시간 대비 Event Time 이 최대 2분이 지나면 버려집니다.
* (Tumbling Window 를 사용한다 가정했을때) Impr 이벤트는 Window 시간 대비 Event Time 이 최대 3분이 지나면 버려집니다.

```scala
val dfImprRaw = spark.readStream(...)
val dfClickRaw = spark.readStream(...)

val dfImprWatermarked = dfImprRaw
  ...
  .withWatermark("ts_impr", "2 minutes")
  

val dfClickWatermarked = dfClickRaw \
  ...
  .withWatermark("ts_click", "3 minutes")
```

{% hint style="info" %}
Sliding Window 와 Watermark 는 어떤 관점에서 다른걸까요?&#x20;

Sliding Window 를 사용하면 윈도우 기간내에 이전 Window 기간을 일부 포함할 수 있으므로 Late Event 를 처리할 수 있는 것처럼 보입니다.&#x20;

\
다음 경우를 고려해봅시다.

* Sliding Window 만 사용하고 Watermark 를 사용하지 않는 경우
* Sliding Window 와 Watermark 를 모두 사용하는 경우
  {% endhint %}

아래 그림은 시각적으로 Processing / Event Timestamp 에 대해 어떤 이벤트가 Drop 되는지를 보여줍니다.

![Spark Structured Streaming: Handling Late Event (Link)](/files/4sxZrRaA3KvGlGbK1fpB)

위 그림에서 사용한 코드는 아래와 같습니다.

* withWatermark 함수를 통해 최대 10 분까지 늦은 Event Time 을 가진 데이터까지만 사용할 것을 지정하고
* groupBy 내에서 10분 간격으로 좌우 5분의 Sliding Window 간격을 지정합니다.
* groupBy 내에 Window 뿐만 아니라 word 컬럼이 있으므로 이 기준으로 Executor 에서 분산처리 되어 집계를 수행합니다.

```pascaligo
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes") -- 최대 10분까지 늦은 Event Time 을 가진 데이터 허용
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),  -- 5분마다 10분 Sliding
        col("word")
     )
    .count()
```

[Output Mode](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes) 에 따라 결과 (Result) 테이블의 업데이트 시점이 달라지는데, 이 부분은 Streaming Sink 챕터에서 다루겠습니다.

![Spark Structured Streaming State (Link)](/files/qpqUzwiEKhy7CHb1SQem)

{% hint style="info" %}
왜 Watermark 가 필요할까요? 메모리 관점에서 생각해 봅시다.&#x20;

* Spark Structured Streaming 은 Micro-batch 로 동작하고 매 배치마다 Window 는 State 가 됩니다.
* 늦게 들어온 데이터를 무한히 처리해야 한다면, State 는 영원히 커질 수 있습니다 (OOM)
* Watermark 를 통해 늦게 들어오는 데이터의 시간을 제한해 State 가 업데이트 되지 않도록 합니다.
  {% endhint %}

### Summary

이번 챕터에서 나온 개념 또는 용어 대한 정리입니다.

* Event Timestamp
* Processing Timestamp
* Tumbling Window (Fixed Window)
* Sliding Window (Slide Duration)
* Session Window
* Watermark


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://1ambda.gitbook.io/practical-data-pipeline/02-processing/2.4-stream/2.4.4-streaming-window.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
