RocketMQ事务消息实现分布式事务

文章目录

    • 简介
    • 实现原理
    • 实现逻辑

简介

RocketMQ事务消息
RocketMQ在4.3.0版中支持分布式事务消息,这里RocketMQ的事务消息是采用2PC(两段式协议) +补偿机制(消息回查)的分布式事务功能。提供消息发送与业务落库的一致性。
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布式事务功能,通过事务消息能达到分布式事务的最终一致。

RocketMQ的事务消息实现方式主要包括以下几个步骤:
1 生产者发送half消息到Broker。half消息在消费者看来是不可见的,这样可以避免消费者消费到事务未提交的数据,类似于数据库的隔离级别读已提交级别,避免脏读。
2.生产者创建订单,根据创建订单成功与否,向Broker发送commit或rollback。
3.生产者还可以提供Broker回调接口,当Broker发现一段时间half消息没有收到任何操作命令,会主动调用此接口来查询订单是否创建成功。
4.一旦half消息commit了,消费者库存系统就会来消费,如果消费成功,则消息销毁,分布式事务成功结束。
5.如果消费失败,则根据重试策略进行重试,最后还失败则进入死信队列,等待进一步处理。

在这里插入图片描述
在这里插入图片描述

实现原理

RocketMQ的事务消息实现原理基于两阶段提交协议(Two-Phase Commit Protocol),具体流程如下:
1.发送准备消息:当一个事务消息需要发送时,生产者会发送一个准备消息,该消息包含了实际的业务数据和事务的标识符。
2.执行本地事务:接收到准备消息后,生产者会执行一个本地事务,如果本地事务执行成功,则返回COMMIT状态,否则返回ROLLBACK状态。
3.提交或回滚本地事务:当生产者返回COMMIT状态时,代表本地事务已经执行成功,此时,RocketMQ会将消息标记为可提交状态,并发送一个commit消息给消费者;当生产者返回ROLLBACK状态时,代表本地事务执行失败,此时,RocketMQ会将消息标记为可回滚状态,并发送一个rollback消息给消费者。
4. 消费者处理消息:消费者在接收到commit或rollback消息后,会根据消息的状态来执行相应的操作。
5.提交或回滚事务:如果所有的消费者都接收到了commit消息,则代表该事务消息已经提交,此时生产者会提交本地事务,否则,如果有任何一个消费者接收到了rollback消息,则代表该事务消息已经回滚,生产者会回滚本地事务。
事务消息的实现原理基于两阶段提交协议,这是一种经典的分布式事务协议,可以保证数据的一致性和可靠性。RocketMQ通过实现TransactionListener接口来支持事务消息,同时也提供了许多配置选项和工具类来方便用户进行使用和扩展。

实现逻辑

RocketMQ的事务消息通过TransactionListener接口来实现。下面是事务消息的基本实现步骤:
1.实现事务监听器:首先,你需要实现RocketMQ提供的TransactionListener接口,该接口包括两个方法:executeLocalTransaction和checkLocalTransaction。
○ executeLocalTransaction方法用于执行本地事务,当发送事务消息时,RocketMQ会调用此方法来执行本地事务。在该方法内部,你需要执行实际的业务逻辑,并根据执行结果返回事务状态,可以是提交、回滚或是未知状态。
○ checkLocalTransaction方法用于检查本地事务状态,当RocketMQ没有收到事务消息的确认或者取消时,会调用此方法来检查本地事务的状态,然后对消息进行处理。
2.发送事务消息:在发送事务消息时,你需要指定事务监听器,并在executeLocalTransaction方法中执行实际的业务逻辑,然后根据业务逻辑的执行结果返回事务状态。
3.事务状态检查:RocketMQ会定期调用checkLocalTransaction方法来检查本地事务的状态,然后对消息进行处理,例如提交或者回滚。
4.事务消息处理:在消息消费端,你需要根据消息的实际状态来执行相应的处理逻辑。
下面是一个简单的示例代码,演示了如何使用RocketMQ的事务消息:

public class TransactionProducer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("transaction_producer_group");
        producer.setNamesrvAddr("localhost:9876");

        // 设置事务监听器
        TransactionListener transactionListener = new MyTransactionListener();
        producer.setTransactionListener(transactionListener);

        // 启动生产者
        producer.start();

        try {
            // 创建事务消息
            Message msg = new Message("TopicTest", "TagA", "KEY1", "Hello, RocketMQ".getBytes());

            // 发送事务消息
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.printf("%s%n", sendResult);
        } catch (MQClientException | UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}

class MyTransactionListener implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务,例如数据库操作等
        // 返回本地事务的状态,可以是COMMIT、ROLLBACK或UNKNOW
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 检查本地事务状态,返回本地事务的状态,例如COMMIT、ROLLBACK或UNKNOW
    }
}

上述代码展示了如何创建一个事务消息生产者,并实现一个简单的事务监听器。在实际应用中,你需要根据业务需求和具体场景来实现更复杂的事务逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/266165.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

强化学习(五)-Deterministic Policy Gradient (DPG) 算法及公式推导

针对连续动作空间,策略函数没法预测出每个动作选择的概率。因此使用确定性策略梯度方法。 0 概览 1 actor输出确定动作2 模型目标: actor目标:使critic值最大 critic目标: 使TD error最大3 改进: 使用两个target 网络…

Redis缓存数据一致性

实际业务中常使用Redis缓存来提升读写效率,减少存储层的压力。因为数据在缓存和DB中各存储一份,所以会出现数据一致性的问题。总体来说导致数据不一致的原因主要有两个。请求并发和操作非原子。 请求并发是指同时可能有多个读写请求同时请求Cache或者DB&…

【C++】bind绑定包装器全解(代码演示,例题演示)

前言 大家好吖,欢迎来到 YY 滴C系列 ,热烈欢迎! 本章主要内容面向接触过C的老铁 主要内容含: 欢迎订阅 YY滴C专栏!更多干货持续更新!以下是传送门! YY的《C》专栏YY的《C11》专栏YY的《Linux》…

非线性约束的优化问题_序列二次规划算法代码

1. 理论部分 2. 序列二次规划算法代码及解析 3.完整代码 1.理论部分 a.约束优化问题的极值条件 库恩塔克条件(Kuhn-Tucker conditions,KT条件)是确定某点为极值点的必要条件。如果所讨论的规划是凸规划,那么库恩-塔克条件也是充分条件。 &#xff…

5.OpenResty系列之深入理解(一)

