RocketMQ源码分析(四) 延迟消息源码分析

0.前文

RocketMQ源码分析(三) 消费者
RocketMQ源码分析(二) 生产者
RocketMQ源码分析(一)broker启动&remoting抽象

1. 概述

RocketMQ的延迟消息是指消息发送到Broker后,不会立即被消费者消费,而是要等待指定的时间后才能被消费。本文将从源码层面分析延迟消息的实现原理。

2. 核心类图

在这里插入图片描述

主要类及其关系:

  1. ScheduleMessageService

    • Broker端延迟消息处理服务
    • 管理延迟级别的定时任务
    • 转换延迟消息到真实主题
  2. DelayCombineMessageStore

    • 延迟消息存储实现
    • 管理延迟消息的commitlog
    • 提供延迟消息的读写能力
  3. ScheduleMessageTask

    • 延迟消息调度任务
    • 处理到期的延迟消息
    • 将消息投递到真实主题

3. 延迟消息实现原理

3.1 延迟级别设计

RocketMQ默认支持18个延迟级别:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

实现原理:

  1. 每个延迟级别对应一个定时任务
  2. 每个延迟级别对应一个内部主题
  3. 延迟消息先存储到对应级别的主题
  4. 到期后投递到真实主题

3.2 消息发送流程

在这里插入图片描述

发送流程:

  1. Producer设置消息延迟级别
  2. Broker接收到消息后:
    • 计算延迟时间戳
    • 将消息存储到延迟主题
    • 返回发送结果

示例代码:

Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
// 设置延迟级别为3级(10秒)
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);

3.3 消息调度流程

在这里插入图片描述

调度流程:

  1. ScheduleMessageService启动定时任务
  2. 扫描延迟队列中到期的消息
  3. 将消息投递到目标主题
  4. 更新消费进度

核心代码:

public class ScheduleMessageService extends ConfigManager {
    private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>();
    private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = new ConcurrentHashMap<Integer, Long>();
    
    public void executeOnTimeup() {
        // 处理到期的延迟消息
        ConsumeQueue cq = DefaultMessageStore.this.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
        // 读取消息
        SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
        // 投递到真实主题
        MessageExtBrokerInner msgInner = MessageDecoder.decode(bufferMsg);
        msgInner.setTopic(messageExt.getTopic());
        // 更新消费进度
        PutMessageResult putMessageResult = DefaultMessageStore.this.putMessage(msgInner);
    }
}

3.4 消息消费流程

在这里插入图片描述

消费流程:

  1. 消息到期后投递到真实主题
  2. Consumer正常消费目标主题
  3. 对Consumer透明,无需特殊处理

4. 实现细节

4.1 延迟主题设计

  • 使用SCHEDULE_TOPIC_XXXX作为延迟主题
  • 每个延迟级别对应一个队列ID
  • 队列ID = 延迟级别 - 1

4.2 定时调度机制

  • 基于DelayQueue实现定时功能
  • 每个延迟级别一个调度任务
  • 任务执行时间由延迟级别决定
  • 支持动态调整调度参数

4.3 消息转换机制

延迟消息转换过程:

  1. 保存原始消息主题
  2. 临时存储到延迟主题
  3. 到期后恢复原始主题
  4. 投递到目标队列

5. 最佳实践

5.1 使用建议

  1. 合理选择延迟级别
  2. 避免设置过多的延迟消息
  3. 注意延迟消息的顺序性
  4. 考虑消息可靠性要求

5.2 常见问题

  1. 延迟精度问题

    • 依赖定时任务调度
    • 存在一定误差范围
    • 不适合精确定时要求
  2. 顺序性问题

    • 同一延迟级别基本有序
    • 不同级别无法保证顺序
    • 需要业务层面考虑
  3. 性能问题

    • 会占用额外存储空间
    • 增加系统调度开销
    • 需要合理规划容量

6. 总结

RocketMQ延迟消息的特点:

  1. 支持多个延迟级别
  2. 实现机制简单高效
  3. 对用户基本透明
  4. 可靠性有保证

通过定时调度和主题转换的方式,实现了灵活可靠的延迟消息功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/939363.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

嵌入式单片机中对应GPIO外设详解实现

