源码解析:Apache RocketMQ重置消费位点

引入

reset offset,即重置消费进度,一般在以下场景中使用:

  1. 需要重新消费已经消费过的消息,重置到最早位置或根据时间进行重置。
  2. 消息积压,不需要消费积压的消息,重置到最新位置,使其从最新位置开始消费。

重置到最早、或者根据时间进行重置与消息补发的区别?
● 消息补发是将原先的消息由生产者重发一次,与区别的那边消息本质上不是同一条消息(除了消息体一样以外)。
● 重置操作是操作消费位点(offset),本质上还是消费生产者之前发送的那条消息。
源码解析

重置offset起始调用位置:

org.apache.rocketmq.tools.admin.DefaultMQAdminExt#resetOffsetByTimestamp

org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetNewConcurrent

区别:
org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetNewConcurrent
● 这个看看用来并发的重置消费者的offset。可以多个consumer、多个queue可以同时进行处理。
org.apache.rocketmq.tools.admin.DefaultMQAdminExt#resetOffsetByTimestamp
● 用来根据给定的时间戳来重置消费者的偏移量。

这两个入口本质上都是resetOffset,没有本质上的区别,我们以resetOffsetNewConcurrent为例,具体流程如下图:
在这里插入图片描述

  • 首先是examineTopicRouteInfo:主要是获取topic的路由信息,如果路由信息不存在,则无法进行后续操作。
  • 再者是InvokeBrokerToResetOffset:根据上一步拿到的路由信息,遍历路由,一次向broker发起调用。
  • 请求到达服务端(Broker端),判断是否是Broker端侧处理 ○ Broker端处理:
    • 前置检查(look-ahead check):判断当前BrokerRole是否正确、检验当前Topic、ConsumerGroup是否存在,不满足任意条件,直接返回。
    • 将传递过来的offset或者根据timestamp查询到的offset统一放置到queueOffsetMap中
    • assignResetOffset:将上一步的queueOffsetMap的offset放到对应的resetOffsetTable和offsetTable中。
    • 最后prepare reset result并返回response。
  • Client端处理:
    • 先执行queryOffset:查询当前topic下的group下offsetTable中是是否存储了offset信息,有就返回对应的值,没有返回-1;
    • 前置检查(look-ahead check):检查上一步返回结果consumerOffset是否为-1,为-1表示当前group不存在;检查timeStampOffset是否满足条件;满足上述所有条件将timeStampOffset/consumerOffset中较为小的值放到offsetTable中,如果是C的客户端,直接将timestampOffset放入offsetTable中。
    • 请求到达客户端后,先将对应的consumer挂起(suspend),清除ProcessQueue中的消息,在sleep 10s。
    • 再执行updateConsumeOffset:更新consumerOffset。
    • 最后再resume,继续消费。

补充:
如果是服务端重置,重置之后的offset会写入resetOffsetTable中,在后续进行拉取操作的时候会删除resetOffsetTable中对应的offset;如果queryThenEraseResetOffset中有返回值,将resetOffset作为GetMessageResult的nextBeginOffset,拉取操作用的offset。

public Long queryThenEraseResetOffset(String topic, String group, Integer queueId) {
    String key = topic + TOPIC_GROUP_SEPARATOR + group;
    ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
    if (null == map) {
        return null;
    } else {
        return map.remove(queueId);
    }
}

参考:
● https://rocketmq.apache.org/
● https://github.com/apache/rocketmq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/249729.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

订单管理系统开发经验的总结:优化流程、提升效率的关键实践

前言 一.订单管理系统的架构设计 二.订单系统的详细设计 1.拆分 2.换货 3.发货 4.拦截 5.取消 6.物流回传 三.订单系统的订单状态流转 初始状态 中间状态 异常状态 终态 四.订单系统的关键代码逻辑 五.结语 前言 两年来&#xff0c;整个订单管理系统经过大大小…

[MySQL]数据库概述

