2.4.1 Kafka Intro
Last updated
Last updated
이번 챕터에서는 Streaming 컴퓨팅의 데이터 소스로 활용되는 Kafka 에 대해 알아보겠습니다. 실시간 데이터는 주로 Kafka / Kinesis 등의 스토리지에 저장되어 있으며, Spark 에서는 Kafka 가 Built-in 데이터 소스로 지원됩니다.
Kafka 는 Clustering 이 가능한 Message Broker 입니다.
Clustering 이 가능하므로 Broker 를 더 붙여 확장 (Scaling) 이 가능합니다.
Clustering 을 한다는 의미는 데이터를 여러벌로 복제하거나 (Replication)
다양한 Broker 에서 나누어 읽고 쓸 수 있다는 뜻입니다. (Distributed)
Clustering 을 위한 추가적인 컴포넌트인 Zookeeper (ZK) 를 사용합니다.
Kafka 2.8+ 부터 Zookeeper 없이 테스트 모드로 실행해 볼 수 있습니다.
그러나 2021년 기준으로 아직 대부분의 기업에서는 Production 환경에서 Zookeeper 를 같이 사용합니다.
Kafka 를 통해 여러 Application 들은 메세지를 주고 받을 수 있습니다.
데이터를 전송하는 Application 을 Kafka Procucer
데이터를 받아서 처리하는 Application 을 Kafka Consumer 라 부릅니다.
Kafka 가 제공하는 API 를 이용해 다양한 언어 / 프레임워크의 Producer / Consumer 가 존재합니다.
Kafka Golang Library (confluent-kafka-go)
Spark SQL Kafka Integration (github/spark/external/kafka-0-10-sql)
Spark Streaming Kafka Integration (github/spark/external/kafka-0-10)
Kafka 에서 Producer 와 Consumer 는 데이터를 구분하기 위해 'Topic' 을 지정합니다. 다른 종류의 메세지는 다른 Topic 으로 보낼 수 있습니다.
하나의 Topic 을 나누어 Partition 을 여러개 만들 수 있습니다. Spark DataFrame 의 Partition 과 유사하게, 데이터를 나누어 처리하는 단위입니다.
Spark 에서 Partition 수를 조절해 Executor 에서 분산 처리 할 수 있고
Spark 에서 Partition 수를 조절해 Executor 내 메모리에 캐싱할 수 있습니다.
Kafka 에서 Topic 의 Partition 수를 조절해 Broker 내에서 나누어 저장할 수 있고
Kafka 에서 Topic 의 Partition 수를 조절해 Producer / Consumer 에서 병렬성을 조절할 수 있습니다.
파티션을 복제 (Replication) 하는 것도 가능합니다. 이를 통해 Broker 2 가 다운되어 (Topic B - Partition 0) 와 (Topic B - Partition 1) 를 잃어버려도 다른 Broker 에 복제되어 있으므로 복구가 가능합니다.
Leader: 복제된 동일한 여러 Partition 소유한 Broker 중 하나의 Broker 가 Leader 로 선택됩니다. Leader 는 모든 Write / Read 에 대한 요청을 처리합니다.
Follower: Leader 의 Partition 에 Write 되는 데이터를 지속적으로 복제하며 따라갑니다. 만약 Leader 에 문제가 생겨 사용할 수 없는 상태가 되면, 여러 Follower 중 하나가 선택되어 Leader 가 됩니다. 이 과정을 Leader Election 이라 부릅니다.
분산 스토리지에서는 대부분 복제 (Replication) 라는 컨셉을 가지고 있습니다.
HDFS 에서는 dfs.replication
을 통해 복제본 숫자를 지정할 수 있습니다.
Kafka 에서는 replication.factor 라는 이름의 옵션으로 파티션이 복제됩니다. 3 일 경우 원본을 포함해 3개입니다.
ElaticSerach 에서는 데이터의 분할 단위가 Shard 이며 number_of_replicas
옵션을 통해 Shard 를 복제할 수 있습니다. 특이하게도 replica 3 일 경우 원본과 3개의 복제본을 만듭니다.
안전성과 처리량 관점에서 다음을 생각해 봅시다.
만약 Replication Factor (복제본 숫자) = 3 일때
Producer 가 메세지를 Partition Leader 에게 보내고, Partition Follower 가 복제하기 전에 다음 메세지를 보낸다면 처리량은 어떨까요?
Producer 가 메세지를 Partition Leader 에게 보낸 후 Partition Follow 가 전부 복제한 후에야 다음 메세지를 보낼 수 있다면 안전성은 어떨까요?
아래의 옵션들을 찾아보며 각 설정에 따라 어떻게 동작할지 논의해 봅시다.
(Producer) acks = 0, 1, all
(Broker) min.insync.replicas
위 그림에서는 Broker 1, 3 가 이 죽을경우 각 파티션별 Leader 가 Broker 2 번이 되는 경우를 보여줍니다. 이 경우 Broker 2번이 죽으면 전체 Partition 에 문제가 생길텐데, Partitoin Leader 를 변경할 수 있는 방법이 있을까요?
Kafka 에서 제공하는 도구인 kafka-reassign-partitions 를 살펴봅시다.
사내에서 UI 도구 (Kafka Manager 등) 을 제공한다면 편리하게 파티션 재할당 을 수행할 수 있습니다.
언제 파티션을 다시 Re-assign 할 수 있을까요? Partition 이 늘어나는 경우도 고려해봅시다.
Topic 은 여러개 Partition 으로 나뉘어 여러 Broker 에서 분산으로 Write 되고 Read 될 수 있습니다.
이때 하나의 Partition 은 메세지가 증가할 때 마다 Offset 값을 늘려가며 메세지를 저장합니다.
그리고 Consumer 는 Offset 을 따라가며 메세지를 처리하고, 자신이 어디까지 처리했는지를 보관합니다.
Offset 을 일종의 '메세지 번호' 로 생각할 수 있습니다.
하나의 Consumer Group = app-db-updates-consumer
내에서 보면 위 그림과 같습니다.
Consumer A, B 모두 동일한 Topic 을 처리합니다.
Partition 숫자가 4개이므로 Consumer A, B 가 파티션을 나누어 할당 받을 수 있습니다.
Broker 숫자는 나와있지 않으나 Broker 는 1개일수도, 여러개일 수도 있습니다. 이에 따라 Read / Write 요청이 여러 Broker 에 분산될 수 있습니다.
Java 기준으로 그림상 1개의 Consumer 는 1개의 Thread 라고 보아도 좋습니다. Partition Read 1 Thread, Processing N Thread 처럼 Partition 당 N 개의 스레드를 가져갈 수 있으나 여기서는 논의하지 않습니다. (Multi-Threaded Message Consumption with the Apache Kafka Consumer)
위 그림에서는 동일한 Topic / Partition 을 2 개의 Consumer A, B 가 나누어 읽는 것을 보여줍니다. 이는 Consumer Group 이 다르기 때문입니다.
하나의 서비스를 위해 Consumer A 가 Topic 1 내 Partition 1 번을 할당 받고 (consumer group = X)
또 다른 서비스를 위해 Consumer B 가 Topic 1 내 Partition 1 번을 다시 다른 용도로 할당 받을 수 있습니다 (consumer group Y)
Consumer Group 별로 Partition 들의 Offset 이 관리됩니다
Consumer Group 내 Consumer 숫자가 추가되면 무슨 일이 발생할까요?
예를 들어
Partition 10개의 Topic A 에 대해
Consumer Group G1 에서 5개의 Consumer 를 이용해 Partition 10개를 할당 받아 처리하다가
Consumer Group G1 에 5개의 새로운 Consumer 를 추가합니다 (e.g, 별도 프로세스 추가 등)
다음은 Confluent 문서 내 Kafka Consumer 에 대한 설명 중 일부입니다. 이 부분을 읽어보며 생각해 봅시다.
A consumer group is a set of consumers which cooperate to consume data from some topics. The partitions of all the topics are divided among the consumers in the group. As new group members arrive and old members leave, the partitions are re-assigned so that each member receives a proportional share of the partitions. This is known as rebalancing the group.
Topic 과 Producet, Consumer 관점에서 살펴보면
하나의 Kafka Topic 기준으로 여러 Producer 가 나누어 메세지를 보낼 수 있습니다.
하나의 Kafka Topic 을 기준으로 한 Consumer Group 내 여러 Consumer 가 메세지를 읽어 처리할 수 있습니다.
하나의 Kafka Topic 을 기준으로 두개 이상의 Consumer Group 이 동일한 메세지를 읽어 처리할 수 있습니다.
위 그림에는 나와있지 않으나, 필요하다면 Consumer Group 내에서 두개 이상의 Topic 을 읽어 처리할 수 있습니다.
한 Topic 내에서 Partition 숫자는 병렬성을 조절하는 역할을 합니다.
Topic 내 Partition 숫자를 높이면 데이터가 여러 Broker 에 분산될 수 있습니다.
그리고 나누어진 Partition 을 여러 Producer / Consumer 에서 메세지를 보내고, 읽어 처리할 수 있습니다.
위 그림에서는 3개의 Partition 이 2개의 Consumer 에 할당되는 것을 볼 수 있습니다.
Partition 별로 하나의 Consumer Group 내 Consumer 를 최대 한개까지 할당 할 수 있습니다. 그림으로 보면 아래와 같습니다. (Kafka Partitions and Consumers)
Consumer 와 Partition 숫자에 대해 논의해 봅시다.
Consumer 숫자보다 Partition 숫자가 많으면 어떤 일이 발생할까요?
Partition 숫자보다 Consumer 숫자가 많으면 어떤 일이 발생할까요?
또한 Consumer 를 Process / Thread 관점에서도 생각해 봅시다.
JVM 언어 기준으로 1 Consumer = 1 Thread 로 이해해도 괜찮다는 이야기를 위에서 언급 했습니다. (1 Read Consumer Thread / N Processing Thread 전략 제외)
그렇다면 Partition = 12 인 Topic 을 기준으로 인프라 리소스 할당 및 운영 관점에서 어떤 차이가 있을까요?
1 개의 JVM Appliaction Process 내에서 12개의 Consumer Thread 를 실행하는 것
4 개의 JVM Application Process 내에서 각 3개의 Consumer Thread 를 실행하는 것
실습 과제입니다.
Practical Spark 클래스에서는 Spark 를 이용해 Kafka 를 사용하게 됩니다. 그런데 Spark Executor 는 JVM Process 입니다. Kafka Consumer 와는 어떤 관계가 있을까요?
다음 문서를 통해 살펴봅시다.
(Spark Streaming, DStream) https://spark.apache.org/docs/3.2.0/streaming-kafka-0-10-integration.html#consumerstrategies
(Spark Structured Streaming) https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-kafka-data-source.htm
Broker
Topic
Partition
Offset
Consumer
Producer
Consumer Group