2.4.3 Spark Streaming

Spark Streaming API 를 이용해 사용자는 실시간 데이터 처리를 수행할 수 있습니다. 이번 챕터에서는 Spark Streaming 의 기본적인 개념과 사용법에 대해 알아봅니다.

아래는 Structured Streaming API 를 이용해 작성된 Word Count (단어 수 세기) 예제 입니다. 챕터들을 지나며 아래의 주제들을 하나씩 알아보겠습니다.

val stream = 

// Source
spark.readStream
    .format("kafka")
    .option("subscribe", "input-text")
    .load()

// Transformation
    .groupBy(col("value").cast("string").as("key"))
    .agg("count(*).as("value"))

// Sink
    .writeStream
    .format("kafka")
    .option("output-wordcount")

// Trigger, Output Mode
    .trigger("1 minute")
    .outputMode("update")

// Checkpoint
    .option("checkpointLocation", "s3://...")
    .start()

stream.awaitTermination()

Spark Streaming API

Spark 3.2.0 기준으로 현재 두 가지 종류의 API 를 지원합니다.

Spark Streaming (Low-Level API, DStream) 은 Low-level API 로서 다음과 같은 특징이 있습니다.

  • Micro Batch 모드로 동작합니다. 즉, 입력 데이터를 일정 시간동안 데이터를 모아 사용자가 처리할 수 있도록 RDD 로 제공합니다.

  • 사용자는 RDD 로 들어오는 데이터를 RDD API 를 이용해 처리합니다.

아래는 DStream API 를 이용한 코드 샘플입니다.

// Kafka 등으로부터 데이터를 읽어들이는 Stream 을 정의합니다. 아직 실행되지 않습니다.
streamUservent = ... 

streamUserEvent.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at executor
  }
}

ssc.start()             // 작업을 시작합니다. (ssc = Spark Streaming Context)
ssc.awaitTermination()  // 작업이 종료되길 대기합니다. 일반적으로 Streaming Application 의 종료는 SIGTERM (사용자 지정 종료) 또는 오류 등에 의해 발생할 수 있습니다.

Spark Structured Streaming 은 High-level API 로서 다음과 같은 특징이 있습니다.

  • RDD 를 직접 다루지 않고, Spark SQL 의 최적화 등 이점을 누리며 편리한 DataFrame / Dateset API 를 사용할 수 있습니다.

  • Micro Batch 이외에도 Continuous Processing 을 지원합니다 (Experimental)

아래는 Structured Streaming API 를 이용한 코드 샘플입니다.

case class UserProfile(userId: String, eventType: EventType, productId: String, productPrice: Double)

// Kafka Topic 을 읽어 DataFrame 으로 사용
val dfRaw = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka-broker-01:9092,kafka-broker-02:9092,kafka-broker-03:9092")
  .option("subscribe", "user-activity")
  .load()

// Dataset 으로 변경
val dsConverted = dfRaw.as[UserProfile] 

// 가공 및 집계
val dfTransformed = dfConverted.map(...).filter(...).groupBy(...)

// 가공한 데이터를 다시 Kafka 에 Write
val dfWritten = dfTransformed
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

dfWritten.awaitTermination()

만약 Streaming 이 아니라 Batch (1회 적재 후 종료) 를 패턴으로 사용하고 싶다면 writeStream 대신 Generic Load / Save API 와 동일한 write 및 save 를 사용할 수 있습니다. Spark 는 Batch 와 Streaming 의 Sink 로 Kafka 를 지원합니다.

// Batch Source 로 부터 (S3 등) 데이터를 읽어 Kafka Sink 로 1회 적재 합니다.
val dfPersisted = df...
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()

Spark Structured Streaming 을 이용하면 Batch / Stream 데이터 적재를 동일한 가공 API (DataFrame, Dataset) 을 이용해서 수행할 수 있습니다. API 의 통합이 사용자에게 언제 유용할까요?

다음 경우를 가정해 봅시다.

(A) Kafka Topic 의 Message 를 읽어 가공한 후 다른 Storage 로 적재합니다 (S3, Dynamo, Kafka 등)

(B) Processing 과정에서 실패한 Kafka Topic 내 Message 는 추후 다시 처리하기 위해 Dead Letter Queue (DLQ, 또 다른 Kafka Topic) 으로 보냅니다.

(C) DLQ Kafka Topic 내 Message 는 Retention 에 의해 삭제될 수 있기 때문에, DLQ 에 쌓인 메세지는 S3 에 적재합니다.

