SpringBoot + 事务钩子函数

一、案例背景

拿支付系统相关的业务来举例。在支付系统中,我们需要记录每个账户的资金流水(记录用户A因为哪个操作扣了钱,因为哪个操作加了钱),这样我们才能对每个账户的账做到心中有数,对于支付系统而言,资金流水的数据可谓是最重要的。

因此,为了防止支付系统的老大徇私舞弊,CTO提了一个流水存档的需求:要求支付系统对每个账户的资金流水做一份存档,要求支付系统在写流水的时候,把流水相关的信息以消息的形式推送到kafka,由存档系统消费这个消息并落地到库里(这个库只有存档系统拥有写权限)。

整个需求的流程如下所示:

在这里插入图片描述

整个需求的流程还是比较简单的,考虑到后续会有其他事业部也要进行数据存档操作,建议支付系统团队内部开发一个二方库,这个二方库的主要功能就是发送消息到kafka中去。

二、确定方案

既然要求开发一个二方库,因此,我们需要考虑如下几件事情:

1、技术栈使用的springboot,因此,这里最好以starter的方式提供

2、二方库需要发送消息给kafka,最好是二方库内部基于kafka生产者的api创建生产者,不要使用Spring自带的kafkaTemplate,因为集成方有可能已经使用了kafkaTemplate。不能与集成方造成冲突。

3、减少对接方的集成难度、学习成本,最好是提供一个简单实用的api,业务侧能简单上手。

4、发送消息这个操作需要支持事务,尽量不影响主业务

在上述的几件事情中,最需要注意的应该就是第4点:发送消息这个操作需要支持事务,尽量不影响主业务。 这是什么意思呢?首先,尽量不影响主业务,这个最简单的方式就是使用异步机制。其次,需要支持事务是指:假设我们的api是在事务方法内部调用的,那么我们需要保证事务提交后再执行这个api。 那么,我们的流水落地api应该要有这样的功能:

在这里插入图片描述

内部可以判断当前是否存在事务,如果存在事务,则需要等事务提交后再异步发送消息给kafka。如果不存在事务则直接异步发送消息给kafka。而且这样的判断逻辑得放在二方库内部才行。那现在摆在我们面前的问题就是:我要如何判断当前是否存在事务,以及如何在事务提交后再触发我们自定义的逻辑呢?

三、TransactionSynchronizationManager

这个类内部所有的变量、方法都是static修饰的,也就是说它其实是一个工具类。是一个事务同步器。下述是流水落地API的伪代码,这段代码就解决了我们上述提到的疑问:

private final ExecutorService executor = Executors.newSingleThreadExecutor();

public void sendLog() {
    // 判断当前是否存在事务
    if (!TransactionSynchronizationManager.isSynchronizationActive()) {
        // 无事务,异步发送消息给kafka
        
        executor.submit(() -> {
            // 发送消息给kafka
            try {
                // 发送消息给kafka
            } catch (Exception e) {
                // 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
            }
        });
        return;
    }

    // 有事务,则添加一个事务同步器,并重写afterCompletion方法(此方法在事务提交后会做回调)
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {

        @Override
        public void afterCompletion(int status) {
            if (status == TransactionSynchronization.STATUS_COMMITTED) {
                // 事务提交后,再异步发送消息给kafka
                executor.submit(() -> {
                    try {
                     // 发送消息给kafka
                    } catch (Exception e) {
                     // 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
                    }
                });
            }
        }

    });

}

代码比较简单,其主要是TransactionSynchronizationManager的使用。

3.1、判断是否存在事务?TransactionSynchronizationManager.isSynchronizationActive() 方法
我们先看下这个方法的源码:

// TransactionSynchronizationManager.java类内部的部分代码

private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
   new NamedThreadLocal<>("Transaction synchronizations");

public static boolean isSynchronizationActive() {
    return (synchronizations.get() != null);
}

很明显,synchronizations是一个线程变量(ThreadLocal)。那它是在什么时候set进去的呢?

这里的话,可以参考下这个方法:org.springframework.transaction.support.TransactionSynchronizationManager#initSynchronization,其源码如下所示:

/**
  * Activate transaction synchronization for the current thread.
  * Called by a transaction manager on transaction begin.
  * @throws IllegalStateException if synchronization is already active
  */
public static void initSynchronization() throws IllegalStateException {
    if (isSynchronizationActive()) {
        throw new IllegalStateException("Cannot activate transaction synchronization - already active");
    }
    logger.trace("Initializing transaction synchronization");
    synchronizations.set(new LinkedHashSet<>());
}

由源码中的注释也可以知道,它是在事务管理器开启事务时调用的。换句话说,只要我们的程序执行到带有事务特性的方法时,就会在线程变量中放入一个LinkedHashSet,用来标识当前存在事务。只要isSynchronizationActive返回true,则代表当前有事务。

因此,结合这两个方法我们是指能解决我们最开始提出的疑问:要如何判断当前是否存在事务

3.2、如何在事务提交后触发自定义逻辑?
我们来看下这个方法的源代码:

/**
  * Register a new transaction synchronization for the current thread.
  * Typically called by resource management code.
  * <p>Note that synchronizations can implement the
  * {@link org.springframework.core.Ordered} interface.
  * They will be executed in an order according to their order value (if any).
  * @param synchronization the synchronization object to register
  * @throws IllegalStateException if transaction synchronization is not active
  * @see org.springframework.core.Ordered
  */
public static void registerSynchronization(TransactionSynchronization synchronization)
    throws IllegalStateException {

    Assert.notNull(synchronization, "TransactionSynchronization must not be null");
    if (!isSynchronizationActive()) {
        throw new IllegalStateException("Transaction synchronization is not active");
    }
    synchronizations.get().add(synchronization);
}

这里又使用到了synchronizations线程变量,我们在判断是否存在事务时,就是判断这个线程变量内部是否有值。那我们现在想在事务提交后触发自定义逻辑和这个有什么关系呢?

我们在上面构建流水落地api的伪代码中有向synchronizations内部添加了一个TransactionSynchronizationAdapter,内部并重写了afterCompletion方法,其代码如下所示:

TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {

    @Override
    public void afterCompletion(int status) {
        if (status == TransactionSynchronization.STATUS_COMMITTED) {
            // 事务提交后,再异步发送消息给kafka
            executor.submit(() -> {
                    try {
                     // 发送消息给kafka
                    } catch (Exception e) {
                     // 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
                    }
            });
        }
    }

});

我们结合registerSynchronization的源码来看,其实这段代码主要就是向线程变量内部的LinkedHashSet添加了一个对象而已,但就是这么一个操作,让Spring在事务执行的过程中变得“有事情可做”。这是什么意思呢?

是因为Spring在执行事务方法时,对于操作事务的每一个阶段都有一个回调操作,比如:trigger系列的回调

在这里插入图片描述

invoke系列的回调

在这里插入图片描述
而我们现在的需求就是在事务提交后触发自定义的函数,那就是在invokeAfterCommit和invokeAfterCompletion这两个方法来选了。首先,这两个方法都会拿到所有TransactionSynchronization的集合(其中会包括我们上述添加的TransactionSynchronizationAdapter)。

但是要注意一点:invokeAfterCommit只能拿到集合,invokeAfterCompletion除了集合还有一个int类型的参数,而这个int类型的参数其实是当前事务的一种状态。也就是说,如果我们重写了invokeAfterCompletion方法,我们除了能拿到集合外,还能拿到当前事务的状态。

因此,此时我们可以根据这个状态来做不同的事情,比如:可以在事务提交时做自定义处理,也可以在事务回滚时做自定义处理等等。

四、总结

上面有说到,我们判断当前是否存在事务、添加钩子函数都是依赖线程变量的。因此,我们在使用过程中,一定要避免切换线程。否则会出现不生效的情况。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/953791.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于微信小程序的智能停车场管理系统设计与实现(LW+源码+讲解)

