practical-data-pipeline
  • Practical Data Pipeline
  • 시작 전에 드리는 당부의 말
  • 01 - 데이터 인프라
    • 1.1 데이터 파이프라인
    • 1.2 데이터 입수 (Ingestion)
    • 1.2 데이터 가공 (Processing)
    • 1.3 데이터 저장 (Storage)
    • 1.4 데이터 분석 (Analysis)
  • 02 - 데이터 처리
    • 2.1 데이터 처리
    • 2.2 배치 (Batch) 처리
      • 2.1.1 Spark Intro
      • 2.1.2 Spark Tutorial
      • 2.1.3 Spark Concept
      • 2.1.4 Spark Architecture
      • 2.1.5 Spark DataFrame
      • 2.1.6 Spark Persistence
      • 2.1.7 Spark Cache
      • 2.1.8 Spark SQL & Table
      • 2.1.9 Spark Join
      • 2.2.1 Spark Memory
      • 2.2.2 Spark Versions
    • 2.3 워크플로우 (Workflow) 관리
    • 2.4 스트림 (Stream) 처리
      • 2.4.1 Kafka Intro
      • 2.4.2 Kafka Advanced
      • 2.4.3 Spark Streaming
      • 2.4.4 Streaming Window
      • 2.4.5 Streaming State
      • 2.4.6 Streaming Sink
  • 04 - 데이터 스토리지
    • 4.1 Kafka
      • 4.1 Kafka Concept
      • 4.2 Kafka Advanced
      • 4.3 Kafka Versions
    • 4.2 Redis
    • 4.3 RDB (MySQL)
    • 4.4 ElasticSearch
    • 4.5 KV Storage (DynamoDB)
    • 4.6 Druid
  • 05 - 데이터 애플리케이션
    • 5.1 데이터 서비스
    • 5.2 통계 서비스
    • 5.3 추천 서비스
    • 5.4 A/B 테스팅
  • 08 - Case Study
    • Week 1 - Data Pipeline
    • Week 2 - EMR & Kubernetes
    • Week 3 - Metastore
    • Week 4 - KV & Delta Storage
    • Week 5 - Kafka Rebalancing
    • Week 6 - ML Pipeline
  • 09 - 설치 및 환경 구성
    • Spark 설치 및 환경 구성
      • Spark - Local Shell 환경
      • Spark - Local Jupyter 환경
      • Spark - Kubernetes 환경
      • Spark - EMR 환경
      • Spark - Databricks 환경 (SaaS)
    • Flink 설치 및 환경 구성
    • Kafka 설치 및 환경 구성
    • MySQL 설치 및 환경 구성
    • DynamoDB 사용을 위한 환경 구성
    • ElasticSearch 설치 및 환경 구성
    • Presto 설치 및 환경 구성
    • Druid 설치 및 환경 구성
Powered by GitBook
On this page
  • Window
  • Watermark
  • Summary

Was this helpful?

  1. 02 - 데이터 처리
  2. 2.4 스트림 (Stream) 처리

2.4.4 Streaming Window

Previous2.4.3 Spark StreamingNext2.4.5 Streaming State

Last updated 3 years ago

Was this helpful?

이번 챕터에서는 Streaming 시스템에서 사용되는 Window 의 개념과 사용법에 대해서 배웁니다. API 별 Window 관련 내용은 다음 Spark 공식 문서에서 살펴볼 수 있습니다.

Streaming Application 에서 데이터는 실시간으로 들어옵니다. 이 때 '시간' 은 두 가지로 분류됩니다.

  • Event Timestamp: 이벤트 (데이터) 에 남은 시간 값입니다. App / Web 의 경우 OS 에 세팅된 시간일 수 있습니다.

  • Processing Timestamp: Streaming Application (Spark) 이 해당 데이터를 처리하는 시간입니다. 아래 그림과 같이 Event Time 과 Processing Time 은 크게 차이날 수 있습니다.

이러한 개념은 Spark 뿐만 아니라 다른 프레임워크인 Flink 에서도 볼 수 있듯이 일반적입니다.

Client 에 남는 Event Timestamp 는 부정확하거나 적시에 데이터가 들어오지 않을 수 있습니다. 다음 경우를 생각해봅시다.

  • Client (Browser, App 등) 에서 시간 세팅이 제대로 되어있지 않을 경우 과거나 미래 데이터가 들어올 수 있을지

  • Client 의 종료나 Network 오류로 인해 어제 발생한 이벤트가 다음날 또는 몇분 뒤에 들어올 수 있을지,

  • 혹은 전송 과정에서 통신 망의 이슈로 특정 로그의 순서가 뒤바뀌어 올 수 있을지 (Click 이 먼저 들어오고 Impression 로그가 나중에 들어옴)

Window

