RocketMQ源码 发送 延迟消息 源码分析

前言

rocketMQ 支持的延迟消息,简单理解就是对于生产者发送的消息,支持设置固定时间的延迟级别,在到达指定的延迟时间时,才会投递到消费者队列,消费者才能消费到消息。

延迟队列和普通消息的发送流程,主要流程都是一致的,区别在于:

可以参考源码架构图来看,在 DledgerCommitLog 组件写入消息之前,会针对设置了setDelayTimeLevel 延迟级别的消息,改写  topic 为 RMQ_SYS_SCHEDULE_TOPIC,那么 ReputMessageService 消息重分发现场就会将消息投递到 RMQ_SYS_SCHEDULE_TOPIC 下。

再由 ScheduleMessageService 调度线程,定时将 RMQ_SYS_SCHEDULE_TOPIC 下的消息取出,判断是否到达延迟时间,如果到达延迟时间,就将消息重新投递到真实的Topic中,这样消费者就能消费到真是的消息。

源码版本:4.9.3

源码架构图

源码分析

1. 源码入口

本地源码入口从一个代码example工程中的一个示例,开始org.apache.rocketmq.example.schedule.ScheduledMessageProducer#main

public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        // Launch producer
        producer.start();
        int totalMessagesToSend = 100;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
            // This message will be delivered to consumer 10 seconds later.
            message.setDelayTimeLevel(3);
            // Send the message
            producer.send(message);
        }
        
        // Shutdown producer after use.
        producer.shutdown();
    }
    
}

... 发送流程和普通消息一致,对通用流程感兴趣可以看下另一篇文章,RocketMQ源码 发送消息源码分析-CSDN博客...

2. DledgerCommitLog 存储消息

broker接受的消息后,会调用 DLedgerCommitLog 存储组件,进行消息存储写入 CommitLog组件。在写的过程中有一个 setMessageInfo 信息的函数,会改写 topic 为 RMQ_SYS_SCHEDULE_TOPIC。

org.apache.rocketmq.store.dledger.DLedgerCommitLog#asyncPutMessage

    // 异步写入消息
    @Override
    public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {

        // 存储统计服务
        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        // 事务类型
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());

        // 设置消息信息:包含延迟消息的处理,改写延迟消息topic和queueId
        setMessageInfo(msg, tranType);

        final String finalTopic = msg.getTopic();
    }
    private void setMessageInfo(MessageExtBrokerInner msg, int tranType) {
        // Set the storage time
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));

        //should be consistent with the old version
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {

            // 延时消息,延迟投递
            // Delay Delivery
            // 如果延时级别大于0
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                // 系统级延时调度topic,用于延时调度投递
                String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                // 根据延时级别获取对应的队列,队列的id为delayLevel-1
                int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                // 设置topic和queueId
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
            msg.setBornHostV6Flag();
        }

        InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
            msg.setStoreHostAddressV6Flag();
        }
    }

... ReputMessageService 重分发服务会定时将延迟消息投递到 RMQ_SYS_SCHEDULE_TOPIC ...

3. ScheduleMessageService 定时调度消息服务

投递延迟消息定时任务,会定时 100ms 扫描 RMQ_SYS_SCHEDULE_TOPIC 主题下的队列中的消息,计算消息的延迟时间,是不是已经到期,如果到期,就将消息投递倒真实的 Topic中。

