2.4.5 Streaming State
Last updated
Was this helpful?
Last updated
Was this helpful?
이번 챕터에서는 State 를 어떻게 만들고, State 가 Spark Streaming 에서 관리되는지를 알아봅니다.
State 는 Spark 가 Executor 메모리에 유지하는 '상태' 를 말합니다. 다음 연산들을 사용할 경우에 State 가 만들어집니다.
Join
Aggregation
Window
Arbitrary Stateful Operation
Join, Aggregation (Group By), Window 의 경우에는 사용자가 Spark 의 함수를 사용해서 State 를 비명시적으로 만들게 됩니다.
Arbitrary Stateful Operation 이란 사용자가 명시적으로 'State' 를 만드는 방법입니다. Direct Streaming / Structured Streaming 각각 다음의 API 를 이용할 수 있습니다.
(Direct Streaming) ,
(Structured Streaming)
사용자가 State 가 없는 연산 (단순 Filter, Map 등) 을 수행하는 것이 아니라 Join, Aggtegation, Window 를 통해 비명시적으로 혹은 mapGroupsWithState
와 같은 함수를 통해 명시적으로 State 를 사용할때 State 는 Executor 의 메모리에 존재합니다.
만약 이 때 Executor 가 죽는다면 State 는 날라가서 복구되지 못하는 걸까요?
또한 State 가 과하면 OOM 이 발생하지 않을지도 같이 이야기 해 봅시다.
Checkpoint 게임에서의 'Save Point' 와 비슷하게 Spark Streaming Application 이 특정 시점의 데이터를 기록해놓는 것입니다.
Spark Structured Streaming 에서는 checkpointLocation
옵션을 통해 checkpoint 경로를 지정할 수 있습니다.
이 때 Checkpoint 에 기록되는 데이터는 크게 2가지로 나눌 수 있습니다.
Data (State)
Metadata (Kafka Offset 등)
Spark Streaming Application 은 기본적으로 Micro Batch 로 동작하기 때문에 이전 Batch 의 Data(State) 와 Metadata (Kafka Offset 등) 을 Checkpoint 에 기록합니다. 이를 통해
다음 Micro-batch 에서 이전 State 를 읽어 현재 Micro-batch 에서의 데이터와 더해 State 를 갱신하고
Executor Failure 와 같은 문제 상황 발생시 State 를 복구할 수 있도록 설계되었습니다.
사용자가 만든 State 에 사용자 ID 와 상품 방문 숫자가 저장된다면, Micro-batch 간의 다음과 같이 도식화 할 수 있습니다.
spark.readStream.format("rate")
를 이용하면 시간과 증가하는 정수값을 자동으로 생성하는 스트림 소스를 만들 수 있습니다. 아래 코드는 GroupBy 사용시 StateStoreSave 가 Plan 에 포함되는 것을 보여줍니다.
Spark Structured Streaming 은 Spark Batch 에서 사용하는 Dataset, DataFrame 의 API 를 거의 동일하게 제공하고 마찬가지로 Optimizer 를 통해 최적화되므로 위와 같이 explain() 을 이용해 실행 계획을 볼 수 있습니다.
Spark Batch 와 동일하게 실행 계획이 만들어지며, 차이점은 결과물이 단일 실행이 아니라 시간에 따라 연속적으로 실행되는 Micro-batch 가 된다는 점입니다.
이전에 만들었던 숫자를 Group By 하는 쿼리를 아래 커맨드를 통해 Spark Shell 에서 실행해볼 수 있습니다.
터미널 내에서 만들어진 Checkpoint 디렉토리를 살펴보면 다음과 같습니다.
위 내용을 보면 State 가 저장되는 디렉토리와 그 파일 구조를 확인할 수 있습니다.
state/{operatorId=0}/{partitionId=0}/{version}.delta
형식으로 기록됩니다.
실제로 spark.sql.shuffle.partitions 의 기본값은 200 이고, 이로 인해 200 개의 파티션의 각각 State 가 디렉토리별로 생성된 것을 볼 수 있습니다.
이제 State 이외의 다른 어떤 내용들이 Checkpoint 에 저장되는지 한번 살펴 보겠습니다.
아까 위에서 만들어진 Checkpoint 내 디렉토리 목록을 다시 살펴보면 다음과 같습니다.
위 그림에서 말하는 Metadata 를 Checkpoint 내 디렉토리로 보면 commits , offsets 가 해당됩니다.
Spark Structured Streaming 은 각 Micro-batch 마다 ID 가 존재합니다. (batchId = 0 부터 시작)
N 번째 배치에서는 먼저 offsets/N
파일이 만들어지고 여기에 필요한 메타데이터를 남깁니다. (WAL, Write-ahead LOG)
Kafka Source 를 사용한다면, 해당 Task 를 위한 Kafka Offset 이 {partitionId: offset}
구조로 남습니다.
N 번째 배치를 수행하기 위해 state/{operatorId}/{partitionId}/N
상태 파일을 읽습니다.
상태 파일은 1 부터 시작하므로 N 번째 상태 파일은 N-1 번의 batchId 에서 실행된 결과입니다.
그리고 처리가 완료되면 commits/N
디렉토리에 처리한 결과에 대한 메타데이터를 남깁니다.
그림으로 표현하면 다음과 같습니다.
6번의 batchID 를 가진 Micro-batch 를 수행할 경우
먼저 Offset 파일을 만들고 (offsets/6
)
State 파일을 읽어 처리하고 (state/6
)
처리가 완료되면 Commit 파일을 만듭니다. (commits/6
)
따라서 commits 디렉토리 내 batchId 가 존재하면, 해당 Micro-batch 는 작업이 완료된 것입니다.
지금까지 설명한 Checkpoint 에 대한 내용을 하나의 그림으로 표현해보면 아래와 같이 시각화 할 수 있습니다.
체크포인트는 Executor 복구시에 사용할 수 있어야 하므로 기본적으로는 HDFS / S3 등 분산 저장소에 기록해 놓습니다. 다만 위에서와 같이 로컬 디버깅 / 테스트 목적등을 위해 머신 로컬 파일 내에 기록할 수도 있습니다.
그러나 HDFSBackedStateStore 의 경우에는 큰 State 를 사용할 경우 읽었다 내리면서 과도한 GC 로 인해 Micro-batch 처리 시간에 영향을 많이 주게 됩니다. 이는 HDFSBackedStateStore 는 Executor 의 JVM 메모리 내에 State 를 유지하기 때문입니다.
State 를 Checkpoint 에 저장할 경우 복구 및 재시작시 Application Code 를 State 호환성이 없도록 변경할 경우 기존 State 를 사용할 수 없으므로 사용에 매우 주의하여야 합니다.
매우 중요하니 전체 섹션을 읽어볼 것을 권장합니다.
예를 들어, 다음의 변경 사항은 기존의 Checkpoint 를 쓸 수 없도록 만듭니다.
Source (readStream) 에서 Kafka Topic 을 변경이 불가능합니다. (not allowed as the results are unpredictable)
Sink (writeStream) 에서 Kafka 를 foreach 로 변경이 불가능합니다.
Filter (where, filter 함수) 를 추가하거나 삭제하는 것은 가능합니다.
Projeciton 은 동일한 Output Schema 일 경우에만 가능합니다. (selectExpr(x as a) => selectExpr(y as b))
Group By 에 새로운 키를 추가하는 것은 불가능합니다. (groupBy("a", "b"))
Streaming 에서 dropDuplicates 사용시 키를 추가하는 것은 불가능합니다.
mapGroupsWithState / flatMapGroupsWithState 사용시 (Arbitrary Stateful Operation) 사용자가 지정한 State Schema (Case Class) 는 변경이 불가능합니다.
이 섹션에서는 Structured Streaming 에서 Kafka 를 사용할 경우 Offset 이 어떻게 Checkpoint 에서 관리되는지 살펴봅니다.
Spark Streaming 과 같은 분산 처리 스트리밍 프레임워크는 문제 상황시 자체 복구나 Exactly-once Processing 을 위해 일반적으로 Kafka Offset 을 직접 관리합니다. (Spark, Druid, Flink 등) 즉, 일반적인 Kafka Consumer 와는 다르게 Kafka Offset 을 __consumer_offset Topic 으로 보내지 않으며 Checkpoint 파일 내에 Offset 을 기록합니다.
Structured Streaming 에서 Kafka Offset 을 소비하는 방식은 다음과 같습니다.
최초 실행시 Spark Driver 가 Kafka Broker 에서 startingOffsets
(earliest, latest) 을 시작점으로 잡아 Offset 을 가져옵니다.
이후부터는 Checkpoint 내 offsets
디렉토리를 바탕으로 판별합니다. startingOffsets
를 사용하지 않습니다.
Spark Driver 는 Kafka 로 부터 지정된 Offset 을 시작점으로 잡아 전체 Kafka Topic Partition 에서 maxOffsetsPerTrigger
만큼 가져옵니다. (default = 미지정)
Spark Driver 는 Kafka Topic 의 Partition Offset 값들을 (Kafka Partition, minOffset, maxOffset) 과 같이 만들어 Executor 에게 최대한 균등하게 뿌립니다.
만약 이번 Micro-batch 에서 새롭게 생성된 Kafka Topic 내 Partition 을 발견한다면 earliest Offset 을 사용합니다.
Spark Executor 들은 Driver 로 부터 받은 (Kafka Partition, minOffset, maxOffset) 값을 바탕으로 Kafka Broker 로 부터 데이터를 가져옵니다.
Spark Structured Streaming 설정에서 minPartitions
값을 따로 지정하지 않는다면, Kafka Topic Partition 만큼의 최초 Spark Partition 세팅 됩니다. (Kafka Partition 1 = Spark Partition 1)
이후 Group By 등 집계 시에는 spark.sql.shuffle.partitions
설정 값이 사용됩니다.
minPartition >= Kafka Topic Partition 숫자 일 경우, 1개의 Kafka Partition 이 1+ 개의 Spark Partition (= Task) 에 최대한 균등하게 나누어집니다.
Spark Executor 는 각 Micro-batch 마다 Kafka Consumer 를 매번 생성하는것이 아니라, Caching 해서 사용합니다. (default = 64개, spark.kafka.consumer.cache.capacity
)
Structured Streaming 이 아니라 Direct Streaming 을 이용할 경우에는 아래와 같이 Kafka 에 직접 Offset 을 저장할 수 있는 옵션을 제공합니다.
Checkpoint 를 사용할 경우 Spark Version Upgrade 시 문제가 될 수 있으므로 제공하는 옵션입니다. (Structured Streaming 에서는 제공하지 않습니다.)
checkpoint 내 파일들을 통해 Spark Structured Streaming 이 Offset 을 어떻게 관리하는지 한번 살펴보겠습니다.
Source 로 사용하는 Kafka Topic 'airbnb-listing-test' 의 Partition 은 3개 입니다.
최초 1개의 메세지를 전송하고, 그 다음으로 2개의 메세지를 전송했습니다. Micro-batch 는 2번 수행됩니다 (batchId = 0, 1)
Commit 을 Rewind (다시 Offset 을 돌려 Re-consume) 을 수행하기 위해 아래의 파일들을 삭제 후 재실행하면 다시 Consume 할 수 있습니다.
Spark Structured Streaming API 를 이용해 Join, Group By 등의 Aggregation 이나 Window 함수를 사용하면 자연스럽게 State 가 사용됩니다.
Spark Streaming 에서는 사용자가 직접 명시적으로 상태를 커스텀 클래스로 만들어 사용할 수 있는 API 도 제공합니다.
이 챕터에서는 Structured Streaming API 를 기반으로 살펴 보겠습니다.
State 를 만드는 Function
State 나타내는 Case Class
사용자는 mapGroupsWithState
선언 전까지 정의한 Transformation 으로 가공된 데이터를 상태 업데이트 함수 내에서 받아 상태를 업데이트 하는 방식으로 사용하게 됩니다. 예를 들어,
단 withWatermark 와 같이 사용해야 합니다.
flatMapGroupsWithState 와 위에서 살펴본 mapGroupsWithState 의 차이는 다음과 같습니다.
함수형 프로그래밍에서 flatMap
과 동일하게 flatMapGroupsWithState
는 여러 Row 를 리턴하거나 아니면 리턴하지 않을 수 (Empty List) 있습니다.
Append 모드에서는 Aggregation 이 가능합니다.
이번 챕터에서 나온 개념 또는 용어 대한 정리입니다.
Checkpoint
commits, offsets, state Directory
Stream State
Kafka Source Options
startingOffsets
minPartitions
Arbitrary Stateful Operation
State 는 단순히 Key-Values 의 나열일 뿐입니다. Spark Streaming 내부적으로는 라는 개념을 통해 State 의 관리를 지원합니다.
사용자가 명시적으로 ( 등) 생성하거나 암시적으로 (Join, Window, Deduplication) 생성하는 State 는 이곳에 저장되며 증분 방식으로 Checkpoint 를 남길 수 있습니다.
즉 는 해당 파티션 내 State 의 6번째 버전입니다.
를 살펴보면 아래와 같이 디렉토리 경로가 결정됨을 볼 수 있는데, operatorId / partitionId 를 통해 경로가 정해집니다.
State 또한 기본적으로는 를 사용합니다. (S3-compatible)
Spark 3.2.0 부터는 Built-in 으로 를 지원합니다. RocksDB StateStore 는 Native Memory (JVM 영억 외부 = Off-heap) 와 Executor 의 Local Disk 를 사용해 State 를 관리합니다. 이로 인해 GC 의 영향을 받지 않습니다.
If you really want to support state schema changes, then you can explicitly encode/decode your complex state data structures into bytes using an encoding/decoding scheme that supports schema migration. For example, if you save your state as Avro-encoded bytes, then you are free to change the Avro-state-schema between query restarts as the binary state will always be restored successfully. ()
(Direct Streaming) ,
(Structured Streaming)
를 이용해 상태를 만드려면 2 가지가 필요합니다.
만약 상태의 Key 가 무한정 늘어나게 되면, 메모리도 무한정 사용할 수 있습니다. 따라서 오래된 상태를 제거하기 위해 의 파라미터로 Timeout 을 줄 수 있습니다. 위의 예제에서는 ProcessingTimeTimeout
을 사용했으나, 만약 Event Timestamp 를 이용해 오래된 상태를 제거한다면 아래와 같이 파라미터를 줄 수 있습니다.
위 두 함수는 사용할 수 있는 에서도 차이가 있습니다. (Output Mode 는 Sink 챕터에서 더 자세히 다루겠습니다.)
는 Update 모드만 사용할 수 있습니다.
는 Update / Append 모드를 사용할 수 있습니다.
mapGroupsWithState Update Mode 또는 flatMapGroupsWithState 의 Append 모드에서는 Global Watermark 로 인해 일부 Row 가 버려질 수 있습니다. 자세한 내용은 문서를 참조할 수 있습니다.
(Direct Streaming) ,
(Structured Streaming)