RocketMQ 顺序消息和事务消息及其原理

RocketMQ 顺序消息和事务消息

  • 1、Spring Cloud Alibaba RocketMq 架构图
  • 2、RocketMQ 顺序消息
  • 2.1、RockerMQ 实现顺序消费
      • 2.1.1、顺序发消息
      • 2.1.2、顺序收消息
    • 2.2、顺序发送的技术原理
    • 2.3、顺序消费的技术原理
  • 3、RocketMQ 的事务消息
    • 3.1、RocketMQ 事务消息流程
    • 3.2、事务消息代码示例
    • 3.3、事务消息注意项

1、Spring Cloud Alibaba RocketMq 架构图

 
先上一张图,大致了解 Spring Cloud Alibaba RocketMq 架构图:
在这里插入图片描述

2、RocketMQ 顺序消息

 
RocketMQ的顺序消息分为:局部有序和全局有序。

  • 局部有序:指发送同一个队列的消息有序,可以在发送消息时指定队列,在消费消息时也按顺序消费。
    • 示例场景:同一订单ID的消息要保证有序,不同订单ID的消息没有约束,相互不影响,且不同订单ID之间的消息是并行的。
  • 全局有序:设置 Topic只有一个队列可以实现全局有序,创建Topic时手动设置。此类场景极少,性能差,一般不推荐使用。

 

2.1、RockerMQ 实现顺序消费

 
顺序消息分为两部分:顺序发送(发消息)、顺序消费(收消息)。以下为实现局部有序示例:

2.1.1、顺序发消息

 
顺序发消息方式一

  • ① 修改生产端项目配置为同步发送

    #在项目spring.propeties配置文件中,设置同步发送。(默认市异步发送)
    # 所有通过 RocketMQTemplate 或其他相关组件发送的消息都会以同步的方式发送。
    spring.cloud.stream.rocketmq.output.producer.sync=true
    
  • ② MessageBuilder 设置Header信息头,表示该消息是一条顺序消息,将消息固定发送到指定的消息队列。

    @RestController
    public class sendMsgController {
        
        @Autowired
        private Source source;
        
        @GetMapping(value = "/sendOrderlyMsg")
        public String sendOrderlyMsg() {
            List<String> list = Arrays.asList("状态1","状态2","状态3");
            for (String msg : list) {
                // 这里指定消息发送到第0各消息队列
                MessageBuilder builder = MessageBuilder.withPayload(msg)
                       .setHeader(BinderHeaders.PARTITION_HEADER, 0);
                Message message = builder.build();
                source.output().send(message);
            }
            return "success";
        }
        
    }
    

顺序发消息方式二

@Slf4j
@SpringBootTest
public class SendSyncOrderlyMessageTest {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void sendOrderedStringMessage() {
        String message = "这是一条同步顺序消息:";
        for (int i = 0; i < 5; i++) {
            // hashkey是为了确保这些消息被路由到同一个消息队列,这样消费者就能够按照顺序处理它们
            rocketMQTemplate.syncSendOrderly("orderilMessageTopic", message + i, "syncOrderlyHashKey");
        }
    }
}

 

2.1.2、顺序收消息

  • 修改消费端项目配置为顺序消费。

    # 指定为顺序消费,这里默认配置为并发消费
    spring.cloud.stream.rocketmq.binding.input.consumer.orderly=true
    

2.2、顺序发送的技术原理

 
       RocketMQ 发送消息的三种方式:同步、异步、单向。RocketMQ 发送顺序消息的原理,就是同一类消息发送到相同的队列即可。为保证先发送的消息先存储到消息队列,必须使用同步发送的方式,否则可能出现先发的消息后到消息队列,此时消息就会乱序。

  • 同步:发送网络请求后会同步等待 Broker 服务器的返回结果,支持发送失败重试,适用于比较重要的消息通知场景。
  • 异步:异步发送网络请求,不会阻塞当前线程,不支持失败重试,适用于对响应时间要求更高的场景。
  • 单向:单向和异步发送的原理一致,但是不支持回调。适用于响应时间非常短,对可靠性要求不高的场景,如日志收集。

 
撸一撸源码,首先看看 org.apache.rocketmq.spring.core.RocketMQTemplate#syncSendOrderly()

public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
    
    // …… 省略该类其他代码
     private DefaultMQProducer producer;
    
     private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();


    public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {
        return syncSendOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
    }


    public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
            log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
        }
        try {
            long now = System.currentTimeMillis();
            // 转成 RocketMQ API 中的 Message 对象
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
            // 调用发送消息接口,发送消息,选择队列由 messageQueueSelector 实现
            SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
            long costTime = System.currentTimeMillis() - now;
            if (log.isDebugEnabled()) {
                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
            }
            return sendResult;
        } catch (Exception e) {
            log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
            throw new MessagingException(e.getMessage(), e);
        }
    }
    
    // …… 省略该类其他代码
    
}

 
RocketMQTemplate#syncSendOrderly() 中选择队列的过程,由 MessageQueueSelector 和 hashKey 在实现类 SelectMessageQueueByHash 中完成。

package org.apache.rocketmq.client.producer.selector;

import java.util.List;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

public class SelectMessageQueueByHash implements MessageQueueSelector {

    // hash 值相同且 队列数相同,则消息发送到的队列相同
    // 这里的 mqs队列列表,由Producer从NameServer根据Topic查询Broker列表,缓存在本地内存中,以便下次从缓存获取。
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        // 根据 hashKey计算hash值,再用hash值和队列数 mqs.size()取模,得到一个索引值,结果小于队列数。
        int value = arg.hashCode() % mqs.size();
        if (value < 0) {
            // 索引取绝对值
            value = Math.abs(value);
        }
        // 根据索引值从队列中取出一个队列
        return mqs.get(value);
    }
}

 

2.3、顺序消费的技术原理

 
RocketMQ支持集群消费、广播消费两种模式。

  • 广播消费 模式:每条消息会被 ConsumerGroup 的每个 Consumer 消费。
  • 集群消费 模式:每条消息只会被 ConsumerGroup 的一个 Consumer 消费。(默认模式)
     

       顺序消费原理是,同一个消息队列只允许 Consumer 中的一个消费线程拉取消费。Consumer 中有个消费线程池,多个线程会同时消费消息。在顺序消费的场景下消费线程请求到 Broker 时,会先申请独占锁,获得锁的请求则允许消费。

       消息消费成功后,会向 Broker 提交消费进度,更新消费位点信息,避免下次拉取到已消费的消息。顺序消费中如果消费线程在监听器中进行业务处理时抛出异常,则不会提交消费进度,消费进度会阻塞在当前这条消息,并不会继续消费该队列中后续的消息,从而保证顺序消费。

       顺序消费需特别注意对异常的处理,如果重试也失败,会一直阻塞在当前消息,直到超出最大重试次数,从而在很长一段时间内无法消费后续消息造成队列消息堆积。
 
撸一下 org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService 类源码:

public class ConsumeMessageOrderlyService implements ConsumeMessageService {
    private final MessageQueueLock messageQueueLock = new MessageQueueLock();
    // ……省略其他代码
    class ConsumeRequest implements Runnable {
        private final ProcessQueue processQueue;
	    // ……省略其他代码
        @Override
        public void run() {

            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
 
                // ……省略其他代码 
				try {
					this.processQueue.getConsumeLock().lock();
					if (this.processQueue.isDropped()) {
						log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
							this.messageQueue);
						break;
					}

					status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
				} catch (Throwable e) {
					log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
						RemotingHelper.exceptionSimpleDesc(e),
						ConsumeMessageOrderlyService.this.consumerGroup,
						msgs,
						messageQueue), e);
					hasException = true;
				} finally {
					this.processQueue.getConsumeLock().unlock();
				}
				// ……省略其他代码
            }
        }
    }
}

 
org.apache.rocketmq.client.impl.consumer.ProcessQueue 类源码:

public class ProcessQueue {
    
    // ……省略其他代码
    private final Lock consumeLock = new ReentrantLock();
    
    public Lock getConsumeLock() {
        return consumeLock;
    }
    // ……省略其他代码
    
}

 

3、RocketMQ 的事务消息

 
RocketMQ 采用2PC方案来提交事务消息。

  • 第一阶段Producer 向 Broker 发送预处理消息(也称半消息),此时消息还未被投递出去,Consumer 不能消费
  • 第二阶段Producer 向 Broker 发送提交或回滚消息

 

3.1、RocketMQ 事务消息流程

 
RocketMQ 事务消息流程:

  • 1、发送预处理消息成功后,开始执行本地事务。

  • 2、如果本地事务执行成功,发送提交请求提交事务消息,消息会投递给 Consumer.
    在这里插入图片描述

  • 3、如果本地事务执行失败,发送回滚请求回滚事务消息,消息不会投递给 Consumer.
    在这里插入图片描述

  • 4、如果本地事务状态未知,网络故障或 Producer 宕机,Broker 未收到二次确认的消息。由 Broker 端发送请求给 Producer 进行消息回查,确认提交或回滚。如果消息状态一直未被确认,需要人工介入处理。
    在这里插入图片描述
     

3.2、事务消息代码示例

 

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.apache.rocketmq.common.message.MessageConst;
 
public class TransactionalMessageExample {
 
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
 
    public void sendTransactionalMessage() {
        // 构建消息
        Message<String> message = MessageBuilder.withPayload("Transactional message")
                .setHeader(RocketMQHeaders.TRANSACTION_ID, "transactionId-test") // 设置事务ID
                .build();
 
        // 发送事务消息
        rocketMQTemplate.sendMessageInTransaction("Topic:Tag", message, new RocketMQLocalTransactionListener() {
            // 执行本地事务,及本地事务执行成功后的操作
            @Override
            public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                try {
                    // 获取事务ID : transactionId-test
                    String transactionId = (String)message.getHeaders()
                        .get(RocketMQHeaders.TRANSACTION_ID);
                    // 假设以事务ID 作为主键,执行本地事务, save(transactionId) 为执行数据库操作
                    boolean result = save(transactionId);
                    return result ? RocketMQLocalTransactionState.COMMIT : 
                        RocketMQLocalTransactionState.ROLLBACK;
                } catch (Exception e) {
                    return RocketMQLocalTransactionState.ROLLBACK;
                }
            }
            
            // 检查本地事务状态,返回事务状态
            @Override
            public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
                // 获取事务ID : transactionId-test
                String transactionId = (String)message.getHeaders()
                    .get(RocketMQHeaders.TRANSACTION_ID);
                // 假设 isSuccess(transactionId) 是以事务ID为主键,查询本地事务执行情况
                if (isSuccess(transactionId)) {
                    return RocketMQLocalTransactionState.COMMIT;
                }
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        });
    }
}

 

3.3、事务消息注意项

 

  • ① RocketMQ 事务消息不支持延时消息和批量消息
  • 事务性消息可能不止一次被检查或消费
  • 事务回查的间隔时间:BrokerConfig.transactionCheckInterval,通过Broker的配置文件设置好。
  • ④ 事务性消息中用到了生产者群组,该机制是一种高可用机制,用以确保 RocketMQ 事务消息的可靠性。
  • ⑤ 单个消息的检查次数限制默认为15次,目的是为了避免单个消息被检查太多次,而导致队列消息累积。
    • 可以通过Broker配置的 transactionCheckMax 参数来修改该限制。
    • 如已经检查某条消息超过N次(N=transactionCheckMax)则Broker将丢弃此消息,并默认打印错误日志。
    • 可以通过重写AbstractTransactionCheckListener 类来自定义该行为。
  • ⑥ Broker配置文件的transactionMsgTimeout参数,也指定了RocketMQ 事务消息在特定时间长度之后被检查。
    • 当发送事务消息时,可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变transactionMsgTimeout参数的限制。
    • CHECK_IMMUNITY_TIME_IN_SECONDS 参数优先于 transactionMsgTimeout 参数。
  • ⑦ RocketMQ 事务消息的生产者ID,不能与其他类型消息的生产者ID共享。
    • 事务消息允许反向查询、MQ服务器能通过它们的生产者ID查询到消费者
  • ⑧ 如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
     
     
     
     
     
     
     
     
    .

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/759885.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

微服务之服务保护策略【持续更新】

文章目录 线程隔离一、滑动窗口算法二、漏桶算法三、令牌桶算法 面试题1、Sentinel 限流和Gateway限流的区别 线程隔离 两种实现方式 线程池隔离&#xff08;Hystix隔离&#xff09;&#xff0c;每个被隔离的业务都要创建一个独立的线程池&#xff0c;线程过多会带来额外的CPU…

emptyDir + initContainer实现ConfigMap的动态更新(K8s相关)

1. 絮絮叨叨 K8s部署服务时&#xff0c;一般都需要使用ConfigMap定义一些配置文件例如&#xff0c;部署分布式SQL引擎Presto&#xff0c;会在ConfigMap中定义coordinator、worker所需的配置文件以node.properties为例&#xff0c;node.environment和node.data-dir的值将由Helm…

Transformer丨基础Transformer模型和代码详解

笔者在深度学习入门期间自学过Transformer&#xff0c;但是那时碍于急于求成&#xff0c;并未对其进行深度归纳与分享。 近期&#xff0c;笔者观察到不论是自然语言处理模型还是视觉模型&#xff0c;已经几乎从传统的CNN、RNN的网络结构设计全面转向基于Transformer的结构设计…

002-基于Sklearn的机器学习入门:回归分析(上)

本节及后续章节将介绍机器学习中的几种经典回归算法&#xff0c;所选方法都在Sklearn库中聚类模块有具体实现。本节为上篇&#xff0c;将介绍基础的线性回归方法&#xff0c;包括线性回归、逻辑回归、多项式回归和岭回归等。 2.1 回归分析概述 回归&#xff08;Regression&…

Vue3学习(一)

创建组件实例&#xff1a;我们传入 createApp 的对象实际上是一个组件 import { createApp } from vue // 从一个单文件组件中导入根组件 import App from ./App.vueconst app createApp(App) 大多数真实的应用都是由一棵嵌套的、可重用的组件树组成的。 App (root compone…

AI大模型的崛起:第四次工业革命的前奏?

在当今这个信息爆炸的时代&#xff0c;人工智能&#xff08;AI&#xff09;大模型的崛起引起了广泛的关注和讨论。有人将其视为第四次工业革命的前奏&#xff0c;然而&#xff0c;这真的可能吗&#xff1f;本文将探讨这一问题&#xff0c;并对中国AI大模型的发展进行简要分析。…

Android:移动垃圾软件

讲解政策相关,最近升级AI扫荡系统和证书防高风险,回复按留言时间来排,请耐心等待 移动垃圾软件 官方政策公告行为透明、信息披露清晰保护用户数据不要损害移动体验软件准则反垃圾软件政策Google API 服务用户数据政策官方政策公告 ​ 在 Google,我们相信,如果我们关注用户…

DIY智能音箱:基于STM32的低成本解决方案 (附详细教程)

摘要: 本文详细介绍了基于STM32的智能音箱的设计与实现过程&#xff0c;包括硬件设计、软件架构、语音识别、音乐播放等关键技术。通过图文并茂的方式&#xff0c;结合Mermaid流程图和代码示例&#xff0c;帮助读者深入理解智能音箱的工作原理&#xff0c;并提供实际操作指导。…

[图解]分析模式高阶+课程讲解03物品模式

1 00:00:00,280 --> 00:00:03,440 下一个要探讨的模式是物品模式 2 00:00:04,310 --> 00:00:08,300 说是物品模式&#xff0c;实际上更多的说物品规格 3 00:00:09,210 --> 00:00:12,560 首先&#xff0c;我们要区分一下物品和物品规格的定义 4 00:00:14,440 -->…

【C++】C++ 网店销售库存管理系统(源码+论文)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化【获取源码商业合作】 &#x1f449;荣__誉&#x1f448;&#xff1a;阿里云博客专家博主、5…

抖音直播自动点赞脚本:让点赞变得简单

抖音直播自动点赞脚本&#xff1a;让点赞变得简单 简介 点赞是社交媒体上表达喜爱的一种方式&#xff0c;尤其在抖音这样的平台上&#xff0c;点赞不仅能够增加主播的人气&#xff0c;还能鼓励他们创作更多优质内容。然而&#xff0c;手动点赞往往既耗时又费力。为了解决这个…

算法与数据结构面试宝典——常见的数据结构都有哪些?详细示例(C#,C++)

文章目录 一、逻辑结构&#xff1a;线性与非线性线性数据结构非线性数据结构访问方式 二、数组&#xff08;Array&#xff09;三、链表&#xff08;LinkedList&#xff09;四、栈&#xff08;Stack&#xff09;五、队列&#xff08;Queue&#xff09;六、树&#xff08;Tree&am…

Android高级面试_6_性能优化

Android 高级面试-7&#xff1a;网络相关的三方库和网络协议等 1、网络框架 问题&#xff1a;HttpUrlConnection, HttpClient, Volley 和 OkHttp 的区别&#xff1f; HttpUrlConnection 的基本使用方式如下&#xff1a; URL url new URL("http://www.baidu.com")…

pytest测试框架pytest-random-order插件随机执行用例顺序

Pytest提供了丰富的插件来扩展其功能&#xff0c;本章介绍下pytest-random-order插件&#xff0c;随机设置pytest测试用例的运行顺序&#xff0c;并对随机性进行一些控制。 官方文档&#xff1a; https://pytest-cov.readthedocs.io/en/latest/index.html 适配版本说明&#x…

AI智能客服项目拆解(1) 产品大纲

本文作为拆解AI智能客服项目的首篇&#xff0c;以介绍产品大纲为主。后续以某AI智能客服产品为例&#xff0c;拆解相关技术细节。 AI智能客服是一种基于人工智能技术的客户服务解决方案&#xff0c;旨在提高客户满意度和优化企业运营。利用人工智能和自然语言处理技术&#xff…

如何为数据库中的位图添加动态水印

许多数据库存储了以blob或文件形式保存的位图&#xff0c;其中包括照片、文档扫描、医学图像等。当这些位图被各种数据库客户端和应用程序检索时&#xff0c;为了日后的识别和追踪&#xff0c;有时需要在检索时为它们添加唯一的水印。在某些情况下&#xff0c;人们甚至希望这些…

数字图像处理之【高斯金字塔】与【拉普拉斯金字塔】

数字图像处理之【高斯金字塔】与【拉普拉斯金字塔】 1.1 什么是高斯金字塔&#xff1f; 高斯金字塔&#xff08;Gaussian Pyramid&#xff09;是一种多分辨率图像表示方法&#xff0c;用于图像处理和计算机视觉领域。它通过对原始图像进行一系列的高斯平滑和下采样操作&#x…

istitle()方法——判断首字母是否大写其他字母小写

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 语法参考 istitle()方法用于判断字符串中所有的单词首字母是否为大写而其他字母为小写。istitle()方法的语法格式如下&#xff1a; str.istitle() …

Java并发编程基础知识点

目录 Java并发编程基础知识点1、线程&#xff0c;进程概念及二者的关系进程相关概念线程相关概念进程与线程的关系补充小知识点&#xff1a; 2、线程的状态Java线程的状态&#xff1a;Java线程不同状态之间的切换图示 3、Java程序中如何创建线程&#xff1f;①、继承Thread类②…

【python】python知名品牌调查问卷数据分析可视化(源码+调查数据表)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化【获取源码商业合作】 &#x1f449;荣__誉&#x1f448;&#xff1a;阿里云博客专家博主、5…