org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask

    // 投递延时消息定时任务
    class DeliverDelayedMessageTimerTask implements Runnable {
        // 延时级别
        private final int delayLevel;
        // 偏移量
        private final long offset;

        public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
            this.delayLevel = delayLevel;
            this.offset = offset;
        }

        @Override
        public void run() {
            try {
                if (isStarted()) {
                    // 执行延时消息投递
                    this.executeOnTimeup();
                }
            } catch (Exception e) {
                // XXX: warn and notify me
                log.error("ScheduleMessageService, executeOnTimeup exception", e);
                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);
            }
        }

        /**
         * 计算正确的延时投递时间,如果延时消息已经过期,返回当前时间
         * @return
         */
        private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {

            long result = deliverTimestamp;

            long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
            if (deliverTimestamp > maxTimestamp) {
                result = now;
            }

            return result;
        }

        // 执行延时消息投递
        public void executeOnTimeup() {
            // 根据topic、queueId获取对应的CQ
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

            if (cq == null) {
                // 延时指定时间后,再次执行
                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
                return;
            }

            // 根据偏移量读取到CQ内存映射数据
            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
            if (bufferCQ == null) {
                long resetOffset;
                if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
                        this.offset, resetOffset, cq.getQueueId());
                } else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {
                    log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
                        this.offset, resetOffset, cq.getQueueId());
                } else {
                    resetOffset = this.offset;
                }

                this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
                return;
            }

            long nextOffset = this.offset;
            try {
                int i = 0;
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();

                // 遍历CQ内存映射队列
                for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    long offsetPy = bufferCQ.getByteBuffer().getLong(); // 物理偏移量
                    int sizePy = bufferCQ.getByteBuffer().getInt(); // 物理大小
                    long tagsCode = bufferCQ.getByteBuffer().getLong(); // deliverTimestamp

                    if (cq.isExtAddr(tagsCode)) {
                        if (cq.getExt(tagsCode, cqExtUnit)) {
                            tagsCode = cqExtUnit.getTagsCode();
                        } else {
                            //can't find ext content.So re compute tags code.
                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                tagsCode, offsetPy, sizePy);
                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                        }
                    }

                    long now = System.currentTimeMillis();
                    // 计算正确的延时投递时间
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                    // 下一个消息的偏移量 = 起始偏移量 + 遍历的步长
                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                    long countdown = deliverTimestamp - now;
                    // 如果延时时间还未到,则继续等待,100毫秒后再次执行
                    if (countdown > 0) {
                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                        return;
                    }

                    // 读取消息
                    MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
                    if (msgExt == null) {
                        continue;
                    }

                    // 转换消息,还原消息真实的topic
                    MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
                    if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                        log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                            msgInner.getTopic(), msgInner);
                        continue;
                    }

                    boolean deliverSuc;
                    if (ScheduleMessageService.this.enableAsyncDeliver) {
                        deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
                    } else {
                        // 将消息同步投递到,真实的topic中
                        deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
                    }

                    if (!deliverSuc) {
                        // 如果投递失败,则继续等待,100毫秒后再次执行
                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                        return;
                    }
                }

                // 计算下一个消息的偏移量,下一个偏移量 = 起始偏移量 + 遍历的步长
                nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
            } catch (Exception e) {
                log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
            } finally {
                bufferCQ.release();
            }

            // 进行下一次调度
            this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
        }

        // 调度下一个延时投递消息
        public void scheduleNextTimerTask(long offset, long delay) {
            // 延时投递消息
            ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
                this.delayLevel, offset), delay, TimeUnit.MILLISECONDS);
        }

        // 同步投递消息
        private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
            int sizePy) {
            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false);
            PutMessageResult result = resultProcess.get();
            boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK;
            if (sendStatus) {
                ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());
            }
            return sendStatus;
        }

        private boolean asyncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
            int sizePy) {
            Queue<PutResultProcess> processesQueue = ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);

            //Flow Control
            int currentPendingNum = processesQueue.size();
            int maxPendingLimit = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
                .getScheduleAsyncDeliverMaxPendingLimit();
            if (currentPendingNum > maxPendingLimit) {
                log.warn("Asynchronous deliver triggers flow control, " +
                    "currentPendingNum={}, maxPendingLimit={}", currentPendingNum, maxPendingLimit);
                return false;
            }

            //Blocked
            PutResultProcess firstProcess = processesQueue.peek();
            if (firstProcess != null && firstProcess.need2Blocked()) {
                log.warn("Asynchronous deliver block. info={}", firstProcess.toString());
                return false;
            }

            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, true);
            processesQueue.add(resultProcess);
            return true;
        }

        // 异步投递消息
        private PutResultProcess deliverMessage(MessageExtBrokerInner msgInner, String msgId, long offset,
            long offsetPy, int sizePy, boolean autoResend) {
            CompletableFuture<PutMessageResult> future =
                ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);
            return new PutResultProcess()
                .setTopic(msgInner.getTopic())
                .setDelayLevel(this.delayLevel)
                .setOffset(offset)
                .setPhysicOffset(offsetPy)
                .setPhysicSize(sizePy)
                .setMsgId(msgId)
                .setAutoResend(autoResend)
                .setFuture(future)
                .thenProcess();
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/303693.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Dijkstra算法——邻接矩阵实现+路径记录

