2.4.6 Streaming Sink

이번 챕터에서는 Spark Structured Streaming 의 Output Sink (저장소) 와 Output Mode 에 대해 알아봅니다.

Output Mode

3 가지의 Output Mode 가 존재합니다. Output Mode 는 Spark Structured Streaming 의 결과가 Micro-batch 로 만들어지는 '테이블' 형태라는 가정하에 이해하면 쉽습니다.

  • Append (Default) 모드는 지난번 Micro-batch 에는 없었던 신규 데이터인 경우에 Row 를 추가해 테이블을 만듭니다.

  • Update (2.1.1+) 모드는 지난번 Micro-batch 에서 수정되었거나 혹은 신규 데이터인 경우에만 Row 를 추가해 테이블을 만듭니다.

  • Complete 모드는 전체 결과를 가진 테이블을 만듭니다.

그림으로 표현하면 아래와 같습니다.

Output Mode 에 따라 지원되는 연산이 달라집니다. 아래 표는 Spark 3.2.0 기준으로 Output Mode 와 지원되는 연산을 보여줍니다.

사용하는 연산에 따라 지정할 수 있는 Output Mode 가 달라집니다.

  • Aggregation (Group By 등) 을 Event Time 기준의 Watermark 와 사용할 경우에는 Append / Update / Complete 전부 사용 가능합니다.

  • Aggregation 에 Watermark 가 없을 경우에는 Append 를 지원해야 하는데 기간 제한이 없어 Update 가 발생할 수 있으므로 이 경우에는 Append 모드가 불가능해집니다. 따라서 지원하지 않습니다.

  • mapGroupsWithState 는 Update 모드만을, flatMapGroupsWithState 는 Function 파라미터로 지정되는 모드에 따라 Append / Update 를 사용할 수 있고 이를 바탕으로 Aggregation 지원 여부를 결정합니다. (Update = Aggregation 가능)

  • Join 연산을 사용한다면 Append 모드만 사용 가능합니다.

코드에서는 Output 모드를 다음과 같이 지정할 수 있습니다. (Scala)

val streaming = dfConverted.writeStream
  .queryName("...")
  .format("...")
  .outputMode("update")  // "append", "complete"

Output Sink

Output Sink 를 통해 가공한 Streaming 데이터를 내보낼 수 있습니다. 아래는 Spark 3.2.0 기준으로 지원되는 Output Sink 입니다.

주로 Kafka, Foreach, File (Parquet) Sink 가 사용됩니다.

  • Kafka Sink 를 통해 다른 Topic 으로 결과 데이터를 실시간으로 내보낼 때 사용할 수 있습니다.

  • File Sink 를 통해 CDC 등의 데이터를 S3 로 Parquet 등의 Format 으로 적재할 수 있습니다.

  • Foreach Sink 를 통해 Structured Streaming 이 지원하지 않는 저장소 (e.g., Dynamo) 등으로 전송할 수 있습니다.

foreach 와 foreachBatch Sink 의 경우 차이점은 다음과 같습니다.

  • foreach 는 단건씩 처리할 수 있는 반면

  • foreachBatch 는 하나의 Micro-batch 의 전체 결과 데이터를 DataFrame 으로 받아 사용할 수 있습니다.

foreach Sink 의 open 과 close 는 Micro-batch 마다 한번씩 호출됩니다. open 에서 true 를 리턴해야 하며 JDBC Connection 을 여는등 비싼 자원을 위한 연산을 여기서 수행할 수 있습니다.

streamingDatasetOfString.writeStream.foreach(
  new ForeachWriter[String] {

    def open(partitionId: Long, version: Long): Boolean = {
      // Open connection
    }

    def process(record: String): Unit = {
      // Write string to connection
    }

    def close(errorOrNull: Throwable): Unit = {
      // Close the connection
    }
  }
).start()

Trigger

Trigger 를 이용해 얼마의 주기마다 Output Sink 로 데이터를 보낼지를 결정할 수 있습니다. Trigger 를 지정하지 않으면 Micro-batch 가 가장 빨리 수행될 수 있는 주기로 실행됩니다.

  • Trigger.ProcessingTime 를 사용하는 경우, 이전 작업이 끝나지 않으면 다음 Micro-batch 를 수행하지 않습니다. 이전 작업이 끝난 뒤에야 수행 됩니다. 만약 이전 작업이 일찍 끝났다면 Interval 이 만료 되기까지 대기한 후 실행합니다.

  • Trigger.ContinuousContinuous Processing Mode 를 지원하기 위한 Trigger 입니다. 실험 단계이며 일부 제약조건이 있을 수 있습니다.

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start()

// One-time trigger
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start()

// Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start()

Summary

이번 챕터에서 나온 개념 또는 용어 대한 정리입니다.

  • Output Mode

    • Append / Update / Complete

  • Output Sink

    • File / Kafka / Foreach / Foreach Batch

  • Trigger (Micro-batch Interval)

Last updated