一、GPIO外设详解 大家可以看到,函数库开发的时候外设的使用流程都是一样的,接下来就讲解一下细节。 l定义一个外设的结构体变量 变量命名规则 PPP_InitTypeDef PPP_InitStructure; 每个外设都有对应的结构体,结构体的定义一般都是存放在每个外设的头文件内,比如GPIO外…

C# OpenCvSharp DNN 实现百度网盘AI大赛-表格检测第2名方案第三部分-表格方向识别

目录 说明 效果 模型 项目 ​编辑 代码 参考 下载 其他 说明 百度网盘AI大赛-表格检测的第2名方案。 该算法包含表格边界框检测、表格分割和表格方向识别三个部分&#xff0c;首先&#xff0c;ppyoloe-plus-x 对边界框进行预测&#xff0c;并对置信度较高的表格边界…

智源研究院与腾讯达成战略合作,推动大模型技术前沿探索和应用落地

2024 年 12 月 18日&#xff0c; 智源研究院与腾讯签署战略合作协议&#xff0c;双方将在大模型研发、人工智能技术前沿探索及开源生态建设等领域展开深入合作。智源研究院院长王仲远、副院长兼总工程师林咏华&#xff0c;腾讯集团高级执行副总裁、云与智慧产业事业群总裁汤道生…

C++特殊类设计(单例模式等)

目录 引言 1.请设计一个类&#xff0c;不能被拷贝 2. 请设计一个类&#xff0c;只能在堆上创建对象 为什么设置实例的方法为静态成员呢 3. 请设计一个类&#xff0c;只能在栈上创建对象 4. 请设计一个类&#xff0c;不能被继承 5. 请设计一个类&#xff0c;只能创建一个对…

[SAP ABAP] ALV报表练习1

销售订单明细查询报表 业务目的&#xff1a;根据选择屏幕的筛选条件&#xff0c;使用 ALV 报表&#xff0c;显示销售订单详情 效果展示 用户的输入条件界面 用户的查询结果界面(部分截图) 完整代码如下所示 主程序(zsd001_437) *&----------------------------------…

Docker日志与监控

一、引言 随着容器技术在生产环境中被广泛应用&#xff0c;Docker容器的日志管理与监控变得尤为重要。在现代应用程序中&#xff0c;容器化的应用通常是由多个容器组成的服务&#xff0c;而容器中的日志与监控则是确保服务健康运行、诊断问题和优化性能的关键。通过日志和监控…

信号槽【QT】

文章目录 对象树字符集信号槽QT坐标系信号与槽connect自定义槽自定义信号disconnect 对象树 #ifndef MYLABEL_H #define MYLABEL_H#include<QLabel> class MyLabel : public QLabel { public:// 构造函数使用带 QWidget* 版本的.// 确保对象能够加到对象树上MyLabel(QWi…

3.zabbix中文设置

1、zabbix中文设置 2、中文乱码的原因 zabbix使用DejaVuSan.ttf字体&#xff0c;不支持中文&#xff0c;导致中文出现乱码。解决方法很简单&#xff0c;把我们电脑里面字体文件传到zabbix服务器上。 3、解决zabbix乱码方法 3.1、从Window服务器找到相应的字休复制到zabbix S…

电脑连接不上手机热点 找不到到服务器的ip地址

手机热点连接不上 找不到到服务器的ip地址 emmm希望不会有人不会吧 解决方法&#xff1a; 1.点击右上角图标进入设置 2.点击更改所有wifi网络的DNS设置 3.查看自己的IP分配和DNS分配是不是DHCP自动分配&#xff0c;不是的话就不对了&#xff0c;需要点击编辑手动改一下 4.改完…

计算机网络之王道考研读书笔记-2

第 2 章 物理层 2.1 通信基础 2.1.1 基本概念 1.数据、信号与码元 通信的目的是传输信息。数据是指传送信息的实体。信号则是数据的电气或电磁表现&#xff0c;是数据在传输过程中的存在形式。码元是数字通信中数字信号的计量单位&#xff0c;这个时长内的信号称为 k 进制码…

MySQL数据库04|内置函数、存储过程、视图、事务、索引