시간이 지나 S3 에 쌓인 데이터를 (Batch Source) 읽어 문제를 해결한 후 (A) 에서 보내려고 했던 Storage 에 다시 보낼 수 있습니다. 이 때 두 가지 옵션이 있을 수 있습니다.

  1. S3 데이터를 Spark Batch Application 에서 읽어 최종 Storage 인 (A) 로 직접 적재합니다.

  2. 만약 (A) 에서 사용되는 Spark Streaming 에서 메모리에 State 등을 활용한다면, S3 데이터를 읽어 (A) Kafka Topic 으로 데이터를 흘려 보내고 Spark Streaming 이 이 메세지를 다시 처리하게끔 할 수 있습니다.

즉, (2) 번 옵션은 Spark Batch Application 이 S3 데이터를 읽어 원본 Kafka Topic 으로 다시 보내고, 이것을 기존의 Spark Steraming Application 이 처리합니다.

단, 원본 Kafka Topic 이 다른 Consumer 들에 의해 공유되는 경우 (Inventory 등) 별도 작업 복구를 위한 Kafka Topic 을 만들고 Spark Streaming Application 이 이것을 사용할 수 있습니다.

Processing Mode

Spark Streaming (DStream API) 또는 Structured Streaming 에서는 Micro-batch Processing 모드를 기본적으로 지원합니다. Structured Streaming 의 경우에는 Micro-batch Processing 모드 이외에도 Continuous Processing 을 실험적으로 (Experimental) 지원합니다. 두 가지 프로세싱 모드의 차이점은 다음과 같습니다.

  • Micro-batch Processing 모드의 경우 들어오는 데이터를 일정 기간 동안 모았다가 (Batch) 처리합니다.

  • Continuous Processing 모드의 경우 지연 없이 바로바로 처리합니다.

아래 그림을 통해 두 가지 Processing 모드의 차이점을 살펴볼 수 있습니다.

Micro-batch Processing 모드와 Continuous Processing 모드의 차이점은 무엇일까요?

  • 개별 Record 의 Latency 관점에서 생각해보고

  • 또 다른 Streaming Framework 인 Flink 는 어떤 Processing 모드를 지원하는지 찾아봅시다.

Streaming Patterns

Streaming Application 은 Kafka 로 부터 데이터를 받아 다양한 곳에 적재할 수 있습니다. 위 그림과 같이 S3 에 데이터를 적재할 수도 있고, 다시 Kafka 로 보내 다른 실시간 처리를 위해 사용할 수도 있습니다.

이 챕터에서는 크게 결과 저장소의 종류를 기준으로 4 가지 사례로 나누어 알아봅니다.

  1. S3 에 데이터를 적재합니다. 가장 기본적인 적재 방식으로서 Client / Server 의 이벤트 데이터나 CDC (Storage) 로 부터 온 데이터를 S3 에 내려 Query Engine 을 통해 조회하거나 추가적인 가공을 위해 보관합니다.

  2. 하나 이상의 Kafka Topic 에서 데이터를 읽어 (Join) 처리 후 다른 Kafka Topic 으로 전송합니다. 때로는 다른 Kafka Cluster 의 Topic 으로 데이터를 전송할 수도 있습니다. Druid 와 같은 스토리지에서 이 Topic 의 데이터를 읽어 SQL 형태로 사용할 수 있습니다. Druid 는 기본적인 가공은 가능하나 복잡한 Join 및 Unnesting 등은 수행하기 어려우므로 Streaming Application 에서 가공하고 이것을 결과 Topic 에 저장한 뒤 Druid 와 같은 Storage 에서 읽어 사용합니다.

  3. Kafka Topic 데이터를 읽어 External Key-Value 저장소에 지속적으로 '상태' 를 업데이트 합니다. (e.g., 상품의 재고 상태 등) 이 경우 '상태' 는 Stream Application 외부에 존재합니다. 경우에 따라 Stream (Event) 과 Batch (Meta) 데이터를 Join 할 수 있습니다. Batch 데이터는 1일 1회 갱신하며, 실시간으로 인입되는 Stream 데이터와 Join 해 결과를 만듭니다.

  4. Kafka Topic 데이터를 읽어 Stream Application 내에 In-memory '상태' (State) 를 만든뒤 이 상태 값을 바탕으로 연산을 수행하고 결과를 내보냅니다. Output 은 다른 Kafka Topic 일 수 있습니다. 단, In-memory State 는 '메모리' 에 저장하므로 사이즈 제한이 있고, Checkpoint 등에 저장되어 안전하게 보일지라도 외부에서 사용 가능한 데이터가 아니므로 단일 Streaming Application 내에서만 활용 가능해, 범용성이 크게 집니다.

