009 rocketmq延时消息

文章目录

  • 延迟消息
    • 第⼀步:修改消息Topic名称和队列信息
    • 第⼆步:转发消息到延迟主题的CosumeQueue中
    • 第三步:延迟服务消费SCHEDULE_TOPIC_XXXX消息
    • 第四步:将信息重新存储到CommitLog中
    • 第五步:将消息投递到目标Topic中
    • 第六步:消费者消费目标topic中的数据
    • ScheduledMessageProducer.java
    • ScheduledMessageConsumer.java

延迟消息

两个关键要素:临时存储 延时服务
内置队列,RocketMQ包含多个内置队列。
特点:对消费者不可见,消费者不能消费
SCHEDULE_TOPIC_XXXX对应的队列是一种内置队列

注意:RocketMQ不⽀持任意时间的延时,只⽀持以下⼏个固定的延时等级

String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

RocketMQ的延迟等级可以进⾏修改,以满⾜⾃⼰的业务需求,可以修改/添加新的level。
例如:你想⽀持1天的延迟,修改最后⼀个level的值为1d,这个时候依然是18个level;也可以增加⼀个1d,这个时候总共就有19个level。

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d

实现原理
延迟队列的核⼼思路是:所有的延迟消息由producer发出之后,都会存放到同⼀个topic(SCHEDULE_TOPIC_XXXX)下,根据延迟level的个数,创建对应数量的队列,也就是说18个level对应了18个队列。注意,这并不是说这个内部主题只会有18个队列,因为Broker通常是集群模式部署的,因此每个节点都有18个队列。不同的延迟级别会对应不同的队列序号,当延迟时间到之后,由
定时线程读取转换为普通的消息存的真实指定的topic下,此时对于consumer端此消息才可⻅,从⽽被consumer消费。
⽣产者在发送延迟消息⾮常简单,只需要设置⼀个延迟级别即可,注意不是具体的延迟时间,如:

Message msg=new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//设置延迟level为5,对应延迟1分钟
msg.setDelayTimeLevel(5);
producer.send(msg);

延迟消息在RocketMQ Broker端的流转如下图所示:

延迟消息
可以看到,总共有6个步骤,下⾯会对这6个步骤进⾏详细的讲解:

  1. 修改消息Topic名称和队列信息
  2. 转发消息到延迟主题SCHEDULE_TOPIC_XXXX的CosumeQueue中
  3. 延迟服务消费SCHEDULE_TOPIC_XXXX消息
  4. 将信息重新存储到CommitLog中
  5. 将消息投递到⽬标Topic中
  6. 消费者消费⽬标topic中的数据

第⼀步:修改消息Topic名称和队列信息

RocketMQ Broker端在存储⽣产者写⼊的消息时,⾸先都会将其写⼊到CommitLog中。之后根据消息中的Topic信息和队列信息,将其转发到⽬标Topic的指定队列(ConsumeQueue)中。
由于消息⼀旦存储到ConsumeQueue中,消费者就能消费到,⽽延迟消息不能被⽴即消费,所以这⾥将Topic的名称修改为SCHEDULE_TOPIC_XXXX,并根据延迟级别确定要投递到哪个队列下。
同时,还会将消息原来要发送到的⽬标Topic和队列信息存储到消息的属性中。

如果是延迟消息,修改topic,通过timelevel判断是延时消息
如果设置的级别超过了最大级别,重置延迟级别
更改topic
相同的延迟等级发送到同一个队列,不同等级不同队列
记录真正的topic和队列

/store/consumequeue/SCHEDULE_TOPIC_XXXX/2
相关源码如下所示:
org.apache.rocketmq.store.CommitLog#putMessage

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {}

第⼆步:转发消息到延迟主题的CosumeQueue中

CommitLog中的消息转发到CosumeQueue中是异步进⾏的。在转发过程中,会对延迟消息进⾏特殊处理,主要是计算这条延迟消息需要在什么时候进⾏投递。

投递时间=消息存储时间(storeTimestamp) + 延迟级别对应的时间

需要注意的是,会将计算出的投递时间当做消息Tag的哈希值存储到CosumeQueue中,CosumeQueue单个存储单元组成结构如下图所示:
组成结构
其中:
Commit Log Offset:记录在CommitLog中的位置。
Size:记录消息的⼤⼩
Message Tag HashCode:记录消息Tag的哈希值,⽤于消息过滤。特别的,对于延迟消息,这个字段记录的是消息的投递时间戳。这也是为什么java中hashCode⽅法返回⼀个int型,只占⽤4个字节,⽽这⾥Message Tag HashCode字段却设计成8个字节的原因。
相关源码参⻅:
CommitLog#checkMessageAndReturnSize

public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer
byteBuffer, final boolean checkCRC,
 final boolean readBody) {}

第三步:延迟服务消费SCHEDULE_TOPIC_XXXX消息

Broker内部有⼀个ScheduleMessageService类,其充当延迟服务,消费SCHEDULE_TOPIC_XXXX中的消息,并投递到⽬标Topic中。
ScheduleMessageService在启动时,其会创建⼀个定时器Timer,并根据延迟级别的个数,启动对应数量的TimerTask,每个TimerTask负责⼀个延迟级别的消费与投递。
相关源码如下所示:
ScheduleMessageService#start