本文是在下面这篇文章的基础上做了一些补充&#xff0c;增加了路径记录的功能。具体Dijkstra的实现过程可以参考下面的这篇文章。 [jarvan&#xff1a;Dijkstra算法详解 通俗易懂](Dijkstra算法详解 通俗易懂 - jarvan的文章 - 知乎 https://zhuanlan.zhihu.com/p/338414118) …

软考高级选择考哪个好?

&#x1f4d2;软考高级总共5个科目&#xff0c;同样是高级证书&#xff0c;认可度也有区别! 大家一般在「信息系统项目管理师」✔️和「系统架构设计师」✔️二选一 1️⃣信息系统项目管理师 ❤️信息系统项目管理师也叫「高项」&#xff0c;考试内容主要是「项目管理」相关&am…

006-Zynq图像传输中cache刷新对视频的影响(讲究一个恰到好处)

文章目录 前言一、cache是什么玩意儿&#xff1f;二、解决方法1.Xil_DCacheInvalidateRange函数2.未刷新前的问题3.带刷新后的效果 总结 前言 也是移植过程中遇到的一个问题&#xff0c;尝试了一些解决方案&#xff0c;也算是解决了这个问题。 这个问题出现在通过以太网传输分…

为什么要使用云原生数据库?云原生数据库具体有哪些功能?

相比于托管型关系型数据库&#xff0c;云原生数据库极大地提高了MySQL数据库的上限能力&#xff0c;是云数据库划代的产品&#xff1b;云原生数据库最早的产品是AWS的 Aurora。AWS Aurora提出来的 The log is the database的理念&#xff0c;实现存储计算分离&#xff0c;把大量…

基于YOLOv8全系列【n/s/m/l/x】开发构建道路交通场景下CCTSDB2021交通标识检测识别系统

交通标志检测是交通标志识别系统中的一项重要任务。与其他国家的交通标志相比&#xff0c;中国的交通标志有其独特的特点。卷积神经网络&#xff08;CNN&#xff09;在计算机视觉任务中取得了突破性进展&#xff0c;在交通标志分类方面取得了巨大的成功。CCTSDB 数据集是由长沙…

CodeGPT,你的智能编码助手—CSDN出品

CodeGPT是由CSDN打造的一款生成式AI产品&#xff0c;专为开发者量身定制。 无论是在学习新技术还是在实际工作中遇到的各类计算机和开发难题&#xff0c;CodeGPT都能提供强大的支持。其涵盖的功能包括代码优化、续写、解释、提问等&#xff0c;还能生成精准的注释和创作相关内…

分布式系统架构设计之分布式消息队列架构解析

分布式消息队列架构是构建在分布式系统之上的消息队列架构&#xff0c;旨在提高高性能、高可用性和可伸缩性。它包括以下架构相关部分&#xff1a; 1、架构优势 分布式消息队列架构的优势主要体现在以下几个方面&#xff1a; 01 高可用性 在分布式消息队列架构中&#xff0…

十九:爬虫最终篇-平安银行商城实战

平安银行商场实战 需求 获取该商城商品信息 目标网址 https://m.yqb.com/bank/product-item-50301196.html?mcId1583912328849970&loginModepab&historyy&sceneModem&traceid30187_4dXJVel1iop详细步骤 1、寻找数据接口 2、对比payload寻找可疑参数 3、多…

上海亚商投顾:沪指再度失守2900点 全市场超4800只个股下跌

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 三大指数昨日继续调整&#xff0c;沪指跌超1%再度失守2900点&#xff0c;深成指、创业板指均创出调整新低&…

【算法练习】leetcode算法题合集之二叉树篇

递归遍历基础篇 前序遍历&#xff0c;中序遍历&#xff0c;后序遍历是根据处理根节点的位置来命名的。 树的处理大多用到了递归&#xff0c;递归需要知道终止条件。 前序遍历&#xff08;中左右&#xff09; 144.二叉树的前序遍历 中左右&#xff0c;先处理根节点&#xff0c;…

