多线程事务

一、业务场景

        我们在工作中经常会到往数据库里插入大量数据的工作,但是既需要保证数据的一致性,又要保证程序执行的效率。因此需要在多线程中使用事务,这样既可以保证数据的一致性,又能保证程序的执行效率。但是spring自带的@Transactional注解无法满足多线程间的事务一致性,因为这几个事务执行的线程不同,无法保持数据的一致性。

二、解决方案

        我的解决方案参考分布式事务2PC(Two-phase commit protocol),各个线程需要等待所有的线程执行完成后才能进行下一步操作,在使用线程池执行任务时,如果线程池的最大线程数小于任务列表的数量,就会发生“死锁”,即获取到线程的任务阻塞等待没有获取线程的任务执行完成,而没有获取线程的任务会在阻塞队列中等待空闲线程的调用。这种情况需要使用一阶段的超时机制来“解开”,超时机制会发送回滚命令,线程池收到后进行回滚,但这种情况任务始终无法提交,再次提交结果依然是等到超时再回滚。再使用中需要结合具体业务来对线程池参数以及数据库连接池参数进行合理的设置。如果这里听的优点迷,可以先看下面具体代码实现再来结合这段文字思考。

 

1、工具类代码: 

import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author poxiao
 * @create 2023-01-05 22:22
 * <p>
 * 多线程事务管理器
 * 基于分布式事务思想,采用2PC(Two-phase commit protocol)协议
 * 解决基于线程池的多线程事务一致性问题
 */
@Slf4j
public class MultiThreadingTransactionManager {

    /**
     * 事务管理器
     */
    private final PlatformTransactionManager transactionManager;

    /**
     * 超时时间
     */
    private final long timeout;

    /**
     * 时间单位
     */
    private final TimeUnit unit;

    /**
     * 一阶段门闩,(第一阶段的准备阶段),当所有子线程准备完成时(除“提交/回滚”操作以外的工作都完成),countDownLatch的值为0
     */
    private CountDownLatch oneStageLatch = null;

    /**
     * 二阶段门闩,(第二阶段的执行执行),主线程将不再等待子线程执行,直接判定总的任务执行失败,执行第二阶段让等待确认的线程进行回滚
     */
    private final CountDownLatch twoStageLatch = new CountDownLatch(1);

    /**
     * 是否提交事务,默认是true(当任一线程发生异常时,isSubmit会被设置为false,即回滚事务)
     */
    private final AtomicBoolean isSubmit = new AtomicBoolean(true);

    /**
     * 构造方法
     * @param transactionManager 事务管理器
     * @param timeout 超时时间
     * @param unit 时间单位
     */
    public MultiThreadingTransactionManager(PlatformTransactionManager transactionManager, long timeout, TimeUnit unit) {
        this.transactionManager = transactionManager;
        this.timeout = timeout;
        this.unit = unit;
    }

    /**
     * 线程池方式执行任务,可保证线程间的事务一致性
     * @param runnableList 任务列表
     * @param executor 线程池
     * @return
     */
    public boolean execute(List<Runnable> runnableList, Executor executor) {

        // 排除null值
        runnableList.removeAll(Collections.singleton(null));

        // 属性初始化
        innit(runnableList.size());

        // 遍历任务列表并放入线程池
        for (Runnable runnable : runnableList) {
            // 创建线程
            Thread thread = new Thread() {
                @Override
                public void run() {
                    // 如果别的线程执行失败,则该任务就不需要再执行了
                    if (!isSubmit.get()) {
                        log.info("当前子线程执行中止,因为线程事务中有子线程执行失败");
                        oneStageLatch.countDown();
                        return;
                    }
                    // 开启事务
                    TransactionStatus transactionStatus = transactionManager.getTransaction(new DefaultTransactionDefinition());
                    try {
                        // 执行业务逻辑
                        runnable.run();
                    } catch (Exception e) {
                        // 执行体发生异常,设置回滚
                        isSubmit.set(false);
                        log.error("线程{}:业务发生异常,执行体:{}", Thread.currentThread().getName(), runnable);
                    }
                    // 计数器减一
                    oneStageLatch.countDown();
                    try {
                        //等待所有线程任务完成,监控是否有异常,有则统一回滚
                        twoStageLatch.await();
                        // 根据isSubmit值判断事务是否提交,可能是子线程出现异常,也有可能是子线程执行超时
                        if (isSubmit.get()) {
                            // 提交
                            transactionManager.commit(transactionStatus);
                            log.info("线程{}:事务提交成功,执行体:{}", Thread.currentThread().getName(), runnable);
                        } else {
                            // 回滚
                            transactionManager.rollback(transactionStatus);
                            log.info("线程{}:事务回滚成功,执行体:{}", Thread.currentThread().getName(), runnable);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            executor.execute(thread);
        }

        /**
         * 主线程担任协调者,当第一阶段所有参与者准备完成,oneStageLatch的计数为0
         * 主线程发起第二阶段,执行阶段(提交或回滚),根据
         */
        try {
            // 主线程等待所有线程执行完成,超时时间设置为五秒
            oneStageLatch.await(timeout, unit);
            long count = oneStageLatch.getCount();
            System.out.println("countDownLatch值:" + count);
            // 主线程等待超时,子线程可能发生长时间阻塞,死锁
            if (count > 0) {
                // 设置为回滚
                isSubmit.set(false);
                log.info("主线线程等待超时,任务即将全部回滚");
            }
            twoStageLatch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 返回结果,是否执行成功,事务提交即为执行成功,事务回滚即为执行失败
        return isSubmit.get();
    }

    /**
     * 初始化属性
     * @param size 任务数量
     */
    private void innit(int size) {
        oneStageLatch = new CountDownLatch(size);
    }
}

2、业务代码:

(1)线程池参数

我这里采用自定义线程池,线程池参数如下:

@Configuration
public class ThreadPoolConfig {
    // 获取服务器的cpu个数
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();// 获取cpu个数
    private static final int COUR_SIZE = CPU_COUNT * 4;
    private static final int MAX_COUR_SIZE = CPU_COUNT * 8;

    // 接下来配置一个bean,配置线程池。
    @Bean
    public Executor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(COUR_SIZE);// 设置核心线程数
        threadPoolTaskExecutor.setMaxPoolSize(MAX_COUR_SIZE);// 配置最大线程数
        threadPoolTaskExecutor.setQueueCapacity(MAX_COUR_SIZE * 4);// 配置队列容量(这里设置成最大线程数的四倍)
        threadPoolTaskExecutor.setThreadNamePrefix("thirdParty-thread");// 给线程池设置名称
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 设置任务的拒绝策略
        return threadPoolTaskExecutor;
    }

}

(2)任务业务正常,无异常抛出时正常提交事务情况

public Result<?> testTransaction() throws SQLException {

        List<User> users = new LinkedList<>();
        User user = new User();
        user.setName("1111");
        users.add(user);
        User user1 = new User();
        user1.setName("2222");
        users.add(user1);
        MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);
        List<Runnable> runnableList = new ArrayList<>();
        users.forEach((x) -> {
            runnableList.add(new Runnable() {
                @Override
                public void run() {
                    System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);
                    secondUserMapper.insertUser(x);
                }
            });
        });
        multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);

        return Result.success(1);
    }

执行时的日志: 

执行成功后数据库多次了两条数据 

 (3)展示出现异常任务时回滚事务情况

 public Result<?> testTransaction() throws SQLException {

        List<User> users = new LinkedList<>();
        User user = new User();
        user.setName("1111");
        users.add(user);
        User user1 = new User();
        user1.setName("2222");
        users.add(user1);
        MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);
        List<Runnable> runnableList = new ArrayList<>();
         //模拟任务出现异常
         runnableList.add(() -> {
             int a = 10 / 0;
         });
        users.forEach((x) -> {
            runnableList.add(new Runnable() {
                @Override
                public void run() {
                    System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);
                    secondUserMapper.insertUser(x);
                }
            });
        });
        multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);

        return Result.success(1);
    }

执行时的日志:  

数据库没有新增的数据 

 

参考文章: 

Spring多线程事务解决方案-CSDN博客

 两阶段VS三阶段提交协议_两阶段提交-CSDN博客

详解Spring多线程下如何保证事务的一致性-51CTO.COM 

