# 2.4.3 Spark Streaming

Spark Streaming API 를 이용해 사용자는 실시간 데이터 처리를 수행할 수 있습니다. 이번 챕터에서는 Spark Streaming 의 기본적인 개념과 사용법에 대해 알아봅니다.

![Spark Streaming Concept (Link)](/files/QL3BsWNM607585ZyBpMl)

아래는 Structured Streaming API 를 이용해 작성된 Word Count (단어 수 세기) 예제 입니다. 챕터들을 지나며 아래의 주제들을 하나씩 알아보겠습니다.

```scala
val stream = 

// Source
spark.readStream
    .format("kafka")
    .option("subscribe", "input-text")
    .load()

// Transformation
    .groupBy(col("value").cast("string").as("key"))
    .agg("count(*).as("value"))

// Sink
    .writeStream
    .format("kafka")
    .option("output-wordcount")

// Trigger, Output Mode
    .trigger("1 minute")
    .outputMode("update")

// Checkpoint
    .option("checkpointLocation", "s3://...")
    .start()

stream.awaitTermination()
```

### Spark Streaming API&#x20;

![ APIs (Link)](/files/PaK8uG5Tps1CBLDxzbk6)

Spark 3.2.0 기준으로 현재 두 가지 종류의 API 를 지원합니다.

