practical-data-pipeline
  • Practical Data Pipeline
  • 시작 전에 드리는 당부의 말
  • 01 - 데이터 인프라
    • 1.1 데이터 파이프라인
    • 1.2 데이터 입수 (Ingestion)
    • 1.2 데이터 가공 (Processing)
    • 1.3 데이터 저장 (Storage)
    • 1.4 데이터 분석 (Analysis)
  • 02 - 데이터 처리
    • 2.1 데이터 처리
    • 2.2 배치 (Batch) 처리
      • 2.1.1 Spark Intro
      • 2.1.2 Spark Tutorial
      • 2.1.3 Spark Concept
      • 2.1.4 Spark Architecture
      • 2.1.5 Spark DataFrame
      • 2.1.6 Spark Persistence
      • 2.1.7 Spark Cache
      • 2.1.8 Spark SQL & Table
      • 2.1.9 Spark Join
      • 2.2.1 Spark Memory
      • 2.2.2 Spark Versions
    • 2.3 워크플로우 (Workflow) 관리
    • 2.4 스트림 (Stream) 처리
      • 2.4.1 Kafka Intro
      • 2.4.2 Kafka Advanced
      • 2.4.3 Spark Streaming
      • 2.4.4 Streaming Window
      • 2.4.5 Streaming State
      • 2.4.6 Streaming Sink
  • 04 - 데이터 스토리지
    • 4.1 Kafka
      • 4.1 Kafka Concept
      • 4.2 Kafka Advanced
      • 4.3 Kafka Versions
    • 4.2 Redis
    • 4.3 RDB (MySQL)
    • 4.4 ElasticSearch
    • 4.5 KV Storage (DynamoDB)
    • 4.6 Druid
  • 05 - 데이터 애플리케이션
    • 5.1 데이터 서비스
    • 5.2 통계 서비스
    • 5.3 추천 서비스
    • 5.4 A/B 테스팅
  • 08 - Case Study
    • Week 1 - Data Pipeline
    • Week 2 - EMR & Kubernetes
    • Week 3 - Metastore
    • Week 4 - KV & Delta Storage
    • Week 5 - Kafka Rebalancing
    • Week 6 - ML Pipeline
  • 09 - 설치 및 환경 구성
    • Spark 설치 및 환경 구성
      • Spark - Local Shell 환경
      • Spark - Local Jupyter 환경
      • Spark - Kubernetes 환경
      • Spark - EMR 환경
      • Spark - Databricks 환경 (SaaS)
    • Flink 설치 및 환경 구성
    • Kafka 설치 및 환경 구성
    • MySQL 설치 및 환경 구성
    • DynamoDB 사용을 위한 환경 구성
    • ElasticSearch 설치 및 환경 구성
    • Presto 설치 및 환경 구성
    • Druid 설치 및 환경 구성
Powered by GitBook
On this page
  • Output Mode
  • Output Sink
  • Trigger
  • Summary

Was this helpful?

  1. 02 - 데이터 처리
  2. 2.4 스트림 (Stream) 처리

2.4.6 Streaming Sink

Previous2.4.5 Streaming StateNext4.1 Kafka

Last updated 3 years ago

Was this helpful?

이번 챕터에서는 Spark Structured Streaming 의 Output Sink (저장소) 와 Output Mode 에 대해 알아봅니다.

Output Mode

3 가지의 Output Mode 가 존재합니다. Output Mode 는 Spark Structured Streaming 의 결과가 Micro-batch 로 만들어지는 '테이블' 형태라는 가정하에 이해하면 쉽습니다.

  • Append (Default) 모드는 지난번 Micro-batch 에는 없었던 신규 데이터인 경우에 Row 를 추가해 테이블을 만듭니다.

  • Update (2.1.1+) 모드는 지난번 Micro-batch 에서 수정되었거나 혹은 신규 데이터인 경우에만 Row 를 추가해 테이블을 만듭니다.

  • Complete 모드는 전체 결과를 가진 테이블을 만듭니다.

그림으로 표현하면 아래와 같습니다.

Output Mode 에 따라 지원되는 연산이 달라집니다. 아래 표는 Spark 3.2.0 기준으로 Output Mode 와 지원되는 연산을 보여줍니다.

