(项目实战)业务场景中学透RocketMQ5.0-事务消息在预付卡系统中的应用

在这里插入图片描述

1 什么是事务消息

RocketMQ中事务消息主要是解决分布式场景下各业务系统事务一致性问题,常见的分布式事务解决方案有传统XA事务方案、TCC、本地消息表、MQ事务等。今天我们基于RocketMQ事务消息解决预付卡系统资金账户子系统和会员积分子系统、短信子系统分布式事务业务场景
在这里插入图片描述

1.1 事务消息功能原理

事务消息是 RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

事务消息处理流程
在这里插入图片描述

  1. 生产者将半事务消息发送至RocketMQ服务端。
  2. RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

2 分布式事务的应用场景

分布式系统调用的特点为一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。接下来我们基于预付卡系统-资金账户子系统和会员积分子系统、短信子系统业务流程来对分布式应用场景进行分析
在这里插入图片描述

  1. 预付卡系统-资金账户子系统为整个分布式应用场景中的上游系统,上游系统资金账户的变动会影响下游系统(会员积分子系统,短信子系统)事务的执行
  2. 在上游系统事务执行成功后会提交事务消息到下游系统,这时下游系统(会员积分和短信子系统)执行自己的业务逻辑
  3. 如果上游系统未执行成功最终状态为rollback则不会发送消息到下游系统,这时下游系统就不会执行相关业务逻辑保证了事务的一致性

3 基于RocketMQ事务消息解决卡系统分布式事务

在这里插入图片描述

  1. 预付卡系统-资金账户模块向RocketMQ服务端发送半事务消息,该消息暂时不会投递下游系统
  2. RocketMQ服务端向资金账户返回接收成功
  3. 执行资金账户本地事务
  4. 向RocketMQ二次提交事务执行结果(commit或rollback)
  5. 如果RocketMQ服务端出现异常情况会在资金账户系统(生产者)回查状态再执行第4步
  6. 检查资金账户本地事务最终执行结果
  7. 根据资金账户本地事务最终状态再次进行二次提交
  8. 如果事务状态最终为commit则会交付消息到会员积分子系统,短信子系统(消息订阅者),如果状态为rollback则不交付消息到订阅者

4 RocketMQ事务消息核心代码实现

本技术文档采用SpringCloud2021.x和RocketMQ5.0进行代码实现

Spring Cloud Alibaba VersionSpring Cloud VersionSpring Boot Version
2021.xSpring Cloud 2021.x2.7.18

4.1 在SpringCloud中集成RocketMQ流程

4.1.1 引入RocketMQ Stream Starter
<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
4.1.2 修改application.properties配置文件
server.port=1001
spring.application.name=ecard-tx-pay
spring.cloud.stream.function.definition=consumer;
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
spring.cloud.stream.rocketmq.bindings.producer-out-0.producer.group=output_1
spring.cloud.stream.rocketmq.bindings.producer-out-0.producer.transactionListener=ecardTransactionListener
spring.cloud.stream.rocketmq.bindings.producer-out-0.producer.producerType=Trans
spring.cloud.stream.bindings.producer-out-0.destination=tx
spring.cloud.stream.bindings.consumer-in-0.destination=tx
spring.cloud.stream.bindings.consumer-in-0.group=tx-group
logging.level.org.springframework.context.support=debug

4.1.3 代码实现

1 事务消息生产者

package cn.itbeien.ecard.tx;

import java.util.function.Consumer;

import cn.itbeien.ecard.common.SimpleMsg;
import com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;

/**
 * @author beien
 * @date 2024-06-20 22:15
 * Copyright© 2024 beien
 * 发送半事务消息到RocketMQ服务端
 */
@Service
@Slf4j
public class EcardTXService {
  
	@Autowired
	private StreamBridge streamBridge;

	//生产半事务消息
	@Bean
	public void producer() {
      String orderId = "10001";
		MessageBuilder builder = MessageBuilder.withPayload(new SimpleMsg("Hello Tx msg " + orderId));
				builder.setHeader("test", String.valueOf(i))
						.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
				builder.setHeader(RocketMQConst.USER_TRANSACTIONAL_ARGS, "binder");
				Message<SimpleMsg> msg = builder.build();
				streamBridge.send("producer-out-0", msg);
				log.info("send Msg:" + msg.toString());
	}
}

2 事务消息监听

package cn.itbeien.ecard.tx;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import org.springframework.stereotype.Component;

/**
 * @author beien
 * @date 2024-06-20 22:09
 * Copyright© 2024 beien
 * 执行本地事务和事务状态检查
 */
@Component("ecardTransactionListener")
@Slf4j
public class TransactionListenerImpl implements TransactionListener {

	/**
	 * @param msg messages
	 * @param arg message args
	 * @return Transaction state
	 * 执行本地事务方法 对应事务消息执行流程中的第3步
	 */
	@Override
	public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
		Object num = msg.getProperty("test");