专注于大学生项目实战开发,讲解,毕业答疑辅导&#xff0c;欢迎高校老师/同行前辈交流合作✌。 技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;…

接上一主题,实现QtByteArray任意进制字符串转为十进制数

函数&#xff1a; /// <summary>/// n进制字符串转为十进制数&#xff0c;snDefine的长度最小为二进制数。/// 例子&#xff1a;/// _pn(_Math::strNToInt(_t("1010"), _t("01")));/// _pn(_Math::strNToInt(_t("-1010"), _t("0123…

小游戏前端地区获取

目前前端获取除了太平洋&#xff0c;没有其它的了。 //在JS中都是使用的UTF-8&#xff0c;然而requst请求后显示GBK却是乱码&#xff0c;对传入的GBK字符串&#xff0c;要用数据流接收&#xff0c;responseType: "arraybuffer" tt.request({url: "https://whoi…

sosadmin相关命令

sosadmin命令 以下是本人翻译的官方文档&#xff0c;如有不对&#xff0c;还请指出&#xff0c;引用请标明出处。 原本有个对应表可以跳转的&#xff0c;但是CSDN的这个[](#)跳转好像不太一样&#xff0c;必须得用html标签&#xff0c;就懒得改了。 sosadmin help 用法 sosadm…

人工智能提高安全性的8种方法

人工智能提高安全性的8种方法 人工智能&#xff08;AI&#xff09;通过增强威胁检测、简化响应和加强各个领域的防御&#xff0c;正在彻底改变网络安全。根据HPE的见解&#xff0c;以下是基于AI改善安全性的八种关键方式。 ​ ​ 高级威胁检测和实时监控&#xff1a; 人工智能…

Android SDK下载安装(图文详解)

安装完sdk&#xff0c;就可以直接使用adb命令了&#xff0c;我们做app自动化测试&#xff0c;也需要sdk环境的依赖。 1. 下载Android SDK 网盘下载地址&#xff1a;https://pan.quark.cn/s/8398e52cefc9 官网下载地址&#xff1a;https://www.androiddevtools.cn/ &#xff08;…

25/1/13 嵌入式笔记 继续学习Esp32

PWM&#xff08;Pulse Width Modulation&#xff0c;脉宽调制&#xff09; 是一种通过快速切换高低电平来模拟中间电压值的技术。它广泛应用于控制 LED 亮度、电机速度、音频生成等场景。 analogWrite函数:用于在微控制器&#xff08;如 Arduino&#xff09;上生成模拟信号。 …

uniapp区域滚动——上划进行分页加载数据(详细教程)

##标题 用来总结和学习&#xff0c;便于自己查找 文章目录 一、为什么scroll-view?          1.1 区域滚动页面滚动&#xff1f;          1.2 代码&#xff1f; 二、分页功能&#xff1f;          2.1 如何实现&#xff…

【Git版本控制器--1】Git的基本操作--本地仓库

目录 初识git 本地仓库 认识工作区、暂存区、版本库 add操作与commit操作 master文件与commit id 修改文件 版本回退 撤销修改 删除文件 初识git Git 是一个分布式版本控制系统&#xff0c;主要用于跟踪文件的更改&#xff0c;特别是在软件开发中。 为什么要版本…

【STM32-学习笔记-7-】USART串口通信

文章目录 USART串口通信Ⅰ、硬件电路Ⅱ、常见的电平标准Ⅲ、串口参数及时序Ⅳ、STM32的USART简介数据帧起始位侦测数据采样波特率发生器 Ⅴ、USART函数介绍Ⅵ、USART_InitTypeDef结构体参数1、USART_BaudRate2、USART_WordLength3、USART_StopBits4、USART_Parity5、USART_Mode…

Spring MVC简单数据绑定

【图书介绍】《SpringSpring MVCMyBatis从零开始学&#xff08;视频教学版&#xff09;&#xff08;第3版&#xff09;》_springspringmvcmybatis从零开始 代码、课件、教学视频与相关软件包下载-CSDN博客 《SpringSpring MVCMyBatis从零开始学(视频教学版)&#xff08;第3版&…

