RocketMQ源码 发送顺序消息源码分析

前言

rocketmq 发送顺序消息和普通消息的主流程区别大部分一致的,区别在于:普通消息发送时,从所有broker的队列集合中 轮询选择一个队列,而顺序队列可以提供用户自定义消息队列选择器,从NameServer 分配的顺序 broker集合中选择一个队列。

源码版本:4.9.3

源码架构图

源码分析

发送普通消息源码在另外一篇文章https://blog.csdn.net/hzwangmr/article/details/135411495,这里主要阅读和普通消息有差异的部分。

顺序消息源码入口

org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message, org.apache.rocketmq.client.producer.MessageQueueSelector, java.lang.Object)

可以看到系统提供了一个 MessageQueueSelector 消息队列选择器,用于自定义选择队列的逻辑。

    /**
     * Same to {@link #send(Message)} with message queue selector specified.
     *
     * @param msg Message to send.
     * @param selector Message queue selector, through which we get target message queue to deliver message to.
     * @param arg Argument to work along with message queue selector.
     * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
     * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
     * @throws MQClientException if there is any client error.
     * @throws RemotingException if there is any network-tier error.
     * @throws MQBrokerException if there is any error with broker.
     * @throws InterruptedException if the sending thread is interrupted.
     */
    @Override
    public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        msg.setTopic(withNamespace(msg.getTopic()));
        return this.defaultMQProducerImpl.send(msg, selector, arg);
    }

触发队列选择器源码

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl

这里可以看到在调用发送消息核心 sendKernelImpl() 方法之前,会调用 selector.select() 函数,执行我们自定的选择逻辑。那么自定义的逻辑具体是什么呢?接着往下看

    private SendResult sendSelectImpl(
        Message msg,
        MessageQueueSelector selector,
        Object arg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);

        // 获取topic路由数据
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            try {
                // 解析发布消息队列
                List<MessageQueue> messageQueueList =
                    mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
                // 克隆消息
                Message userMessage = MessageAccessor.cloneMessage(msg);
                String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
                userMessage.setTopic(userTopic);

                // 利用消息队列选择器,选择一个队列
                mq = mQClientFactory.getClientConfig().queueWithNamespace(
                        // 自定义选择队列
                        selector.select(messageQueueList, userMessage, arg)
                );
            } catch (Throwable e) {
                throw new MQClientException("select message queue threw exception.", e);
            }

            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeout < costTime) {
                throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
            }
            if (mq != null) {
                // 如果选择出来的 MessageQueue 存在,这调用核心发送消息函数,发送消息
                return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
            } else {
                throw new MQClientException("select message queue return null.", null);
            }
        }

        validateNameServerSetting();
        throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
    }

源码工程自带发送顺序消息实例

可以看到,工程自带的发送顺序消息的example实例,针对 id相同的数据,选择了相同的消息队列,这样对于同一个实体的数据变化一定是有顺序的。

public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.start();

            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                int orderId = i % 10;
                Message msg =
                    new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // 自定义选择逻辑,可以理解为将相同的 orderId订单id,投递到相同的队列中
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

其他

topic对应的顺序队列怎么来的?

mq生产者客户端,在发送消息前,会从 NameServer中读取消息队里指定 topic对应的 topic路由信息,然后写到转换和缓存在内存多个数据结构里。其中,有一处就是下方的将 topic路由数据转换为topic 已发布信息,这里写入了messageQueueList (顺序消息待选择队列)。

转换逻辑是:

  1. 将路由信息写入topic发布信息;
  2. 判断当前路由信息是不是顺序 topic 配置;
  3. 解析顺序消息 topic配置;
  4. 遍历 broker,遍历broker 队列数量;
    1. 封装消息队列数据。topic -> broker ->队里id;
  5. 顺序队列数据,添加到 MessageQueueList 集合中;
    public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
        TopicPublishInfo info = new TopicPublishInfo();
        // 写入路由数据
        info.setTopicRouteData(route);
        // 顺序消息配置,顺序消息分配了多少个broker
        if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
            // 是顺序消息
            // 解析顺序消息topic配置
            String[] brokers = route.getOrderTopicConf().split(";");

            // 遍历顺序 broker,从namersrv拿到的broker数量
            for (String broker : brokers) {
                String[] item = broker.split(":");

                // 特定broker中的队列数量
                int nums = Integer.parseInt(item[1]);
                for (int i = 0; i < nums; i++) {
                    // 分装队列数据,topic--brokerName--队列id
                    MessageQueue mq = new MessageQueue(topic, item[0], i);
                    info.getMessageQueueList().add(mq);
                }
            }
            // 是顺序topic,打标
            info.setOrderTopic(true);
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/298665.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

