如何使用rocketmq实现分布式事务?

什么是rocketmq事务消息

事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

RocketMQ的分布式事务又称为“半消息事务”。

事务消息处理流程

RocketMQ是靠半消息机制实现分布式事务

事务消息:MQ 提供类似 X/Open XA 的分布事务功能,通过 MQ 事务消息能达到分布式事务的最终一致。

半消息:暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。

半消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。

事务消息交互流程如下图所示。

图片

1. 生产者将消息发送至Apache RocketMQ服务端。

2. Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

3. 生产者开始执行本地事务逻辑。

4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

• 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

• 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 说明 服务端回查的间隔时间和最大回查次数,请参见参数限制。

6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

事务消息生命周期

图片

事务消息

• 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。

• 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。

• 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。

• 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。

• 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。

• 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

• 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

示例

下面是使用 RocketMQ 实现事务的一个例子:

生产者实现事务监听器:

首先,需要实现一个 RocketMQ 的事务监听器接口RocketMQLocalTransactionListener,这个接口定义了在发送和确认事务消息时的回调方法。您需要根据业务逻辑来实现这些方法。

executeLocalTransaction 方法:

这个方法在发送事务消息时被调用,用于执行本地事务。具体步骤如下:

1. 获取消息中的事务 ID。

2. 根据事务索引来模拟本地事务执行的状态。

3. 将事务状态放入 localTrans 映射中,以备后续 checkLocalTransaction 方法使用。

根据您的代码,executeLocalTransaction 方法中模拟了三种状态:

• 如果状态为 0,表示本地事务成功,返回 RocketMQLocalTransactionState.COMMIT,消息将被提交。

• 如果状态为 1,表示本地事务失败,返回 RocketMQLocalTransactionState.ROLLBACK,消息将被回滚。

• 如果状态为 2,表示本地事务状态未知,返回 RocketMQLocalTransactionState.UNKNOWN

checkLocalTransaction 方法:

这个方法在消息的确认状态时被调用,用于检查本地事务的状态。具体步骤如下:

  1. 获取消息中的事务 ID。

  2. 根据之前保存在 localTrans 映射中的事务状态,决定消息的提交、回滚或未知。

checkLocalTransaction 方法会根据之前在 executeLocalTransaction 方法中保存的状态来返回相应的事务状态。

@RocketMQTransactionListener  
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {  
    private AtomicInteger transactionIndex = new AtomicInteger(0);  
  
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<String, Integer>();  
  
    @Override  
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {  
        String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);  
        System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n",  
        transId);  
        int value = transactionIndex.getAndIncrement();  
        int status = value % 3;  
        localTrans.put(transId, status);  
        if (status == 0) {  
            // Return local transaction with success(commit), in this case,  
            // this message will not be checked in checkLocalTransaction()  
            System.out.printf(" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());  
            return RocketMQLocalTransactionState.COMMIT;  
        }  

        if (status == 1) {  
            // Return local transaction with failure(rollback) , in this case,  
            // this message will not be checked in checkLocalTransaction()  
            System.out.printf(" # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());  
            return RocketMQLocalTransactionState.ROLLBACK;  
        }  

        System.out.printf(" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");  
        return RocketMQLocalTransactionState.UNKNOWN;  
    }  

    @Override  
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {  
        String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);  
        RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;  
        Integer status = localTrans.get(transId);  
        if (null != status) {  
            switch (status) {  
            case 0:  
                retState = RocketMQLocalTransactionState.COMMIT;  
                break;  
            case 1:  
                retState = RocketMQLocalTransactionState.ROLLBACK;  
                break;  
            case 2:  
                retState = RocketMQLocalTransactionState.UNKNOWN;  
                break;  
            }  
        }  
        System.out.printf("------ !!! checkLocalTransaction is executed once," +  
        " msgTransactionId=%s, TransactionState=%s status=%s %n",  
        transId, retState, status);  
        return retState;  
    }  
}

消费者

@Service  
@RocketMQMessageListener(topic = "${demo.rocketmq.transTopic}", consumerGroup = "string_trans_consumer")  
public class StringTransactionalConsumer implements RocketMQListener<String> {  
    @Override  
    public void onMessage(String message) {  
        System.out.printf("------- StringTransactionalConsumer received: %s \n", message);  
    }  
}

这些步骤基本上涵盖了使用 RocketMQ 实现事务的主要过程。可以根据具体的业务需求和环境进行调整和配置。

总结

使用半消息实现分布式事务在提供分布式事务支持和保证消息传递的原子性方面具有优势,但需要引入MQ并提供查询事务接口。在选择是否使用半消息实现分布式事务时,需要根据具体的业务需求和系统性能要求来进行权衡和选择。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/402482.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

OpenAI 的 GPTs 提示词泄露攻击与防护实战:防御卷(一)

前面的OpenAI DevDay活动上&#xff0c;GPTs技术的亮相引起了广泛关注。随着GPTs的创建权限开放给Plus用户&#xff0c;社区里迅速涌现了各种有趣的GPT应用&#xff0c;这些都是利用了Prompt提示词的灵活性。这不仅展示了技术的创新潜力&#xff0c;也让人们开始思考如何获取他…

C++学习Day09之系统标准异常

目录 一、程序及输出1.1 系统标准异常示例1.2 标准异常表格 二、分析与总结 一、程序及输出 1.1 系统标准异常示例 #include<iostream> using namespace std; #include <stdexcept> // std 标准 except 异常class Person { public:Person(int age){if (age <…

短小精悍的npm入门级保姆教程,一篇包会

npm是什么&#xff1f; npm是一个强大的包管理工具&#xff0c;它使开发人员能够轻松地安装、更新和管理项目依赖的包。通过初始化一个package.json 文件&#xff0c;我们可以描述你的项目并记录其依赖关系。使用npm install命令&#xff0c;我们可以安装和管理包。使用npm pu…

SQL注入漏洞解析

什么是SQL注入 原理&#xff1a; SQL注入即是指web应用程序对用户输入数据的合法性没有判断或过滤不严&#xff0c;攻击者可以在web应用程序中事先定义好的查询语句的结尾上添加额外的SQL语句&#xff0c;在管理员不知情的情况下实现非法操作&#xff0c;以此来实现欺骗数据库服…

消息中间件之RocketMQ源码分析(十三)

Broker消息存储机制 RocketMQ首先将消息数据写入操作系统PageCache&#xff0c;然后定时将数据刷入磁盘。接下来主要分析RocketMQ是如何接收发送消息请求并将消息写入PageCache的&#xff0c;整个过程如图 Commit目录下有多个CommitLog文件&#xff0c;其实CommitLog只有一个…

前端构建效率优化之路

项目背景 我们的系统&#xff08;一个 ToB 的 Web 单页应用&#xff09;前端单页应用经过多年的迭代&#xff0c;目前已经累积有大几十万行的业务代码&#xff0c;30 路由模块&#xff0c;整体的代码量和复杂度还是比较高的。 项目整体是基于 Vue TypeScirpt&#xff0c;而构…

PostgreSQL与MySQL,谁更胜一筹

前言 PostgreSQL与MySQL都是优秀的开源数据库。在日常学习中&#xff0c;新手可能接触最多的是MySql,但是实际工作中&#xff0c;两者的应用场景其实都很广。我之前的做过上网流量销售业务&#xff0c;用的是MySQL,现在接触广告业务&#xff0c;用的是pg数据库&#xff0c;每天…

C语言:指针(一)

目录 1.内存和地址2. 指针变量和地址2.1 取地址操作符&#xff08;&&#xff09;2.2 指针变量和解引用操作符&#xff08;*&#xff09;2.2.1 指针变量2.2.2 解引用操作符&#xff08;*&#xff09; 2.3 指针变量的大小 3.指针变量的类型和意义3.1 指针的解引用3.2 指针 -指…

SQL注入漏洞解析--less-3

1.首先我们打开第三关看一下 2.这个和之前1.2关提示都是一样&#xff0c;所以我们先输入id数字看一下显示什么 3.这里正常回显&#xff0c;当我们后边加上时可以看到页面报错信息。可推断sql语句是单引号字符型且有括号&#xff0c;所以我们需要闭合单引号且也要考虑括号。 4…

FISCO BCOS(十七)利用脚本进行区块链系统监控