public void start() {}

需要注意的是,每个TimeTask在检查消息是否到期时,⾸先检查对应队列中尚未投递第⼀条消息,如果这条消息没到期,那么之后的消息都不会检查。如果到期了,则进⾏投递,并检查之后的消息是否到期。

第四步:将信息重新存储到CommitLog中

在将消息到期后,需要投递到⽬标Topic。由于在第⼀步已经记录了原来的Topic和队列信息,因此这⾥重新设置,再存储到CommitLog即可。此外,由于之前Message Tag HashCode字段存储的是消息的投递时间,这⾥需要重新计算tag的哈希值后再存储。
源码参见:DeliverDelayedMessageTimerTask#messageTimeup⽅法

private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {}

第五步:将消息投递到目标Topic中

第六步:消费者消费目标topic中的数据

不同的延迟级别放在不同的队列序号下(queueId=delayLevel-1)。每⼀个延迟级别对应的延迟消息转换为普通消息的位置标识存放在~/store/config/delayOffset.json⽂件内。
key为对应的延迟级别,value对应不同延迟级别转换为普通消息的offset值。

{
 "offsetTable":{3:202,4:2,5:2,6:2,7:2,8:2,9:2,10:2,11:2
 }

ScheduledMessageProducer.java

package com.example.rocketmq.demo.scheduled;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class ScheduledMessageProducer {

    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        producer.setNamesrvAddr("localhost:9876");
        // Launch producer
        producer.start();
        for (int i = 0; i < 2; i++) {
            Message message = new Message("TopicTest", ("Hello scheduled message " + i).getBytes());
            // This message will be delivered to consumer 10 seconds later.
            //设置延时等级,3 是等级 不是3s
            message.setDelayTimeLevel(3);

            // Send the message
            producer.send(message);
        }

        // Shutdown producer after use.
        producer.shutdown();
    }

}

ScheduledMessageConsumer.java