사용하는 연산에 따라 지정할 수 있는 Output Mode 가 달라집니다.

  • Aggregation (Group By 등) 을 Event Time 기준의 Watermark 와 사용할 경우에는 Append / Update / Complete 전부 사용 가능합니다.

  • Aggregation 에 Watermark 가 없을 경우에는 Append 를 지원해야 하는데 기간 제한이 없어 Update 가 발생할 수 있으므로 이 경우에는 Append 모드가 불가능해집니다. 따라서 지원하지 않습니다.

  • mapGroupsWithState 는 Update 모드만을, flatMapGroupsWithState 는 Function 파라미터로 지정되는 모드에 따라 Append / Update 를 사용할 수 있고 이를 바탕으로 Aggregation 지원 여부를 결정합니다. (Update = Aggregation 가능)

  • Join 연산을 사용한다면 Append 모드만 사용 가능합니다.

코드에서는 Output 모드를 다음과 같이 지정할 수 있습니다. (Scala)

val streaming = dfConverted.writeStream
  .queryName("...")
  .format("...")
  .outputMode("update")  // "append", "complete"

Output Sink

Output Sink 를 통해 가공한 Streaming 데이터를 내보낼 수 있습니다. 아래는 Spark 3.2.0 기준으로 지원되는 Output Sink 입니다.

주로 Kafka, Foreach, File (Parquet) Sink 가 사용됩니다.

  • Kafka Sink 를 통해 다른 Topic 으로 결과 데이터를 실시간으로 내보낼 때 사용할 수 있습니다.

  • File Sink 를 통해 CDC 등의 데이터를 S3 로 Parquet 등의 Format 으로 적재할 수 있습니다.

  • Foreach Sink 를 통해 Structured Streaming 이 지원하지 않는 저장소 (e.g., Dynamo) 등으로 전송할 수 있습니다.

foreach 와 foreachBatch Sink 의 경우 차이점은 다음과 같습니다.

  • foreach 는 단건씩 처리할 수 있는 반면

  • foreachBatch 는 하나의 Micro-batch 의 전체 결과 데이터를 DataFrame 으로 받아 사용할 수 있습니다.

foreach Sink 의 open 과 close 는 Micro-batch 마다 한번씩 호출됩니다. open 에서 true 를 리턴해야 하며 JDBC Connection 을 여는등 비싼 자원을 위한 연산을 여기서 수행할 수 있습니다.

streamingDatasetOfString.writeStream.foreach(
  new ForeachWriter[String] {

    def open(partitionId: Long, version: Long): Boolean = {
      // Open connection
    }

    def process(record: String): Unit = {
      // Write string to connection
    }

    def close(errorOrNull: Throwable): Unit = {
      // Close the connection
    }
  }
).start()

Trigger

Trigger 를 이용해 얼마의 주기마다 Output Sink 로 데이터를 보낼지를 결정할 수 있습니다. Trigger 를 지정하지 않으면 Micro-batch 가 가장 빨리 수행될 수 있는 주기로 실행됩니다.

  • Trigger.ProcessingTime 를 사용하는 경우, 이전 작업이 끝나지 않으면 다음 Micro-batch 를 수행하지 않습니다. 이전 작업이 끝난 뒤에야 수행 됩니다. 만약 이전 작업이 일찍 끝났다면 Interval 이 만료 되기까지 대기한 후 실행합니다.

  • Trigger.Continuous 는 Continuous Processing Mode 를 지원하기 위한 Trigger 입니다. 실험 단계이며 일부 제약조건이 있을 수 있습니다.

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start()

// One-time trigger
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start()

// Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start()

Summary

이번 챕터에서 나온 개념 또는 용어 대한 정리입니다.

  • Output Mode

    • Append / Update / Complete

  • Output Sink

    • File / Kafka / Foreach / Foreach Batch

  • Trigger (Micro-batch Interval)

Structured Streaming: Output Mode
Structured Steaming: Output Sink
Spark Output Mode ()
Spark Structured Streaming Output Mode ()
Kafka as Central Event Broker ()
Structured Streaming Sink Patterns ()
Spark Structured Streaming Output Sink ()
Link
Link
Link
Link - Practical Spark
Link