SpringBoot中使用RocketMQ实现事务消息来保证分布式事务的一致性(有代码)

前言

分布式事务是分布式系统中非常常见的问题。是非常必要钱常见的。实现的方式也是多种多样。今天这个视频主要来分享一下RocketMQ实现事务消息来保证分布式事务的一致性。不知道大家使用过这种方式没有。这种分布式事务的原理其实和本地消息表一样。

本地消息表实现分布式事务的基本原理

本地消息表实现分布式事务的基本原理是通过两个阶段的事务处理来保证分布式环境中的数据一致性。以下是其基本步骤:
大致就是将本地消息表和要执行的第一个业务逻辑放在一个事务中,这样就可以一起成功一起失败。当第一阶段成功后。根据本地消息表中的记录去让下游的业务执行成功。扫描本地消息表中的消息然后执行下游业务。执行成功后在删除本地消息表中消息。不成功则重试。

1.本地事务:

在开始分布式事务时,首先执行本地操作。例如,更新某个服务的数据。
如果本地操作成功,事务进入下一步;如果失败,则回滚本地事务,并结束流程。
消息记录:
创建一条消息记录,通常称为“本地消息”,将需要在后续阶段执行的远程操作信息保存在本地数据库的一个消息表中。这个消息记录包含了执行远程操作所需的所有数据。
消息发送:

将本地消息发送到消息队列,如RocketMQ或其他消息中间件。此时,消息队列并不保证消息已经被消费,只是简单地将消息放入队列。
消息消费:

消息队列的消费者监听并处理消息。消费者通常是另一个服务,它接收消息并执行相应的远程操作,比如更新另一个服务的数据。
确认与补偿:

如果远程操作成功,消费者会发送一个确认信号(ACK),通知生产者操作已完成。这时,生产者可以删除本地消息表中的记录。
如果远程操作失败,消费者可能会尝试重新消费消息,或者根据策略回滚本地事务,然后通知生产者消息处理失败。
最终一致性:

尽管可能有短暂的延迟,但最终所有服务的数据状态会达到一致,因为本地操作和远程操作都会成功完成,或者在失败时都会回滚。
异常处理:

为了处理异常情况,系统通常会有超时和重试机制。如果消费者长时间没有确认,生产者可能会重新发送消息,或者在一定时间后回滚本地事务。
本地消息表方案的优点在于它避免了分布式事务的复杂性,实现了最终一致性,而不是强一致性。但是,它也有一些缺点,比如增加了系统的复杂性,需要维护额外的消息表,以及可能出现消息丢失或重复消费的问题。因此,它更适合对实时性要求不高,但对最终一致性有要求的场景。

本地消息表是一种最终一致性方案。并不是强一致性方案。

rocketmq事务消息

今天重点来说一下rocketmq事务消息是怎么做的。先理解一下Rocketmq事务消息
在这里插入图片描述

这种类似的图片挺多的。简单的来看一下 然后一会结合代码看一下。生产者先送消息到MQserve。然后mq去执行本地事务。通过回查的方式来保证第一阶段消息执行的成功。然后下游消费者来消费这个消息。

代码

