国科大学习生活(期末复习资料、课程大作业解析、大厂实习经验心得等): 文章专栏(点击跳转)
大数据开发学习文档(分布式文件系统的实现,大数据生态圈学习文档等): 文章专栏(点击跳转)
【消息队列】Kafka从入门到面试学习总结
- 1. 什么是Kafka?
- 1.1 Kafka定义
- 1.2 Kafka主要应用场景
- 1.3 Kafka基础架构
- 2. Kafka 生产者
- 2.1 生产者消息发送流程
- 2.2 生产者分区
- 2.2.1 分区的原因
- 2.2.2 分区策略
- 2.3 生产者如何提高吞吐量
- 2.4 生产经验
- 2.4.1 数据可靠性
- 2.4.2 幂等性和事务
- 2.4.3 数据顺序乱序问题
- 3. Kafka Broker
- 3.1 Kafka Broker工作流程
- 3.2 生产经验
- 3.2.1 服役退役节点
- 3.2.2 Leader 和 Follower 故障处理细节
- 3.3 Kafka文件存储
- 3.3.1 文件存储机制
- 3.3.2 文件清理策略
- 3.4 高效读写数据
- 4. Kafka 消费者
- 4.1 Kafka 消费者工作流程
- 4.2 消费者组初始化流程
- 4.3 消费者组详细消费流程
- 4.4 分区分配及再平衡策略
- 4.5 offset 位移
- 4.5.1 offset提交策略
- 4.5.2 漏消费和重复消费
- 4.6 生产经验
- 4.6.1 消费者事务
- 4.6.2 数据积压(消费者如何提高吞吐量)
- 5. Kafka-Eagle监控
- 6. Kafka-Kraft模式
- 6.1 基于ZK与基于KRaft的Kafka
- 6.2 新旧模式对比
- 7. Kafka外部系统集成
- 参考文献
- 项目地址
前言:Kafka最新版本已经可以不依赖ZooKeeper来实现,但经典永不过时,再怎么变其原理都是一致的。故本文基于依赖ZooKeeper的Kafka3.x展开。
1. 什么是Kafka?
1.1 Kafka定义
Kafka传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于大数据实时处理领域。
- 发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。
Kafka最新定义:Kafka是一个开源的分布式事件流平台(Event Streaming
Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
1.2 Kafka主要应用场景
缓冲/消峰:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
- 例如:双十一期间,上有产生数据的速度为10亿/s,但是下游系统消费数据的速度为1千万/s,此时就需要一个消息队列来起到缓冲消峰的作用。
解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
- 例如:如果遇到上游有多个生产者数据源、下游有多个消费者时,相互之间获取数据的时候就需要遵守各自的接口规范,导致系统之间的耦合度较高。此时就需要消息队列来对他们进行解耦,生产者与消费者仅需和Kafka打交道而无需关注多变的接口约束。
异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。
- 例如:用户注册某账号后需要发送注册成功的短信,如果注册成功的消息需等待短信发送成功后再返回则会等待较长时间,此时可以使用Kafka来做异步通信:用户注册成功后直接返回注册成功,并将发送短信的请求放入Kafka中,由下游发送短信接口顺序消费即可。
1.3 Kafka基础架构
- Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic。
- Broker:一台 Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker 可以容纳多个topic。
- Producer:消息生产者,就是向Kafka broker发消息的客户端。
- Consumer:消息消费者,向Kafka broker取消息的客户端。
- Consumer Group(CG): 消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
- Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个topic可以分为多个partition(物理上的概念),每个partition是一个有序的队列。
- Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个Follower。
- Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
- Follower:每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和Leader 数据的同步。Leader发生故障时,某个Follower会成为新的Leader。
注:消息队列的两种模式:
点对点模式:仅有一个生产者一个消费者,消费者主动拉取数据,收到数据后立刻删除消息队列中数据。
发布/订阅模式:适用于多个生产者多个消费者的场景;消费者消费数据后不立即删除数据、每个消费者相互独立消费数据。
2. Kafka 生产者
2.1 生产者消息发送流程
流程图详解:
Interceptors:拦截器,Kafka通常不使用拦截器对数据进行处理。通常是通过上游数据源数据采集工具进行处理(如Flume)
Serializer:序列化器(因为要进行网络通信)
Partitioner:分区器,因为Kafka是以分布式存储的方式存储数据
batch.size:只有数据积累到batch.size之后,sender才会发送数据。默认16k
linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。
ACK应答:Kafka集群收到消息后的应答级别
- 0:生产者发送过来的数据,不需要等数据落盘应答。
- 1:生产者发送过来的数据,Leader收到数据后应答。
- -1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。-1和all等价。
总体流程:
在消息发送的过程中,涉及到了两个线程——Main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender 线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。
2.2 生产者分区
2.2.1 分区的原因
- 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
- 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
2.2.2 分区策略
- 指明partition的情况下,直接将指明的值作为partition值;
- 例如:partition=0,所有数据写入分区0。
- 没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
- 例如:key1的hash值=5,key2的hash值=6 ,topic的partition数=2,那么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。
- 既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。
- 例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到,Kafka再随机一个分区进行使用(如果还是0会继续随机)。
2.3 生产者如何提高吞吐量
从上述生产者Producer发送数据到Kafka Broker的流程图可知,要想提高生产者吞吐量:
// batch.size:批次大小,默认16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms:等待时间,默认0
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// RecordAccumulator:缓冲区大小,默认32M:buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
// compression.type:压缩,默认none,可配置值gzip、snappy、lz4和zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
2.4 生产经验
2.4.1 数据可靠性
- 至少一次(AtLeastOnce)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2;可以保证数据不丢失,但是不能保证数据不重复。
- 最多一次(AtMostOnce)= ACK级别设置为0;可以保证数据不重复,但是不能保证数据不丢失。
需求:精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。 Kafka 0.11版本以后,引入了一项重大特性。
2.4.2 幂等性和事务
-
幂等性原理
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。 精确一次(ExactlyOnce) = 幂等性 + 至少一次(ack = -1 + 分区副本数 >= 2 + ISR最小副本数量 >= 2)。
重复数据的判断标准:具有<PID, Partition, SeqNumber>
相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。 所以幂等性只能保证的是在单分区单会话内不重复。
-
Kafka生产者事务原理
说明:开启事务,必须开启幂等性。总体流程:
首先Producer向Kafka集群请求PID(实现幂等性所需),成功获取到PID后发送消息到Kafka集群Broker中的Topic中,此时若Producer发起Commit请求,Kafka集群的对应Broker中的Transaction Coordinator(事务协调器)会持久化该Commit请求到__transaction_state Topic(存储事务信息的特殊主题)中;此时Broker后台发起Commit请求,成功后会将成功信息持久化到__transaction_state中。- __transaction_state Topic:默认有50个分区,每个分区负责一部分事务,事务划分是根据Transaction.id的hashcode值%50计算出该事务属于哪一个分区。该分区Leader副本所在的Broker节点即为这个transaction.id对应的Transaction Coordinator节点。
Kafka生产者事务一共有如下5个API:
// 1初始化事务
void initTransactions();
// 2开启事务
void beginTransaction() throws ProducerFencedException;
// 3在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws ProducerFencedException;
// 4提交事务
void commitTransaction() throws ProducerFencedException;
// 5放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
2.4.3 数据顺序乱序问题
由生产者Producer发送数据流程图:
放大Sender线程部分:
kafka在1.x版本之前保证数据单分区有序,条件如下:
max.in.flight.requests.per.connection = 1
(不需要考虑是否开启幂等性)。即:将上图中的缓存请求数改为1个。
kafka在1.x及以后版本保证数据单分区有序,条件如下:
(1)未开启幂等性
max.in.flight.requests.per.connection需要设置为1。同样是将上图中的缓存请求数改为1个。
(2)开启幂等性
max.in.flight.requests.per.connection需要设置小于等于5。 故无论如何,都可以保证最近5个request的数据都是有序的。因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,
3. Kafka Broker
3.1 Kafka Broker工作流程
在zookeeper的服务端存储的Kafka相关信息:
- /kafka/brokers/ids [0,1,2] 记录有哪些服务器
- /kafka/brokers/topics/first/partitions/0/state{“leader”:1 ,“isr”:[1,0,2] } 记录谁是Leader,有哪些服务器可用
- /kafka/controller {“brokerid”:0} 辅助选举Leader
总体流程:
- Broker们启动后向ZK注册节点信息并抢占注册Controller;
- Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群broker 的上下线,所有topic的分区副本分配和Leader选举等工作。
- 注册成功Controller后开始监听ZK中的brokers/ids节点信息变化
- Controller决定Leader选举,原则:在ISR中存活,并在AR中排在前面的优先成为Leader。
- 例如:AR[1,0,2], ISR[1,0,2],那么leader就会按照1,0,2的顺序轮询。
- Kafka 中副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader,然后Follower找Leader进行同步数据。
- 选举成功后Controller上节点信息到ZK,brokers/ topics/ first /partitions /0 / state
- 其他broker节点从ZK同步集群节点信息
- 假设Broker1中的Leader挂了,上述Controller会监听到节点变化,会从ZK获取ISR信息,并按照上述原则选举新Leader,最后更新Leader及ISR
AR:Kafka分区中的所有副本统称为AR(Assigned Repllicas)
ISR:表示和Leader保持同步的 Follower集合。
OSR:表示Follower与Leader副本同步时,延迟过多的副本。
AR = ISR + OSR
3.2 生产经验
3.2.1 服役退役节点
- 编写topics-to-move.json创建要负载均衡的主题
- 生成执行计划:–topics-to-move-json-file topics-to-move.json --broker-list “0,1,2,3” –generate
- 执行计划:–reassignment-json-file increase-replication-factor.json –execute
3.2.2 Leader 和 Follower 故障处理细节
LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。
HW(High Watermark):所有副本中最小的LEO 。
-
Follower故障
o Follower发生故障后会被临时踢出ISR(Leader/Follower存活节点)
o 这个期间Leader和Follower继续接收数据
o 待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。
o 等该Follower的LEO >= 该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了。 -
Leader故障
o Leader发生故障之后,会从ISR中选出一个新的Leader
o 为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。(即:会丢数据)
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
3.3 Kafka文件存储
3.3.1 文件存储机制
Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端(顺序写),为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0。
稀疏索引
.index文件为稀疏索引。大约每往.log文件中写入4kb数据(默认,可以使用og.index.interval.bytes来调整),就会往.index文件写入一条索引。
.index文件中保存的offset为相对offset,这样能确保offset的值所占空间不会过大,因此能将offset的值控制在固定大小。
查找数据过程
首先根据目标offset定位到segment文件;然后找到小于等于目标offset的最大offset所对应的索引项;定位到log文件;向下遍历找到目标Record。
3.3.2 文件清理策略
Kafka 中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间。
最低优先级小时,默认7天。 log.retention.hours,
分钟。log.retention.minutes,
最高优先级毫秒。log.retention.ms,
负责设置检查周期,默认5分钟。 log.retention.check.interval.ms,
当日志超过了设置的保存时间,会触发清理策略:delete(删除)或compact(压缩):
- delete日志删除:将过期数据删除
- log.cleanup.policy = delete 所有数据启用删除策略
(1)基于时间:默认打开。以 segment中所有记录中的最大时间戳作为该文件时间戳。
(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的segment。
log.retention.bytes,默认等于-1,表示无穷大。
- log.cleanup.policy = delete 所有数据启用删除策略
- compact日志压缩:对于相同key的不同value值,只保留最后一个版本。
log.cleanup.policy = compact 所有数据启用压缩策略
这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。
3.4 高效读写数据
- Kafka本身是分布式集群,可以采用分区技术,并行度高
- 读数据采用稀疏索引,可以快速定位要消费的数据
- 顺序写磁盘:官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
- 页缓存 + 零拷贝技术
- PageCache页缓存:Kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。
- 零拷贝:Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。KafkaBroker应用层不关心存储的数据,所以就不用走应用层,传输效率高。
以读消费数据过程为例:
- 页缓存+非零拷贝技术:
生产者将数据写到页缓存以及磁盘中,下游消费者发起读数据请求(消费数据),Kafka将数据从页缓存或者磁盘读出来后会交由应用层,然后再发往内核空间中的Socket Cache,最后在通过网卡发送给消费者。 - 页缓存+零拷贝技术:
下游消费者发起读数据请求(消费数据),Kafka将数据从页缓存或者磁盘读出来后直接交由网卡发送到消费者。减少了大量无用的数据传输,提高了读写效率。
4. Kafka 消费者
4.1 Kafka 消费者工作流程
Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
消费者组内每个消费者负责消费不同分区的数据(可能是多个Topic),一个分区只能由一个组内消费者消费。
消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
如果向消费组中添加更多的消费者,超过Topic分区数量,则有一部分消费者就会闲置,不会接收任何消息。
4.2 消费者组初始化流程
Coordinator:辅助实现消费者组的初始化和分区的分配。
Coordinator节点选择 = Groupid的Hashcode值% 50( __consumer_offsets的分区数量)
例如:Groupid的Hashcode值 = 1,1 % 50 = 1,那么__consumer_offsets主题的1号分区,在哪个broker上,就选择这个节点的Coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。
总体流程:
当Consumer启动后会向Coordinator发起JoinGroup请求,此时Coordinator会选出一个Consumer作为Leader,并将要消费的Topic情况发送给Leader消费者。Leader消费者收到Topic信息后会制定消费方案并将消费方案发给Coordinator。此时Coordinator会将该消费方案广播到各个Follower消费者。然后各消费者开始消费。
- 每个消费者都会和coordinator保持心跳(默认3s),一旦超时(session.timeout.ms=45s),该消费者会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms 5分钟),也会触发再平衡。
4.3 消费者组详细消费流程
总体流程:
消费者组中的消费者发起消费数据请求sendFetches,Kafka集群中的数据分区将数据发送到completedFetches队列中,然后消费者通过FetchedRecordes从队列中拉取数据,然后依次经过反序列化器、拦截器后得到数据。
Fetch.min.bytes每批次最小抓取大小,默认1字节
fetch.max.wait.ms一批数据最小值未达到的超时时间,默认500ms
Fetch.max.bytes每批次最大抓取大小,默认50m
FetchedRecords请求从队列中抓取数据
Max.poll.records一次拉取数据返回消息的 最大条数,默认500条
独立消费者消费数据方式:(订阅主题、(订阅分区
消费者组消费数据方式:同一个主题的分区数据,只能由一个消费者组中的一个消费。
4.4 分区分配及再平衡策略
一个consumer group中有多个consumer组成,一个topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据?
Kafka有四种主流的分区分配策略:Range、RoundRobin、Sticky、CooperativeSticky。
可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。
分区分配策略之 Range
分区分配策略
Range是对每个Topic而言的。
- 首先对同一个topic中的所有分区按分区号排序,并对消费者按照字母顺序排序。
- 通过Pattitons数 / consumer数 来决定每个消费者应该消费几个分区。
- 如果除不尽,那么前面几个消费者就会多消费一个分区。
举例:当前有七个分区,三个消费者,排序后的分区为0.1.2.3.4.5.6.7;消费者排序完之后为C1 C2 C3.那么8/3 = 2 余 2,那么C1消费0.1.2三个分区、C2消费3.4.5三个分区,C3消费6.7两个分区。
注意:此策略是针对单个Topic而言的,如果集群中有N多个topic,那么消费者C1会比其他消费者多消费大量分区,容易造成数据倾斜。
再平衡策略
- 停止掉1号消费者,快速重新发送消息观看结果(45s以内,越快越好)。
1号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,时间到了45s后,判断它真的退出就会把任务 整体 分配给其他broker执行。 - 再次重新发送消息观看结果(45s以后)。
消费者0已经被踢出消费者组,所以重新按照range方式分配。
分区分配策略之 RoundRobin
分区分配策略
RoundRobin是针对集群内所有Topic而言的。
RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来;
然后按照hashcode进行排序,最后通过轮询算法来分配 partition 给到各个消费者。
再平衡策略
- 停止掉1号消费者,快速重新发送消息观看结果(45s以内,越快越好)。
1 号消费者的任务会按照RoundRobin的方式,把数据轮询分成 0 、6和 3号分区数据,分别由1号消费者或者2号消费者消费。 - 再次重新发送消息观看结果(45s以后)
消费者0已经被踢出消费者组,所以重新按照RoundRobin方式分配。
分区分配策略之 Sticky
分区分配策略
粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面;在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
再平衡策略
- 停止掉1号消费者,快速重新发送消息观看结果(45s以内,越快越好)。
1 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0和 1号分区数据,分别由1号消费者或者2号消费者消费。 - 再次重新发送消息观看结果(45s以后)
消费者0已经被踢出消费者组,消费者0已经被踢出消费者组,所以重新按照粘性方式分配。
4.5 offset 位移
Kafka0.9版本之前,consumer默认将offset保存在Zookeeper中;从0.9版本开始,consumer默认将offset保存在Kafka 一个内置的topic中,该topic为__consumer_offsets
。
__consumer_offsets 主题里面采用 key 和 value的方式存储数据。key是 group.id+topic+分区号
,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行Compact,也就是每个group.id + topic + 分区号就保留最新数据。
4.5.1 offset提交策略
- 自动提交offset:为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。
- 手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。
- commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
- commitAsync(异步提交):发送完提交offset请求后,就开始消费下一批数据了。
4.5.2 漏消费和重复消费
- 指定Offset消费:当Kafka中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量 时(例如该数据已被删除),该怎么办?
- 指定时间消费:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。 例如要求按照时间消费前一天的数据,怎么处理?
重复消费:已经消费了数据,但是offset没提交。
漏消费:先提交offset后消费,有可能会造成数据的漏消费。
怎么能做到既不漏消费也不重复消费呢?详看消费者事务。
4.6 生产经验
4.6.1 消费者事务
如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比如MySQL)。这部分知识会在后续项目部分涉及。
4.6.2 数据积压(消费者如何提高吞吐量)
- 如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。
- 如果是下游的数据处理不及时:提高每批次拉取的数量。因为批次拉取数据过少(拉取数据/处理时间 < 生产速度), 使处理的数据小于生产的数据,也会造成数据积压。
5. Kafka-Eagle监控
Kafka-Eagle框架可以监控Kafka集群的整体运行情况,在生产环境中经常使用。
核心原理和功能:
- 实时监控:Kafka-Eagle 提供了实时监控 Kafka 集群的功能,包括关键性能指标、主题和分区的状态、消费者组的活动等。
- 历史数据分析:除了实时监控,Kafka-Eagle 还支持历史数据的分析和查询,帮助用户了解 Kafka 集群的趋势和性能变化。
- 告警系统:Kafka-Eagle 内置了强大的告警系统,允许用户定义各种告警规则,以便在出现问题时及时采取行动。
- 可定制性:Kafka-Eagle 具有高度可定制性,可以根据您的需求进行配置和扩展。您可以自定义监控面板、图表、报告以及数据源。
- JMX 监控:为了使 Kafka-Eagle 能够收集 Kafka 的性能指标,需要在 Kafka 服务器上开启 JMX 端口,这样 Kafka-Eagle 才能通过 JMX 接口获取 Kafka 的运行时数据。
- 多集群支持:Kafka-Eagle 支持管理多个 Kafka 集群,并且可以管理 Kafka 的 Topic(查看、删除、创建等),也可以对消费者状态进行监控。
- 用户界面:Kafka-Eagle 提供了一个用户友好的 Web 界面,用于实时显示 Kafka 集群的状态和性能指标,以及配置告警规则。
- Kafka SQL:Kafka-Eagle 支持使用 Kafka SQL 实时消费,允许用户通过 SQL 查询 Kafka 中的数据。
- 数据保留策略:Kafka-Eagle 允许用户设置数据的默认保留时间,确保监控数据的及时清理和存储效率。
- 告警通知方式:Kafka-Eagle 支持多种告警通知方式,包括邮件、短信、Webhook 等,确保用户能够及时收到告警信息。
思考:如果不使用Kafka-Eagle如何实现监控Kafka集群功能?
Interceptor拦截器!可以通过在此处设置检查点,同样可以实现监控Kafka集群的功能。
6. Kafka-Kraft模式
6.1 基于ZK与基于KRaft的Kafka
Zookeeper 提供了配置服务、分布式同步、命名服务、Leader 选举和集群管理等功能,在大数据时代的开始很多开源产品都依赖 Zookeeper 来构建,Apache Kafka 也不例外。但是随着 Kafka 功能的演进和应用的场景越来越多:
- 基于 Zookeeper 的协作模式,使得 Kafka 的集群一致性维护越来越复杂;
- 受到 Zookeeper 性能的限制,使得 Kafka 无法支撑更大的集群规模;
- 并且 Zookeeper 自身带来的运维复杂性和产品稳定性,也同样将复杂度和风险负担传递到 Kafka 运维人员;
因此作为 Zookeeper 的替代,Kafka 3.3.1 提供了 KRaft 元数据管理组件。下图来自于 KIP-500 [1]提案,左右分别是 Zookeeper 模式和 KRaft 模式的部署架构图。
6.2 新旧模式对比
- 在 Zookeeper (后面简称为 ZK)模式下:
- 运维部署:3 个 ZK 节点;2…N 个 Broker 节点,其中一个 Broker 承担 Controller 的角色。除了拉起一套最小生产的 Kafka 集群需要至少 3 + N 的资源外,Kafka 的运维人员要同时掌握 ZK 和 Kafka Broker 两套完全不同的系统的运维方式。
- 通信协调:ZK 节点之间通过 ZAB 协议进行一致性协调;Broker 会通过 ZK 来选出一个 Controller 负责全局的协调,同时也会直接修改 ZK 里的数据;Controller 也会监听和修改 ZK 里的数据,并调用 Broker 来完成集群的协调。虽然 ZK 之间的一致性由 ZAB 来保障了,但是 ZK 与 Controller 之间和 Controller 与 Broker 之间的一致性是相对比较脆弱的。
- 在 KRaft 模式下:
- 运维部署:3 个 Controller 节点;0…N 个 Broker 节点。Kafka 节点可以同时承担 Controller 和 Broker 两个角色,因此一套最小生产集群只需要 3 个节点。在测试环境更可以只以 1 节点模式就可以轻量地拉起一个 Kafka 集群。
- 通信协调:Controller 节点底层通过 Raft 协议达成一致,Controller 的内存状态通过 #replay Raft Log 来构建,因此 Controller 之间的内存状态都是一致的;Broker 订阅 KRaft Log 维护和 Controller 一致的内存状态,并且通过事件驱动的方式执行 Partition Reassignment 之类的操作来实现集群最终一致性协调。整个集群的状态维护和一致性协调都是基于 KRaft 中的事件。
ZAB协议以及Raft协议介绍可以参考我另外一篇文章:
分布式一致性算法Paxos、Raft 及 Zookeeper ZAB 蹒跚者_Stan_-CSDN博客
7. Kafka外部系统集成
集成Flume、Spark、Flink
Flume、Spark、Flink是在大数据开发领域中非常常用的组件。他们都既可以用于Kafka的生产者,也可以用于 Kafka的消费者。
具体实现参考我的其他文章:
待补充
参考文献
Kafka 如何基于 KRaft 实现集群最终一致性协调 - SegmentFault 思否
Kafka3.x教程(从入门到调优,深入全面)_哔哩哔哩_bilibili
KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum - Apache Kafka - Apache
Software Foundation
项目地址
BigDataDev: 大数据核心框架学习pro (gitee.com)
欢迎大家参考!
14点08分 2024年10月13日
消息队列 Kafka内容学习整理,如有错误,欢迎评论区交流指出。
不积跬步无以至千里!