多线程结合sprongboot事务(完善)_springboot多线程事务-CSDN博客 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/642919.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【一个糟糕的词:省流】

今日思考&#xff0c;博主分享&#x1f4dd;&#xff0c;原文如下&#xff0c; 我最近听到了一个特别糟糕的词叫省流。我甚至认为这个词可以用来衡量一个人的智商啊&#xff0c;我们可以把一个知识简单的分成三部分问题&#xff0c;答案思维方式就是这个答案是怎么推导出来的啊…

【FPGA】VGA显示文字、彩条、图片——基于DE2-115

文章目录 前言一、VGA概述1.1 简述1.2 管脚定义1.3 VGA显示原理1.4 VGA时序标准1.5 VGA 显示模式及相关参数 二、VGA显示自定义的汉字字符2.1 点阵汉字生成2.2 生成BMP文件2.3 生成txt文件2.4 实现效果 三、VGA显示条纹3.1 实现流程3.2 实现效果 四、VGA输出一幅彩色图像4.1 bm…

代码随想录——找树左下角的值(Leetcode513)

题目链接 层序遍历 思路&#xff1a;使用层序遍历&#xff0c;记录每一行 i 0 的元素&#xff0c;就可以找到树左下角的值 /*** Definition for a binary tree node.* public class TreeNode {* int val;* TreeNode left;* TreeNode right;* TreeNode() {}*…

基于SSH的母婴用品销售管理系统带万字文档

文章目录 母婴商城系统一、项目演示二、项目介绍三、系统部分功能截图四、万字论文参考五、部分代码展示六、底部获取项目源码和万字论文参考&#xff08;9.9&#xffe5;带走&#xff09; 母婴商城系统 一、项目演示 母婴商城系统 二、项目介绍 基于SSH的母婴商城系统 系统…

了解K8s集群kubectl命令进行陈述式资源管理

前言 在 Kubernetes 集群中&#xff0c;通过陈述式和声明式资源管理是确保应用程序高效运行的关键。认识这两种管理方法&#xff0c;能够更好地掌握 Kubernetes 集群的运维和管理。 目录 一、K8s 资源管理操作分类 1. 陈述式 2. 声明式 3. K8s 集群管理常用命令概览 二…

lenovo联想小新Pro 16 APH8 2023款(83AR)笔记本电脑原装出厂Windows11系统镜像安装包下载

恢复出厂开箱状态OEM预装win11系统&#xff0c;自带恢复重置初始化还原功能 下载链接&#xff1a;https://pan.baidu.com/s/1n_mPM4ZrLPrmXpCTukuyCQ?pwdmnwj 提取码&#xff1a;mnwj 联想原装系统自带所有驱动、出厂主题壁纸、系统属性联机支持标志、Office办公软件、联想…

C++成员函数 - 析构函数

析构函数 析构函数 是特殊的成员函数&#xff0c;其 特征 如下&#xff1a; 1. 析构函数名是在类名前加上字符 ~ 。 2. 无参数无返回值类型。 3. 一个类只能有一个析构函数。若未显式定义&#xff0c;系统会自动生成默认的析构函数。注意&#xff1a;析构函数不能重 载 …

OpenHarmony 实战开发——一文总结ACE代码框架

一、前言 ACE_Engine框架是OpenAtom OpenHarmony&#xff08;简称“OpenHarmony”&#xff09;的UI开发框架&#xff0c;为开发者提供在进行应用UI开发时所必需的各种组件&#xff0c;以及定义这些组件的属性、样式、事件及方法&#xff0c;通过这些组件可以方便进行OpenHarmo…

AI大模型探索之路-基础篇5:GLM-4解锁国产大模型的全能智慧与创新应用

目录 前言一、GLM4大模型总体概述二、GLM4和GPT4功能对比三、GLM4和GPT4性能对比1、基础能力&#xff08;英文&#xff09;2、指令跟随能力3、对齐能力4、长文本能力5、多模态-文生图 四、GLM-4 ALL Tools1、文生图2、代码解释器3、网页浏览4、Function Call5、多工具自动调用 …

FTP协议——BFTPD安装(Linux)