Streaming Application 는 Window 통해 데이터를 어느 기간동안 모아서 처리할지 지정합니다. 일반적으로 3 종류의 Window 로 나눕니다.

  • Fixed Window (Tumbling Wnidow) 는 겹치는 기간 없이 지정된 기간동안의 데이터만 처리합니다.

  • Sliding Window : 현재 기간과 과거 기간의 일부를 겹쳐서 처리합니다. 과거 기간의 일부를 포함해 상태를 만들거나 판별할 때 사용할 수 있습니다.

  • Session Window: 특정 Key (주로 사용자 ID 등) 를 중심으로 이벤트의 발생 유무에 따라 기간을 처리합니다. 즉 이벤트가 연속적으로 (지정된 Gap 이하의 시간동안) 들어오면 Window 기간이 늘어날 수 있습니다.

    • Spark 3.2.0+ 부터 Structured Streaming 에서 지원됩니다.

이제 Window 타입을 하나씩 살펴보겠습니다.

Tumbling / Fixed Window

window(
  timeColumn: Column,
  windowDuration: String): Column  (1)
  
window(
  timeColumn: Column,
  windowDuration: String,
  slideDuration: String): Column   (2)
  
window(
  timeColumn: Column,
  windowDuration: String,
  slideDuration: String,
  startTime: String): Column       (3)

아래는 groupBy 함수 내에서 Tumbling Window 를 사용하는 예제입니다.

val streamUserActivity = spark.readStream(...)

val = streamWindowed
    .groupBy(window(col("timestamp"), "5 minutes"))
    ...

Sliding Window

Sliding Window 는 Window Duration (Triggering 시점) 기준으로 좌우로 Sliding Duration 만큼의 Event Time 을 가진 데이터를 처리합니다.

val streamWindowed = streamUserActivity
    .groupBy(window(col("timestamp"), "10 minutes", "5 minutes"))
    ...

Session Window

Windowing 을 사용하면 Window 값을 가진 컬럼이 추가됩니다. (아래 예제에서는 session 이라는 이름으로 변경)

따라서 Windowding 을 단순히 컬럼을 추가하고 해당 컬럼에 맞는 이벤트만 뽑아내 Group By 하는것으로 단순화 해 생각할 수 있습니다.

val streamUserActivity = spark.readStream(...)

val streamWindowed = streamUserActivity
    .groupBy(
        session_window(col("timestamp"), "5 minutes").as("session"),
        col("userId").as("user")
    )
    ...

위 그림에서는 5분 단위 Gap 을 가진 사용자 ID 기준의 Session Window 를 사용했고, 이로 인해 10:12 데이터가 포함된 윈도우에는 기존 Window 에서 집계된 Count 가 없음을 볼 수 있습니다.

Structured Streaming 에서 Session Window 는 3.2.0+ 부터 지원되었습니다.

그 이전에는 사용자 Session 을 어떻게 구현했을까요? 다음 Github Code 와 Article 읽어보며 논의해 봅시다.

 val sessionUpdates = events
      .groupByKey(event => event.sessionId)
      .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) {

        case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>

          // If timed out, then remove session and send final update
          if (state.hasTimedOut) {
            val finalUpdate =
              SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = true)
            state.remove()
            finalUpdate
          } else {
            // Update start and end timestamps in session
            val timestamps = events.map(_.timestamp.getTime).toSeq
            val updatedSession = if (state.exists) {
              val oldSession = state.get
              SessionInfo(
                oldSession.numEvents + timestamps.size,
                oldSession.startTimestampMs,
                math.max(oldSession.endTimestampMs, timestamps.max))
            } else {
              SessionInfo(timestamps.size, timestamps.min, timestamps.max)
            }
            state.update(updatedSession)

            // Set timeout such that the session will be expired if no data received for 10 seconds
            state.setTimeoutDuration("10 seconds")
            SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = false)
          }
      }

이제 Window 를 Key 별로 시각화 해서 정리해 보면 아래와 같이 표현할 수 있습니다.

Watermark

Watermark 는 Streaming Framework 에서 "늦은" 데이터를 어떻게 처리할지를 (얼마나 늦게 들어온 이벤트를 Drop 할지) 지정할 수 있는 기능입니다. Processing Timestamp 를 사용하면 데이터의 '시간' 이 처리 시점이 되어서 늦는 경우가 없지만, 만약 Event Timestamp 를 사용할 경우 위에서 논의한 바와 같이 '늦게' 들어오는 이벤트가 생길 수 있습니다.

  • window 함수를 사용한 groupBy 집계 연산 (주의! groupByKey 가 아님)

위 그림에서 분홍색의 케이스의 경우 (동일 상품 및 동일 사용자)

  • Click 이벤트는 12:06 시각에 발생했으나 (Event Time 기준)

  • Impr 이벤트가 12:09 에 발생했습니다 (Event Time 기준)

  • 일반적으로는 Impr (상품 노출) 이벤트가 Click (상품 클릭) 이벤트 보다 먼저 발생하기 때문에 순서가 뒤집어 진 경우임을 볼 수 있습니다.

