RocketMQ学习笔记:分布式事务

这是本人学习的总结,主要学习资料如下

  • 马士兵教育
  • rocketMq官方文档

目录

  • 1、分布式事务的难题
  • 2、解决方式
    • 2.1、半事务消息和事务回查
    • 2.2、代码样例
      • 2.2.1、TransactionListener
      • 2.2.2、TransactionMQProducer
      • 2.2.3、MessageListenerConcurrently
      • 2.2.4、流程图


1、分布式事务的难题

现有两个系统,A向B转钱。A系统扣钱和B系统加钱就应该属于同一个事务,任何一个失败都要回滚。两个系统之间唯一的通信方式就是RocketMQ
请添加图片描述

以最朴素的想法,现在就有两个实现分布式事务的方案。但这两个都有比较大的不可靠性。

  • A系统先扣钱再发送MQ:这样的弊端是无法确定消息有没有发送到MQ,或者消息有没有被MQ保存。总之这做法缺少一些回查的机制。
  • A系统先发送MQ再扣钱:这样的弊端是发送消息后,A系统可能出现错误回滚。而B收到了消息就正常消费,完全不知道A那边出了问题。

2、解决方式

2.1、半事务消息和事务回查

  • 半事务消息:半事务消息是指向RocketMQ发送一条消息,但这个消息只存放在CommitLog中,并不在ConsumeQueue展示。也就是说该消息被RocketMQ接收了,但是消费者却无法消费到这条消息。
  • 事务回查:在半事务消息发送成功后。A系统执行事务,如果成功则MQ将消息变成正常消息,失败则不发送消息。这里如果业务太复杂还不能确定事务是否完成的话,还可以发送UNKNOWN给MQ,这样MQ就会有定时器去检查事务是否完成。
    RocketMQ会向生产者询问是否可以把半事务变成正常的消息让消费者可以消费到。在这篇文章的例子就是询问A系统扣款有没有扣成功。如果成功了那就让B系统消费消息。

请添加图片描述

所以呢,通过半事务消息事务回查就能保证A系统和发送消息具有事务,即扣款失败则不发送消息,扣款成功则发送消息。所以半事务消息至少保证了生产者和MQ之间的原子性。MQ和消费者之间的原子性需要另外处理。

消费者需要保证幂等性,失败后重试,即使称为死信后也特殊处理等操作来保证事务。这个例子中B系统成功加钱的话那交易结束,如果尝试多次后还是失败,那就需要一个机制来通知A系统,让他把扣掉的钱加回去。

2.2、代码样例

2.2.1、TransactionListener

一个接口规范,我们需要实现这个接口来定义本地事务和事务回查。

就是本地事务具体执行,成功后怎么办,失败了怎么办。定时的事务回查如何检查事务有没有完成。这些东西都要定义在TransactionListener的实现中。


TransactionListener transactionListener = new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                // 执行本地事务,A扣100块
                // 如果成功
                // return LocalTransactionState.COMMIT_MESSAGE;
                // 如果失败
                // return LocalTransactionState.ROLLBACK_MESSAGE;
                //或者业务比较复杂,不想在这个阶段就关闭事务,可以返回Unknown,之后就需要MQ定时事务回查
                return LocalTransactionState.UNKNOW;
            }

            @Override
            // 事务回查,默认一分钟一次
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("事务回查, " + new SimpleDateFormat("yyyyMMdd, HH:mm:ss").format(new Date()));
                // 如果成功
                // return LocalTransactionState.COMMIT_MESSAGE;
                // 如果失败
                // return LocalTransactionState.ROLLBACK_MESSAGE;
                // 业务比较长,还不确定成功或失败,返回unknown,下次再查
                return LocalTransactionState.UNKNOW;
            }
        };

2.2.2、TransactionMQProducer

半事务消息的生产者,在DefaultMQProducer的基础上新增了一个重要的参数,类型是ExecutorService。这个线程池是用来生产线程去完成事务回查。

但是事务回查的逻辑不需要定义在线程的run()方法中,这一部分放在TransactionListener中。

 TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");
        producer.setNamesrvAddr("localhost:9876");
        // build a thread pool used to for MQ to call back to check transaction
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100
                , TimeUnit.MINUTES, new ArrayBlockingQueue<>(10), (r) -> {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
        );
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        try{
            Message msg = new Message("transaction_producer", null, "A give B 100 dollar".getBytes());
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
        }catch(Exception e) {
            // rollback
            System.out.println("rollback");
        }
        producer.shutdown();

