概述
每当我们调用Kafka的poll()方法或者使用Spring的@KafkaListener(其实底层也是poll()方法)注解消费Kafka消息时,它都会返回之前被写入Kafka的记录,即我们组中的消费者还没有读过的记录。
这意味着我们有一种方法可以跟踪该组消费者读取过的记录。 如前所述,Kafka的一个独特特征是它不会像许多JMS队列那样跟踪消费过的记录。相反,它允许消费者使用Kafka跟踪每个分区中的位置(位移,也称偏移量)。
我们将更新分区中当前位置的操作称为提交位移(commits offset)。
偏移量 offset,也称位移
- 每个消费者在消费消息的过程中必然需要有个字段记录它当前消费到了分区的哪个位置上,这个字段就是消费者位移(Consumer Offset),它是消费者消费进度的指示器。
- 看上去Offset 就是一个数值而已,其实对于 Consumer Group 而言,它是一组 KV 对,Key 是分区,V 对应 Consumer 消费该分区的最新位移 TopicPartition->long
- 不过切记的是消费者位移是下一条消息的位移,而不是目前最新消费消息的位移。
- 提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。
那么消费者是如何提交偏移量(offset)的呢? 它向Kafka生成一条消息,指向一个特殊的 __consumer_offsets主题,俗称位移主题。 __consumer_offsets主题包含每个分区需要提交的偏移量。 但是,如果消费者组的消费者崩溃或新的消费者加入消费者组,这将触发重新平衡(rebalance),即消费者组内的消费者负责的分区会发生变化。 在重新平衡之后,可以为每个消费者分配一组新的分区而不是之前处理的分区。 然后消费者将读取每个分区的已提交偏移量并从那里继续。
位移的提交
- Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。
- 位移提交的方式
- 从用户的角度来说,位移提交分为自动提交和手动提交;
- 从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。
- 大数据组件都关闭了自动提交,采取了手动提交。
原文链接:https://blog.csdn.net/king14bhhb/article/details/114657998
重复消费、消息丢失
重复消费
如果提交的偏移量小于客户端处理的最后一条消息的偏移量,那么最后处理的偏移量与提交的偏移量之间的消息将被处理两次。
如下图:
消息丢失
如果提交的偏移量大于客户端实际处理的最后一条消息的偏移量,那么消费者组将忽略上次处理的偏移量与提交的偏移量之间的所有消息。如下图:
位移提交方案
自动提交(Automatic Commit)
提交偏移量的最简单方法是允许消费者来完成。 如果配置 enable.auto.commit=true
,则消费者每五秒钟将提交客户端从poll()收到的最大偏移量。 五秒间隔是默认值,可通过设置auto.commit.interval.ms
来控制。 就像消费者中的其他机制一样,自动提交由poll loop驱动。 无论您何时轮询,消费者都会检查是否需要提交,如果是,它将提交它在上次轮询中返回的偏移量。
它实际保证的是位移至少要隔一段时间才会提交,如果你是单线程处理消息,那么只有处理完消息后才会提交位移,可能远比你设置的间隔长,因为你的处理逻辑可能需要一定的时间。
提交的时机
- Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。
- 从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。
- 但自动提交位移的一个问题在于,它可能会出现重复消费,如果处理失败了下次开始的时候就会从上一次提交的offset 开始处理
存在的问题
数据重复写入
假设 Consumer 当前消费到了某个主题的最新一条消息,位移是 100,之后该主题没有任何新消息产生,故 Consumer 无消息可消费了,所以位移永远保持在 100。由于是自动提交位移,位移主题中会不停地写入位移 =100 的消息。显然 Kafka 只需要保留这类消息中的最新一条就可以了,之前的消息都是可以删除的。
位移提交和rebalance
虽然自动提交很方便,但是它也有一定的不足。
请注意,默认情况下,自动提交每五秒钟发生一次。 假设我们在最近的提交之后三秒钟并且触发了重新平衡。 在重新平衡之后,所有消费者将从最后提交的偏移开始消费。 在这种情况下,偏移量是三秒钟之前的偏移量,因此在这三秒内到达的所有事件将被处理两次。 可以将提交间隔配置为更频繁地提交并减少记录将被复制的窗口,但是不可能完全消除它们。这是自动提交机制的一个缺陷(其实就是重复消费的问题)。
启用自动提交后,对poll的调用将始终提交上一轮询返回的最后一个偏移量。 它不知道实际处理了哪些事件,因此在再次调用poll()之前,始终处理完poll()返回的所有事件至关重要, 因为和poll()一样,close()方法也会自动提交偏移量。
其实仔细思考,手动提交也存在这个问题,因为rebalance会先让所以的消费者停止消费,因为在kafka的角度来看,消息消费的那一刻,消费已经完成,所以停止消费的时候,你的逻辑很可能没有完成,那么你的offset 也很可能没有提交。在rebalance后分区重新分配的消费者会重新从服务端获取分区的offset值,此时可能是消费端提交前的offset,也会产生重复消费问题。
自动提交很方便,但它们不能给开发人员足够的控制以避免重复的消息。
手动提交
- 很多与 Kafka 集成的大数据框架都是禁用自动提交位移的,如 Spark、Flink 等。这就引出了另一种位移提交方式:手动提交位移,即设置 enable.auto.commit = false。一旦设置了 false
- Kafka Consumer API 为你提供了位移提交的方法,如 consumer.commitSync 等。当调用这些方法时,Kafka 会向位移主题写入相应的消息。
同步手动提交
实现方案
- 设置
enable.auto.commit
为 false - 代码中手动提交
public static void main(String[] args) {
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
// 模拟消息的处理逻辑
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
try {
//处理完当前批次的消息,在轮询更多的消息之前,调用commitSync方法提交当前批次最新的消息
consumer.commitSync();
} catch (CommitFailedException e) {
e.printStackTrace();
}
}
}
存在的问题
从名字上来看,它是一个同步操作,即该方法会一直等待,直到位移被成功提交才会返回。如果提交过程中出现异常,该方法会将异常信息抛出。
commitSync()的问题在于,Consumer程序会处于阻塞状态,直到远端的Broker返回提交结果,这个状态才会结束,需要注意的是同步提交会在提交失败之后进行重试。
在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS
异步手动提交
实现方案
- 设置
enable.auto.commit
为 false - 代码中异步提交
下面都是三个测试用例都是异步提交,不同之处在于有没有去实现回调函数。建议生产环境中一定要实现,至少记录下日志。
@Test
public void asynCommit1(){
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
consumer.commitAsync();
}
}
@Test
public void asynCommit2(){
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
// 异步回调机制
consumer.commitAsync(new OffsetCommitCallback(){
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception!=null){
System.out.println(String.format("提交失败:%s", offsets.toString()));
}
}
});
}
}
@Test
public void asynCommit3(){
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
consumer.commitAsync((offsets, exception) ->{
if (exception!=null){
System.out.println(String.format("提交失败:%s", offsets.toString()));
}
});
}
}
从名字上来看它就不是同步的,而是一个异步操作。调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等。
存在的问题
commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的,所以只要在程序停止前最后一次提交成功即可。
这里提供一个解决方案,那就是不论成功还是失败我们都将offsets信息记录下来,如果最后一次提交成功那就忽略,如果最后一次没有提交成功,我们可以在下次重启的时候手动指定offset。
综合异步和同步来提交
try {
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
consumer.commitAsync();
}
} catch (CommitFailedException e) {
System.out.println(String.format("提交失败:%s", e.toString()));
} finally {
consumer.commitSync();
}
同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。
精细化提交(分批提交)
设想这样一个场景:你的 poll 方法返回的不是 500 条消息,而是 5000 条。那么,你肯定不想把这 5000 条消息都处理完之后再提交位移,因为一旦中间出现差错,之前处理的全部都要重来一遍。这类似于我们数据库中的事务处理。很多时候,我们希望将一个大事务分割成若干个小事务分别提交,这能够有效减少错误恢复的时间。
对于一次要处理很多消息的 Consumer 而言,它会关心社区有没有方法允许它在消费的中间进行位移提交。比如前面这个 5000 条消息的例子,你可能希望每处理完 100 条消息就提交一次位移,这样能够避免大批量的消息重新消费。
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
for (ConsumerRecord<String, String> record : records) {
// 数据的处理逻辑
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
// 记录下offset 信息
offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
if (count % 100 == 0) {
// 回调处理逻辑是null
consumer.commitAsync(offsets, null);
}
count++;
}
try {
//处理完当前批次的消息,在轮询更多的消息之前,调用commitSync方法提交当前批次最新的消息
consumer.commitSync(offsets);
} catch (CommitFailedException e) {
e.printStackTrace();
}
}
CommitFailedException 异常处理
产生原因
从源代码方面来说,CommitFailedException 异常通常发生在手动提交位移时,即用户显式调用 KafkaConsumer.commitSync() 方法时。因为KafkaConsumer.commitSync()有重试机制,所以一般的网络原因可以排除,发生这个异常的原因主要就是超时了,但是这个超时不是说提交本身超时了,而是消息的处理时间超长,导致发生了Rebalance,已经将要提交位移的分区分配给了另一个消费者实例。
熟悉的错误:
Exception in thread “main” org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
解决方案
缩短单条消息处理的时间
比如之前下游系统消费一条消息的时间是 100 毫秒,优化之后成功地下降到 50 毫秒,那么此时 Consumer 端的 TPS 就提升了一倍。
增加 Consumer 端允许下游系统消费一批消息的最大时长
这取决于 Consumer 端参数 max.poll.interval.ms
的值。在最新版的 Kafka 中,该参数的默认值是 5 分钟。如果你的消费逻辑不能简化,那么提高该参数值是一个不错的办法。
值得一提的是,Kafka 0.10.1.0 之前的版本是没有这个参数的,因此如果你依然在使用 0.10.1.0 之前的客户端 API,那么你需要增加 session.timeout.ms
参数的值。不幸的是,session.timeout.ms
参数还有其他的含义,因此增加该参数的值可能会有其他方面的“不良影响”,这也是社区在 0.10.1.0 版本引入 max.poll.interval.ms
参数,将这部分含义从 session.timeout.ms
中剥离出来的原因之一。
减少下游系统一次性消费的消息总数
这取决于 Consumer 端参数 max.poll.records
的值。当前该参数的默认值是 500 条,表明调用一次 KafkaConsumer.poll 方法,最多返回 500 条消息。
可以说,该参数规定了单次 poll 方法能够返回的消息总数的上限。
如果前两种方法对你都不适用的话,降低此参数值是避免 CommitFailedException 异常最简单的手段。
下游系统使用多线程来加速消费
这应该算是“最高级”同时也是最难实现的解决办法了。具体的思路就是,让下游系统手动创建多个消费线程处理 poll 方法返回的一批消息。
之前你使用 Kafka Consumer 消费数据更多是单线程的,所以当消费速度无法匹及 Kafka Consumer 消息返回的速度时,它就会抛出 CommitFailedException 异常。
如果是多线程,你就可以灵活地控制线程数量,随时调整消费承载能力,再配以目前多核的硬件条件,该方法可谓是防止 CommitFailedException 最高档的解决之道。
事实上,很多主流的大数据流处理框架使用的都是这个方法,比如 Apache Flink 在集成 Kafka 时,就是创建了多个 KafkaConsu
消费端消费后不提交offset情况的分析总结
故最近在使用kafka的过程中遇到了一个疑问,在查阅了一些资料和相关blog之后,做一下总结和记录。
问题:消费者在消费消息的过程中,配置参数
spring.kafka.listener .ackMode
设置为不自动提交offset,在消费完数据之后如果不手动提交offset,那么在程序中和kafak中的数据会如何被处理呢?
spring.kafka.listener.ackMode
:指定消息确认模式,包括 RECORD、BATCH 和 MANUAL_IMMEDIATE等。可根据需求选择不同的确认模式,用于控制消息的确认方式。
ackMode是个枚举类型:
- RECORD
每处理一条commit一次 - BATCH(默认)
每次poll的时候批量提交一次,频率取决于每次poll的调用频率 - TIME
每次间隔ackTime的时间去commit - COUNT
累积达到ackCount次的ack去commit - COUNT_TIME
ackTime或ackCount哪个条件先满足,就commit - MANUAL
处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交。最终也是批量提交。 - MANUAL_IMMEDIATE
每次处理完业务,手动调用Acknowledgment.acknowledge()后立即提交
参考Kafka系列之SpringBoot集成Kafka
————————————————————————————————————————————————————————————
首先简单的介绍一下消费者对topic的订阅。
- 客户端的消费者订阅了topic后,如果是单个消费者,那么消费者会顺序消费这些topic分区中的数据;
- 如果是创建了消费组有多个消费者,那么kafak的服务端将这些topic平均分配给每个消费者。比如有2个topic,每个topic有2个分区,总共有4个分区,如果一个消费组开了四个消费者线程,那么每个消费者将被分配一个分区进行消费。一般建议是一个消费组里的消费者的个数与订阅的topic的总分区数相等,这样可以达到最高的吞吐量。如果消费者的个数大于订阅的topic的总分区,那么多出的消费者将分配不到topic的分区,等于是白白创建了一个消费者线程,浪费资源。
我们进入正题,对开头提出的问题的总结如些:
注意:以下情况均基于kafka的消费者关闭自动提交offset的条件下。亦是基于同一个消费者组的情况,因为不同的消费者组之间,他们彼此的offset偏移量是完全独立的。
-
如果消费端在消费kafka的数据过程中,一直没有提交offset,那么在此程序运行的过程中它不会重复消费。但是如果重启之后,就会重复消费之前没有提交offset的数据。
-
如果在消费的过程中有几条或者一批数据数据没有提交offset(比如异常情况程序没有走到手动提交的代码),后面其他的消息消费后正常提交offset至服务端,那么服务端会更新为消费后最新的offset,不会重新消费,就算重启程序或者rebalance也不会重新消费。
-
消费端如果没有提交offset,程序不会阻塞或者重复消费,除非在消费到这个你没有提交offset的消息时你新增或者减少消费端,此时会发生rebalance现象,即可再次消费到这个未提交offset的数据,产生重复消费问题。因为客户端也记录了当前消费者的offset信息,所以程序会在每次消费了数据之后,自己记录offset,而手动提交到服务端的offset与这个并没有关系,所以程序会继续往下消费。在发生rebalance现象之后,会从服务端得到最新的offset信息记录到本地。所以说如果当前的消费的消息没有提交offset,此时在你重新初始化消费者之后,可得到这条未提交消息的offset,从此位置开始消费。