一、Kraft 模式与 ZooKeeper 模式简介
在Kafka 2.8 之前,Kafka 重度依赖 ZooKeeper 集群做元数据管理、Controller 的选举等(统称为共识服务);当ZooKeeper 集群性能发生抖动时,Kafka 的性能也会受到很大的影响。如下图所示:
在 Kafka 2.8 之后,引入了基于 Raft 协议的 Kraft 模式,支持取消对 ZooKeeper 的依赖。在2022 年 10月 3日发布 Kafka 3.3.1 版本之后,将名为 KRaft 的新元数据管理方案标记为生产环境可用。
在此模式下,一部分 Kafka Broker 被指定为 Controller, 另一部分则为 Broker。这些 Controller 的作用就是以前由 ZooKeeper 提供的共识服务,并且所有的元数据都将存储在 Kafka 主题中并在内部进行管理。
Kraft 模式相比 ZooKeeper 模式的主要优势如下:
- 运维简化:只需要部署 Kafka, 不再依赖 ZooKeeper。
- 横向扩展能力提升:Kafka 集群能支持的 Partition 数量是衡量其横向扩展能力的重要指标。此前这个值受 ZooKeeper 与 Controller 之间传递元数据的限制,只能到十万量级,而 Kraft 模式不需要这种传递,因此可以提升到百万量级。
- 元数据传播提效:元数据通过 Kafka 的 Topic 管理,并利用 Topic 的生产消费传播,集成性更好的同时也提升了一些底层实现的性能。
二、基于 Kraft 模式的 Kafka 集群部署
1.主机规划
主机名 |
IP 地址 |
角色 |
node id |
10.8.3.35 |
Broker,Controller |
0 |
|
10.8.3.36 |
Broker,Controller |
1 |
|
10.8.3.37 |
Broker,Controller |
2 |
2.部署
(1)编辑 10.8.3.35 上的server.properties 配置文件
[root@localhost kraft]# vi server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # This configuration file is intended for use in KRaft mode, where # Apache ZooKeeper is not present. # ########################### Server Basics ########################### # The role of this server. Setting this puts us in KRaft mode process.roles=broker,controller # The node id associated with this instance's roles node.id=1 # The connect string for the controller quorum controller.quorum.voters=1@10.8.3.35:9093,2@10.8.3.36:9093,3@10.8.3.37:9093 ########################Socket Server Settings ######################## # The address the socket server listens on. # Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum. # If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(), # with PLAINTEXT listener name, and port 9092. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://10.8.3.35:9092,CONTROLLER://10.8.3.35:9093 # Name of listener used for communication between brokers. inter.broker.listener.name=PLAINTEXT # Listener name, hostname and port the broker will advertise to clients. # If not set, it uses the value for "listeners". advertised.listeners=PLAINTEXT://10.8.3.35:9092 # A comma-separated list of the names of the listeners used by the controller. # If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol # This is required if running in KRaft mode. controller.listener.names=CONTROLLER # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 # The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ########################### # A comma separated list of directories under which to store log files log.dirs=/usr/local/kafka/logs/kraft-combined-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ######################## Internal Topic Settings ###################### # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ########################### Log Flush Policy ######################### # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval |