【RocketMQ】顺序消费消息实现原理分析

一、顺序消息概述

1.1、什么是顺序消息

顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。

1.2、顺序消息的类型

全局顺序消息

对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费(单生产者单线程,单消费者单线程)

  • 适用场景

  • 适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。

分区顺序消息

对于指定的一个Topic,所有消息根据Sharding Key进行划分到不同队列中,同一个队列内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一队列内同一Sharding Key的消息保证顺序,不同队列之间的消息顺序不做要求。

  • 适用场景

  • 适用于性能要求高,以Sharding Key作为划分字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。

1.3、顺序消息的使用场景

顺序消息通常用于业务上有先后顺序要求的场景,比如订单创建、支付、发货等操作,必须按照一定的顺序执行,否则会导致错误的结果。RocketMQ提供了顺序消息的特性,可以满足这种业务场景的需求。

举个例子,某电商网站有一个订单创建系统,用户下单后需要按照如下顺序依次处理:

  1. 创建订单;

  2. 扣减库存;

  3. 发送短信通知用户。

如果这些操作不按照正确的顺序执行,就会出现一些问题。比如库存已经被扣减了,但是订单还没创建成功,这样可能会导致订单创建失败。RocketMQ的顺序消息可以保证这些操作按照正确的顺序执行。

二、顺序消息的实现原理

2.1、顺序消息实现案例

接下来看RoceketMQ源码中提供的顺序消息例子(稍微做了一些修改):

生产者
public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            // 创建生产者
            DefaultMQProducer producer = new DefaultMQProducer("生产者组");
            // 启动
            producer.start();
            // 创建TAG
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                // 生成订单ID
                int orderId = i % 10;
                // 创建消息
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // 获取订单ID
                        Integer id = (Integer) arg;
                        // 对消息队列个数取余
                        int index = id % mqs.size();
                        // 根据取余结果选择消息要发送给哪个消息队列
                        return mqs.get(index);
                    }
                }, orderId); // 这里传入了订单ID
                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}
消费者
public class Consumer {