2.2.3、MessageListenerConcurrently

消费者部分就比较简单,只要listener是MessageListenerConcurrently就好。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TransactionalTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
        try{
            for(MessageExt msg: list) {
                // simulate DB action
                System.out.println("update B where transactionId" + msg.getTransactionId());
                System.out.println("Success consume msg: " + msg.getMsgId());
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("Failed to consume meg, try more times");
            // means that failed to consume this msg. In next time will still consume this msg.
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        // means that success to consume this msg. In the next time will consume next msg.
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

consumer.start();
while(true){
}

2.2.4、流程图

请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/491785.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

浅谈双亲委派模型

本文浅析了双亲委派的基本概念、实现原理、和自定义类加载器的正确姿势。 对于更细致的加载loading过程、初始化initialization顺序等问题&#xff0c;文中暂不涉及&#xff0c;后面整理笔记时有相应的文章。 JDK版本&#xff1a;oracle java 1.8.0_102 基本概念 定义 双亲委…

智慧城市解决方案大全:标准规范顶层设计指南、整体解决方案、厂商售前宣讲PPT、招投标、智慧城市白皮书等全套680份,一次性打包下载

关键词&#xff1a;智慧城市&#xff0c;智慧城市解决方案&#xff0c;智慧城市发展的前景与趋势&#xff0c;智慧城市概念主力流出&#xff0c;智慧城市项目包括哪些方面&#xff0c;智慧城市项目方案&#xff0c;智慧城市宣传片&#xff0c;智慧城市白皮书&#xff0c;智慧城…

Linux如何将桌面版转为mini版-解决中文字体变为英文字体

中文字体转为英文字体 我们进入Rocky-Linux后&#xff0c;ls或者打开文件夹发现有中文 我们执行命令 sudo localedef -i en_US -f UTF-8 en_US.UTF-8将其转为英文&#xff0c;并且重启机器 此时中文转化为英文 桌面版linux转为MINN版 1. 我们可以卸载桌面版 sudo dnf gr…

c++ 跳转搜索(Jump Search)

与二分搜索一样&#xff0c;跳转搜索是一种针对排序数组的搜索算法。基本思想是通过按固定步骤向前跳跃或跳过某些元素来代替搜索所有元素来检查更少的元素&#xff08;比线性搜索&#xff09;。例如&#xff0c;假设我们有一个大小为 n 的数组 arr[] 和一个大小为 m 的块&…

PEReDi 完全隐私的央行数字货币方案

第一个对完全隐私保护建模的方案&#xff0c;基于账户模型&#xff0c;要求交易双方都在线。 角色分类 中央银行 B B B&#xff1a;负责发行数字货币和货币政策&#xff0c;但不控制用户账户的状态&#xff0c;没有能力对交易的发送者或接收者进行去匿名化或披露与特定交易相…

数据结构-队列-005

1链式队列 运行结果如下&#xff1a; 1.1链式队列结点定义 /*自定义一个数据类型*/ typedef struct student {char name[32];char sex;int age; }DATA_TYPE;/*定义一个链式队列结点*/ typedef struct link_queue_node {DATA_TYPE data;//数据域struct link_queue_node *pne…

SpringBoot和SpringCloud面试题

1、SpringBoot 1.1 和Spring对比 1.2 SpringBoot自动装配 springboot的自动装配实际上就是为了从spring.factories文件中获取到对应的需要进行自动装配的类&#xff0c;并生成相应的Bean对象&#xff0c;然后将它们交给spring容器来帮我们进行管理 原理 SpringBootApplicatio…

BUUCTF-Misc13

[ACTF新生赛2020]outguess1 1.打开附件 2.outguess outguess -k "abc" -r mmm.jpg flag.txt “-k “abc”” 表示使用密码 “abc” 进行解密&#xff1b; “-r” 表示提取信息的操作&#xff1b; “mmm.jpg” 是包含隐藏信息的源图像文件&#xff1b; “flag.txt” …

共用体详解

1 共用体的概念 有时需要使几种不同类型的变量存放到同一段内存单元中。例如,可把一个整型变量、一个字符型变量、一个实型变量放在同一个地址开始的内存单元中(见图11.24)。以上3个变量在内存中占的字节数不同,但都从同一地址开始(图中设地址为1000)存放。也就是使用覆盖技术…

“数据持久化”和“缓存与数据库不一致”到底有什么区别?

之前&#xff0c;我一直把“数据持久化”和“缓存与数据库不一致问题”给搞混了。我当时复习的时候基本上就没有思考&#xff0c;就是纯背诵&#xff0c;数据持久化是什么&#xff0c;数据持久化有两种方式&#xff0c;这两种方式特点是什么&#xff0c;然后巴拉巴拉一堆。缓存…

LC 100.相同的树

100. 相同的树 给你两棵二叉树的根节点 p 和 q &#xff0c;编写一个函数来检验这两棵树是否相同。 如果两个树在结构上相同&#xff0c;并且节点具有相同的值&#xff0c;则认为它们是相同的。 示例 1&#xff1a; 输入&#xff1a; p [1,2,3], q [1,2,3] 输出&#xff1…

【pytest、playwright】构建POM项目,以及解决登录问题,allure环境问题

目录 前言 1、文件目录 2、安装依赖 3、POM项目实战-案例&#xff1a;打开指定页面 目录结构&#xff1a; pages中的代码&#xff1a; cases中的代码&#xff1a; 4、解决登录问题 问题&#xff1a; 解决方案&#xff1a; 获取登录的用户信息&#xff08;cookie&a…

GTC 2024 火线评论:DPU 重构文件存储访问

编者按&#xff1a;英伟达2024 GTC 大会上周在美国加州召开&#xff0c;星辰天合 CTO 王豪迈在大会现场参与了 GPU 与存储相关的最新技术讨论&#xff0c;继上一篇《GTC 2024 火线评论&#xff1a;GPU 的高效存储利用》之后&#xff0c;这是他发回的第二篇评论文章。 上一篇文章…

数据意外变化导致条件判断流程异常

1. 问题描述 用户使用的 MCU 型号是 STM32H750VB。 在客户的代码中有多个条件语句&#xff0c;在条件里面的变量数值没有变化的情况下执行了条件里面的逻辑。 有点类似如下 C 语句 : If(变量 A !0) {//执行一些指令 }即变量 A 在明明没有变化且条件不满足的情况下, 程序运行时…

程序员卷王的简历

这真是一份淋漓尽致、低入尘埃、舔到骨髓的优势。 但从一个hr的角度来看&#xff0c;依然有可以继续提升的地方。 比如&#xff1a; 优势第一条本身就有问题&#xff0c;不懂劳动法&#xff1f;你怎么还会有劳动法这个概念&#xff01;你知道“劳动法”本身&#xff0c;这个…

自动采集实时海量主流电商平台API数据接口,让你拥有一手绝对好牌!

前言 你是否曾为获取重要数据而感到困扰&#xff1f;是否因为数据封锁而无法获取所需信息&#xff1f;是否因为数据格式混乱而头疼&#xff1f;现在&#xff0c;所有这些问题都可以迎刃而解。 平时需要从某些电商网站上抓取数据&#xff0c;那么这里以淘宝为示例给大家演示。这…

selenium元素定位--xpath定位--层级与逻辑组合定位

其他元素非唯一时&#xff0c;又不想用xpath绝对定位时&#xff0c;需要用到层级与逻辑定位. 一、层级属性结合定位&#xff1a; 遇到元素没有class、name、id等或属性动态变化情况时&#xff0c;可以找父节点元素&#xff0c;父级节点没有id时&#xff0c;可以继续往上找id&…

HeidiSQL导出SQL文件

目前开发阶段的数据库可视化工具逐渐转为了HeidiSQL&#xff0c;本文讲一讲导出到sql文件的小细节&#xff0c;给自己做个记录补充。 安装或数据库可视化工具比较可参考&#xff1a; windows下全免费手动搭建php8mysql8开发环境及可视化工具安装 导出 原来用Navicat的时候&am…

git下载安装教程

git下载地址 有一个镜像的网站可以提供下载&#xff1a; https://registry.npmmirror.com/binary.html?pathgit-for-windows/图太多不截了哈哈&#xff0c;一直next即可。

macOS Sonoma 14.4.1 (23E224) 正式版发布,ISO、IPSW、PKG 下载

macOS Sonoma 14.4.1 (23E224) 正式版发布&#xff0c;ISO、IPSW、PKG 下载 2024 年 3 月 26 日凌晨&#xff0c;macOS Sonoma 14.4.1 更新修复了一个可能导致连接到外部显示器的 USB 集线器无法被识别的问题。它还解决了可能导致 Java 应用程序意外退出的问题&#xff0c;并修…