만약 (Processing Timestamp 기준으로) 현재 시간이 12:10 이라고 하고 두 이벤트가 지금 Kafka 에서 Streaming Application 으로 들어왔다고 할 때

  • Click 이벤트는 12:06 시간에 발생했으나 12:10 에 처리가 되고

  • Impr 이벤트는 1209 시간에 발생했으나 12:10 에 처리가 되는 상황입니다.

이 때 아래와 같이 withWatermark 함수를 사용하면 두 이벤트 모두 버려지게 됩니다.

  • (Tumbling Window 를 사용한다 가정했을때) Click 이벤트는 Window 시간 대비 Event Time 이 최대 2분이 지나면 버려집니다.

  • (Tumbling Window 를 사용한다 가정했을때) Impr 이벤트는 Window 시간 대비 Event Time 이 최대 3분이 지나면 버려집니다.

val dfImprRaw = spark.readStream(...)
val dfClickRaw = spark.readStream(...)

val dfImprWatermarked = dfImprRaw
  ...
  .withWatermark("ts_impr", "2 minutes")
  

val dfClickWatermarked = dfClickRaw \
  ...
  .withWatermark("ts_click", "3 minutes")

Sliding Window 와 Watermark 는 어떤 관점에서 다른걸까요?

Sliding Window 를 사용하면 윈도우 기간내에 이전 Window 기간을 일부 포함할 수 있으므로 Late Event 를 처리할 수 있는 것처럼 보입니다.

다음 경우를 고려해봅시다.

  • Sliding Window 만 사용하고 Watermark 를 사용하지 않는 경우

  • Sliding Window 와 Watermark 를 모두 사용하는 경우

아래 그림은 시각적으로 Processing / Event Timestamp 에 대해 어떤 이벤트가 Drop 되는지를 보여줍니다.

위 그림에서 사용한 코드는 아래와 같습니다.

  • withWatermark 함수를 통해 최대 10 분까지 늦은 Event Time 을 가진 데이터까지만 사용할 것을 지정하고

  • groupBy 내에서 10분 간격으로 좌우 5분의 Sliding Window 간격을 지정합니다.

  • groupBy 내에 Window 뿐만 아니라 word 컬럼이 있으므로 이 기준으로 Executor 에서 분산처리 되어 집계를 수행합니다.

val windowedCounts = words
    .withWatermark("timestamp", "10 minutes") -- 최대 10분까지 늦은 Event Time 을 가진 데이터 허용
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),  -- 5분마다 10분 Sliding
        col("word")
     )
    .count()

왜 Watermark 가 필요할까요? 메모리 관점에서 생각해 봅시다.

  • Spark Structured Streaming 은 Micro-batch 로 동작하고 매 배치마다 Window 는 State 가 됩니다.

  • 늦게 들어온 데이터를 무한히 처리해야 한다면, State 는 영원히 커질 수 있습니다 (OOM)

  • Watermark 를 통해 늦게 들어오는 데이터의 시간을 제한해 State 가 업데이트 되지 않도록 합니다.

Summary

이번 챕터에서 나온 개념 또는 용어 대한 정리입니다.

  • Event Timestamp

  • Processing Timestamp

  • Tumbling Window (Fixed Window)

  • Sliding Window (Slide Duration)

  • Session Window

  • Watermark

Tumbling Window 는 지정된 간격 동안의 데이터를 처리합니다. 함수의 파라미터중 slideDuration 을 주지 않을 경우 Tumbling Window 가 됩니다.

Streaming Watermark of a stateful streaming query is how long to wait for late and possibly out-of-order events until a streaming state can be considered final and not to change. Streaming watermark is used to mark events (modeled as a row in the streaming Dataset) that are older than the threshold as "too late", and not "interesting" to update partial non-final streaming state. ()

를 통한 Watermark 지정은 Structured Streaming 에서 다음 두 가지 경우에 지원됩니다.

/ 를 사용한 사용자 관리 State 연산 (일반적으로 groupByKey 와 같이 사용

에 따라 결과 (Result) 테이블의 업데이트 시점이 달라지는데, 이 부분은 Streaming Sink 챕터에서 다루겠습니다.

Flink: Event Timestamp vs Processing Timestamp
window
Spark 3.1 Example: StructuredSessionization.scala
Arbitrary Stateful Processing in Apache Spark’s Structured Streaming
Spark Structured Streaming: Watermark
Flink Streaming: Watermark
Streaming Watermark: Spark Internals
withWatermark
mapGroupsWithState
flatMapGroupsWithState
Output Mode
Spark Direct Streaming: Window
Spark Structured Streaming: Window
Processing Time vs Event Time ()
Event Time vs Processing Time ()
Window Types ()
Sliding Window ()
User Event
Tumbling Window vs Session Window ()
Tumbling Window vs Session Window ()
Session Window Details (Link)
Window Strategies ()
Spark Structured Streaming: Handling Late Event ()
Late Event Handling w/ Wartermark ()
Spark Structured Streaming: Handling Late Event ()
Spark Structured Streaming State ()
Link
Link
Link
Link
Link
Link
Link
Link
Link
Link
Link