目录 十三、MySQL常用内置函数 1、字符串函数 1️⃣拼接字符串&#xff1a;concat(str1,str2,…) 2️⃣包含字符个数&#xff1a;length(str) 3️⃣截取字符串&#xff1a;left(str,len)、right(str,len)、substring(str,pos,len) 4️⃣去除空格&#xff1a;ltrim(str)、r…

【Unity3D】实现可视化链式结构数据(节点数据)

关键词&#xff1a;UnityEditor、可视化节点编辑、Unity编辑器自定义窗口工具 使用Newtonsoft.Json、UnityEditor相关接口实现 主要代码&#xff1a; Handles.DrawBezier(起点&#xff0c;终点&#xff0c;起点切线向量&#xff0c;终点切线向量&#xff0c;颜色&#xff0c;n…

Group FLUX - Beta Sprint Essay2

文章目录 I. SCRUMAchievements from yesterday’s stand-up meeting to the present Commit recordFrontend-CommitsBackend-Commits PM ReportBurnup mapRunning image of our current program I. SCRUM Achievements from yesterday’s stand-up meeting to the present Zh…

硬盘清洁器 -一个功能出色的的文件与使用纪录清理工具,不仅可以将磁盘中不必要的暂存盘一次扫除,供大家学习研究参考

【核心功能】 1.硬件性能检测。 2.清理日常垃圾信息。 3.永久性删除文件。不可恢复擦除可用空间。 4.系统恢复和还原。 5.磁盘管理。 6.重复文件删除。坏链清除&#xff0c;删除非必要文件。 7.恢复删除文件。含电子照片、PDF、视频等。 8.批量重命名。 下载&#xff1a;https:…

[Linux] 进程信号概念 | 信号产生

&#x1fa90;&#x1fa90;&#x1fa90;欢迎来到程序员餐厅&#x1f4ab;&#x1f4ab;&#x1f4ab; 主厨&#xff1a;邪王真眼 主厨的主页&#xff1a;Chef‘s blog 所属专栏&#xff1a;青果大战linux 总有光环在陨落&#xff0c;总有新星在闪烁 为什么我的课设这么难…

流程引擎Activiti性能优化方案

流程引擎Activiti性能优化方案 Activiti工作流引擎架构概述 Activiti工作流引擎架构大致分为6层。从上到下依次为工作流引擎层、部署层、业务接口层、命令拦截层、命令层和行为层。 基于关系型数据库层面优化 MySQL建表语句优化 Activiti在MySQL中创建默认字符集为utf8&…

51c视觉~合集36

我自己的原文哦~ https://blog.51cto.com/whaosoft/12275223 #无监督盲超分算法MLMC 即插即用的解决方案 本文介绍了一种新的无监督盲超分辨率算法MLMC&#xff0c;该算法结合了元学习和马尔可夫链蒙特卡罗核估计&#xff0c;无需监督预训练或参数先验&#xff0c;即可实现…

Firecrawl教程①:自动化抓取与数据转化,赋能AI应用

Firecrawl教程①:自动化抓取与数据转化,赋能AI应用 前言一、功能特点1. 支持 LLM 可处理的数据格式2. 全面抓取网站3. 强大的操作支持4. 灵活的定制选项5. 支持多种编程语言 SDK二、如何开始使用 Firecrawl第一步:获取 API 密钥第二步:官网在线工具使用第三步:安装 Firecr…

关于目标检测YOLO 各版本区别v1-v11/X/R/P

概述 YOLO&#xff08;You Only Look Once&#xff0c;你只看一次&#xff09;是一系列开创性的实时目标检测模型&#xff0c;它们彻底改变了计算机视觉领域。由Joseph Redmon开发&#xff0c;后续版本由不同研究人员迭代&#xff0c;YOLO模型以其在图像中检测对象的高速度和准…

SpringBoot3整合FastJSON2如何配置configureMessageConverters

在 Spring Boot 3 中整合 FastJSON 2 主要涉及到以下几个步骤&#xff0c;包括添加依赖、配置 FastJSON 作为 JSON 处理器等。下面是详细的步骤&#xff1a; 1. 添加依赖 首先&#xff0c;你需要在你的 pom.xml 文件中添加 FastJSON 2 的依赖。以下是 Maven 依赖的示例&#…