2.4.2 Kafka Advanced
Last updated
Last updated
이번 챕터에서는 Kafka 의 동작에 대해 자세히 알아봅니다. Kafka 의 동작 방식에 대해 더 자세히 알아봄으로써 발생할 수 있는 문제를 피하고 올바르게 Application 을 설정하고 작성하는 방법에 대해 익힙니다.
Consumer Group 내 어떤 Consumer 가 어떤 Partition 을 가져갈지 결정하는 것을 Partition Assignment 라고 부르며 여러가지 할당 전략을 사용할 수 있습니다.
Consumer 설정: partition.assignment.strategy
RangeAssignor (default)
RoundRobinAssignor
StickyAssignor
CooperativeStickyAssignor (2.4+)
위 그림에서는 기본으로 사용되는 Ranger Assigment 를 이용해 할당하는 과정을 보여줍니다.
Consumer Group 내 3 개의 Consumer 가 존재하고
Topic A, Topic B 두개의 Topic 을 읽고 있습니다.
Topic 당 파티션은 2개 이므로
Topic A 의 2개 파티션이 Consumer 2 개에 할당되고
Topic B 의 2개 파티션이 Consumer 2 개에 할당되는 것을 볼 수 있습니다.
만약 다른 전략을 사용한다면 아래와 그림과 같이 Consumer C3 도 파티션을 할당 받을 수 있습니다.
Consumer 를 위한 Partition 할당 전략이 왜 다양한 걸까요? RangeAssignor 와 RoundRobinAssignor 를 비교해 보며 장 단점을 논의해 봅시다.
RangerAssignor 를 이용했을 경우 Consumer 의 리소스를 잘 활용할 수 있는지
RoundRobinAssignor 를 사용했을 경우 Consumer 가 죽으면 어떤 문제가 발생할지
Kafka Consumer 는 Topic Partition 을 할당 받아 처리할 수 있습니다. 이 때, 다양한 이유로 Topic Partition 의 재할당 (Rebalancing) 이 발생할 수 있습니다.
Consumer Group 내 새로운 Consumer 가 추가될 경우
Consumer Group 내 기존 Consumer 가 Shutdown 될 경우
Consumer Group 내 기존 Consumer 가 정상적이지 않은 것으로 판단 될 경우 (heartbeat.interval.ms, session.timeout.ms, max.poll.interval.ms 등)
Topic 내 Partition 이 증가될 경우 (참고. Partition 은 줄어들 수 없습니다)
Kafka Consumer 가 강제로 죽는 경우가 아니라, 의도적으로 언제 추가되고 제거될 수 있을까요? 다음 두 가지 경우를 생각해봅시다.
Kafka Consumer Application 을 Kubernetes Deployment (N 개의 Pod 으로 관리) 하면서 Rolling Upgrade 할 때
Kafka Consumer Application 을 Kubernetes 에 HPA 를 세팅해 리소스 활용률에 따라 Scale-out / Down 이 발생할 때
Kubernetes 에서는 다양한 이유로 Application Pod 이 의도적으로 삭제되거나 추가될 수 있습니다.
Cordon: Node 내 Pod 방출
HPA 등 Trigger 에 의해 Autoscaling 으로 신규 추가 및 Scaling-in 으로 축소
Pod 배포시 Rolling Upgrade 로 1+ 개씩 생성 후 기존 Pod 삭제
AWS Spot 이용시 Spot Rebalancing 등의 알림을 통해 AWS Node Termination Handler 가 Node 를 자동으로 Drain (Managed Node 에서는 기본 설치)
Rebalancing 은 특별한 설정이 없다면 Eager Rebalancing Protocol 을 따라 동작합니다. 그림으로 표현하면 아래와 같습니다.
Rebalancing 은 Kafka Consumer 측면에서 큰 부담입니다. 기존의 Eager Rebalancing Protocol 을 기준으로 설명해보면,
Rebalancing 이 발생하면 Consumer Group 내 전체 Consumer 가 데이터 읽기를 멈추고
Consumer 들은 자신이 할당 받았던 Partition 들을 반환합니다.
Group Coordinator (Broker) 와 Consumer Leader 를 통해 파티션을 할당 합니다.
Group Coordinator 는 1 개의 Consumer Group 을 담당하는 Broker 입니다
Consumer Leader 는 Consumer Group 내 첫 번째로 Join 한 Consumer 입니다
Consumer 들은 할당받은 Partition 들을 바탕으로 다시 작업을 시작합니다.
아래 그림은 Group Coordinator Broker 와 Consumer Group Leader 를 나타냅니다.
Rebalancing 은 모든 Consumer 가 읽기 작업을 멈추어야 하므로 Kafka 는 2.3, 2.4 에 다양한 Rebalancing 개선 기능을 추가했습니다.
Static Membership 은 특정 기간동안 동일한 group.instance.id Consumer 가 삭제 후 새롭게 추가되어도 Rebalancing 을 진행하지 않습니다.
Incremental Rebalancing Protocol 은 파티션 재분배가 필요한 Consumer 의 파티션만 재배치 하고, 나머지 Consumer 들은 작업을 지속합니다.
아래 그림에서 각각 Static Membership 과 Incremental Rebalancing Protocol 을 살펴볼 수 있습니다.
Static Membership 을 사용하면, 동일한 group.instance.id 를 가진 Consumer 는 정해진 (session.timeout.ms) 기간 동안 다시 Join 해도 Rebalancing 이 일어나지 않고, 과거의 Consumer 로 간주헤 과거에 할당받았던 파티션을 그대로 할당합니다. 따라서 전체 Consumer 의 읽기 중단이 발생하지 않습니다.
Kubernetes 에서 Consumer 를 Stateful Set 으로 관리하는 와중에, 특정 노드의 문제가 반복적으로 발생해 Consumer Pod 이 영향을 받을 경우
Consumer Pod 들은 Stateful Set ID 를 이용해 instance.group.id 를 지정할 수 있으므로 (1, 2, 3, 4, 5) 지정된 기간동안 Consumer Group 에 Join 한다면 리밸런싱이 발생하지 않을 수 있습니다.
Incremental Rebalancing 은 현재 Consumer 들의 Partition 할당을 고려해, 필요한 Partition 만 회수합니다. 따라서 전체 리밸런싱이 발생하지 않으며 다른 Consumer 들은 Topic Partition 읽기 작업을 지속적으로 진행할 수 있습니다. 이전 대상으로 선택된 Partition 만 회수 후 재 할당합니다.
Group 내의 Kafka Consumer 는 할당 받은 Topic 내 Partition 마다 어느 메세지까지 읽었는지를 관리합니다. Offset 을 어디까지 처리했는지 기록하는 행위를 Commit 이라 부릅니다. Offset 을 기록하는 방법은 자동 또는 수동이 있으며 Offset 을 잘못 관리할 경우 Consumer 가 강제종료 (SIGKILL, OOM) 혹은 단순 종료 (Rebalancing) 에도 유실 또는 중복이 발생할 수 있습니다.
Consumer 는 __consumer_offsets 라는 토픽에 해당 Consumer Group 이 읽는 Topic 내 Partition 의 현재 Offset 정보를 저장합니다.
과거에는 (0.9 미만) Consumer 가 Zookeeper 에 Offset 을 저장 했습니다.
0.9 이상 버전에서는 Consumer 는 Zookeeper 를 사용하지 않으며 Consumer 의 Offset 은 Kafka Broker 토픽에 저장됩니다.
위에서 언급한 자동 커밋 (Auto Commit) 은 아래 두개의 옵션으로 활성화 할 수 있습니다. 활성화시 Consumer 가 메세지를 Polling 할 때 interval.ms 를 초과하면 Commit 을 수행합니다.
enable.auto.commit = true
auto.commit.interval.ms = 5000
다만 Auto Commit 기능은 중복 또는 유실이 발생하기 쉽습니다. (librdkafka, enable.auto.offset.store = true) 아래 그림은 Auto Commit 사용시 중복 또는 유실이 발생하는 케이스를 각각 보여줍니다.
위 그림에서 볼 수 있듯이, 현재 메시지를 가져왔고 Auto Commit 으로 인해 가져온 1000 Offset 에서 Commit 이 되었으나 실제 처리는 200 Offset 을 수행하고 있습니다. 이 때 강제 종료가 발생하면 Consumer 재시작 후에 1001 Offset 부터 메세지를 읽어옵니다 (= 유실)
한 가지 주의해야 할 점은 Java 기반의 Kafka Consumer 와 librdkafka 라이브러리를 (C/C++ 구현체로 Golang, Python 등 Consumer 에서 사용) 사용하는 Kafka Consumer 의 Auto Commit 동작 방식이 다릅니다.
Java 기반의 Consumer 는 Poll 과 (메시지 Fetching) Commit 이 같은 스레드에서 일어나고 현재 메시지의 Commit 이 다음 메세지를 위한 Poll 에서 발생하므로 '중복' 만 발생합니다.
librdkafka 기반의 Consumer 는 Poll / Commit 이 별도 스레드에서 동작하며, enable.auto.offset.store = true 일 경우 방금 가져온 메세지의 Offset 까지 Commit 하므로 유실이 발생할 수 있습니다.
더 자세한 내용은 다음 문서를 참조할 수 있습니다.
(Java Client) By default, the consumer is configured to auto-commit offsets. Using auto-commit gives you “at least once” delivery: Kafka guarantees that no messages will be missed, but duplicates are possible. Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property. If the consumer crashes, then after a restart or a rebalance, the position of all partitions owned by the crashed consumer will be reset to the last committed offset. When this happens, the last committed position may be as old as the auto-commit interval itself. Any messages which have arrived since the last commit will have to be read again.
위 그림 에서는 현재 메시지를 Poll() 을 통해 가져왔고 Auto Commit 이 활성화 되어 있으나, 아직 interval.ms 에 도달하지 않아 Commit 이 발생하지 않았습니다.
이 때, 메세지를 처리하다가 중간인 400 정도에서 Consumer 가 죽고 다시 재실행되면, 200 Offset 부터 데이터를 다시 읽어와 처리하게 됩니다 (= 중복)
Auto-commit 이 아니라 수동으로 Commit 할 경우 commitSync (동기) 또는 commitAsync (비동기) 함수를 Consumer API 에서 사용할 수 있습니다.
지금까지는 Consumer 의 중복 '읽기' 를 다루었지만 Producer 또한 중복으로 데이터를 Write 할 수 있습니다.
Kafka Producer 는 Broker 의 Ack (데이터를 잘 받았는지) 응답을 확인하고 다음 메세지를 전송하게 됩니다.
만약 Broker 의 Ack 가 네트워크에서 유실되거나, Broker 가 문제상황으로 일정 기간동안 Ack 를 보내지 않으면 어떻게될까요?
Kafka 0.11+ 에서 도입된 Idempotent Producer 와 Kafka Transaction 기능을 이용하면 중복 문제를 일부 해결할 수 있습니다.
enable.Idempotence=true
세팅을 통해 Idempotent Producer 를 이용할 수 있습니다.
각각의 Producer 는 PID 와 Seq (Producer Life Cycle 마다 0 부터 시작되는 값) 을 Broker 에 전송하고
Broker 는 파티션별로 PID 별로 가장 높은 Seq 를 기억해, 중복된 메세지가 들어오면 거절할 수 있습니다
단, 주의해야 할 점은 SEQ 값이 Producer 가 생성되면 0부터 시작하므로 (새로운 PID를 할당 받아 Seq 값 리셋) Producer Shutdown 으로 인한 중복은 enable.Idempotence=true
세팅으로 피할 수 없습니다.
Kafka Transaction 을 이용하면 Producer 가 죽고 다시 생성되어도 (Producer Sessions) 중복을 피할 수 있습니다. 다만 Broker 가 추가적인 처리를 진행해야해 Write Throughput 이 떨어질 수 있습니다.
Kafka Broker 는 Producer 가 죽고 다시 살아나도 같은 Producer 임을 인식하기 위해 Transaction ID 를 (TID) 관리합니다. 즉 Producer Restart 마다 PID 가 갱신되지만 TID 를 통해 과거와 동일한 Producer 세션임을 알 수 있습니다.
Idempotent Producer 를 이용할 때와는 달리 Producer 는 Transaction Coordinator (Partition Leader Broker 내 위치 하는 모듈) 과 통신하며 Transaction 을 진행합니다.
코드로 살펴보면 다음과 같습니다.
위 그림에서 Idempotent Producer 와 Transaction Producer 의 차이를 한 눈에 볼 수 있습니다.
Consumer 또한 Transaction 을 이용할 수 있는데, Consumer 에서 isolation.level = read_committed 옵션을 지정할 경우 Producer 가 성공적으로 Commit 한 데이터만 읽어옵니다.
Spring Kafka 를 이용해서 Kafka Producer / Consumer 를 직접 작성하는 경우가 많은데, 이 경우 transaction.id.prefix 값을 세팅하게 됩니다.
1 Application = 1 Producer 이고, N 개의 Application (= N 개의 Kubernetes Pod) 을 실행한다면 transaction.id.prefix 를 다르게 주어 병렬성을 높이고
transaction.id.prefix = udon-producer-01
transaction.id.prefix = udon-producer-02
transaction.id.prefix = udon-producer-03 (e.g., Stateful Set)
1 Producer 가 죽어도, 다시 살아났을때 PID 를 다르게 부여받아도 Transaction ID 가 같으므로 문제 없이 과거 중단된 Transaction 을 밀어내고 (Fencing out) 다시 진행합니다.
Consumer 의 경우에는 N 개의 Application (= N 개의 Kubernetes Pod) 을 실행해도 transaction.id.prefix 값이 같아야 합니다.
transaction.id.prefix = udon-consumer-common
Spring Kafka 내부적으로 Consumer 는 <prefix><group>.<topic>.<partition 와 같이 할당됩니다.
Kafka Transaction 을 이용하면 Consumer 에서 발생하는 '중복된 메시지 처리' 현상을 제거할 수 있을까요? 즉, Exactly Once 를 달성할수 있을지 논의해 봅시다.
There are a few limitations or potential misunderstandings of transactions that are worth noting. Firstly, they only work in situations where both the input and the output goes through Kafka. If you are calling an external service (e.g. via HTTP), updating a database, writing to stdout, or anything other than writing to and from the Kafka broker, transactional guarantees won’t apply and calls can be duplicated. So much like using a transactional database, transactions only work when you are using Kafka. (Chain Services with Exactly-Once Guarantees)
Kafka Transaction 을 이용하면 Consumer 에서 Output 이 다른 Topic 으로의 Message 전송 (= Producing) 일때 중복 문제를 해결할 수 있습니다.
Commit (Broker 에 Offset 전송) 과 Produce (Broker 에 Message 전송) 을 atomic 하게 수행
다시 말해 다른 저장소 (S3, HDFS, Dynamo) 로 데이터를 적재하거나 API 를 호출하는 등 Kafka 이외의 곳에 데이터를 작업하는 것은 Kafka Transaction 으로 Exactly-once 를 보장하지 않습니다. 외부 저장소 저장 / 외부 API 호출 등에 대한 Exactly-once 처리는 Kafka Producer / Consumer 에서는 일반적으로 불가능하며, 저장소 수준에서 De-duplication 을 수행하는것이 가장 비용이 저렴합니다.
지금까지는 Kafka 의 Producer, Consumer 관점에서 데이터의 Write / Read 를 살펴봤습니다. 이제는 Broker 가 데이터를 어떻게 관리하는지에 대해 살펴봅니다.
Kafka 에서는 데이터를 파일로 저장합니다. Kafka Broker 설정의 log.dir 경로에 데이터 파일이 저장되며 Topic 내 Partition 을 Segemnt 라는 단위로 나누어 관리합니다.
실습을 위해 Practical Data Pipeline Git Repository 내 Kafka Docker Compose 파일을 이용할 수 있습니다.
Kafka 의 log.dir 디렉토리를 살펴보면 다음과 같습니다. Confluent Kafka 이미지 기준으로 /etc/kafka 내에 설정이 있으므로 log.dir 경로를 확인해보면, /var/lib/kafka/data 에 데이터가 남음을 알 수 있습니다.
이제 해당 디렉토리로 이동해 내용물을 살펴보면 다음과 같습니다.
여기서 해볼 수 있는 질문은 다음과 같습니다.
__consumer_offsets 는 토픽 이름인데 왜 0, 49 까지 50개의 디렉토리가 존재할까요? 반면 _schema 는 _schema-0 하나의 디렉토리 뿐입니다.
__consumer_offsets 파일 내 {숫자}.index 파일과 {숫자}.log 파일은 무엇일까요?
한 가지 해볼 수 있는 가정은 __consumer_offset 토픽의 파티션 숫자가 50개가 아닐까 하는 생각입니다. 그리고 파티션에 따라 디렉토리가 만들어지는 것입니다. (위 그림과 같이) 이 가설을 확인해보기 위해 kafka-topics 스크립트를 실행해 살펴보면 다음과 같이 파티션 숫자가 50개인 것을 알 수 있습니다.
이제 {숫자}.index, {숫자}.log 파일에 대해 알아봅시다. 하나의 Topic 은 여러개의 파티션으로 나누어질 수 있고, 하나의 파티션은 디렉토리내 여러개의 세그먼트 파일로 나뉘어 질 수 있습니다. 테스트를 위해서 데이터를 조금 생성해보겠습니다. 생성후에 log.dir 디렉토리를 살펴보면 test-topic (1개의 파티션) 을 위한 디렉토리가 생성되었음을 알 수 있습니다.
여기서 {숫자}.index, {숫자}.log, {숫자.timestamp} 파일의 한 세트를 Segment 라고 부릅니다. 만약 데이터가 많아져 Broker 설정 값 이상을 넘기게 되면 여러개의 세그먼트가 생성될 수 있습니다. 예를 들어 log.segment.bytes 나 log.roll.ms, hours 에 의해 지정된 사이즈나 시간이 도달했을 때 새로운 세그먼트가 만들어 질 수 있습니다.
log 파일은 실제 데이터를 담고 있으며
index 파일은 log 파일의 Offset 과 (메시지 순서) Position (파일 내 데이터의 위치) 를 담고 있습니다.
아래 커맨드를 통해 index 파일을 읽어볼 수 있습니다.
이름에서 눈치 채셨겠지만, 파일 이름은 해당 세그먼트의 최초 Offset 입니다. 이 Segment 파일들은 설정값에 의해 오래될 경우 디스크에서 제거되거나 새로운 세그먼트가 생성될 수 있습니다.
(Broker) log.retention.{ms,hours}: 시간을 기준으로 세그먼트 내 log 파일을 얼마나 유지할지를 지정합니다.
(Broker) log.retention.bytes: 사이즈를 기준으로 세그먼트 내 오래된 log 파일을 삭제할지를 결정 합니다. 이 값은 파티션 당 적용됩니다.
(Broker) log.roll.{ms,hours}: 시간을 기준으로 이 값을 초과하면 새로운 세그먼트를 생성합니다.
(Broker) log.segment.bytes: 사이즈를 기준으로 파티션의 크기가 이 값을 넘어서면 새로운 세그먼트를 생성합니다.
리텐션 기간 (log.retention.{ms,hours}) 는 어떤 값이 적정할까요?
log.retention.hours 는 Broker 의 설정 값입니다. 즉 Kafka Cluster 전체에 영향을 미치므로 일반적으로 충분히 넉넉한 값을 잡아둡니다. (e.g, 7일 = 168 시간)
Topic 마다 데이터 사이즈가 다를 수 있으므로 추후 Topic 을 사용하며 Topic 의 retention.ms 값을 조절할 수 있습니다.
운이 나쁜 경우에는 주말에 장애가 발생하고 담당자가 처리하지 못할 수 있으므로 최소 3일 (72시간) 이상을 잡아두는 편이 낫습니다.
log.retention.bytes 를 기반으로한 삭제 정책과 log,retention.hours 를 기반으로 한 삭제 정책에 관해서는 다음 아티클에서 자세히 설명합니다.
Kafka 는 데이터를 Disk 에 저장합니다. Disk 에 저장하는데도 어떻게 성능이 좋을 수 있을까요? 아래 문서와 키워드로부터 그 이유를 찾아봅시다.
Zero Copy, Page Cache, Sequantial IO
아래 그림은 지금까지 나왔던 Segment 에 대한 내용들을 요약한 그림입니다. (Link)
대부분의 스토리지는 Retention 과 Compaction 컨셉을 가지고 있습니다. 분산 시스템이라 하더라도 Disk 가 무한하지 않으므로
Retention 을 통해 데이터 유지 기간을 정하고
Compaction 을 통해 데이터 사이즈를 압축합니다.
아래는 Druid 에서의 Retention 과 Compaction 관련 문서입니다.
다음 사례들은 Kafka Producer, Consumer 에서 메세지 중복과 유실이 언제 발생할 수 있는지를 설명합니다.
Message Duplication by Producer 그림은 Broker 가 보낸 Ack 를 잃어버려 Producer 가 재전송을 해 중복이 발생하는 경우를 보여줍니다.
Message Duplication by Producer 그림은 Producer 가 배치로 메세지를 묶어 보내다 중간에 Shutdown 등이 발생해, 다시 처음부터 묶어 보낼때 중복이 발생하는 경우를 보여줍니다.
Message Lost by Producer 그림은 특정 Broker 가 일시적으로 해당 메세지만 처리를 못하는 상황에서 Producer 의 retries = 0 일 경우, 다시 보내지 않아 유실이 발생하는 경우를 보여줍니다.
Message Re-processing by Consumer 그림은 Auto Commit = False 인 상황에서도 메세지를 처리하다가 Commit 이전에 Shutdown 이 발생할 경우 동일한 메세지를 재 처리하는 것을 보여줍니다.
다음 옵션을 인터넷에서 찾아보며, 어떨 때 해당 옵션을 쓸 수 있는지 논의해 봅시다.
retries (Producer)
acks (Producer)
max.in.flight.requests.per.connection (Producer)
enable.idempotence (Producer)
transactional.id (Producer)
isolation.level (Consumer)
중복이나 유실 이외에도 Message 의 순서도 중요한 요소입니다. Kafka 에서 메세지의 순서는 어떻게 정해질까요?
하나의 Topic 내 여러 파티션 내의 메시지 순서는 어떻게 정해질까요?
Producer 가 1개일때와 N 개일때를 고민해 봅시다.
Producer 가 1개이더라도 Partition 이 1개이거나 N 개이면 어떻게 될까요? Consumer 에서는 메세지를 순서대로 읽을 수 있을까요?
Producer 가 1개일때도 메세지 순서가 뒤바뀔 수 있다면, 어떻게 메세지 순서를 보장할 수 있을까요? max.in.flight.requests.per.connection
옵션을 찾아보며 논의해 봅시다.
이번 챕터에서 나온 단어입니다.
Partition Assigment
Rebalancing
Static Membership
Incremental Rebalancing
Idempotent Producer
Transaction ID / Producer ID / Producer Seq
Kafka Transaction
Transaction Coordinator / Transaction Log
Segment
Tranasction