    public static void main(String[] args) throws MQClientException {
        // 创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("消费者组");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 订阅主题
        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        // 注册消息监听器,使用的是MessageListenerOrderly
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                // 打印消息
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

从例子中可以看出生产者在发送消息的时候,通过订单ID作为路由信息,将同一个订单ID的消息发送到了同一个消息队列中,保证同一个订单ID相关消息有序发送,接下来就看消费者是如何保证消息的顺序消费的。

2.2、定时任务对消息队列加锁

消费者在启动的时候,会对是否是顺序消费进行判断(监听器是否是MessageListenerOrderly类型来判断),如果是顺序消费,会使用ConsumeMessageOrderlyService,并调用它的start方法进行启动,在集群模式模式下,start方法中会启动一个定时加锁的任务,周期性的对该消费者负责的消息队列进行加锁。

为什么集群模式下需要加锁?

  • 因为广播模式下,消息队列会分配给消费者下的每一个消费者,而在集群模式下,一个消息队列同一时刻只能被同一个消费组下的某一个消费者进行,所以在广播模式下不存在竞争关系,也就不需要对消息队列进行加锁,而在集群模式下,有可能因为负载均衡等原因将某一个消息队列分配到了另外一个消费者中,因此在顺序消费情况下,集群模式下需要对消息队列加锁,当某个消息队列被锁定时,其他的消费者不能进行消费。

加锁的具体逻辑如下,首先获取当前消费者负责的所有消息队列MessageQueue,返回数据是一个MAP,key为broker名称,value为broker下的消息队列,接着对MAP进行遍历,处理每一个broker下的消息队列:

  1. 根据broker名称查找broker的详细信息;

  2. 创建加锁请求,在请求中设置要加锁的消息队列,将请求发送给broker,表示要对这些消息队列进行加锁;

  3. Broker返回请求处理结果,响应结果中包含了加锁成功的消息队列,对于加锁成功的消息队列将消息队列MessageQueue,将其对应的ProcessQueue中的locked属性置为true表示该消息队列已加锁成功,如果响应中未包含某个消息队列的信息,表示此消息队列加锁失败,需要将其对应的ProcessQueue对象中的locked属性置为false表示加锁失败;

2.3、顺序消息拉取

上面可知,在使用顺序消息时,定时任务会周期性的对当前消费者负责的消息队列进行加锁,不过由于负载均衡等原因,有可能给当前消费者分配了新的消息队列,此时还未来得及通过定时任务加锁,所以消费者在构建消息拉取请求前会再次进行判断,如果是新分配到当前消费者的消息队列,同样会向Broker发送请求,对MessageQueue进行加锁,加锁成功将其对应的ProcessQueue中的locked属性置为true才可以拉取消息。

2.4、顺序消息消费

消息拉取成功之后,会将消息提交到线程池中进行处理,对于顺序消费处理逻辑如下:

  • 获取消息队列MessageQueue的对象锁,每个MessageQueue对应了一把Object对象锁,然后使用synchronized进行加锁,这里加锁的原因是因为顺序消费使用的是线程池,由多个线程同时进行消费,所以某个线程在处理某个消息队列的消息时需要对该消息队列MessageQueue加锁,防止其他线程并发消费该消息队列的锁,破坏消息的顺序性;
public class MessageQueueLock {
    private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();

    public Object fetchLockObject(final MessageQueue mq) {
        // 获取消息队列对应的对象锁,也就是一个Object类型的对象
        Object objLock = this.mqLockTable.get(mq);
        // 如果获取为空
        if (null == objLock) {
            // 创建对象
            objLock = new Object();
            // 加入到Map中
            Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
            if (prevLock != null) {
                objLock = prevLock;
            }
        }
        return objLock;
    }
}
  • 上一步获取锁成功之后,会再次校验该MessageQueue对应的ProcessQueue中的锁(locked状态),看是否过期或者已经失效,过期或者失效稍后会重新进行加锁;
  • 获取ProcessQueue的中的consumeLock消费锁,获取成功之后调用消息监听器的consumeMessage方法开始消费消费;
public class ProcessQueue {
   // 消息消费锁
   private final Lock consumeLock = new ReentrantLock();

   public Lock getConsumeLock() { // 获取消息消费锁
         return consumeLock;
   }
}
  • 消息消费完毕,释放ProcessQueueconsumeLock消费锁;

  • 方法执行完毕,释放MessageQueue对应的Object对象锁;

在第1步中就已经获取了MessageQueue对应的Object对象锁对消息队列进行加锁了,那么为什么在第3步消费消息之前还要再加一个消费锁呢?

猜测有可能是在消费者进行负载均衡时,当前消费者负责的消息队列发生变化,可能移除某个消息队列,那么消费者在进行消费的时候就要获取ProcessQueueconsumeLock消费锁进行加锁,相当于锁住ProcessQueue,防止正在消费的过程中,ProcessQueue被负载均衡移除。

既然如此,负载均衡的时候为什么不使用MessageQueue对应的Object对象锁进行加锁而要使用ProcessQueue中的consumeLock消费锁?

这里应该是为了减小锁的粒度,因为消费者在MessageQueue对应的Object加锁后,还进行了一系列的判断,校验都成功之后获取ProcessQueue中的consumeLock加锁,之后开始消费消息,消费完毕释放所有的锁,如果负载均衡使用MessageQueueObject对象锁需要等待整个过程结束,锁的粒度较粗,这样显然会降低性能,而如果使用消息消费锁,只需要等待第3步和第4步结束就可以获取锁,减少等待的时间,而且消费者在进行消息消费前也会判断ProcessQueue是否被移除,所以只要保证consumeMessage方法在执行的过程中(消息被消费的过程)ProcessQueue不被移除即可。

2.5、总结

消费者端,是通过加锁来保证消息的顺序消费,一共有三把锁:

  • 向Broker申请的消息队列 集群模式下一个消息队列同一时刻只能被同一个消费组下的某一个消费者进行,为了避免负载均衡等原因引起的变动,消费者会向Broker发送请求对消息队列进行加锁,如果加锁成功,记录到消息队列对应的ProcessQueue中的locked变量中。

  • 消息队列 对应MessageQueue对应的Object对象锁,消费者在处理拉取到的消息时,由于可以开启多线程进行处理,所以处理消息前需要对MessageQueue加锁,锁住要处理的消息队列,主要是处理多线程之间的竞争,保证消息的顺序性。

  • 消息消费锁 对应ProcessQueue中的consumeLock,消费者在调用consumeMessage方法之前会加消费锁,主要是为了避免在消费消息时,由于负载均衡等原因,ProcessQueue被删除

三、参考

  • 【RocketMQ】【源码】顺序消息实现原理

  • RocketMQ顺序消息机制源码分析~

  • RocketMQ 顺序消费机制

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/246340.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

经典深度学习算法【1】:K-近邻算法(KNN)概述

最简单最初级的分类器是将全部的训练数据所对应的类别都记录下来&#xff0c;当测试对象的属性和某个训练对象的属性完全匹配时&#xff0c;便可以对其进行分类。但是怎么可能所有测试对象都会找到与之完全匹配的训练对象呢&#xff0c;其次就是存在一个测试对象同时与多个训练…

JVM虚拟机系统性学习-JVM调优之GC日志分析

JVM 调优 首先&#xff0c;为什么要 JVM 调优呢&#xff1f; JVM 调优的目的就是为了让应用程序使用最小的硬件消耗来承载更大的吞吐量 什么情况下需要 JVM 调优呢&#xff1f; 系统吞吐量下降&#xff0c;或系统延迟较高出现 OOMFull GC 频繁GC 停顿时间过长&#xff08;超…

UE虚幻引擎中程序无需运行也可调试

首先先新建一个蓝图类&#xff0c;在蓝图类中创建一个Custom event 事件&#xff0c;然后在右侧细节面板中搜索call in editor&#xff0c;编译保存之后&#xff0c;将该蓝图类拖拽到关卡场景中&#xff0c;在细节面板中即可看到该事件的按钮。

PE文件格式-PE文件头部

文章目录 PE文件头基本概念简介地址对齐 PE结构概述PE文件头部DOS MZ 头PE头&#xff08;NT头&#xff09;PE头标识 Signature标准PE头 IMAGE_FILE_HEADERMachineNumberOfSectionsTimeDateStampPointToSymbolTableNumberOfSymbolSizeOfOptionalHeaderCharacteristics 扩展PE头 …

亚信科技AntDB数据库——深入了解AntDB-M元数据锁的相关概念

AntDB-M在架构上分为两层&#xff0c;服务层和存储引擎层。元数据的并发管理集中在服务层&#xff0c;数据的存储访问在存储引擎层。为了保证DDL操作与DML操作之间的一致性&#xff0c;引入了元数据锁&#xff08;MDL&#xff09;。 AntDB-M提供了丰富的元数据锁功能&#xff…

【Hadoop】执行start-dfs.sh启动hadoop集群时,datenode没有启动怎么办

执行start-dfs.sh后&#xff0c;datenode没有启动&#xff0c;很大一部分原因是因为在第一次格式化dfs后又重新执行了格式化命令&#xff08;hdfs namenode -format)&#xff0c;这时主节点namenode的clusterID会重新生成&#xff0c;而从节点datanode的clusterID 保持不变。 在…

认识缓存,一文读懂Cookie,Session缓存机制。

&#x1f3c6;作者简介&#xff0c;普修罗双战士&#xff0c;一直追求不断学习和成长&#xff0c;在技术的道路上持续探索和实践。 &#x1f3c6;多年互联网行业从业经验&#xff0c;历任核心研发工程师&#xff0c;项目技术负责人。 &#x1f389;欢迎 &#x1f44d;点赞✍评论…

基于VUE3+Layui从头搭建通用后台管理系统(前端篇)十四:系统设置模块相关功能实现

一、本章内容 本章使用已实现的公共组件实现系统管理中的系统设置模块相关功能,包括菜单管理、角色管理、日志管理、用户管理、系统配置、数据字典等。 1. 详细课程地址: 待发布 2. 源码下载地址: 待发布 二、界面预览 三、开发视频 3.1 B站视频地址:

硬件基础常识【4】--利用戴维宁定理求运放复杂反馈电阻网络的增益

最近学到了一种求带T型电阻网络反馈运放增益的方法 如图所示为T型电阻网络反馈的反相放大器 求解思路 沿X-Y断开&#xff0c;右侧利用戴维宁定理等效成电压源串电阻的形式 由戴维宁定理可得&#xff1a; V T H V o u t ∗ R 4 / ( R 3 R 4 ) ( 式 1 ) VTHVout*R4/(R3R4)…

布局前沿技术,紫光展锐推动6G创新融合发展

随着5G进入规模化商用阶段&#xff0c;6G研究已在全球范围内拉开帷幕。2023年6月&#xff0c;ITU发布了《IMT面向2030及未来发展的框架和总体目标建议书》&#xff0c;在升级5G三大应用场景的同时&#xff0c;扩展出三个跨领域场景&#xff0c;形成6G的六大应用场景&#xff0c…

unity—UGUI 点击按钮后,持续点击空格键会持续出发按钮

在unity开发当中&#xff0c;使用UGUI开发&#xff0c;无论是你代码绑定按钮事件&#xff0c;还是在Inspector窗口直接拖拽绑定的事件&#xff0c;点击按钮事件后&#xff0c;按空格键都会再次执行相关的方法。 默认情况下&#xff0c;Unity将空格键映射为UI按钮的Submit提交操…

【Hadoop】

Hadoop是一个开源的分布式离线数据处理框架&#xff0c;底层是用Java语言编写的&#xff0c;包含了HDFS、MapReduce、Yarn三大部分。 组件配置文件启动进程备注Hadoop HDFS需修改需启动 NameNode(NN)作为主节点 DataNode(DN)作为从节点 SecondaryNameNode(SNN)主节点辅助分…

Ubuntu18.04.6下samba服务的安装及配置

目录 01 安装samba服务&#xff1a; 03 重启samba服务 04 设置samba登录密码 05 测试 前言 虚拟机下Ubuntu18.04.6samba服务的安装及配置 01 安装samba服务&#xff1a; 命令行中输入 sudo apt-get install samba 02 配置 2.1 先创建一个需要共享的目录&#xff0c…

利用高级 CSPM 应对现代攻击

混合云和多云环境的快速增长造成了跨架构的复杂性&#xff0c;使得人们很难清楚、完整地了解技术堆栈中的各种平台。最近云攻击和破坏的激增引起了人们对团队应如何有效地保护和运行云中的应用程序的关注。 因为错误配置是云环境中安全威胁的首位&#xff0c;并且是基于云的攻…

kotlin 基础概览

继承类/实现接口 继承类和实现接口都是用的 : &#xff0c;如果类中没有构造器 ( constructor )&#xff0c;需要在父类类名后面加上 () &#xff1a; class MainActivity : BaseActivity(), View.OnClickListener 空安全设计 Kotlin 中的类型分为「可空类型」和「不可空类型」…

Java集合核心知识点总结

Java集合概述 从集合特点角度出发&#xff0c;Java集合可分为映射集、和单元素集合。如下图所示&#xff0c;单元素集合类图如下: collection包 : 工具单元素集合我们又可以分为&#xff0c;存储不可重复元素的Set集合&#xff0c;可顺序存储重复元素的List&#xff0c;以及F…

Processon的使用以及流程图的绘制

目录 一、ProcessOn 1.2 官方网站 门诊流程图 会议OA流程图 药库采购入库流程图 ​住院流程图 二、Axure自定义元件库 2.1 新建元件库 2.2 自定义元件 2.3 添加元件库 一、ProcessOn ProcessOn是一款在线的流程图、思维导图、组织结构图、网络拓扑图等多种图表类型…

OpenSSL的源码在哪里下载?

官方网站去下载&#xff0c;网址&#xff1a; https://www.openssl.org/source/ 比较老的版本的下载页面地址&#xff1a; https://www.openssl.org/source/old/ 由于某面板的OpenSSL模块的安装配置语句如下&#xff1a; --with-openssl/root/rpmbuild/BUILD/openssl-1.0.2u所…

STM32 寄存器配置笔记——USART DMA接收

一、简介 本文主要介绍STM32如何配合USART的IDLE中断实现USART DMA接收不定长的数据。其中使用的接收缓存还是延用前面博客写的乒乓缓存。使用DMA USART接收来替代中断方式或轮询方式的接收主要是为了提高代码的运行效率&#xff0c;中断方式的接收&#xff0c;每接收一个字节便…

Linux-----10、查找命令

# 查找命令 # 1、 命令查找 Linux下一切皆文件&#xff01; which 命令 &#xff1a;找出命令的绝对路径 whereis 命令 &#xff1a;找出命令的路径以及文档手册信息 [rootheima ~]# which mkdir /usr/bin/mkdir[rootheima ~]# whereis mkdir mkdir: /usr/bin/mkdir /usr/…