일반적으로 Streaming 의 결과물이 다른 팀에 의해서 소비된다면 Latency 가 매우 중요하므로 (@torreswoo) Streaming Application 의 Output 은 Storage 가 아니라 Kafka Topic 인 경우가 많습니다.

Stream Processing 은 실시간 데이터를 처리할 수 있지만 매우 물리 / 노동 비용 관점에서 고비용입니다. 다음 측면을 고려해 봅시다.

  • 지속적으로 Streaming Appilcation 이 떠있어야 합니다. 특성상 Streaming Application 은 트래픽이 튀는 상황에서도 '안전성' 을 보장해야 하므로 일정량 이상의 여분 리소스가 필요합니다. (경우에 따라 Auto Scaling 을 사용할 수도 있습니다.)

  • 지속적으로 Streaming Application 이 떠 있다는 말은 새벽에 문제가 생길 경우에도 담당자가 대응을 해야한다는 의미입니다. 자동 복구 등의 프로세스를 갖출 수 있겠지만, 예외적인 경우에는 사람의 간섭이 필요한 복구가 존재할 수 있습니다. (정책적 결정이 필요한 경우 등) Streaming Application 의 숫자가 늘어날수록 노동 인력에게 가해지는 부하가 늘어납니다.

  • Streaming Application 을 위한 기반 데이터 인프라가 필요합니다. Kafka / Zookeeper 등이 필요하고 Spark / Flink Streaming 을 위해 Cluster Manager 가 필요합니다.

  • 단일 Streaming Application 을 위해 복구용 Topic 과 Batch Application, Dead Letter 큐를 S3 에 내리기 위한 보존용 Application 등 여러 Application 필요할 수 있습니다.

따라서 Streaming Application 은 일반적으로 '실시간 데이터' 를 사용해 효과가 혹은 혜택이 있는 경우에만 사용하는게 바람직합니다.

이제 각 패턴을 하나씩 살펴봅시다.

Pattern 1 ) Kafka Topic Relay

Kafka Topic 에서 데이터를 읽어 어디론가 저장하는 단순한 패턴입니다.

  • Kafka Topic 을 Spark 읽어 S3 (HDFS, GCS) 로 저장할 수 있습니다.

  • Kafka Topic 을 다른 Kafka Broker 로 Relay 하거나 Kinesis To Kafka Relay 일 수 있습니다.

Client Event 에 Session ID 발급 / Event 전처리 등 추가적인 가공이 들어갈 수 있기 때문에 Kafka Connect 처럼 단순 Relay 를 제공하는 프레임워크보다는 가공 로직을 사내 기준에 맞추어 자유롭게 변경할 수 있는 Spark / Flink 프레임워크가 선호됩니다.

val dfRaw = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka-broker-01:9092,kafka-broker-02:9092,kafka-broker-03:9092")
  .option("subscribe", "client-event")
  .load()


val dfWithSessionId = dfRaw.withColumn(...)

val dfWritten = dfWithSessionId
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

dfWritten.awaitTermination()

Pattern 2 ) Kafka Topic Converting

Kafka Topic 을 용도에 맞추어 가공해 사용하는 패턴입니다.

  • 여러 Topic 을 Union / Join 할 수 있습니다. 예를 들어 Client Web / Client App 이벤트의 포맷이 서로 다르며, 각기 다른 Kafka Topic 으로 전송될 때 두개의 Kafka Stream 을 공통 포맷으로 변환한 뒤 Spark DataFrame 에서 Join 할 수 있습니다.

  • 이렇게 가공된 통합 Client Event 는 Kafka 로 보내져 Druid 등에서 소비되거나 직접 Storage 로 적재될 수 있습니다

val dfWebRaw = spark.
  .readStream
  .format("kafka")
  ...
  .option("subscribe", "client-event-web")
  .load()

val dfWebRaw = spark
  .readStream
  .format("kafka")
  ...
  .option("subscribe", "client-event-app")
  .load()

// 최초에 공용 Client Format 으로 Event 로그를 정의할 수 있습니다. 
// 이 예제에서는 Union 을 샘플로 보여주기 위해 아래와 같이 이벤트 포맷이 플랫폼별로 다르다는 가정을 가지고 있습니다.
case class UserEventWeb(userId: String, eventType: EventType, browser: String, ...)
case class UserEventApp(userId: String, eventType: EventType, platform: String, ...)