我们需要实现分布式事务的两个服务分别是用户中心的服务以及im业务服务。功能是注册的功能。用户的注册信息基本信息存储在用户中心表。然后其他信息存储在im_user表里面。这个听起来有点奇怪。因为我这套代码是计划用户中心存储多个app的用户信息。通义提供鉴权服务什么的。然后基本信息存储在自己的业务用户表里面。大概是这样的设计思路。可以看代码。

	/**
	 * 使用rocketmq实现事务
	 * @param dto
	 * @return
	 * @throws Exception
	 */
	@ApiOperation("使用邮箱和密码注册")
	@PostMapping("/sys/registByWeb")
	public GenericResponse registByWebTX(@RequestBody SysRegisterForm dto) throws Exception {
		String uuid = UUID.randomUUID().toString() + new Random().nextInt();
		SysUserEntity sysUserEntity = new SysUserEntity();
		sysUserEntity.setPassword(dto.getPassword());
		sysUserEntity.setUsername(dto.getUsername());
		sysUserEntity.setOpenid(uuid);
		//注册需要的实体类
		RegisterFeign registerFeign = new RegisterFeign();
		registerFeign.setOpenid(uuid);
		registerFeign.setUsername(dto.getUsername());
		registerFeign.setEmail(dto.getEmail());
		TransactionSendResult sendResult= rocketMqHelper.transactionSend(Topic.REGISTER,
				MessageBuilder.withPayload(sysUserEntity).build(),registerFeign);
		String sendStatus = sendResult.getSendStatus().name();
		String localTXState = sendResult.getLocalTransactionState().name();
		logger.info("sendStatus---" + sendStatus);
		logger.info("localTXState---"+localTXState);

		// 注意:这里不能立即返回成功,因为事务还未完成,实际应用中可能需要设计异步回调通知客户端事务结果
		// 以下仅为示例逻辑,实际应用中需根据业务需求调整
		return GenericResponse.response(ServiceError.NORMAL);
	}

这里实现注册功能。然后
TransactionSendResult sendResult= rocketMqHelper.transactionSend(Topic.REGISTER,
MessageBuilder.withPayload(sysUserEntity).build(),registerFeign);
这行代码用来发送事务消息;
需要给rocketmq配置一个生产者端的消息监听器

@Slf4j
@RocketMQTransactionListener
public class UserRegistrationTransactionListener implements RocketMQLocalTransactionListener {


    @Autowired
    private SysUserService sysUserService;


    @Autowired
    SysUserDao sysUserDao;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info(">>>> TX message listener execute local transaction, message={},args={} <<<<",msg,arg);
        // 执行本地事务
        RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;
        try {
            String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
//            OrderEntity order = GSON.fromJson(jsonString, OrderEntity.class);
            SysUserEntity  sysUserEntity = JSON.parseObject(jsonString, SysUserEntity.class);
            sysUserService.saveUser(sysUserEntity);
        } catch (Exception e) {
            log.error(">>>> exception message={} <<<<",e.getMessage());
            result = RocketMQLocalTransactionState.UNKNOWN;
        }
//        return  RocketMQLocalTransactionState.UNKNOWN;
        return result;
    }


    /**
     * 步骤四
     * 描述:mq回调检查本地事务执行情况
     * @param msg
     * @return
     */

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info(">>>> TX message listener check local transaction, message={} <<<<",msg.getPayload());
        // 检查本地事务
        RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;
        try {
            String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            SysUserEntity  sysUserEntity = JSON.parseObject(jsonString, SysUserEntity.class);
//            OrderEntity order = GSON.fromJson(jsonString, OrderEntity.class);
//            List<OrderEntity> list = orderService.selectOrder(order);
            List<Map> list = sysUserDao.queryUserByOpenid(sysUserEntity.getOpenid(),sysUserEntity.getUsername());
            if(list.size()<=0){
                result = RocketMQLocalTransactionState.UNKNOWN;
            }

        } catch (Exception e) {
            // 异常就回滚
            log.error(">>>> exception message={} <<<<",e.getMessage());
            result = RocketMQLocalTransactionState.ROLLBACK;
        }
        return result;
    }


}

@RocketMQTransactionListener注意这个注解不能落下。
然后可以配置一下下游消费者。

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "consumeRegister", topic = "TX_REGISTER_ADD",consumeMode = ConsumeMode.ORDERLY)
public class RegisterListener implements RocketMQListener<RegisterFeign> {

    @Autowired
    private WeChatService weChatService;


    /**
     *
     * @param dto
     */
    @Override
    public void onMessage(RegisterFeign dto) {
        log.info("接收到消息,开始消费..dto" + dto);
        weChatService.registByOpenid(dto);

    }

}

我们在这个地方来接受一下消息。然后调用这个服务的保存。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/607820.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

1.基于python的单细胞数据预处理-质量控制

目录 质量控制过滤低质量细胞的指南双细胞过滤手动过滤低质量读数细胞自动过滤低质量读数细胞环境RNA校正 参考&#xff1a; [1] https://github.com/Starlitnightly/single_cell_tutorial [2] https://github.com/theislab/single-cell-best-practices 质量控制 原始的单细胞…

模拟实现链表的功能

1.什么是链表&#xff1f; 链表是一种物理存储结构上非连续存储结构&#xff0c;数据元素的逻辑顺序是通过链表中的引用链接次序实现的 。 实际中链表的结构非常多样&#xff0c;以下情况组合起来就有8种链表结构&#xff1a; 单向或者双向 带头或者不带头 …

猫头虎分享已解决Bug || Node.js安装失败Error: unable to connect to https://nodejs.org/猫头虎

猫头虎分享已解决Bug || Node.js安装失败Error: unable to connect to https://nodejs.org/猫头虎 博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — …

活动回顾 |观测云 AI Agent 探索实践

亚马逊云科技“构建全球化软件和互联网新生态——ISV 行业”论坛上&#xff0c;观测云产品架构师刘锐发表了题为“AI Agent 可观测性探索与实践”的主题演讲&#xff0c;不仅展示了观测云在人工智能领域的前沿技术&#xff0c;更强调了在日益复杂的系统环境中&#xff0c;实现有…

autoware.universe 使用之Rosbag replay simulation放包仿真

本文将按照官方文档&#xff0c;通过播放rosbag录制包进行可视化模拟&#xff0c;中间也报了很多错误&#xff0c;特此记录下来&#xff0c;以免后续踩坑。 电脑配置如下&#xff1a;    ubuntu20.04    cuda: cuda-11.6    nvidia-driver 535    ros2: foxy 关于auto…

「MDN web 入门」学习笔记

目录 写在前面 1. MDN 简介 1.1 MDN 的主要特点 1.2 MDN 的主要功能 1.3 MDN 网页开发的指南 2. 安装基础软件 2.1 专业人士工具 2.2 初学者基本工具 3. 设计网站外观 3.1 计划 3.2 绘制草图 3.3 选定素材 3.4 文本 3.5 主题颜色 3.6 图像 3.7 字体 4. 处理文…

Redis(无中心化集群搭建)

文章目录 1.无中心化集群1.基本介绍2.集群说明 2.基本环境搭建1.部署规划&#xff08;6台服务器&#xff09;2.首先删除上次的rdb和aof文件&#xff08;对之前的三台服务器都操作&#xff09;1.首先分别登录命令行&#xff0c;关闭redis2.清除/root/下的rdb和aof文件3.把上次的…

认识卷积神经网络

我们现在开始了解卷积神经网络&#xff0c;卷积神经网络是深度学习在计算机视觉领域的突破性成果&#xff0c;在计算机视觉领域&#xff0c;往往我们输入的图像都很大&#xff0c;使用全连接网络的话&#xff0c;计算的代价较高&#xff0c;图像也很难保留原有的特征&#xff0…

oracle 数据库找到UDUMP的文件名称

oracle 数据库找到UDUMP的文件名称 select p.value||\||i.instance_name||_ora_||spid||.trc as "trace_file_name" from v$parameter p ,v$process pro, v$session s, (select sid from v$mystat where rownum1) m, v$instance i where lower(p.name)user_dump_…

Java_File

介绍&#xff1a; File对象表示路径&#xff0c;可以是文件&#xff0c;也可以是文件夹。这个路径可以是存在的&#xff0c;也可以是不存在的&#xff0c;带盘符的路径是绝对路径&#xff0c;不带盘符的路径是相对路径&#xff0c;相对路径默认到当前项目下去找 构造方法&…

英伟达推出视觉语言模型:VILA