package com.example.rocketmq.demo.scheduled;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ScheduledMessageConsumer {

    public static void main(String[] args) throws Exception {
        // Instantiate message consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
        // Subscribe topics
        consumer.subscribe("TopicTest", "*");
        // Register message listener
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // Print approximate delay time period
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                            + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // Launch consumer
        consumer.start();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/979845.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

筑牢安全防线:工商业场所燃气泄漏防护新方案

燃气安全是企业经营不可逾越的生命线。在餐饮后厨、化工车间、酒店锅炉房等场所&#xff0c;可燃气体一旦泄漏&#xff0c;极易引发严重事故。如何实现精准监测、快速响应&#xff0c;成为工业及商业领域安全管理的核心诉求。旭华智能深耕安全监测领域&#xff0c;推出的工业及…

Android15 Camera HAL Android.bp中引用Android.mk编译的libB.so

背景描述 Android15 Camera HAL使用Android.bp脚本来构建系统。假设Camera HAL中引用了另外一个HAL实现的so &#xff08;例如VPU HAL&#xff09;&#xff0c; 恰巧被引用的这个VPU HAL so是用Android.mk构建的&#xff0c;那Camera HAL Android.bp在直接引用这个Android.mk编…

基于SSM实现的bbs论坛系统功能实现八

一、前言介绍&#xff1a; 1.1 项目摘要 随着互联网技术的不断进步和普及&#xff0c;网络社区已成为人们获取信息、交流意见、分享经验的重要场所。BBS&#xff08;Bulletin Board System&#xff0c;电子公告板系统&#xff09;论坛系统作为网络社区的一种重要形式&#xf…

Javaweb数据库多表查询 内连接 外连接 子查询

内连接 外连接 左外连接&#xff0c;左边是全部表 表名&#xff0c;即使没有匹配右边的数据&#xff0c;也要查询出来 子查询 案例 1.没有说所有的部门&#xff0c;所有的员工&#xff0c;用内连接&#xff08;隐式内连接&#xff09;

零知识证明与 ZK Rollups 详解

零知识证明与 ZK Rollups 详解 &#x1f510; 1. 零知识证明基础 1.1 什么是零知识证明&#xff1f; 零知识证明&#xff08;ZKP&#xff09;允许证明者向验证者证明一个陈述的真实性&#xff0c;而无需透露除了该陈述是真实的这一事实之外的任何信息。 1.2 核心特性 完整性…

《操作系统 - 清华大学》 8 -11:进程管理:上下文切换

进程管理之上下文切换与进程控制详解 一、上下文切换的定义 在多程序运行环境下&#xff0c;程序以进程形式存在&#xff0c;且多个进程共享CPU资源。不同时刻&#xff0c;进程需要切换以获取CPU执行权&#xff0c;这个切换过程被称为进程的上下文切换。“上下文”英文为“co…

Unity中动态切换光照贴图的方法

关键代码&#xff1a;LightmapSettings.lightmaps lightmapDatas; LightmapData中操作三张图&#xff1a;lightmapColor,lightmapDir,以及一张ShadowMap 这里只操作前两张&#xff1a; using UnityEngine; using UnityEngine.EventSystems; using UnityEngine.UI;public cl…

微服务笔记 2025/2/15

微服务是一种软件架构风格&#xff0c;它是以专注于单一职责的很多小型项目为基础&#xff0c;组合出复杂的大型应用。 微服务是一种架构。 微服务是一种架构。 微服务是一种架构。 以前自己做项目最常用的架构是单体架构。单体项目不适合开发大型项目。 学习微服务技术来解…

Android 端侧运行 LLM 框架 MNN 及其应用

MNN Chat Android App - 基于 MNN 引擎的智能聊天应用 一、MNN 框架简介与工作原理1.1 什么是 MNN&#xff1f;1.2 MNN 的工作原理 二、MNN Chat Android App2.1 MNN Chat 的功能2.2 MNN Chat 的优势2.3 MNN Chat Android App 的使用 三、总结 随着移动端人工智能需求的日益增长…

基于Python 宠物用品库存管理系统开发

Python 宠物用品库存管理系统开发 一、项目背景与需求分析 在宠物行业蓬勃发展的当下&#xff0c;宠物用品店的商品种类繁多&#xff0c;库存管理变得尤为重要。为了提高管理效率、减少人为错误&#xff0c;我们可以开发一个宠物用品库存管理系统。该系统需要具备商品信息管理…

Linux---共享内存

1.ipcs命令 IPC机制是一个让人烦恼的问题&#xff1a;编写错误的程序或因为某些原因而执行失败的程序将把它的IPC资源&#xff08;如消息队列中的数据&#xff09;遗留在系统里&#xff0c;并且这些资源在程序结束后很长时间让然在系统中游荡&#xff0c;这导致对程序的新调用…

数据结构与算法:二叉树

目录 树的概念 二叉树 二叉树性质 二叉树的遍历 前序遍历 中序遍历 后序遍历 层序遍历 二叉树节点个数 二叉树叶子节点个数 二叉树高度 二叉树第k层节点个数 二叉树查找值为x的节点 判断二叉树是否是完全二叉树 二叉树销毁 树的概念 树型结构是一类重要的非线性…

中科大计算机网络原理 1.5 Internt结构和ISP

一、互联网的层次化架构 ‌覆盖范围分层‌ ‌主干网&#xff08;Tier-1级&#xff09;‌ 国家级或行业级核心网络&#xff0c;承担跨区域数据传输和全球互联功能。例如中国的四大主干网&#xff08;ChinaNET、CERNET等&#xff09;以及跨国运营商&#xff08;如AT&T、Deuts…

AI编程界的集大成者——通义灵码AI程序员

一、引言 随着软件行业的快速发展和技术的进步&#xff0c;人工智能&#xff08;AI&#xff09;正在成为软件开发领域的一个重要组成部分。近年来&#xff0c;越来越多的AI辅助工具被引入到开发流程中&#xff0c;旨在提高效率、减少错误并加速创新。在这样的背景下&#xff0…

GPT-4.5震撼登场,AI世界再掀波澜!(3)

GPT-4.5震撼登场&#xff0c;AI世界再掀波澜! GPT-4.5震撼登场&#xff0c;AI世界再掀波澜!(2) &#xff08;一&#xff09;伦理困境&#xff1a;如何抉择 GPT-4.5 的强大功能在为我们带来诸多便利的同时&#xff0c;也引发了一系列深刻的伦理问题&#xff0c;这些问题犹如高…

electron-builder打包时github包下载失败【解决办法】

各位朋友们&#xff0c;在使用electron开发时&#xff0c;选择了electron-builder作为编译打包工具时&#xff0c;是否经常遇到无法从github上下载依赖包问题&#xff0c;如下报错&#xff1a; Get "https://github.com/electron/electron/releases/download/v6.1.12/ele…

【Linux】命令行参数 | 环境变量(四)

目录 前言&#xff1a; 一、命令行参数&#xff1a; 1.main函数参数 2.为什么有它&#xff1f; 二、环境变量&#xff1a; 1.main函数第三个参数 2.查看shell本身环境变量 3.PATH环境变量 4.修改PATH环境变量配置文件 5.HOME环境变量 6.SHELL环境变量 7.PWD环境变…

计算机毕业设计Python+DeepSeek-R1大模型游戏推荐系统 Steam游戏推荐系统 游戏可视化 游戏数据分析(源码+文档+PPT+讲解)

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

Python实现GO鹅优化算法优化BP神经网络回归模型项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后关注获取。 1.项目背景 传统BP神经网络的局限性&#xff1a;BP&#xff08;Back Propagation&#xff09;神经网络作为一种…

1.忆往昔—Java发展史

在编程世界的远古时代&#xff0c;C语言和C统治着大地&#xff0c;但它们复杂且难以驾驭。1995年5月23日&#xff0c;Java 1.0正式发布&#xff0c;它像一把神奇的钥匙&#xff0c;打开了“一次编写&#xff0c;到处运行”的大门。 早在1991年Java就已经初见雏形&#xff0c;不…