初识JVM HotSopt 的发展历程

目录 导学 目前企业对程序员的基本要求 面向的对象 实战 学习目标 JVM 是什么 JVM 的三大核心功能 各大 JVM look 看一下虚拟机 HotSopt 的发展历程 总结 导学 目前企业对程序员的基本要求 面向的对象 实战 学习目标 JVM 是什么 JVM 的三大核心功能 即时编译 主要是…

【pytorch】注意力机制-1

1 注意力提示 1.1 自主性的与非自主性的注意力提示 非自主性提示&#xff1a; 可以简单地使用参数化的全连接层&#xff0c;甚至是非参数化的最大汇聚层或平均汇聚层。 自主性提示 注意力机制与全连接层或汇聚层区别开来。在注意力机制的背景下&#xff0c;自主性提示被称为查…

大数据技术Kafka详解 ⑤ | Kafka中的CAP机制

目录 1、分布式系统当中的CAP理论 1.1、CAP理论 1.2、Partitiontolerance 1.3、Consistency 1.4、Availability 2、Kafka中的CAP机制 C软件异常排查从入门到精通系列教程&#xff08;核心精品专栏&#xff0c;订阅量已达600多个&#xff0c;欢迎订阅&#xff0c;持续更新…

ESP-IDF学习记录(5) 画一块esp32-c3 PCB板

最近看了半个多月&#xff0c;趁着嘉立创官方活动&#xff0c;研究esp32-c3规格书&#xff0c;白嫖PCB 和元器件。原本计划按照官方推荐的搞个四层板&#xff0c;结果打样太贵&#xff0c;火速改成双层板&#xff0c;用了官方的券。小于10*10,也可以使用嘉立创的免费打样。 下面…

nginx 实现 正向代理、反向代理 、SSL(证书配置)、负载均衡 、虚拟域名 ,使用其他中间件监控

我们可以详细地配置 Nginx 来实现正向代理、反向代理、SSL、负载均衡和虚拟域名。同时&#xff0c;我会介绍如何使用一些中间件来监控 Nginx 的状态和性能。 1. 安装 Nginx 如果你还没有安装 Nginx&#xff0c;可以通过以下命令进行安装&#xff08;以 Ubuntu 为例&#xff0…

Netty 入门学习

前言 学习Spark源码绕不开通信&#xff0c;Spark通信是基于Netty实现的&#xff0c;所以先简单学习总结一下Netty。 Spark 通信历史 最开始: Akka Spark 1.3&#xff1a; 开始引入Netty&#xff0c;为了解决大块数据&#xff08;如Shuffle&#xff09;的传输问题 Spark 1.6&…

鸿蒙报错Init keystore failed: keystore password was incorrect

报错如下&#xff1a; > hvigor ERROR: Failed :entry:defaultSignHap... > hvigor ERROR: Tools execution failed. 01-13 16:35:55 ERROR - hap-sign-tool: error: Init keystore failed: keystore password was incorrect * Try the following: > The key stor…

IDEA的Git界面(ALT+9)log选项不显示问题小记

IDEA的Git界面ALT9 log选项不显示问题 当前问题idea中log界面什么都不显示其他选项界面正常通过命令查询git日志正常 预期效果解决办法1. 检查 IDEA 的 Git 设置2. 刷新 Git Log (什么都没有大概率是刷新不了)3. 检查分支和日志是否存在4. 清理 IDEA 缓存 (我用这个成功解决)✅…

ffmpeg硬件编码

使用FFmpeg进行硬件编码可以显著提高视频编码的性能&#xff0c;尤其是在处理高分辨率视频时。硬件编码利用GPU或其他专用硬件&#xff08;如Intel QSV、NVIDIA NVENC、AMD AMF等&#xff09;来加速编码过程。以下是使用FFmpeg进行硬件编码的详细说明和示例代码。 1. 硬件编码支…