要利用脚本进行区块链系统监控&#xff0c;你可以使用各种编程语言编写脚本&#xff0c;如Python、Shell等 利用脚本进行区块链系统监控可以提高系统的稳定性、可靠性&#xff0c;并帮助及时发现和解决潜在问题&#xff0c;从而确保区块链网络的正常运行。本文可以利用脚本来解…

java使用File创建空文件和创建单级文件、多级文件、删除、获得文件夹下的文件和文件夹

1、使用createNewFile创建文件 package com.controller;import org.springframework.web.bind.annotation.*;import java.io.File; import java.io.IOException; import java.util.LinkedList;RestController CrossOrigin RequestMapping("/admin") public class Ad…

IO进程线程作业day5

1> 将互斥机制的代码实现重新敲一遍 #include <myhead.h> int num520;//定义一个全局变量 pthread_mutex_t mutex;//创建锁 //线程1任务 void *task1(void *arg) {puts("任务1");pthread_mutex_lock(&mutex);//上锁num1314;sleep(1);printf("tas…

Liunx使用nginx和http搭建yum-server仓库

文章目录 1. yum-server的搭建方式2. nginx搭建yum-server仓库2.1. 安装配置nginx2.2 配置yum-server的rpm2.3. 同步yum源相关包2.3.1 rsync同步源3.3.1 reposync同步源 2.4. 配置客户端访问yum配置2.5. 验证测试 3. http服务搭建yum-server仓库3.1. 安装配置http3.2 配置yum-s…

代码随想录算法训练营第一天

● 今日学习的文章链接和视频链接 ● 自己看到题目的第一想法 1. 704二分法&#xff1a; 方法一&#xff1a; 整个数组是 左闭右闭区间 [ ] left指针指向数组开始下标&#xff0c; right 指针指向数组最后下表nums.size()-1, mid为 (leftright) /2循环条件 left<rightnu…

论文精读--Noisy Student

一个 EfficientNet 模型首先作为教师模型在标记图像上进行训练&#xff0c;为 300M 未标记图像生成伪标签。然后将相同或更大的 EfficientNet 作为学生模型并结合标记图像和伪标签图像进行训练。学生网络训练完成后变为教师再次训练下一个学生网络&#xff0c;并迭代重复此过程…

unity学习(34)——角色选取界面(跨场景坑多)

先把SelectMenu中的camera的audio listener去掉。 现在还是平面&#xff0c;直接在camera下面添加两个panel即可&#xff0c;应该是用不到canvas了&#xff0c;都是2D的UI。 加完以后问题来了&#xff0c;角色选择界面的按钮跑到主界面上边了&#xff0c;而且现在账号密码都输…

国外创意品牌案例:英国北方铁路公司发布“Try the train”活动

近期&#xff0c;英国北方铁路公司&#xff08;Northern Trains&#xff09;发起了一项名为“Try the train” 的活动&#xff0c;旨在帮助那些对火车感到恐惧的人在搭乘火车时感到更舒适&#xff0c;以解锁公司业务新的增长领域&#xff0c;吸引更多的人在通勤、上学、出游、参…

【蓝桥杯单片机入门记录】静态数码管

目录 一、数码管概述 &#xff08;1&#xff09;认识数码管 &#xff08;2&#xff09;数码管的工作原理 &#xff08;3&#xff09;LED数码管驱动方式-静态显示 二、数码管电路图 三、静态数码管显示例程 &#xff08;1&#xff09;例程1&#xff1a;数码管显示某一位&a…

发布 rust 源码包 (crates.io)

rust 编程语言的包 (或者 库, library) 叫做 crate, 也就是软件中的一个组件. 一个完整的软件通常由多个 crate 组成, rust 编译器 (rustc) 一次编译一整个 crate, 不同的 crate 可以同时并行编译. rust 官方有一个集中发布开源包的网站 crates.io. 发布在这上面的 crate 可以…

个性化纹身设计,Midjourney带你探索独一无二的艺术之美

hello,大家好&#xff0c;欢迎回来。 在当今社会&#xff0c;纹身已经变得非常常见。 在寻求与众不同的个性化纹身时&#xff0c;你是否曾经为了找不到独特的设计而苦恼&#xff1f; 现在&#xff0c;Midjourney将为你打开一扇全新的艺术之门&#xff0c;引领你探索纹身设计…