		if ("1".equals(num)) {
			log.info("executer: " + new String(msg.getBody()) + " unknown");
			return LocalTransactionState.UNKNOW;
		}
		else if ("2".equals(num)) {
			log.info("executer: " + new String(msg.getBody()) + " rollback");
			return LocalTransactionState.ROLLBACK_MESSAGE;
		}
		log.info("executer: " + new String(msg.getBody()) + " commit");
		return LocalTransactionState.COMMIT_MESSAGE;
	}

	/**
	 * @param msg messages
	 * @return Transaction state
	 * 检查本地事务方法,根据事务执行结果返回事务状态
	 */
	@Override
	public LocalTransactionState checkLocalTransaction(MessageExt msg) {
		log.info("check: " + new String(msg.getBody()));
		return LocalTransactionState.COMMIT_MESSAGE;
	}

}

3 事务消息订阅(消费)者

package cn.itbeien.pay.tx;

import java.util.function.Consumer;

import cn.itbeien.ecard.common.SimpleMsg;
import com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;

/**
 * @author beien
 * @date 2024-06-20 22:20
 * Copyright© 2024 beien
 * 该部分代码为会员积分子系统、短信子系统用来处理预付卡系统账户扣款状态的方法
 */
@Service
@Slf4j
public class PayConsumerService {
  
	@Bean
	public Consumer<Message<SimpleMsg>> consumer() {
      /**
      *会员积分子系统、短信子系统根据订阅自动执行此业务逻辑
      * 根据消费金额记录积分,下发短信通知用户预付卡余额变更或下发消息到小程序
      */
		return msg -> {
			Object arg = msg.getHeaders();
			log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " ARG:"
				+ arg.toString());
		};
	}
}

5 关注我

欢迎关注我的视频号和公众号,视频号有相关技术和业务视频可学习支付业务/文旅行业数字化。探讨技术(系统架构、微服务、容器化、云原生、分布式事务),支付系统实战。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/730822.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JMeter的基本使用与性能测试,完整入门篇保姆式教程

Jmeter 的简介 JMeter是一个纯Java编写的开源软件&#xff0c;主要用于进行性能测试和功能测试。它支持测试的应用/服务/协议包括Web (HTTP, HTTPS)、SOAP/REST Webservices、FTP、Database via JDBC等。我们最常使用的是HTTP和HTTPS协议。 Jmeter主要组件 线程组&#xff08…

永辉超市:胖东来爆改,成色几何?

单日业绩暴涨14倍。来&#xff0c;看看&#xff0c;这是被胖东来爆改后重新开业后的门店&#xff0c; 不出意外的流量爆炸。胖东来爆改&#xff0c;真是解决实体商超困境的灵丹妙药吗&#xff1f; 今天我们聊聊——永辉超市 最近两年实体商超日子都不好过&#xff0c;去年13家…

在Worpress增加网站的二级目录,并转向到站外网站

在WordPress中&#xff0c;你可以通过添加自定义重定向来实现将某个二级目录&#xff08;例如 www.example.com/subdir&#xff09;重定向到站外网站。可以通过以下几种方法来实现&#xff1a; 方法一&#xff1a;使用 .htaccess 文件 如果你的服务器使用Apache&#xff0c;你…

使用上海云盾 CDN 和 CloudFlare 后 Nginx、 WordPress、 Typecho 获取访客真实 IP 方法

最近因为被 DDoS/CC 攻击的厉害,明月就临时的迁移了服务器,原来的服务器就空置下来了,让明月有时间对服务器进行了重置重新部署安装生产环境。因为站点同时使用了上海云盾和 CloudFlare(具体思路可以参考【国内网站使用国外 CloudFlare CDN 的思路分享】一文)两个 CDN 服务…

Java数据类型与运算符

1. 变量和类型 变量指的是程序运行时可变的量&#xff0c;相当于开辟一块空间来保存一些数据。 类型则是对变量的种类进行了划分&#xff0c;不同类型的变量具有不同的特性。 1.1 整型变量&#xff08;重点&#xff09; 基本语法格式&#xff1a; int 变量名 初始值;代码示…

20240621在飞凌的OK3588-C开发板linux系统的CAM3上接OV5645录像

20240621在飞凌的OK3588-C开发板linux系统的CAM3上接OV5645录像 2024/6/21 19:57 开发板&#xff1a;OK3588-C SDK&#xff1a;linux R4/Buildroot v4l2-ctl --list-devices v4l2-ctl --list-formats-ext -d /dev/video16 gst-launch-1.0 v4l2src device/dev/video16 num-bu…

【Git】 -- Part2 -- 分支管理

1. 分支 在 Git 中&#xff0c;分支&#xff08;Branch&#xff09;是用于在项目中创建独立开发线路的机制。分支使得开发者可以在不影响主干&#xff08;main 或 master&#xff09;的情况下进行实验、开发新功能或修复 Bug。 举个例子&#xff1a; 分⽀就好像是科幻电影⾥⾯…

鸿蒙开发:【进程模型概述】

进程模型概述 系统的进程模型如下图所示&#xff1a; 应用中&#xff08;同一包名&#xff09;的所有PageAbility、ServiceAbility、DataAbility、FormAbility运行在同一个独立进程中&#xff0c;即图中绿色部分的“Main Process”。 WebView拥有独立的渲染进程&#xff0c;即…