动态编译 - Dynamically Compile and Load External Java Classes

文章目录 概述Code 概述 动态编译和加载外部Java类的核心流程可以概括为以下几个步骤&#xff1a; 读取源代码: 首先&#xff0c;需要获取到外部的Java源代码。这通常是通过读取文件、网络资源或者数据库中的源代码字符串来实现的。编译源代码: 接下来&#xff0c;需要使用Ja…

PHP在线sqlite转html表格小功能(sqlite2html)

6KB PHP实现在线sqlite转html表格小功能(支持大文件上传,得到一表一文件) 可自定义&#xff1a;上传限制大小&#xff1b;支持后缀格式!下载格式位压缩包&#xff0c;内含一表一个html文件。 作用&#xff1a;程序员实用工具&#xff0c;上传sqlite数据得到html表格数据供本地…

[ESP32]如何透過Modbus和Serial port擷取工業數顯表頭資料?

[ESP32]ESP32 as Modbus Master and Receive Data from Gauge with Serial Port 對於既有老舊的工業或實驗設備機台&#xff0c;嵌入工業數顯表頭並顯示設備運作參數和數據&#xff0c;以讓巡檢人員或操作人員手抄記錄數據&#xff0c;是常見作法。然而&#xff0c;若可將既有設…

个人笔记:分布式大数据技术原理(一)Hadoop 框架

Apache Hadoop 软件库是一个框架&#xff0c;它允许使用简单的编程模型&#xff0c;实现跨计算机集群的大型数据集的分布式处理。它最初的设计目的是为了检测和处理应用程序层的故障&#xff0c;从单个机器扩展到数千台机器&#xff08;这些机器可以是廉价的&#xff09;&#…

环形缓冲区优点及实现

环形缓冲区优点及实现 目录 环形缓冲区优点及实现一、环形缓冲区概念二、环形缓冲区优点1、一个有缺陷的数据读写示例2、使用环形缓冲区解决数据读写缺陷 三、环形缓冲区实现代码 一、环形缓冲区概念 环形缓冲区是一种特殊的缓冲区&#xff0c;其读指针和写指针都指向同一个缓…

MySQL之视图索引执行计划

目录 一.视图 二.执行计划 2.1.什么是执行计划 2.2.执行计划的作用 三.使用外连接、内连接和子查询进行举例 四.思维导图 好啦今天就到这里了哦&#xff01;&#xff01;&#xff01;希望能帮到你哦&#xff01;&#xff01;&#xff01; 一.视图 含义 &#xff1a;在数…

【BIAI】lecture 3 - GD BP CNN Hands-on

GD & BP & CNN & Hands-on 专业术语 gradient descent (GD) 梯度下降 back propagation (BP) 向传播 Convolutional Neural Network (CNN) 卷积神经网络 forward propagation 前向传播 biologically symmetry 生物对称性 synaptic 突触 axon 轴突 课程大纲 The go…

webgl调试之排查内存泄漏

内存泄漏自然而然是要看内存是不是涨了 然后我们如何确认泄露了呢&#xff0c;我们需要把代码梳理清楚&#xff0c;知道哪个时机&#xff0c;在delete&#xff0c;在create&#xff0c;那么这个时候&#xff0c;按道理&#xff0c;delete了n个对象&#xff0c;create了N个对象&…

Redis 键中冒号的用途是什么?可以使匹配查询更快吗?

Redis 键中冒号的用途是什么在Redis中&#xff0c;冒号&#xff08;:&#xff09;用作键的分隔符&#xff0c;它的主要作用是创建层次结构和命名空间。通过在键中使用冒号&#xff0c;可以将键分为多个部分&#xff0c;从而更好地组织和管理数据。 以下是冒号在Redis键中的用途…

2024苹果Mac电脑免费文件数据恢复软件EasyRecovery