val dsEventWeb = dfWebRaw.as[UserEventWeb]
val dsEventApp = dfWebRaw.as[UserEventApp]

val dsEventCommonWeb = dsEventWeb.map(e => e.convertToEventCommon())
val dsEventCommonApp = dsEventApp.map(e => e.convertToEventCommon())

val dsEventCommonAll = dsEventCommonWeb.union(dsEventCommonApp)

// write into kafka

또 다른 가공 패턴으로는 메타 테이블 + 이벤트 스트림을 Join 하는 케이스입니다. (Stream-Static Join)

  • 상품의 메타를 Batch 를 통해 읽어오고

  • 상품의 사용자 이벤트를 Stream 을 통해 읽어 Join 해 사용하는 방식입니다.

다만 상품의 메타 데이터는 일별로 갱신될 수 있으므로 매번 즉시 쿼리하기 보다는 캐싱 후 어떻게 읽어온 배치 데이터를 갱신할것인지 주의해야 합니다.

Stream-Stream Join 을 이용하면 다음과 실시간 Attribution 집계와 같은 작업을 해낼 수 있습니다.

val dfImprRaw = spark.readStream(...)
val dfClickRaw = spark.readStream(...)

val dfImprWatermarked = dfImprRaw
  .selectExpr("product_id as pid_impr", "adid AS adid_impr", "ts_impr")
  .withWatermark("ts_impr", "10 seconds ")

val dfClickWatermarked = dfClickRaw \
  .selectExpr("product_id as pid_click", "adid AS adid_impr", "ts_click")
  .withWatermark("ts_click", "20 seconds")

# inner join with time range conditions
dfImprWatermarked.join(  
  dfClickWatermarked,
  expr(""" 
    adid_click = adid_impr AND
    pid_click = pid_impr AND 
    ts_click >= ts_impr AND 
    ts_click <= ts_impr + interval 5 minutes    
    """
  )
)

Parttern 3) 가공 후 Dynamo, HBase, Redis 등 API 에서 조회가 가능한 Storage 에 적재

Kafka Topic 에서 데이터를 Streaming Application 읽어 원하는 형태로 가공 후 원하는 Write / Read Capacity 와 Latency 를 제공하는 Storage 를 골라 데이터를 적재할 수 있습니다.

  • (User Profile) 사용자 마다의 최근 이벤트 N 개 (검색, 상품 뷰, 주문 등) 를 시간순으로 정렬해 State 로 만들어 최신의 State 를 Dynamo 등에 Write 할 수 있습니다.

  • (Product Profile) 상품의 재고 변동이나 상품 기준의 집계 메트릭 (주문, 조회 수, 범위 기간당 노출 대비 클릭률 등) 을 계산해 Storage 에 적재할 수 있습니다.

  • 사용하는 Storage 의 Latency 및 Transaction, Upsert 등 구문 지원 여부에 따라 Streaming State 를 사용하지 않고 외부 저장소를 State 로 쓸 수 있습니다. (Get 후 Upsert)

아래 그림은 mapGroupsWithState 또는 flatMapGroupsWithState 를 사용해 Structured Streaming 에서 State 를 만드는 것을 도식화 한 그림입니다.

val dsUserEvent = dfUserEvent.as[UserEvent]
  
def stateFunc(userId:String,
              inputs: Iterator[UserEvent],
              oldState: GroupState[UserState]): UserState = {
...
}

val dsGrouped = dsUserEvent
    .groupByKey(u => u.userId)
    .mapGroupsWithState(timeoutSpec)(stateFunc)

Pattern 4) In-memory State

Spark Application 은 분산 처리를 수행하며 Executor 의 경우 N 개가 될 수 있으므로, 이 N 개 Executor 의 Memory 를 활용해 State 를 만들어 계산에 활용할 수 있습니다.

  • 즉 Spark Executor 내 Memory 를 State 저장소로 활용해 외부 저장소 조회 없이 빠른 집계를 수행할 수 있습니다.

  • 빠른 집계를 위해 In-memory State 를 쓰는 만큼 주된 Output 은 Kafka Topic 입니다.

다만 Memory 사이즈에 제한이 있으므로 Key 는 TTL 등을 통해 무한히 늘어나지 않도록 지정해야 합니다.

Summary

이번 챕터에서 나온 개념 또는 용어 대한 정리입니다.

  • Micro-batch Processing Mode

  • Continuous Processing Mode

  • Dead Letter Queue

  • Flink (Streaming Framework)

  • AWS Kinesis

  • Key Value Storage (Dynamo, HBase, ...)

Last updated