* [Spark Streaming](https://spark.apache.org/docs/latest/streaming-programming-guide.html) (Low-Level API, DStream)
* [Spark Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) (High-Level API)

Spark Streaming (Low-Level API, DStream) 은 Low-level API 로서 다음과 같은 특징이 있습니다.

* Micro Batch 모드로 동작합니다. 즉, 입력 데이터를 일정 시간동안 데이터를 모아 사용자가 처리할 수 있도록 RDD 로 제공합니다.
* 사용자는 RDD 로 들어오는 데이터를 RDD API 를 이용해 처리합니다.

아래는 DStream API 를 이용한 코드 샘플입니다.

```scala
// Kafka 등으로부터 데이터를 읽어들이는 Stream 을 정의합니다. 아직 실행되지 않습니다.
streamUservent = ... 

streamUserEvent.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at executor
  }
}

ssc.start()             // 작업을 시작합니다. (ssc = Spark Streaming Context)
ssc.awaitTermination()  // 작업이 종료되길 대기합니다. 일반적으로 Streaming Application 의 종료는 SIGTERM (사용자 지정 종료) 또는 오류 등에 의해 발생할 수 있습니다.
```

Spark Structured Streaming 은 High-level API 로서 다음과 같은 특징이 있습니다.

* RDD 를 직접 다루지 않고, Spark SQL 의 최적화 등 이점을 누리며 편리한 DataFrame / Dateset API 를 사용할 수 있습니다.
* Micro Batch 이외에도 [Continuous Processing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing) 을 지원합니다 (Experimental)

아래는 Structured Streaming API 를 이용한 코드 샘플입니다.

```scala
case class UserProfile(userId: String, eventType: EventType, productId: String, productPrice: Double)

// Kafka Topic 을 읽어 DataFrame 으로 사용
val dfRaw = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka-broker-01:9092,kafka-broker-02:9092,kafka-broker-03:9092")
  .option("subscribe", "user-activity")
  .load()

// Dataset 으로 변경
val dsConverted = dfRaw.as[UserProfile] 

// 가공 및 집계
val dfTransformed = dfConverted.map(...).filter(...).groupBy(...)

// 가공한 데이터를 다시 Kafka 에 Write
val dfWritten = dfTransformed
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

dfWritten.awaitTermination()
```

만약 Streaming 이 아니라 Batch (1회 적재 후 종료) 를 패턴으로 사용하고 싶다면 writeStream 대신 [Generic Load / Save API](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html) 와 동일한 write 및 save 를 사용할 수 있습니다. Spark 는 Batch 와 Streaming 의 Sink 로 Kafka 를 지원합니다.

```scala
// Batch Source 로 부터 (S3 등) 데이터를 읽어 Kafka Sink 로 1회 적재 합니다.
val dfPersisted = df...
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()
```

![Dead Letter Queue (LInk)](/files/o5HgaIapBKJS8o1wtarc)

![Dead Letter Queue (LInk)](/files/YOgtf5eydmv4HcU90nK8)

{% hint style="info" %}
Spark Structured Streaming 을 이용하면 Batch / Stream 데이터 적재를 동일한 가공 API (DataFrame, Dataset) 을 이용해서 수행할 수 있습니다. API 의 통합이 사용자에게 언제 유용할까요?

다음 경우를 가정해 봅시다.

(A) Kafka Topic 의 Message 를 읽어 가공한 후 다른 Storage 로 적재합니다 (S3, Dynamo, Kafka 등)

(B) Processing 과정에서 실패한 Kafka Topic 내 Message 는 추후 다시 처리하기 위해 Dead Letter Queue (DLQ, 또 다른 Kafka Topic) 으로 보냅니다.

(C) DLQ Kafka Topic 내 Message 는 Retention 에 의해 삭제될 수 있기 때문에, DLQ 에 쌓인 메세지는 S3 에 적재합니다.

\
시간이 지나 S3 에 쌓인 데이터를 (Batch Source) 읽어 문제를 해결한 후 (A) 에서 보내려고 했던 Storage 에 다시 보낼 수 있습니다. 이 때 두 가지 옵션이 있을 수 있습니다.

1. S3 데이터를 Spark Batch Application 에서 읽어 최종 Storage 인 (A) 로 직접 적재합니다.
2. 만약 (A) 에서 사용되는 Spark Streaming 에서 메모리에 State 등을 활용한다면, S3 데이터를 읽어 (A) Kafka Topic 으로 데이터를 흘려 보내고 Spark Streaming 이 이 메세지를 다시 처리하게끔 할 수 있습니다.

즉, (2) 번 옵션은 Spark Batch Application 이 S3 데이터를 읽어 원본 Kafka Topic 으로 다시 보내고, 이것을 기존의 Spark Steraming Application 이 처리합니다.

단, 원본 Kafka Topic 이 다른 Consumer 들에 의해 공유되는 경우 (Inventory 등) 별도 작업 복구를 위한 Kafka Topic 을 만들고 Spark Streaming Application 이 이것을 사용할 수 있습니다.
{% endhint %}

### Processing Mode

![Spark Processing Modes (Link)](/files/rIzFWGVAqdR85t3AR5Kn)

Spark Streaming (DStream API) 또는 Structured Streaming 에서는 Micro-batch Processing 모드를 기본적으로 지원합니다. Structured Streaming 의 경우에는 Micro-batch Processing 모드 이외에도 Continuous Processing 을 실험적으로 (Experimental) 지원합니다.\
두 가지 프로세싱 모드의 차이점은 다음과 같습니다.

* Micro-batch Processing 모드의 경우 들어오는 데이터를 일정 기간 동안 모았다가 (Batch) 처리합니다.
* Continuous Processing 모드의 경우 지연 없이 바로바로 처리합니다.

\
아래 그림을 통해 두 가지 Processing 모드의 차이점을 살펴볼 수 있습니다.

![Micro-batch Processing Mode (Link)](/files/4eCYEBz6FNtwOY8iUEdQ)

![Continuous Processing Mode (Link)](/files/OAWTIkE6dDQ10xNHFlu4)

![Micro-batch Processing Mode (Link)](/files/5TqEluh1pClCpIiSm7KL)

![Continuous Processing Mode (Link)](/files/CXp2GUd1x4yBy8OU6e4q)

{% hint style="info" %}
Micro-batch Processing 모드와 Continuous Processing 모드의 차이점은 무엇일까요?

* 개별 Record 의 Latency 관점에서 생각해보고
* 또 다른 Streaming Framework 인 [Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/) 는 어떤 Processing 모드를 지원하는지 찾아봅시다.
  {% endhint %}

### Streaming Patterns

![Spark Streaming Patterns (Link)](/files/OWomKX4hKlFbWvN8EcsE)

Streaming Application 은 Kafka 로 부터 데이터를 받아 다양한 곳에 적재할 수 있습니다. 위 그림과 같이 S3 에 데이터를 적재할 수도 있고, 다시 Kafka 로 보내 다른 실시간 처리를 위해 사용할 수도 있습니다.

![](/files/byDqJoaA68OkcXJ4vV4G)

이 챕터에서는 크게 결과 저장소의 종류를 기준으로 4 가지 사례로 나누어 알아봅니다.

1. S3 에 데이터를 적재합니다. 가장 기본적인 적재 방식으로서 Client / Server 의 이벤트 데이터나 CDC (Storage) 로 부터 온 데이터를 S3 에 내려 Query Engine 을 통해 조회하거나 추가적인 가공을 위해 보관합니다.
2. 하나 이상의 Kafka Topic 에서 데이터를 읽어 (Join) 처리 후 다른 Kafka Topic 으로 전송합니다.  때로는 다른 Kafka Cluster 의 Topic 으로 데이터를 전송할 수도 있습니다. Druid 와 같은 스토리지에서 이 Topic 의 데이터를 읽어 SQL 형태로 사용할 수 있습니다. Druid 는 기본적인 가공은 가능하나 복잡한 Join 및 Unnesting 등은 수행하기 어려우므로 Streaming Application 에서 가공하고 이것을 결과 Topic 에 저장한 뒤 Druid 와 같은 Storage 에서 읽어 사용합니다.
3. Kafka Topic 데이터를 읽어 External Key-Value 저장소에 지속적으로 '상태' 를 업데이트 합니다. (e.g., 상품의 재고 상태 등) 이 경우 '상태' 는 Stream Application 외부에 존재합니다. 경우에 따라 Stream (Event) 과 Batch (Meta) 데이터를 Join 할 수 있습니다. Batch 데이터는 1일 1회 갱신하며, 실시간으로 인입되는 Stream 데이터와 Join 해 결과를 만듭니다.
4. Kafka Topic 데이터를 읽어 Stream Application 내에 In-memory '상태' (State) 를 만든뒤 이 상태 값을 바탕으로 연산을 수행하고 결과를 내보냅니다. Output 은 다른 Kafka Topic 일 수 있습니다. 단, In-memory State 는 '메모리' 에 저장하므로 사이즈 제한이 있고, Checkpoint 등에 저장되어 안전하게 보일지라도 외부에서 사용 가능한 데이터가 아니므로 단일 Streaming Application 내에서만 활용 가능해, 범용성이 크게 집니다.

> 일반적으로 Streaming 의 결과물이 다른 팀에 의해서 소비된다면 Latency 가 매우 중요하므로 ([@torreswoo](https://github.com/torreswoo)) Streaming Application 의 Output 은 Storage 가 아니라 Kafka Topic 인 경우가 많습니다.

{% hint style="info" %}
Stream Processing 은 실시간 데이터를 처리할 수 있지만 매우 물리 / 노동 비용 관점에서 고비용입니다. 다음 측면을 고려해 봅시다.

* 지속적으로 Streaming Appilcation 이 떠있어야 합니다. 특성상 Streaming Application 은 트래픽이 튀는 상황에서도 '안전성' 을 보장해야 하므로 일정량 이상의 여분 리소스가 필요합니다. (경우에 따라 Auto Scaling 을 사용할 수도 있습니다.)
* 지속적으로 Streaming Application 이 떠 있다는 말은 새벽에 문제가 생길 경우에도 담당자가 대응을 해야한다는 의미입니다. 자동 복구 등의 프로세스를 갖출 수 있겠지만, 예외적인 경우에는 사람의 간섭이 필요한 복구가 존재할 수 있습니다. (정책적 결정이 필요한 경우 등) Streaming Application 의 숫자가 늘어날수록 노동 인력에게 가해지는 부하가 늘어납니다.
* Streaming Application 을 위한 기반 데이터 인프라가 필요합니다. Kafka / Zookeeper 등이 필요하고 Spark / Flink Streaming 을 위해 Cluster Manager 가 필요합니다.
* 단일 Streaming Application 을 위해 복구용 Topic 과 Batch Application, Dead Letter 큐를 S3 에 내리기 위한 보존용 Application 등 여러 Application 필요할 수 있습니다.

\
따라서 Streaming Application 은 일반적으로 '실시간 데이터' 를 사용해 효과가 혹은 혜택이 있는 경우에만 사용하는게 바람직합니다.
{% endhint %}

이제 각 패턴을 하나씩 살펴봅시다.

![](/files/UeK6sLqsRvMdFnjHKlK1)

#### Pattern 1 ) Kafka Topic Relay

Kafka Topic 에서 데이터를 읽어 어디론가 저장하는 단순한 패턴입니다.

* Kafka Topic 을 Spark 읽어 S3 (HDFS, GCS) 로 저장할 수 있습니다.
* Kafka Topic 을 다른 Kafka Broker 로 Relay 하거나 Kinesis To Kafka Relay 일 수 있습니다.

\
Client Event 에 Session ID 발급 / Event 전처리 등 추가적인 가공이 들어갈 수 있기 때문에 Kafka Connect 처럼 단순 Relay 를 제공하는 프레임워크보다는 가공 로직을 사내 기준에 맞추어 자유롭게 변경할 수 있는 Spark / Flink 프레임워크가 선호됩니다.

```scala
val dfRaw = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka-broker-01:9092,kafka-broker-02:9092,kafka-broker-03:9092")
  .option("subscribe", "client-event")
  .load()


val dfWithSessionId = dfRaw.withColumn(...)

val dfWritten = dfWithSessionId
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

dfWritten.awaitTermination()
```

![](/files/gWW38eft3We9a7RloPj4)

#### **Pattern 2 ) Kafka Topic Converting**

Kafka Topic 을 용도에 맞추어 가공해 사용하는 패턴입니다.

* 여러 Topic 을 Union / Join 할 수 있습니다. 예를 들어 Client Web / Client App 이벤트의 포맷이 서로 다르며, 각기 다른 Kafka Topic 으로 전송될 때 두개의 Kafka Stream 을 공통 포맷으로 변환한 뒤 Spark DataFrame 에서 Join 할 수 있습니다.
* 이렇게 가공된 통합 Client Event 는 Kafka 로 보내져 Druid 등에서 소비되거나 직접 Storage 로 적재될 수 있습니다

```scala
val dfWebRaw = spark.
  .readStream
  .format("kafka")
  ...
  .option("subscribe", "client-event-web")
  .load()

val dfWebRaw = spark
  .readStream
  .format("kafka")
  ...
  .option("subscribe", "client-event-app")
  .load()

// 최초에 공용 Client Format 으로 Event 로그를 정의할 수 있습니다. 
// 이 예제에서는 Union 을 샘플로 보여주기 위해 아래와 같이 이벤트 포맷이 플랫폼별로 다르다는 가정을 가지고 있습니다.
case class UserEventWeb(userId: String, eventType: EventType, browser: String, ...)
case class UserEventApp(userId: String, eventType: EventType, platform: String, ...)

val dsEventWeb = dfWebRaw.as[UserEventWeb]
val dsEventApp = dfWebRaw.as[UserEventApp]

val dsEventCommonWeb = dsEventWeb.map(e => e.convertToEventCommon())
val dsEventCommonApp = dsEventApp.map(e => e.convertToEventCommon())

val dsEventCommonAll = dsEventCommonWeb.union(dsEventCommonApp)

// write into kafka
```

또 다른 가공 패턴으로는 메타 테이블 + 이벤트 스트림을 Join 하는 케이스입니다. (**Stream-Static Join**)

* 상품의 메타를 Batch 를 통해 읽어오고
* 상품의 사용자 이벤트를 Stream 을 통해 읽어 Join 해 사용하는 방식입니다.

다만 상품의 메타 데이터는 일별로 갱신될 수 있으므로 매번 즉시 쿼리하기 보다는 캐싱 후 어떻게 읽어온 배치 데이터를 갱신할것인지 주의해야 합니다.

[Stream-Stream Join](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#stream-stream-joins) 을 이용하면 다음과 실시간 Attribution 집계와 같은 작업을 해낼 수 있습니다.

![Stream - Stream Join Case (Link)](/files/FUYryDtEXrcUPXg8s1s6)

![Stream - Stream Join Case (Link)](/files/sUxh5rf7kTgJw6ViSkFG)

```scala
val dfImprRaw = spark.readStream(...)
val dfClickRaw = spark.readStream(...)

val dfImprWatermarked = dfImprRaw
  .selectExpr("product_id as pid_impr", "adid AS adid_impr", "ts_impr")
  .withWatermark("ts_impr", "10 seconds ")

val dfClickWatermarked = dfClickRaw \
  .selectExpr("product_id as pid_click", "adid AS adid_impr", "ts_click")
  .withWatermark("ts_click", "20 seconds")

# inner join with time range conditions
dfImprWatermarked.join(  
  dfClickWatermarked,
  expr(""" 
    adid_click = adid_impr AND
    pid_click = pid_impr AND 
    ts_click >= ts_impr AND 
    ts_click <= ts_impr + interval 5 minutes    
    """
  )
)
```

&#x20;

![](/files/EHhLx12VrVfByPj3EdMQ)

#### Parttern 3) 가공 후 Dynamo, HBase, Redis 등 API 에서 조회가 가능한 Storage 에 적재

Kafka Topic 에서 데이터를 Streaming Application 읽어 원하는 형태로 가공 후 원하는 Write / Read Capacity 와 Latency 를 제공하는 Storage 를 골라 데이터를 적재할 수 있습니다.

* (User Profile) 사용자 마다의 최근 이벤트 N 개 (검색, 상품 뷰, 주문 등) 를 시간순으로 정렬해 State 로 만들어 최신의 State 를 Dynamo 등에 Write 할 수 있습니다.
* (Product Profile) 상품의 재고 변동이나 상품 기준의 집계 메트릭 (주문, 조회 수, 범위 기간당 노출 대비 클릭률 등) 을 계산해 Storage 에 적재할 수 있습니다.
* 사용하는 Storage 의 Latency 및 Transaction, Upsert 등 구문 지원 여부에 따라 Streaming State 를 사용하지 않고 외부 저장소를 State 로 쓸 수 있습니다. (Get 후 Upsert)

아래 그림은 [mapGroupsWithState](https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-KeyValueGroupedDataset-mapGroupsWithState.html) 또는 [flatMapGroupsWithState](https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-KeyValueGroupedDataset-flatMapGroupsWithState.html) 를 사용해 Structured Streaming 에서 State 를 만드는 것을 도식화 한 그림입니다.

![Arbitrary Stateful Processing (Link](/files/OaOwYW33n9PMlmoESZj0)

```scala
val dsUserEvent = dfUserEvent.as[UserEvent]
  
def stateFunc(userId:String,
              inputs: Iterator[UserEvent],
              oldState: GroupState[UserState]): UserState = {
...
}

val dsGrouped = dsUserEvent
    .groupByKey(u => u.userId)
    .mapGroupsWithState(timeoutSpec)(stateFunc)
```

![](/files/ZkiLP4FM71D1ePZAdq1H)

#### Pattern 4) In-memory State

Spark Application 은 분산 처리를 수행하며 Executor 의 경우 N 개가 될 수 있으므로, 이 N 개 Executor 의 Memory 를 활용해 State 를 만들어 계산에 활용할 수 있습니다.

* 즉 Spark Executor 내 Memory 를 State 저장소로 활용해 외부 저장소 조회 없이 빠른 집계를 수행할 수 있습니다.
* 빠른 집계를 위해 In-memory State 를 쓰는 만큼 주된 Output 은 Kafka Topic 입니다.

다만 Memory 사이즈에 제한이 있으므로 Key 는 TTL 등을 통해 무한히 늘어나지 않도록 지정해야 합니다.

### Summary

이번 챕터에서 나온 개념 또는 용어 대한 정리입니다.

* Micro-batch Processing Mode
* Continuous Processing Mode
* Dead Letter Queue
* Flink (Streaming Framework)
* AWS Kinesis
* Key Value Storage (Dynamo, HBase, ...)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://1ambda.gitbook.io/practical-data-pipeline/02-processing/2.4-stream/2.4.3-spark-streaming.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
