# 2.4.6 Streaming Sink

이번 챕터에서는 Spark Structured Streaming 의 Output Sink (저장소) 와 Output Mode 에 대해 알아봅니다.

* [Structured Streaming: Output Mode](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes)
* [Structured Steaming: Output Sink](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks)

### Output Mode

3 가지의 **Output Mode** 가 존재합니다. **Output Mode** 는 Spark Structured Streaming 의 결과가 Micro-batch 로 만들어지는 '테이블' 형태라는 가정하에 이해하면 쉽습니다.

* Append (Default) 모드는 지난번 Micro-batch 에는 없었던 신규 데이터인 경우에 Row 를 추가해 테이블을 만듭니다.
* Update (2.1.1+) 모드는 지난번 Micro-batch 에서 수정되었거나 혹은 신규 데이터인 경우에만 Row 를 추가해 테이블을 만듭니다.
* **Complete** 모드는 전체 결과를 가진 테이블을 만듭니다.

\
그림으로 표현하면 아래와 같습니다.

![Spark Output Mode (Link)](/files/bsN9uRuKHXZUtGOZfxi1)

Output Mode 에 따라 지원되는 연산이 달라집니다. 아래 표는 Spark 3.2.0 기준으로 Output Mode 와 지원되는 연산을 보여줍니다.

![Spark Structured Streaming Output Mode (Link)](/files/gw8kQLPHo1oAdXeoojZy)

사용하는 연산에 따라 지정할 수 있는 Output Mode 가 달라집니다.

* Aggregation (Group By 등) 을 Event Time 기준의 Watermark 와 사용할 경우에는 **Append / Update / Complete** 전부 사용 가능합니다.
* Aggregation 에 Watermark 가 없을 경우에는 Append 를 지원해야 하는데 기간 제한이 없어 Update 가 발생할 수 있으므로 이 경우에는 Append 모드가 불가능해집니다. 따라서 지원하지 않습니다.
* mapGroupsWithState 는 Update 모드만을, flatMapGroupsWithState 는 Function 파라미터로 지정되는 모드에 따라 Append / Update 를 사용할 수 있고 이를 바탕으로 Aggregation 지원 여부를 결정합니다. (Update = Aggregation 가능)
* **Join 연산을 사용한다면 Append 모드만 사용 가능합니다.**

코드에서는 Output 모드를 다음과 같이 지정할 수 있습니다. (Scala)

```scala
val streaming = dfConverted.writeStream
  .queryName("...")
  .format("...")
  .outputMode("update")  // "append", "complete"
```

### Output Sink

Output Sink 를 통해 가공한 Streaming 데이터를 내보낼 수 있습니다. 아래는 Spark 3.2.0 기준으로 지원되는 Output Sink 입니다.

![Kafka as Central Event Broker (Link)](/files/5g5ltAYQIwy5AoReymDO)

![Structured Streaming Sink Patterns (Link - Practical Spark)](/files/UEsjb8eBfMmXLRLhvQtT)

주로 Kafka, Foreach, File (Parquet) Sink 가 사용됩니다.

* **Kafka Sink** 를 통해 다른 Topic 으로 결과 데이터를 실시간으로 내보낼 때 사용할 수 있습니다.
* **File Sink** 를 통해 CDC 등의 데이터를 S3 로 Parquet 등의 Format 으로 적재할 수 있습니다.
* **Foreach Sink** 를 통해 Structured Streaming 이 지원하지 않는 저장소 (e.g., Dynamo) 등으로 전송할 수 있습니다.

![Spark Structured Streaming Output Sink (Link)](/files/TWrQq6Hao1VVsyCKpWxS)

foreach 와 foreachBatch Sink 의 경우 차이점은 다음과 같습니다.

* foreach 는 단건씩 처리할 수 있는 반면
* foreachBatch 는 하나의 Micro-batch 의 전체 결과 데이터를 DataFrame 으로 받아 사용할 수 있습니다.

foreach Sink 의 open 과 close 는 Micro-batch 마다 한번씩 호출됩니다. open 에서 true 를 리턴해야 하며 JDBC Connection 을 여는등 비싼 자원을 위한 연산을 여기서 수행할 수 있습니다.

```scala
streamingDatasetOfString.writeStream.foreach(
  new ForeachWriter[String] {

    def open(partitionId: Long, version: Long): Boolean = {
      // Open connection
    }

    def process(record: String): Unit = {
      // Write string to connection
    }

    def close(errorOrNull: Throwable): Unit = {
      // Close the connection
    }
  }
).start()
```

### Trigger

**Trigger** 를 이용해 얼마의 주기마다 Output Sink 로 데이터를 보낼지를 결정할 수 있습니다. Trigger 를 지정하지 않으면 Micro-batch 가 가장 빨리 수행될 수 있는 주기로 실행됩니다.

* `Trigger.ProcessingTime` 를 사용하는 경우, 이전 작업이 끝나지 않으면 다음 Micro-batch 를 수행하지 않습니다. 이전 작업이 끝난 뒤에야 수행 됩니다. 만약 이전 작업이 일찍 끝났다면 Interval 이 만료 되기까지 대기한 후 실행합니다.
* `Trigger.Continuous` 는 **Continuous Processing Mode** 를 지원하기 위한 Trigger 입니다. 실험 단계이며 일부 제약조건이 있을 수 있습니다.

```scala
// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start()

// One-time trigger
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start()

// Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start()
```

### Summary

이번 챕터에서 나온 개념 또는 용어 대한 정리입니다.

* Output Mode
  * Append / Update / Complete
* Output Sink
  * File / Kafka / Foreach / Foreach Batch
* Trigger (Micro-batch Interval)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://1ambda.gitbook.io/practical-data-pipeline/02-processing/2.4-stream/2.4.6-streaming-sink.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
