kafka系列之消费后不提交offset情况的分析总结

概述
每当我们调用Kafka的poll()方法或者使用@KafkaListener(其实底层也是poll()方法)时,它都会返回之前被写入Kafka的记录,即我们组中的消费者还没有读过的记录。 这意味着我们有一种方法可以跟踪该组消费者读取过的记录。 如前所述,Kafka的一个独特特征是它不会像许多JMS队列那样跟踪消费过的记录。 相反,它允许消费者使用Kafka跟踪每个分区中的位置(偏移)。

我们将更新分区中当前位置的操作称为提交(commits)。

那么消费者是如何提交偏移量(offset)的呢? 它向Kafka生成一条消息,指向一个特殊的 __consumer_offsets主题,包含每个分区需要提交的偏移量。 但是,如果消费者崩溃或新的消费者加入消费者群体,这将触发重新平衡(rebalance)。 在重新平衡之后,可以为每个消费者分配一组新的分区而不是之前处理的分区。 然后消费者将读取每个分区的已提交偏移量并从那里继续。

如果提交的偏移量小于客户端处理的最后一条消息的偏移量,那么最后处理的偏移量与提交的偏移量之间的消息将被处理两次,如下图:
在这里插入图片描述

如果提交的偏移量大于客户端实际处理的最后一条消息的偏移量,那么消费者组将忽略上次处理的偏移量与提交的偏移量之间的所有消息,如下图:
在这里插入图片描述
自动提交(Automatic Commit)
提交偏移量的最简单方法是允许消费者来完成。 如果配置 enable.auto.commit=true,则消费者每五秒钟将提交客户端从poll()收到的最大偏移量。 五秒间隔是默认值,可通过设置auto.commit.interval.ms来控制。 就像消费者中的其他机制一样,自动提交由poll loop驱动。 无论您何时轮询,消费者都会检查是否需要提交,如果是,它将提交它在上次轮询中返回的偏移量。
虽然这个选取很方便,但是它也有一定的不足。
请注意,默认情况下,自动提交每五秒钟发生一次。 假设我们在最近的提交之后三秒钟并且触发了重新平衡。 在重新平衡之后,所有消费者将从最后提交的偏移开始消费。 在这种情况下,偏移量是三秒钟之前偏移量,因此在这三秒内到达的所有事件将被处理两次。 可以将提交间隔配置为更频繁地提交并减少记录将被复制的窗口,但是不可能完全消除它们。
启用自动提交后,对poll的调用将始终提交上一轮询返回的最后一个偏移量。 它不知道实际处理了哪些事件,因此在再次调用poll()之前,始终处理完poll()返回的所有事件至关重要, 因为和poll()一样,close()方法也会自动提交偏移量。

自动提交很方便,但它们不能给开发人员足够的控制以避免重复的消息。

故最近在使用kafka的过程中遇到了一个疑问,在查阅了一些资料和相关blog之后,做一下总结和记录。

问题:消费者在消费消息的过程中,配置参数spring.kafka.listener .ackMode设置为不自动提交offset,在消费完数据之后如果不手动提交offset,那么在程序中和kafak中的数据会如何被处理呢?

spring.kafka.listener.ackMode:指定消息确认模式,包括 RECORD、BATCH 和 MANUAL_IMMEDIATE等。可根据需求选择不同的确认模式,用于控制消息的确认方式。

ackMode是个枚举类型:

  • RECORD
    每处理一条commit一次
  • BATCH(默认)
    每次poll的时候批量提交一次,频率取决于每次poll的调用频率
  • TIME
    每次间隔ackTime的时间去commit
  • COUNT
    累积达到ackCount次的ack去commit
  • COUNT_TIME
    ackTime或ackCount哪个条件先满足,就commit
  • MANUAL
    处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交。最终也是批量提交。
  • MANUAL_IMMEDIATE
    每次处理完业务,手动调用Acknowledgment.acknowledge()后立即提交

参考Kafka系列之SpringBoot集成Kafka

————————————————————————————————————————————————————————————

首先简单的介绍一下消费者对topic的订阅。客户端的消费者订阅了topic后,如果是单个消费者,那么消费者会顺序消费这些topic分区中的数据,如果是创建了消费组有多个消费者,那么kafak的服务端将这些topic平均分配给每个消费者。比如有2个topic,每个topic有2个分区,总共有4个分区,如果一个消费组开了四个消费者线程,那么每个消费者将被分配一个分区进行消费。一般建议是一个消费组里的消费者的个数与订阅的topic的总分区数相等,这样可以达到最高的吞吐量。如果消费者的个数大于订阅的topic的总分区,那么多出的消费者将分配不到topic的分区,等于是白白创建了一个消费者线程,浪费资源。