目录 1.什么是数据库 2.数据库分类 2.1关系型数据库 2.2非关系型数据库 1.什么是数据库 我们知道&#xff0c;存储数据可以使用文件来存储。那么为什么我们还要大费周章的去设计和使用数据库呢&#xff1f; 因为文件保存数据有以下几个缺点&#xff1a; 1.文件的安全性不…

Attention机制学习

写在前面 注意力机制是一个很不错的科研创新点方向&#xff0c;但是没有系统记录过学习过程&#xff0c;这里记录科研中遇到的各种注意力机制。 1. Attention机制解释 本质上来说用到attention的任务都有Query&#xff0c;Key&#xff0c;Value三个关键components&#xff0c;…

扩展学习|大数据挖掘与智能体ABM建模

出处&#xff1a;计算社会科学_中南大学_中国大学MOOC(慕课) (icourse163.org)https://www.icourse163.org/course/CSU-1466004186 ps&#xff1a;相关内容来自于本人笔记&#xff0c;若有需要请联系原作者&#xff01;个人学习留存&#xff0c;侵权必删&#xff01; 一…

可回收资源的环保螺旋盖葡萄酒

在酿酒师中&#xff0c;选择哪种瓶盖来保存一瓶葡萄酒主要取决于葡萄酒的种类和酿酒师自己的偏好。在20世纪70年代&#xff0c;澳洲朋友引进并推广了一种保存葡萄酒的新方法&#xff0c;这种新方法螺旋盖并在70年代获得专利&#xff0c;投入商业使用&#xff0c;澳大利亚的酿酒…

【STM32】STM32学习笔记-OLED显示屏(10)

00. 目录 文章目录 00. 目录01. OLED显示屏接线图02. OLED函数库03. OLED测试代码04. Keil调试05. 程序下载06. 附录 01. OLED显示屏接线图 02. OLED函数库 oled.h #ifndef __OLED_H #define __OLED_Hvoid OLED_Init(void); void OLED_Clear(void); void OLED_ShowChar(uint8…

图片变成动图如何操作?掌握这个办法就够了

生动有趣的gif动画图片是怎么制作的呢&#xff1f;其实&#xff0c;制作gif动图的方法很简单&#xff0c;无需下载任何软件&#xff0c;使用gif动图制作&#xff08;https://www.gif.cn/&#xff09;工具-GIF中文网。只需要上传jpg、png格式的图片&#xff0c;轻松一键就能在线…

Reinfocement Learning 学习笔记PartⅡ

文章目录 Reinfocement Learning六、随机近似与随机梯度下降&#xff08;Stochastic Approximation & Stochastic Gradient Descent&#xff09;6.1 Robbins-Monro Algorithm6.2 随机梯度下降 七、时序差分方法&#xff08;Temporal-Difference Learning&#xff09;7.1 TD…

OpenAI 承认 ChatGPT 最近的懒惰:由于用户体验到响应缓慢和无用的输出,调查正在进行中

文章目录 一. ChatGPT 指令遵循能力下降引发用户投诉1.1 用户抱怨回应速度慢、敷衍回答、拒绝回答和中断会话 二. OpenAI 官方确认 ChatGPT 存在问题&#xff0c;展开调查三. OpenAI 解释模型行为差异&#xff0c;回应用户质疑四. GPT-4 模型变更受人事动荡和延期影响 一. Chat…

【设计模式--行为型--中介者模式】

设计模式--行为型--中介者模式 中介者模式定义结构案例实现优缺点使用场景 中介者模式 定义 又叫调停模式&#xff0c;定义一个中介角色来封装一系列对象之间的交互&#xff0c;使原有对象之间的耦合松散&#xff0c;且可以独立的改变它们之间的交互。 结构 抽象中介者角色…

IntelliJ IDEA 运行 若依分离版后端

一、本地运行 一、选择打开IntelliJ IDEA项目 二、选择若依项目 如&#xff1a;java123 三、等待右下角的准备工作&#xff08;有进度条的&#xff09;完成 四、修改MySQL 五、修改资源上传目录 六、修改redis 七、然后点击运行 八、成功图 九、测试访问 二、部署服务器运行 …