ASP .net core微服务实战

>>>>>>>>>>>>>>开发<<<<<<<<<<<<<<<< 0)用户 用户到nginx之间需要用https&#xff0c;避免被监听。 1)nginx // 做统一的分发&#xff0c;到微服务&#xff0c;相当于网关,提供统…

异常处理:全面覆盖与精细化管理的平衡

异常处理&#xff1a;全面覆盖与精细化管理的平衡 在软件开发中&#xff0c;异常处理是保证系统稳定性和用户体验的重要环节。对于是否应当全面覆盖所有异常并设立兜底机制&#xff0c;业界存在着两种主流思路&#xff1a;全面覆盖原则和精细化处理。如何在这两者间取得平衡&a…

Unity文字转语音(使用RT-Voice PRO [2023.1.0])

参考文章Unity插件——文字转朗读语音RtVioce插件功能/用法/下载_rtvoice-CSDN博客 一、使用步骤 1.导入进Unity&#xff08;插件形式为 .unitypackage&#xff09; https://download.csdn.net/download/luckydog1120446388/88717512 2.添加所需Prefab 1&#xff09;.右键可…

【科技素养题】少儿编程 蓝桥杯青少组科技素养题真题及解析第22套

少儿编程 蓝桥杯青少组科技素养题真题及解析第22套 1、植物的叶子多为绿色,这主要是因为它们含有 A、绿色色素 B、叶绿素 C、花青素 D、细胞 答案:B 考点分析:主要考查小朋友们生物知识的储备;叶绿素是植物叶子中的一种色素,它可以吸收太阳光中的能量并转化为植物所…

【深度学习:Domain Adversarial Neural Networks (DANN) 】领域对抗神经网络简介

【深度学习&#xff1a;Domain Adversarial Neural Networks】领域对抗神经网络简介 前言领域对抗神经网络DANN 模型架构DANN 训练流程DANN示例 GPT示例 前言 领域适应&#xff08;DA&#xff09;指的是当不同数据集的输入分布发生变化&#xff08;这种变化通常被称为共变量变…

synchronized和lock的区别

synchronized和lock的区别 1&#xff09;synchronized是一个关键字&#xff0c;lock是一个java类&#xff1b; 2&#xff09;synchronized无法判断获取锁的状态&#xff0c;lock可以判断是否获取到了锁&#xff1b; 3&#xff09;synchronized会自动释放锁&#xff0c;lock必须…

《罗素论教育》笔记

目录 全书架构 书简介 经典摘录 一、教育的理想 教育的基本原理 教育的目的 二、品性的教育 一岁前的教育 主要是2岁到6岁的教育 三、智力教育 14岁前的课程安排 最后的学年 大学教育 四、结束语 全书架构 书简介 经典摘录 一、教育的理想 教育的基本原理 1、我…

Python从入门到网络爬虫(读写Excel详解)

前言 Python操作Excel的模块有很多&#xff0c;并且各有优劣&#xff0c;不同模块支持的操作和文件类型也有不同。最常用的Excel处理库有xlrd、xlwt、xlutils、xlwings、openpyxl、pandas&#xff0c;下面是各个模块的支持情况&#xff1a; 工具名称.xls.xlsx获取文件内容写入…

LitJson-Json字符串转对像时:整型与字符串或字符串转:整型进的类型不一致的处理

目录 问题描述上代码测试代码各位看官&#xff0c;打赏个1元吧 Json数据格式是大家在游戏开中常量用的一种数据格式&#xff0c;某种程度上可以说是必备的。对unity开发来说&#xff0c;LitJson这个json库应该是被使用最多的json库了。 问题描述 今天说要的其中的这个api: Jso…

2024年中国电子学会青少年编程等级考试安排的通知

各有关单位、全体考生: 中国电子学会青少年等级考试&#xff08;以下简称等级考试&#xff09;是中国电子学会为落实《全民科学素质行动规划纲要》&#xff0c;提升青少年电子信息科学素质水平而开展的社会化评价项目。等级考试自2011年启动以来&#xff0c;作为中国电子学会科…