OS复习笔记ch11-2

上一节我们学习的内容是I/O系统的特点和设备分类和差异&#xff0c;这一节我们将主要关注I/O控制方式、OS设计问题、I/O逻辑结构等。 I/O功能的演变 在专栏的ch1-2中&#xff0c;我们详细讲解了CPU与外设的三种交互方式&#xff0c;这里简单地带过。 &#xff08;1&#xff0…

MPLS-LDP(个人学习笔记)

定义 标签分发协议LDP&#xff08;Label Distribution Protocol&#xff09;是多协议标签交换MPLS的一种控制协议&#xff0c;负责转发等价类FEC的分类、标签的分配以及标签交换路径LSP的建立和维护等操作。LDP规定了标签分发过程中的各种消息以及相关处理过程 术语 LDP会话&a…

计算机组成原理 | 计算机系统概述

CPI:(Clockcycle Per Instruction)&#xff0c;指每条指令的时钟周期数。 时钟周期&#xff1a;对CPU来说&#xff0c;在一个时钟周期内&#xff0c;CPU仅完成一个最基本的动作。时钟脉冲是计算机的基本工作脉冲&#xff0c;控制着计算机的工作节奏。时钟周期 是一个时钟脉冲所…

ECharts 词云案例三:2024年阅读关键词

ECharts 词云案例三&#xff1a;2024年阅读关键词 引言 在数据可视化领域&#xff0c;ECharts 以其强大的功能性和灵活性&#xff0c;成为开发者和设计师的首选工具之一。继上一篇关于 ECharts 词云图的详细介绍后&#xff0c;本文将探索词云图的进阶应用——使用蒙版来创造更…

约束求解器方案设计

1.约束求解介绍 给定一个几何对象&#xff08;点、直线段、圆、圆弧、平面等&#xff09;的集合G和一个关于集合G中几何对象之间约束&#xff08;点的位置、直线段的长度、圆弧对应的圆心角角度、垂直、相切等&#xff09; 的集合C&#xff0c;则在二元组(G&#xff0c;C)中根…

【UIDynamic-动力学-附着行为-刚性附着 Objective-C语言】

一、接下来,我们来说这个附着行为啊, 1.我们之前举过例子,一个车坏了,另外一个车,拉着这个车,就是附着行为啊, 这个里边呢,我们新建一个项目, Name:09-附着行为-刚性附着, 附着行为呢,分为两个大类: 1)刚性附着 2)弹性附着 刚性附着,指的就是,两个物体之间…

第九届世界渲染大赛什么时候开始举办?

​第九届世界渲染大赛即将开启&#xff0c;全球设计师和艺术家将汇聚一堂&#xff0c;展现3D艺术的创新与美感。敬请期待这场业界顶级的视觉盛宴&#xff0c;让我们共同关注大赛的启幕时刻。 第九届世界渲染大赛开始时间 预计时间&#xff1a;2024年7月(中旬) 报名方法&#…

光纤三维布里渊温度和应变分布matlab模拟与仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 5.完整程序 1.程序功能描述 光纤三维布里渊温度和应变分布matlab模拟与仿真。其中 , 布里渊散射是光波与声波在光纤中传播时相互作用而产生的光散射过程 , 在不 同的条件下 , 布里渊散射又分…

51单片机STC89C52RC——4.1 独立按键(数码管显示按键值)

目录 目录 目的 一&#xff0c;STC单片机模块 二&#xff0c;矩阵按键模块 2.1 针脚定义 ​编辑 2.2 矩阵按键位置 2.3 如何理解按键按下后针脚的高低电平 2.3.1 错误理解1 2.3.2 错误理解2 2.3.3 正确判定按下的是那个按键的逻辑 2.3.4 判定按键按下的依次扫描程…

服务端代码编写中MySql大小写在Java中报错问题解决

报错信息&#xff1a; 原因&#xff1a;MySql和Java变量大小写产生的冲突。 经过查阅各个博客等&#xff0c;得出浅显结论&#xff08;不一定对&#xff09;&#xff1a;MySql大小写不敏感&#xff0c;Java大小写敏感&#xff0c;当Javabean转为MySql数据库表时&#xff0c;Ja…

58-DOS与DDOS分析(正常TCP会话与SYN Flood攻击、ICMP Flood 攻击、SNMP放大攻击等)

目录 正常 TCP 会话与 SYN Flood 攻击 1、正常的三次握手过程&#xff1a; 2、 SYN Flood 攻击 一、攻击windows系统&#xff1a; 二、攻击web网站 &#xff1a; 拒绝服务攻击工具-Hping3-Syn Flood 攻击 拒绝服务攻击工具--Hping3--ICMP Flood 攻击 sockstress攻击 So…

【Git】 -- Part1 -- 基础操作

1. Git简介 Git 是一个开源的分布式版本控制系统&#xff0c;由 Linus Torvalds 于 2005 年开发&#xff0c;主要用于源代码管理。Git 允许多名开发者共同合作处理同一个项目&#xff0c;跟踪每个文件的修改&#xff0c;并且在必要时回滚到之前的版本。 Linus Torvalds是Linux…