我们进入正题,对开头提出的问题的总结如些:
  
注意:以下情况均基于kafka的消费者关闭自动提交offset的条件下。亦是基于同一个消费者组的情况,因为不同的消费者组之间,他们彼此的offset偏移量是完全独立的。

  1. 如果消费端在消费kafka的数据过程中,一直没有提交offset,那么在此程序运行的过程中它不会重复消费。但是如果重启之后,就会重复消费之前没有提交offset的数据。

  2. 如果在消费的过程中有几条或者一批数据数据没有提交offset(比如异常情况程序没有走到手动提交的代码),后面其他的消息消费后正常提交offset至服务端,那么服务端会更新为消费后最新的offset,不会重新消费,就算重启程序或者rebalance也不会重新消费。

  3. 消费端如果没有提交offset,程序不会阻塞或者重复消费,除非在消费到这个你没有提交offset的消息时你新增或者减少消费端,此时会发生rebalance现象,即可再次消费到这个未提交offset的数据,产生重复消费问题。因为客户端也记录了当前消费者的offset信息,所以程序会在每次消费了数据之后,自己记录offset,而手动提交到服务端的offset与这个并没有关系,所以程序会继续往下消费。在发生rebalance现象之后,会从服务端得到最新的offset信息记录到本地。所以说如果当前的消费的消息没有提交offset,此时在你重新初始化消费者之后,可得到这条未提交消息的offset,从此位置开始消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/777770.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

一种一维时间序列信号变化/事件/异常检测方法(MATLAB)

随着工业物联网、大数据和人工智能的发展,传统工业正在向数字化和智能化升级,从而创造了大量的时间序列数据。通过分析这些数据,可以提供准确可靠的信息服务和决策依据,促进制造业的转型升级。工业物联网在传统工业向“工业 4.0”…

CASS如何输入距离和坐标绘制图形

1、软件版本 这里使用CASS7.0软件进行操作。如下: 2、 操作 输入:500,45【表示距离500米,方向45度】。 点击回车,完成绘制。

V3.76 ViVaCut高级版本!已经开启永久订阅!一款专业视频剪辑软件

在短视频和社交媒体盛行的今天,视频内容的创作和分享已经成为人们日常生活的一部分。为了满足广大视频创作者的需求,一款专业视频剪辑软件应运而生,以其强大的功能和用户友好的界面,为用户提供了全新的视频编辑体验。他提供了众多…

JWT(Json Web Token)在.NET Core中的使用

登录成功时生成JWT字符串目录 JWT是什么? JWT的优点: JWT在.NET Core 中的使用 JWT是什么? JWT把登录信息(也称作令牌)保存在客户端为了防止客户端的数据造假,保存在客户端的令牌经过了签名处理&#xf…

TikTok马来西亚直播网络怎么配置?

TikTok是一款全球流行的社交媒体应用,在东南亚地区拥有大量用户。在马来西亚这个多元化的国家,配置高效稳定的直播网络对TikTok的运营至关重要。 配置马来西亚直播网络的必要性 广泛的地理覆盖:马来西亚包括大片陆地和众多岛屿,网…

加入运动健康数据开放平台,共赢鸿蒙未来

HarmonyOS SDK运动健康服务(Health Service Kit)是为华为生态应用打造的基于华为帐号和用户授权的运动健康数据开放平台。在获取用户授权后,开发者可以使用运动健康服务提供的开放能力获取运动健康数据,基于多种类型数据构建运动健…

大数据Spark 面经

1: Spark 整体架构 Spark 是新一代的大数据处理引擎,支持批处理和流处理,也还支持各种机器学习和图计算,它就是一个Master-worker 架构,所以整个的架构就如下所示: 2: Spark 任务提交命令 一般我们使用shell 命令提…

理解MySQL核心技术:存储过程与函数的强大功能

在大型应用程序和复杂的数据库操作中,存储过程与函数扮演着至关重要的角色。它们不仅可以提高代码的可维护性,还能加强数据库的安全性和性能。本篇文章将深入探讨MySQL存储过程与函数的基础知识、创建、管理及其在实际应用中的优势。 什么是存储过程和函…

set的应用(C++)

