kafka介绍
Apache Kafka 是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
Kafka是一个拥有高吞吐、可持久化、可水平扩展,支持流式数据处理等多种特性的分布式消息流处理中间件,采用分布式消息发布与订阅机制,在日志收集、流式数据传输、在线/离线系统分析、实时监控等领域有广泛的应用。
基于kraft部署集群
集群节点规划
主机名 | IP 地址 | 角色 | node-id | OS |
---|---|---|---|---|
kafka01 | 192.168.72.31 | broker/controller | 1 | Ubuntu22.04 |
kafka02 | 192.168.72.32 | broker/controller | 2 | Ubuntu22.04 |
kafka03 | 192.168.72.33 | broker/controller | 3 | Ubuntu22.04 |
准备3个节点,每个节点同时作为broker和controller运行。
集群架构如下:
基础环境配置
以下操作在所有节点执行。
配置主机名
hostnamectl set-hostname kafka01
hostnamectl set-hostname kafka02
hostnamectl set-hostname kafka03
配置hosts解析,提前部署zookeeper集群:
cat > /etc/hosts <<EOF
192.168.72.31 kafka01
192.168.72.32 kafka02
192.168.72.33 kafka03
EOF
安装java环境
apt update -y
apt install -y openjdk-21-jdk
创建Kafka用户
sudo groupadd kafka
sudo useradd -m -s /bin/bash -g kafka kafka
sudo passwd kafka
安装kafka
wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
tar -zxvf kafka_2.13-3.9.0.tgz -C /opt
ln -s /opt/kafka_2.13-3.9.0 /opt/kafka
配置环境变量
cat > /etc/profile.d/kafka.sh <<'EOF'
export KAFKA_HOME=/opt/kafka
export PATH=$KAFKA_HOME/bin:$PATH
EOF
source /etc/profile
修改配置文件
为broker设置角色,以便他们也可以成为控制器。使用配置属性文件应用broker配置,包括角色设置。broker配置根据角色有所不同。KRaft 提供了三个示例broker配置属性文件。
/opt/kafka/config/kraft/broker.properties
有一个broker角色的示例配置,如果正在配置broker节点,那么需要选择该配置文件/opt/kafka/config/kraft/controller.properties
有一个控制器角色的示例配置,如果正在配置控制器节点,那么需要选择该配置文件/opt/kafka/config/kraft/server.properties
有一个组合角色的示例配置,如果正在配置broker和控制器节点,那么需要选择该配置文件
可以根据这些示例属性文件来配置broker,本示例使用 server.properties
配置。
修改kafka01
节点配置文件
cp /opt/kafka/config/kraft/server.properties{,.bak}
cat <<EOF > /opt/kafka/config/kraft/server.properties
process.roles=broker,controller
node.id=1
num.partitions=6
default.replication.factor=2
min.insync.replicas=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=1
controller.quorum.voters=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://kafka01:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
log.dirs=/data/kafka/kraft-combined-logs
log.retention.hours=168
EOF
修改kafka02
节点配置文件
cp /opt/kafka/config/kraft/server.properties{,.bak}
cat <<EOF > /opt/kafka/config/kraft/server.properties
process.roles=broker,controller
node.id=2
num.partitions=6
default.replication.factor=2
min.insync.replicas=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=1
controller.quorum.voters=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://kafka02:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
log.dirs=/data/kafka/kraft-combined-logs
log.retention.hours=168
EOF
修改kafka03
节点配置文件
cp /opt/kafka/config/kraft/server.properties{,.bak}
cat <<EOF > /opt/kafka/config/kraft/server.properties
process.roles=broker,controller
node.id=3
num.partitions=6
default.replication.factor=2
min.insync.replicas=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=1
controller.quorum.voters=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://kafka03:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
log.dirs=/data/kafka/kraft-combined-logs
log.retention.hours=168
EOF
参数说明
process.roles
: 一个节点可以充当 代理 或 控制器 或 两者。本示例指明这个节点可以同时是一个 kafka 代理和一个 kraft 控制器节点。node.id
:作为集群中的节点 ID,识别这是哪个代理以及哪个 Kraft 控制器节点。num.partitions
:创建topic时默认分区数,建议broker节点的倍数default.replication.factor
:分区默认副本数,controller与broker 3节点混合部署时,由于只能故障一个节点,建议设为2,设为3依然只能故障一个broker。min.insync.replicas
:控制写入操作必须有至少多少个副本处于同步状态才能成功执行,通常建议设置为副本数 - 1。offsets.topic.replication.factor
:用于设置 Kafka 内部主题 __consumer_offsets 的副本因子。transaction.state.log.replication.factor
:决定 Kafka 内部主题 __transaction_state 的副本因子transaction.state.log.min.isr
:定义 __transaction_state 主题的最小同步副本数(ISR,In-Sync Replicas)。controller.quorum.voters
: 用于指示所有可用的 kraft 控制器。这里指明将有 3 个 kraft 控制器节点在端口 9093上运行。listeners
:在这里我们指明代理将使用 9092 端口,而 kraft 控制器将使用 19092 端口advertised.listeners
:用于配置 Kafka Broker 如何向客户端(生产者或消费者)暴露其地址和端口。controller.listener.names
:这里的控制器监听器名称设置为 CONTROLLERlistener.security.protocol.map
:在这里添加连接安全详细信息log.dirs
:这是 Kafka 存储数据的日志目录log.retention.hours
:用于控制主题分区日志数据保留时间的配置参数,默认168小时(7天)
建立集群 ID
需要为新 Kafka 版本形成一个集群 ID,集群 ID 在集群中的所有节点之间相同,在第一个节点执行
/opt/kafka/bin/kafka-storage.sh random-uuid > /opt/kafka/config/kraft/cluster.id
scp /opt/kafka/config/kraft/cluster.id kafka02:/opt/kafka/config/kraft/
scp /opt/kafka/config/kraft/cluster.id kafka03:/opt/kafka/config/kraft/
使用集群 ID 建立存储,在所有节点执行
export kafka_cluster_id=$(cat /opt/kafka/config/kraft/cluster.id)
kafka-storage.sh format -t $kafka_cluster_id -c /opt/kafka/config/kraft/server.properties
修改目录权限
mkdir -p /opt/kafka /data/kafka
sudo chown -R kafka:kafka /opt/kafka*
sudo chown -R kafka:kafka /data/kafka
启动kafka服务
建立 Kafka 服务启动定义
cat <<EOF > /etc/systemd/system/kafka.service
[Unit]
Description=Kafka
After=network.target
[Service]
Type=simple
User=kafka
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
LimitNOFILE=10000000
LimitCORE=infinity
LimitNPROC=infinity
LimitMEMLOCK=infinity
[Install]
WantedBy=multi-user.target
EOF
启动kafka服务
systemctl daemon-reload
systemctl enable --now kafka.service
systemctl status kafka.service
查看集群状态
描述运行时状态
Kafka 提供工具来帮助您调试在 KRaft 模式下运行的集群。
可以使用kafka-metadata-quorum 工具描述集群元数据分区的运行时状态,并指定一个带有 --bootstrap-server
选项的 Kafka 代理或一个带有 --bootstrap-controller
选项的 KRaft 控制器。
例如,以下命令指定一个代理并显示元数据法定人数的摘要:
kafka-metadata-quorum.sh --bootstrap-server kafka01:9092 describe --status
示例输出如下
root@kafka01:~# kafka-metadata-quorum.sh --bootstrap-server kafka01:9092 describe --status
ClusterId: ZgI_5pGNQcOgbMNT1Vqqxw
LeaderId: 2
LeaderEpoch: 77
HighWatermark: 1677
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 350
CurrentVoters: [{"id": 1, "directoryId": null, "endpoints": ["CONTROLLER://kafka01:9093"]}, {"id": 2, "directoryId": null, "endpoints": ["CONTROLLER://kafka02:9093"]}, {"id": 3, "directoryId": null, "endpoints": ["CONTROLLER://kafka03:9093"]}]
CurrentObservers: []
该脚本列出了集群状态的基本信息。在显示的输出中,您可以看到节点 2
被选为领导者,所有三个节点 ([1,2,3]
) 都在投票池中并同意该决定。
您可以使用 --bootstrap-controller
选项指定一个控制器。这在代理不可访问时非常有用。
kafka-metadata-quorum.sh --bootstrap-controller kafka01:9093 describe --status
调试日志段
kafka-dump-log 工具可用于调试集群元数据目录中的日志段和快照。该工具将扫描提供的文件并解码元数据记录。例如,以下命令解码并打印第一个日志段中的记录:
kafka-dump-log.sh --cluster-metadata-decoder --files \
/data/kafka/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log
创建topic主题
通过运行创建一个名为 first-topic
的主题,将复制因子设置为 2
确保该主题将在至少两个节点上可用。
kafka-topics.sh --create --topic first-topic \
--bootstrap-server kafka01:9092 --replication-factor 2
然后,运行 kafka-topics.sh
脚本以查看分区在节点上的排列情况:
kafka-topics.sh --describe --bootstrap-server kafka01:9092 --topic first-topic
示例输出如下
root@kafka01:~# kafka-topics.sh --describe --bootstrap-server kafka01:9092 --topic first-topic
Topic: first-topic TopicId: g_CJm0fRR_-ztyoLeNlg2g PartitionCount: 6 ReplicationFactor: 2 Configs: min.insync.replicas=1
Topic: first-topic Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1 Elr: LastKnownElr:
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Elr: LastKnownElr:
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3 Elr: LastKnownElr:
Topic: first-topic Partition: 3 Leader: 3 Replicas: 3,2 Isr: 3,2 Elr: LastKnownElr:
Topic: first-topic Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1 Elr: LastKnownElr:
Topic: first-topic Partition: 5 Leader: 1 Replicas: 1,3 Isr: 1,3 Elr: LastKnownElr:
root@kafka01:~#
可以看到每个分区都有其领导者、两个副本和两个同步副本集(ISR)。分区领导者是一个代理节点,负责将分区数据提供给客户端,而副本仅保留副本。如果副本节点在过去十秒内与领导者保持同步,则默认情况下被视为 ISR。此时间间隔可以根据每个主题进行配置。
生产消息
现在已经创建了一个主题,使用 kafka-console-producer.sh
脚本生成其消息。运行以下命令以启动生产者:
kafka-console-producer.sh --topic first-topic --bootstrap-server kafka01:9092
broker正在等待输入文本消息。输入 Hello World!
并按 ENTER
。提示将如下所示:
>Hello World!
>
生产者现在正在等待下一条消息,这意味着之前的消息已成功传递给 Kafka。可以输入任意数量的消息进行测试。要退出生产者,请按 CTRL+C
。
消费消息
需要一个消费者来从主题中读取消息。Kafka 提供了一个简单的消费者,名为 kafka-console-consumer.sh
。通过运行以下命令来执行它:
kafka-console-consumer.sh --topic first-topic --from-beginning \
--bootstrap-server kafka01:9092
您将看到从主题中读取的消息:
Hello World!
...
模拟节点故障
在第三个 Kafka 节点上,通过运行以下命令停止服务:
sudo systemctl stop kafka
然后,通过运行来描述主题:
kafka-topics.sh --describe --bootstrap-server kafka01:9092 --topic first-topic
输出将类似于此:
root@kafka01:~# kafka-topics.sh --describe --bootstrap-server kafka01:9092 --topic first-topic
Topic: first-topic TopicId: g_CJm0fRR_-ztyoLeNlg2g PartitionCount: 6 ReplicationFactor: 2 Configs: min.insync.replicas=1
Topic: first-topic Partition: 0 Leader: 1 Replicas: 3,1 Isr: 1 Elr: LastKnownElr:
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Elr: LastKnownElr:
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2 Elr: LastKnownElr:
Topic: first-topic Partition: 3 Leader: 2 Replicas: 3,2 Isr: 2 Elr: LastKnownElr:
Topic: first-topic Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1 Elr: LastKnownElr:
Topic: first-topic Partition: 5 Leader: 1 Replicas: 1,3 Isr: 1 Elr: LastKnownElr:
root@kafka01:~#
之前所有在节点3上为leader角色的分区,已经将其他节点的follower分区切换为leader分区。尽管节点 3
被列为副本,但由于不可用,它在 ISR 集合中缺失。
一旦它重新加入集群,它将与其他节点同步并尝试恢复其之前的位置。
再试着阅读来自 first-topic
的消息:
kafka-console-consumer.sh --topic first-topic --from-beginning \
--bootstrap-server kafka01:9092
你会看到它们像往常一样可访问:
Hello World!
...
由于副本的存在,前两个节点接管并为消费者提供服务。现在可以在第三台服务器上启动 Kafka:
sudo systemctl start kafka
在这一步中,已经看到 Kafka 如何减轻集群中节点不可用的情况。
在节点之间迁移数据
在这一步中,您将学习如何在 Kafka 集群中迁移主题。当向现有集群添加带有主题的节点时,Kafka 不会自动将任何分区转移到该节点,这可能是您想要做的。这对于移除节点也很有用,因为现有的分区不会自动移动到剩余的节点。
Kafka 提供了一个名为kafka-reassign-partitions.sh
的脚本,可以生成、执行和验证迁移计划。使用它来创建一个将first-topic
的分区移动到前两个节点的计划。
首先,您需要定义哪些主题应该被移动。脚本接受一个包含主题定义的 JSON 文件,因此请创建并打开它以进行编辑:
vi topics-to-move.json
添加以下行:
topics-to-move.json
{
"topics": [
{
"topic": "first-topic"
}
],
"version": 1
}
在topics
下,您定义一个引用first-topic
的对象。完成后,保存并关闭文件。
运行以下命令生成迁移计划,将 kafka01
替换为指向其中一个 Kafka 节点的域名:
kafka-reassign-partitions.sh --bootstrap-server kafka01:9092 \
--topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate
您将 "1,2"
传递给 --broker-list
,表示目标经纪人的 ID。
输出将类似于此:
Current partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}
该脚本总共生成了两个计划,分别描述了当前和提议的分区布局。如果您需要稍后恢复更改,将提供第一个计划。请注意第二个计划,将其存储在一个名为 migration-plan.json
的单独文件中。创建并打开该文件进行编辑:
vi migration-plan.json
添加第二个执行计划:
migration-plan.json
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
保存并关闭文件。然后,运行以下命令来执行它:
kafka-reassign-partitions.sh --bootstrap-server kafka01:9092 \
--reassignment-json-file migration-plan.json --execute
输出将是:
Current partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-topic-0,first-topic-1,first-topic-2,first-topic-3,first-topic-4,first-topic-5
脚本指出迁移已经开始。要查看迁移的进度,请传入 --verify
。
kafka-reassign-partitions.sh --bootstrap-server kafka01:9092 \
--reassignment-json-file migration-plan.json --verify
经过一段时间,输出将类似于此:
Status of partition reassignment:
Reassignment of partition first-topic-0 is completed.
Reassignment of partition first-topic-1 is completed.
Reassignment of partition first-topic-2 is completed.
Reassignment of partition first-topic-3 is completed.
Reassignment of partition first-topic-4 is completed.
Reassignment of partition first-topic-5 is completed.
Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic first-topic
现在可以描述 first-topic
以验证代理 3
上没有分区:
kafka-topics.sh --describe --bootstrap-server kafka01:9092 --topic first-topic
输出将如下所示:
Topic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 1,2
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 5 Leader: 1 Replicas: 1,2 Isr: 2,1
只有代理 1
和 2
作为副本和 ISR 存在,这意味着迁移成功。