策略+工厂完成支付方式选择(微信/支付宝),简单实现

需求 传参String payType wechat 使用微信支付传参String payType ali 使用支付宝支付代码不允许出现if-else 思路 把支付当作一个行为&#xff0c;代码中当作一个接口&#xff0c;payService。2个实现类&#xff0c;分别是微信支付实现类WeChatPayServiceImpl&#xff0c…

【数据结构】单链表的定义和操作

目录 1.单链表的定义 2.单链表的创建和初始化 3.单链表的插入节点操作 4.单链表的删除节点操作 5.单链表的查找节点操作 6.单链表的更新节点操作 7.完整代码 &#x1f308;嗨&#xff01;我是Filotimo__&#x1f308;。很高兴与大家相识&#xff0c;希望我的博客能对你有所帮助…

汽车充电协议OpenV2G的平替cbexigen!!

纵所周知&#xff0c;开源欧规协议 CCS 的 OpenV2G 协议不支持 ISO15118-20:2022 协议&#xff0c;并且软件维护者已经明确不在进行该软件的维护。 前几天在 Github 上冲浪发现了一个宝藏开源项目&#xff0c;完美的实现的 OpenV2G 的 Exidizer 工具的功能&#xff1a;cbexigen…

Goldstein枝切法对存在间断相位缺陷的解缠研究

摘要: Goldstein枝切法作为相位解缠中路径积分法的重要算法之一&#xff0c;其解缠结果易受到噪声或间断相位缺陷所引起的残差点影响。为了研究相位间断缺陷对解缠算法的影响&#xff0c;模拟了具有间断相位缺陷的数据&#xff0c;采用 Gold-stein枝切法进行了系统的解缠研究。…

嵌入式C语言变量、数组、指针初始化的多种操作

在敲代码的时候&#xff0c;我们会给变量一个初始值&#xff0c;以防止因为编译器的原因造成变量初始值的不确定性。 对于数值类型的变量往往初始化为0&#xff0c;但对于其他类型的变量&#xff0c;如字符型、指针型等变量等该如何初始化呢&#xff1f; 数值类变量初始化 整…

巴贝拉葡萄酒是单一品种还是混合品种制成的?

大多数巴贝拉葡萄酒都是由单一的巴贝拉葡萄品种制成的&#xff0c;许多意大利葡萄酒商开始尝试在巴贝拉葡萄酒中加入其它葡萄品种&#xff0c;其中两个最受欢迎的意大利品种是皮埃蒙特的巴贝拉德阿尔巴和达斯蒂。和朋友在一家意大利餐厅吃饭&#xff0c;被酒单吓到了&#xff1…

MySQL数据库 入门

目录 一、MySQL概述 二、MySQL安装 安装数据库 配置数据库 启动停止数据库 客户端连接数据库 三、数据模型 四、SQL 一、MySQL概述 先来讲解三个概念&#xff1a;数据库、数据库管理系统、 SQL 。 而目前主流的关系型数据库管理系统的市场占有率排名如下&#xff1a; …

Android--Jetpack--数据库Room详解一

人生何须万种愁&#xff0c;千里云烟一笑收 一&#xff0c;定义 Room也是一个ORM框架&#xff0c;它在SQLite上提供了一个抽象层&#xff0c;屏蔽了部分底层的细节&#xff0c;使用对象对数据库进行操作&#xff0c;进行CRUD就像对象调用方法一样的简单。 二&#xff0c;角色介…

​【EI会议征稿中】#先投稿,先送审#第三届网络安全、人工智能与数字经济国际学术会议(CSAIDE 2024)​

#先投稿&#xff0c;先送审# 第三届网络安全、人工智能与数字经济国际学术会议&#xff08;CSAIDE 2024&#xff09; 2024 3rd International Conference on Cyber Security, Artificial Intelligence and Digital Economy 2024年3月1日-3日 | 中国南京 会议官网&#xff1a…