1、简介 BFTPD&#xff0c;全称为 Brutal File Transfer Protocol Daemon&#xff0c;是一个用于Unix和类Unix系统的轻量级FTP服务器软件。它的设计理念是提供一个简单、快速、安全的FTP服务器解决方案&#xff0c;特别适用于需要低资源占用的环境。 2、步骤 环境&#xff1…

【介绍下Pwn,什么是Pwn?】

&#x1f308;个人主页: 程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…

第十课,while循环

一&#xff0c;认识循环是什么 循环普遍存在于日常生活中&#xff0c;同样&#xff0c;在程序中&#xff0c;循环功能也是至关重要的基础功能。 当程序需要重复执行某一段代码&#xff0c;利用循环可以轻松完成工作 例如我要你打印100次上课&#xff0c;直接写100次print&…

javas-core VS java-object-diff

对照工具选择 javas-core 和 java-object-diff ,对比demo https://github.com/kofgame/objectdiff-vs-javers&#xff0c;都为同源对比&#xff0c;都支持嵌套对象。 使用JMH测试方法进行性能测试&#xff0c;使用题库的QuestionResponseVO对象来进行对照对比&#xff0c;进行…

可重构柔性装配产线,为智能制造领域带来了新的革命性变革

随着科技的飞速发展&#xff0c;个性化需求逐渐成为市场的主导。在这个充满变革的时代&#xff0c;制造业正面临着前所未有的挑战和机遇。如何快速响应市场需求、提高生产效率、保证产品质量&#xff0c;成为每一家制造企业必须思考的问题。 在这样的背景下&#xff0c;富唯智…

竞赛 基于深度学习的动物识别 - 卷积神经网络 机器视觉 图像识别

文章目录 0 前言1 背景2 算法原理2.1 动物识别方法概况2.2 常用的网络模型2.2.1 B-CNN2.2.2 SSD 3 SSD动物目标检测流程4 实现效果5 部分相关代码5.1 数据预处理5.2 构建卷积神经网络5.3 tensorflow计算图可视化5.4 网络模型训练5.5 对猫狗图像进行2分类 6 最后 0 前言 &#…

大模型应用商业化落地关键:给企业带来真实的业务价值

2024 年被很多人称为大模型应用的元年&#xff0c;毫无疑问&#xff0c;大模型已经成为共识&#xff0c;下一步更急迫的问题也摆在了大家的面前——大模型到底能够用在哪&#xff1f;有哪些场景能落地&#xff1f;怎么做才能创造真正的价值&#xff1f; 在刚刚过去的 AICon 全…

分布式版本控制工具 git

git 是什么 分布式版本控制工具。github 是代码托管平台。 git 有什么用 保存文件的所有修改记录。使用版本号&#xff08;sha1 哈希值&#xff09; 进行区分。随时可浏览历史版本记录。可还原到历史指定版本。对比不同版本的文件差异。 为什么要使用 git 多人协作开发一个大…

C# 结合 JS 暴改腾讯 IM SDK Demo

目录 关于腾讯 IM SDK Demo 范例运行环境 设计思路 服务端生成地址 IM 服务端接收 IM 客户端程序 小结 关于腾讯 IM SDK Demo 腾讯云即时通信 IM SDK 提供了单聊、群聊、关系链、消息漫游、群组管理、资料管理、直播弹幕等功能&#xff0c;并提供完备的 App 接入及管…

【STM32CubeIDE】软件硬件SPI+六针OLED使用

前言 本文将介绍STM32 6针OLED的使用&#xff0c;分别使用软件和硬件两种SPI驱动方式&#xff0c;最终实现OLED显示TEST-ok字符和数字累加刷新显示 软件平台&#xff1a;STM32CubeIDEHAL库 硬件&#xff1a;STM32F103ZET6(正点原子战舰V3)六针OLED 题外话&#xff1a; 最…

开源博客项目Blog .NET Core源码学习(26:App.Hosting项目结构分析-14)

后台管理页面的系统管理下主要包括用户管理、角色管理、按钮管理和菜单管理&#xff0c;其中创建用户时要指定角色&#xff0c;创建角色时需指定菜单权限&#xff0c;按钮管理也是基于各菜单项进行设置&#xff0c;只有菜单管理相对独立&#xff0c;因此本文学习并分析App.Host…