本文基于Centos8进行实践,请读者自行安装OpenResty。 1. 内部调用 进入默认安装路径 cd /usr/local/openresty/nginx/conf vim nginx.conflocation /sum {# 只允许内部调用internal;content_by_lua_block {local args ngx.req.get_uri_args()ngx.print(tonumber…

Qt 多线程用法

文章目录 开发平台QThread 类 moveToThreadQtConcurrent::run QFutureWatcherQThreadPool QRunnable 开发平台 项目说明OSwin10 x64Qt6.6compilermsvc2022构建工具cmake QThread 类 moveToThread 写一个简单的例子吧,比较容易理解,方便入门. 也可以看出这种方式,对于线程…

服务器IBM x3650 m2 管理口访问故障处理

服务器的内存告警后,连接管理口查看信息,管理口状态灯显示正常,但是无法ping通和访问。 处理过程如下: 1、在centos 6.6中安装ipmitool,替换为阿里云的yum源,然后安装。 # wget -O /etc/yum.repos.d/Cen…

Unity自带的NavMesh寻路组件

最近看了一下Unity自带的NavMesh寻路组件,先说一下基本的使用: 首先先把AI Navgation的package包给安装上。 给场景地图添加上NavMeshSurface组件,然后进行烘焙,烘焙出对应的场景地图文件。 给移动物体添加对应的Nav MeshAgent组…

【雷达原理】雷达测速原理及实现方法

一、雷达测速原理 1.1 多普勒频率 当目标和雷达之间存在相对运动时,若雷达发射信号的工作频率为,则接收信号的频率为,其中为多普勒频率。将这种由于目标相对于辐射源运动而导致回波信号的频率发生变化的现象称为多普勒效应。 如图1-1所示&a…

FATFS文件系统

文件系统是为了存储和管理数据,而在存储设备上建立的一种组织结构。 Windows常用的文件系统: 1、FAT12 2、FAT16 3、FAT32 4、exFAT 5、NTFS FAT:File Alloction Table 文件分配表 在小型的嵌入式存储设备大多…

Bwapp学习笔记

1.基本sql语句 #求绝对值 select abs(-1) from dual; #取余数 select mod(10,3); #验证show databases结果是取之于schemata表的 show databases; select schema_name from information_schema.schemata; #查询当前的数据库 select database(); -- 查询数据库版本 s…

Java研学-Servlet 基础

一 概述 1 介绍 Servlet(Server Applet)是Java Servlet的简称,称为小服务程序或服务连接器,用Java编写的服务器端程序,具有独立于平台和协议的特性,主要功能在于交互式地浏览和生成数据,生成动…

【数据结构】什么是树?

🦄个人主页:修修修也 🎏所属专栏:数据结构 ⚙️操作环境:Visual Studio 2022 目录 📌树的定义 📌树的相关概念 📌线性结构与树结构的对比 📌树的抽象数据类型 📌树的存储结构 &#x1f38…

叮咚,微信年度聊天报告(圣诞节版)请查收~丨GitHub star 16.8k+

微信年度聊天报告——圣诞节特别版,快发给心仪的ta吧~ 开源地址 GitHub开源地址:https://github.com/LC044/WeChatMsg 我深信有意义的不是微信,而是隐藏在对话框背后的一个个深刻故事。未来,每个人都能拥有AI的陪伴,…

Docker - 镜像 | 容器 日常开发常用指令 + 演示(一文通关)

目录 Docker 开发常用指令汇总 辅助命令 docker version docker info docker --help 镜像命令 查看镜像信息 下载镜像 搜索镜像 删除镜像 容器命令 查看运行中的容器 运行容器 停止、启动、重启、暂停、恢复容器 杀死容器 删除容器 查看容器日志 进入容器内部…

2024年【北京市安全员-B证】考试题库及北京市安全员-B证模拟试题

题库来源:安全生产模拟考试一点通公众号小程序 2024年【北京市安全员-B证】考试题库及北京市安全员-B证模拟试题,包含北京市安全员-B证考试题库答案和解析及北京市安全员-B证模拟试题练习。安全生产模拟考试一点通结合国家北京市安全员-B证考试最新大纲…

直接插入排序【从0-1学数据结构】

文章目录 💗 直接插入排序Java代码C代码JavaScript代码稳定性时间复杂度空间复杂度 我们先来学习 直接插入排序, 直接排序算是所有排序中最简单的了,代码也非常好实现,尽管直接插入排序很简单,但是我们依旧不可以上来就直接写代码,一定要分析之后才开始写,这样可以提…

手写线程池

手写线程池 线程池原理 线程池的组成主要分为3个部分,这三部分配合工作就可以得到一个完整的线程池: 任务队列,存储需要处理的任务,由工作的线程来处理这些任务 通过线程池提供的API函数,将一个待处理的任务添加到任…

从归并排序引申到排序链表-图解

从归并排序引申到排序链表 文章目录 从归并排序引申到排序链表归并排序递归版非递归版 排序链表递归版非递归版 归并排序 递归版 //合并排序public static void mergeSort(int[] nums) {mergeSortHelper(0, nums.length, nums); //没有-1}private static void mergeSortHelper…

python 使用 pip 安装第三方库 导入不成功

本文是什么意思呢? 就是你需要使用一些库安装老师或者网上说的 通过pip 安装下载了第三方库,但是使用 import xxx from xxx import xx ,pycharm ide 导入的下面还有红色波浪线,导入不成功。 这是什么原因? 这是pyc…