NVIDIA和MIT的研究人员推出了一种新的视觉语言模型(VLM)预训练框架&#xff0c;名为VILA。这个框架旨在通过有效的嵌入对齐和动态神经网络架构&#xff0c;改进语言模型的视觉和文本的学习能力。VILA通过在大规模数据集如Coy0-700m上进行预训练&#xff0c;采用基于LLaVA模型的…

三.Django--ORM(操作数据库)

目录 1 什么是ORM 1.1 ORM优势 1.2ORM 劣势 1.3 ORM与数据库的关系 2 ORM 2.1 作用 2.2 连接数据库 2.3 表操作--设置字段 2.4 数据库的迁移 写路由增删改查操作 项目里的urls.py: app里的views.py: 注意点: 1 什么是ORM ORM中文---对象-关系映射 在MTV,MVC设计…

2024面试自动化测试面试题【含答案】

&#x1f525; 交流讨论&#xff1a;欢迎加入我们一起学习&#xff01; &#x1f525; 资源分享&#xff1a;耗时200小时精选的「软件测试」资料包 &#x1f525; 教程推荐&#xff1a;火遍全网的《软件测试》教程 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1…

若依框架dialog弹窗取消点击空白出关闭

如果想全局取消的话就找到main.js在里面加上下面的一行代码&#xff0c;添加完成之后记得清楚浏览器缓存重新加载js文件。 Element.Dialog.props.closeOnClickModal.default false;如果想指定某个弹窗取消点击空白处关闭&#xff0c;那么就找到那个弹窗加上。添加完毕之后刷新…

扩散模型~

推荐&#xff1a;write_own_pipeline.ipynb - Colab (google.com) 基本管道 一直显示NVIDIA有问题&#xff0c;所以就把.to("cuda")去掉了&#xff0c;使用Colab运行的&#xff0c;代码如下&#xff1a; from diffusers import DDPMPipelineddpm DDPMPipeline.fr…

哈希题目总结

以下列举了可以用哈希方法&#xff08;包括但不限于用HashMap和HashSet&#xff09;的题目&#xff0c;实质上是把东西丢给这些数据结构去维护。请注意有些题目中用哈希是最优解&#xff0c;有些题目中不是最优解&#xff0c;可以自行探索其时间复杂度和空间复杂度的区别&#…

java入门1.1.1版本

前言&#xff1a; 上面的内容是1.0.0~1.1的内容总结 秉持着先做再定义的理念&#xff0c;这里会带着大家先体验一下类与对象 第一步&#xff1a;新建一个java文件 鼠标右键 → 新建 → 文本文档 → 右键 → 点击重名 → 全选 → hello.java 第二步&#xff1a;用笔记本打开 …

阿里云开发uniapp之uni-starter

一、为什么使用uni-starter uni-starter是集成商用项目常见功能的、云端一体应用快速开发项目模版。 一个应用有很多通用的功能&#xff0c;比如登录注册、个人中心、设置、权限管理、拦截器、banner... uni-starter将这些功能都已经集成好&#xff0c;另外&#xff0c;uni-s…

2023-2024年SaaS行业报告合集(精选22份)

SaaS行业报告/方案&#xff08;精选21份&#xff09; 2023-2024年 报告来源&#xff1a;2023-2024年SaaS行业报告合集&#xff08;精选22份&#xff09; 【以下是资料目录】 2024中国HCM SaaS领导者竞争力持续增强的行业龙头 2024年中国企业级SaaS行业研究报告 2024年SaaS…

基于Transformer网络的多步预测模型

包括完整流程数据代码处理&#xff1a; 多步预测数据集制作、数据加载、模型定义、参数设置、模型训练、模型测试、预测可视化、多步预测、模型评估 ● 环境框架&#xff1a;python 3.9 pytorch 1.8 及其以上版本均可运行 ● 使用对象&#xff1a;论文需求、毕业设计需求者…