EasyRecovery是一个操作安全、价格便宜、用户自主操作的非破坏性的只读应用程序&#xff0c;它不会往源驱上写任何东西&#xff0c;也不会对源驱做任何改变&#xff01;EasyRecovery是一个操作安全、价格便宜、用户自主操作的非破坏性的只读应用程序&#xff0c;它不会往源驱上…

MySQL第四战:视图以及常见面试题(上)

目录 目录&#xff1a; 一.视图 1.介绍什么是视图 2.视图的语法 语法讲解 实例操作 二.MySQL面试题 1.SQL脚本 2.面试题实战 三.思维导图 目录&#xff1a; 随着数字化时代的飞速发展&#xff0c;数据库技术&#xff0c;特别是MySQL&#xff0c;已经成为IT领域中不可…

短网址的新玩法,短到只剩域名

短网址大家应该都不陌生了&#xff0c;一句话就可以解释清楚&#xff0c;把一串很长的网址缩短到只有几个字符依然可以正常访问&#xff0c;缩短之后会更加简洁美观。 那大家见过的短网址一般长啥样呢&#xff0c;比如t.cn/xxxxx、dwz.cn/xxxxx、c1ns.cn/xxxxx。这些短网址都有…

初始MySQL

一、数据库 1.什么是数据库 数据库&#xff08; Database,简称DB &#xff09;&#xff1a;长期存放在计算机内&#xff0c;有组织、可共享的大量数据的集合&#xff0c;是一个数据“仓库” 2.数据库的作用 可以结构化存储大量的数据&#xff0c;方便检索和访问保持数据信息…

JVM工作原理与实战(八):类加载器的分类

专栏导航 JVM工作原理与实战 RabbitMQ入门指南 从零开始了解大数据 目录 专栏导航 前言 一、类加载器介绍 二、类加载器的分类 1.Java代码实现的类加载器 2.Java虚拟机底层源码实现的类加载器 3.默认的类加载器层次&#xff08;JDK8及之前的版本&#xff09; 总结 前言…

迅为RK3588开发板使用 FFMpeg 进行推流

Debian/Ubuntu 系统使用以下命令安装 FFMpeg &#xff0c;如下图所示&#xff1a; apt-get install ffmpeg 使用 ifconfig 查看开发板 ip 为 192.168.1.245 如下图所示&#xff1a; 使用 FFMpeg 推流一个 mp4 视频进行测试&#xff0c;作者将测试视频 test.mp4 放在了根目录下…

学习笔记——C++运算符之赋值运算符

上次我们说到C的运算符共有四种&#xff0c;分别是算术运算符&#xff0c;赋值运算符&#xff0c;比较运算符和逻辑运算符 &#xff0c;下面介绍赋值运算符&#xff0c;赋值运算符主要的种类及作用如下表所示。 #include<bits/stdc.h> using namespace std; int main(){…

求两个数之间的最小公约数

目录 前言 方法&#xff1a;求两个数之间的最小公约数 1.欧几里得算法 2.枚举法 3.公共因子积 4.更相减损术 5.Stein算法 解题&#xff1a;在链表中插入最大公约数 总结 前言 今天刷每日一题&#xff1a;2807. 在链表中插入最大公约数 - 力扣&#xff08;LeetCode&#xff09;…

jenkins安装报错:No such plugin: cloudbees-folder

jenkins安装报错&#xff1a;No such plugin: cloudbees-folder 原因是缺少cloudbees-folder.hpi插件 解决&#xff1a; 一&#xff0c;重新启动 http://xxx:8800/restart 二&#xff0c;跳到重启界面时&#xff0c;点击系统设置 三&#xff0c;找到安装插件&#xff0c;然…

Python基础-07(for循环、range()函数)

文章目录 前言一、for循环1.for循环结构2.参数 end&#xff08;使其输出时变为横向&#xff09; 二、range()函数1.range(常数)2.range(起始值&#xff0c;结束值)3.range(起始值&#xff0c;结束值&#xff0c;步长)4.例子 总结 前言 此章介绍循环结构中最常用的循环&#xf…

Go (一) 基础部分5 -- 单元测试,协程(goroutine),管道(channel)

一、单元测试 Go自带一个轻量级的"测试框架testing"和自带的"go test"命令来实现单元测试和性能测试。 1.确保每个函数时可运行&#xff0c;并且运行结果是正确的。 2.确保写出来的代码性能是好的。 3.单元测试能及时的发现程序设计或实现的逻辑错误&#…