2.4.6 Streaming Sink

이번 챕터에서는 Spark Structured Streaming 의 Output Sink (저장소) 와 Output Mode 에 대해 알아봅니다.

Output Mode

3 가지의 Output Mode 가 존재합니다. Output Mode 는 Spark Structured Streaming 의 결과가 Micro-batch 로 만들어지는 '테이블' 형태라는 가정하에 이해하면 쉽습니다.
  • Append (Default) 모드는 지난번 Micro-batch 에는 없었던 신규 데이터인 경우에 Row 를 추가해 테이블을 만듭니다.
  • Update (2.1.1+) 모드는 지난번 Micro-batch 에서 수정되었거나 혹은 신규 데이터인 경우에만 Row 를 추가해 테이블을 만듭니다.
  • Complete 모드는 전체 결과를 가진 테이블을 만듭니다.
그림으로 표현하면 아래와 같습니다.
Spark Output Mode (Link)
Output Mode 에 따라 지원되는 연산이 달라집니다. 아래 표는 Spark 3.2.0 기준으로 Output Mode 와 지원되는 연산을 보여줍니다.
Spark Structured Streaming Output Mode (Link)
사용하는 연산에 따라 지정할 수 있는 Output Mode 가 달라집니다.
  • Aggregation (Group By 등) 을 Event Time 기준의 Watermark 와 사용할 경우에는 Append / Update / Complete 전부 사용 가능합니다.
  • Aggregation 에 Watermark 가 없을 경우에는 Append 를 지원해야 하는데 기간 제한이 없어 Update 가 발생할 수 있으므로 이 경우에는 Append 모드가 불가능해집니다. 따라서 지원하지 않습니다.
  • mapGroupsWithState 는 Update 모드만을, flatMapGroupsWithState 는 Function 파라미터로 지정되는 모드에 따라 Append / Update 를 사용할 수 있고 이를 바탕으로 Aggregation 지원 여부를 결정합니다. (Update = Aggregation 가능)
  • Join 연산을 사용한다면 Append 모드만 사용 가능합니다.
코드에서는 Output 모드를 다음과 같이 지정할 수 있습니다. (Scala)
val streaming = dfConverted.writeStream
.queryName("...")
.format("...")
.outputMode("update") // "append", "complete"

Output Sink

Output Sink 를 통해 가공한 Streaming 데이터를 내보낼 수 있습니다. 아래는 Spark 3.2.0 기준으로 지원되는 Output Sink 입니다.
Kafka as Central Event Broker (Link)
Structured Streaming Sink Patterns (Link - Practical Spark)
주로 Kafka, Foreach, File (Parquet) Sink 가 사용됩니다.
  • Kafka Sink 를 통해 다른 Topic 으로 결과 데이터를 실시간으로 내보낼 때 사용할 수 있습니다.
  • File Sink 를 통해 CDC 등의 데이터를 S3 로 Parquet 등의 Format 으로 적재할 수 있습니다.
  • Foreach Sink 를 통해 Structured Streaming 이 지원하지 않는 저장소 (e.g., Dynamo) 등으로 전송할 수 있습니다.
Spark Structured Streaming Output Sink (Link)
foreach 와 foreachBatch Sink 의 경우 차이점은 다음과 같습니다.
  • foreach 는 단건씩 처리할 수 있는 반면
  • foreachBatch 는 하나의 Micro-batch 의 전체 결과 데이터를 DataFrame 으로 받아 사용할 수 있습니다.
foreach Sink 의 open 과 close 는 Micro-batch 마다 한번씩 호출됩니다. open 에서 true 를 리턴해야 하며 JDBC Connection 을 여는등 비싼 자원을 위한 연산을 여기서 수행할 수 있습니다.
streamingDatasetOfString.writeStream.foreach(
new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// Open connection
}
def process(record: String): Unit = {
// Write string to connection
}
def close(errorOrNull: Throwable): Unit = {
// Close the connection
}
}
).start()

Trigger

Trigger 를 이용해 얼마의 주기마다 Output Sink 로 데이터를 보낼지를 결정할 수 있습니다. Trigger 를 지정하지 않으면 Micro-batch 가 가장 빨리 수행될 수 있는 주기로 실행됩니다.
  • Trigger.ProcessingTime 를 사용하는 경우, 이전 작업이 끝나지 않으면 다음 Micro-batch 를 수행하지 않습니다. 이전 작업이 끝난 뒤에야 수행 됩니다. 만약 이전 작업이 일찍 끝났다면 Interval 이 만료 되기까지 대기한 후 실행합니다.
  • Trigger.ContinuousContinuous Processing Mode 를 지원하기 위한 Trigger 입니다. 실험 단계이며 일부 제약조건이 있을 수 있습니다.
// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
// One-time trigger
df.writeStream
.format("console")
.trigger(Trigger.Once())
.start()
// Continuous trigger with one-second checkpointing interval
df.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.start()

Summary

이번 챕터에서 나온 개념 또는 용어 대한 정리입니다.
  • Output Mode
    • Append / Update / Complete
  • Output Sink
    • File / Kafka / Foreach / Foreach Batch
  • Trigger (Micro-batch Interval)