set的使用 【基本用法】 大家可以敲一下这段代码体会一下set的基本初始化和使用 #include <iostream> #include <set> #include <vector> using namespace std;int main() {set<int> st1; // 空的set// 使用迭代器构造string str("abcdef"…

uniapp实现一个键盘功能

前言 因为公司需要&#xff0c;所以我.... 演示 代码 键盘组件代码 <template><view class"keyboard_container"><view class"li" v-for"(item, index) in arr" :key"index" click"changArr(item)" :sty…

【GIt】变基(rebase)

目录 变基(rebase)是什么为什么有变基变基后的时间线变基前的时间线 变基原理怎么变基同一个分支变基不同分支变基 参考文章 变基(rebase)是什么 Git 变基&#xff08;rebase&#xff09;是一种用于整合分支的方法&#xff0c;它的工作原理是将一系列提交&#xff08;或分支合…

Pycharm远程连接GPU(内容:下载安装Pycharm、GPU租借、配置SSH、将代码同步到镜像、命令行操控远程镜像、配置远程GPU解释器)

目录 windows下载安装pycharmGPU租借网站AutoDlfeaturize好易智算 GPU租借GPU选择选择镜像充值 然后创建镜像创建成功 复制SSH登录信息 远程进入镜像 在Pycharm中进行ssh连接新建SFTP配置SSH复制ssh根据复制的信息填写ssh配置测试连接 将代码同步到远程镜像上设置mappings将本地…

XAML 框架横向对比

多年来&#xff0c;基于 XAML 的 UI 框架有了很大的发展。下面的图表很好地证明了这个观点。XAML UI 框架的三大巨头&#xff1a;Avalonia UI、Uno Platform 和 .NET MAUI 都支持跨平台的应用。事实上&#xff0c;除了 Avalonia UI&#xff0c;对跨平台 XAML 的需求是它们发展的…

Mysql部署MHA高可用

部署前准备&#xff1a; mysql-8.0.27下载地址&#xff1a;https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-8.0.27-1.el7.x86_64.rpm-bundle.tar mha-manager下载地址&#xff1a;https://github.com/yoshinorim/mha4mysql-manager/releases/download/v0.58/mha4mysql-mana…

三丰云评测:免费虚拟主机与免费云服务器的全面对比

三丰云是一家知名的互联网服务提供商&#xff0c;专注于虚拟主机和云服务器的服务。在互联网技术日新月异的今天&#xff0c;选择一个优质的云服务提供商至关重要。本次评测将重点对比三丰云的免费虚拟主机和免费云服务器&#xff0c;帮助用户更好地选择适合自己需求的服务。首…

Java基础-接口与实现

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;请留下您的足迹&#xff09; 目录 Java 接口 什么是接口&#xff1f; 声明接口 实现接口 继承接口 接口的多继承 标记接口 Java 接口 …

【海贼王的数据航海】ST表——RMQ问题

目录 1 -> RMQ问题 1.1 -> 定义 1.2 -> 解决策略 2 -> ST表 2.1 -> 定义 2.2 什么是可重复贡献问题 2.3 -> 预处理ST表 2.4 -> 处理查询 2.5 -> 实际问题 1 -> RMQ问题 1.1 -> 定义 RMQ (Range Minimum/Maximum Query)即区间最值查询…

Qwen1.5-1.8b部署

仿照ChatGLM3部署&#xff0c;参考了Qwen模型的文档&#xff0c;模型地址https://modelscope.cn/models/qwen/Qwen1.5-1.8B-Chat/summary http接口 服务端代码api.py from fastapi import FastAPI, Request from transformers import AutoTokenizer, AutoModelForCausalLM, …

BitWidget,自定义bit控件

由于QBitArray并不满足我做界面是的需求&#xff0c;所以参照QBitArray简单的写了个控件&#xff0c;如下所示&#xff0c;源码及实例在我上传的资源包中 实例 帮助文档如图所示&#xff08;部分&#xff09; 帮助文档&#xff08;在资源包中&#xff09; 1.html文档 2.chm文…

操作系统期末复习真题练习二

选择题 1.在操作系统中,处于就绪状态和等待状态的进程都没有占用处理机,当处理机空闲时()。 A.就绪状态的进程和等待状态的进程都可以转换成运行状态 B.只有就绪状态的进程可以转换成运行状态 C.只有等待状态的进程可以转换成运行状态 D.就绪状态的进程和等待状态的进程都不能转…