高并发场景下分布式事务处理方案探讨及代码实现

        本文将深入探讨高并发场景下,分布式事务处理的方案。随着互联网的快速发展,对系统性能和稳定性的需求也日益增长,尤其在高并发场景下,分布式事务成为重中之重。在本文中,我将分享我对分布式事务的理论理解,并结合个别典型业务应用场景,最终通过代码实现来展示解决方案。希望通过这篇博客能够对读者在面对类似挑战时提供一些帮助和启发。

分布式事务基础理论

什么是事务

        事务指的就是一个操作单元,在这个操作单元中的所有操作最终要保持一致的行为,要么所有操作都成功,要么所有的操作都被撤销。分为两种:1)一个是本地事务:本地事物其实可以认为是数据库提供的事务机;2)一个是分布式事务;

什么是分布式事务

        指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用。分布式事务需要保证这些小操作要么全部成功,要么全部失败。本质上来说,分布式事务就是为了保证不同数据库的数据一致性。

产生的原因

        1)业务发展,数据库的拆分-分库分表;
        2)SOA和微服务架构的使用;
        3)多个微服务之间调用异常:网络异常、请求超时、数据库异常、程序宕机等;

常见分布式事务解决方案

        1)2PC 和 3PC:两/三阶段提交, 基于XA协议;
        2)TCC:Try、Confirm、Cancel;
        3)事务消息:最大努力通知型;

分布式事务分类

        1)刚性事务:遵循ACID,加锁概念,事务性比较强,强一致性;
        2)柔性事务:遵循BASE理论,最终一致性;

分布式事务框架

         1)TX-LCN:支持2PC、TCC等多种模式,更新慢(个人感觉处于停滞状态);
                github:https://github.com/codingapi/tx-lcn
        2)Seata(免费版):支持 AT、TCC、SAGA 和 XA 多种模式,背靠阿里,专门团队推广;
                github:https://github.com/seata/seata
              GTS(商业版): 阿里云商业化产品
                官网:https://www.aliyun.com/aliware/txc
        3)RocketMq:自带事务消息解决分布式事务
                github:https://github.com/apache/rocketmq

最终一致性

        

什么是Base理论

        CAP 中的一致性和可用性进行一个权衡的结果,核心思想就是:我们无法做到强一致,但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性, 来自 ebay 的架构师提出。

基本可用(Basically Available)

        假设系统,出现了不可预知的故障,但还是能用, 可能会有性能或者功能上的影响,比如RT是10ms,变成50ms。

软状态(Soft state)

        允许系统中的数据存在中间状态,并认为该状态不影响系统的整体可用性,即允许系统在多个不同节点的数据副本存在数据延时。

最终一致性(Eventually consistent)

        系统能够保证在没有其他新的更新操作的情况下,数据最终一定能够达到一致的状态,因此所有客户端对系统的数据访问最终都能够获取到最新的值。

关于数据一致性

        1)强一致:操作后的能立马一致且可以访问;
        2)弱一致:容忍部分或者全部访问不到;
        3)最终一致:弱一致性经过多一段时间后,都一致且正常;

CAP权衡的结果

事务消息

        消息队列提供类似Open XA的分布式事务功能,通过消息队列事务消息能达到分布式事务的最终一致;

半事务消息

        暂不能投递的消息,发送方已经成功地将消息发送到了消息队列服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。

消息回查

        由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。


交互图

(来源rocketmq官方文档) 

        目前较为主流的MQ,比如ActiveMQ、RabbitMQ、Kafka、RocketMQ等,只有RocketMQ支持事务消息,如果其他队列需要事务消息,可以开发个消息服务,自行实现半消息和回查功能。事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性,同时将传统的大事务可以被拆分为小事务,能提升效率,不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。

        缺点:不能实时保证数据一致性,极端情况下需要人工补偿,比如 假如生产者成功处理本地业务,消费者始终消费不成功。

高并发下分布式事务案例

        高并发场景下如果通过加锁这种强一致性分布式事务来保证数据一致性,那么其性能将大大降低,这种强一致性的事务更适合于常规的管理后台和并发量不高的场景。这里以短链创建,流量包扣减的业务场景为例,设计其高并发分布式事务可行性方案,其方案理论正式借助于前面所提到的Base理论,确保其最终一致性。

应用场景

        短链创建失败,但是流量包已经扣减,怎么解决分布式事务问题?

        解决方案 1 :
                1)流量包服务扣减库存前保存一个task任务,记录扣减的流量包;
                2)使用定时任务定时扫描task任务表,允许一段时间不同步,但需确保最终一致。如果一段时间过后检查发现短链未创建,则回滚流量包; 

        解决方案 2:
         采用延迟队列方式,调用扣减流量包服务,在执行扣减前先将扣减消息放入task任务,并发送一个延迟消息,待一定时间后回查短链写入状态,如写入异常则回滚当天流量包。

        解决方案 3:
        方案1与方案2的结合,分布式调度为兜底方案。

代码实现

 流量包锁定任务表

CREATE TABLE `traffic_task` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT,
  `account_no` bigint DEFAULT NULL,
  `traffic_id` bigint DEFAULT NULL,
  `use_times` int DEFAULT NULL,
  `lock_state` varchar(16) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '锁定状态锁定LOCK  完成FINISH-取消CANCEL',
  `biz_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '唯一标识',
  `gmt_create` datetime DEFAULT CURRENT_TIMESTAMP,
  `gmt_modified` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_biz_id` (`biz_id`) USING BTREE,
  KEY `idx_release` (`account_no`,`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

 Lock枚举类

public enum TaskStateEnum {
    /**
     * 锁定
     */
    LOCK,
    /**
     * 完成
     */
    FINISH,
    /**
     * 取消,释放库存
     */
    CANCEL;
}

 流量包任务相关Manage

@Component
@Slf4j
public class TrafficTaskManagerImpl implements TrafficTaskManager {

    @Autowired
    private TrafficTaskMapper trafficTaskMapper;

    @Override
    public int add(TrafficTaskDO trafficTaskDO) {
        return trafficTaskMapper.insert(trafficTaskDO);
    }

    @Override
    public TrafficTaskDO findByIdAndAccountNo(Long id, Long accountNo) {
        TrafficTaskDO taskDO = trafficTaskMapper.selectOne(new QueryWrapper<TrafficTaskDO>()
                .eq("id", id).eq("account_no", accountNo));
        return taskDO;
    }

    @Override
    public int deleteByIdAndAccountNo(Long id, Long accountNo) {
        return trafficTaskMapper.delete(new QueryWrapper<TrafficTaskDO>()
                .eq("id", id).eq("account_no", accountNo));
    }
}

死信队列相关配置

//================流量包扣减,创建短链死信队列配置==================================
// 发送锁定流量包消息-》延迟exchange-》lock.queue-》死信exchange-》release.queue 延迟队列,不能被监听消费
    /**
     * 第一个队列延迟队列,
     */
    private String trafficReleaseDelayQueue = "traffic.release.delay.queue";

    /**
     * 第一个队列的路由key
     * 进入队列的路由key
     */
    private String trafficReleaseDelayRoutingKey = "traffic.release.delay.routing.key";


    /**
     * 第二个队列,被监听恢复流量包的队列
     */
    private String trafficReleaseQueue = "traffic.release.queue";

    /**
     * 第二个队列的路由key
     *
     * 即进入死信队列的路由key
     */
    private String trafficReleaseRoutingKey="traffic.release.routing.key";

    /**
     * 过期时间,毫秒,1分钟
     */
    private Integer ttl = 60000;



    /**
     * 延迟队列
     */
    @Bean
    public Queue trafficReleaseDelayQueue(){

        Map<String,Object> args = new HashMap<>(3);
        args.put("x-message-ttl",ttl);
        args.put("x-dead-letter-exchange", trafficEventExchange);
        args.put("x-dead-letter-routing-key",trafficReleaseRoutingKey);

        return new Queue(trafficReleaseDelayQueue,true,false,false,args);
    }


    /**
     * 死信队列,普通队列,用于被监听
     */
    @Bean
    public Queue trafficReleaseQueue(){

        return new Queue(trafficReleaseQueue,true,false,false);

    }


    /**
     * 第一个队列,即延迟队列的绑定关系建立
     * @return
     */
    @Bean
    public Binding trafficReleaseDelayBinding(){

        return new Binding(trafficReleaseDelayQueue,Binding.DestinationType.QUEUE, trafficEventExchange,trafficReleaseDelayRoutingKey,null);
    }

    /**
     * 死信队列绑定关系建立
     * @return
     */
    @Bean
    public Binding trafficReleaseBinding(){

        return new Binding(trafficReleaseQueue,Binding.DestinationType.QUEUE, trafficEventExchange,trafficReleaseRoutingKey,null);
    }

保存Task表

//先更新,再扣减当前使用的流量包
int rows = trafficManager.addDayUsedTimes(accountNo,useTrafficVO.getCurrentTrafficDO().getId(),1);

TrafficTaskDO trafficTaskDO = TrafficTaskDO.builder().accountNo(accountNo)
                .bizId(trafficRequest.getBizId())
                .useTimes(1).trafficId(useTrafficVO.getCurrentTrafficDO().getId())
                .lockState(TaskStateEnum.LOCK.name()).build();

trafficTaskManager.add(trafficTaskDO);

 发送Mq消息

EventMessage usedTrafficEventMessage = EventMessage.builder()
                .accountNo(accountNo)
                .bizId(trafficTaskDO.getId() + "")
                .eventMessageType(EventMessageType.TRAFFIC_USED.name())
                .build();
//发送延迟信息,用于异常回滚,数据最终一致性
rabbitTemplate.convertAndSend(rabbitMQConfig.getTrafficEventExchange(),
                rabbitMQConfig.getTrafficReleaseDelayRoutingKey(), usedTrafficEventMessage);

return JsonData.buildSuccess();

 检查短链创建情况

@FeignClient(name = "link-service")
public interface ShortLinkFeignService {

    /**
     * 检查短链是否存在
     *
     * @param shortLinkCode 短链码
     * @return
     */
    @GetMapping(value = "/api/link/v1/check", headers = {"rpc-token=${rpc.token}"})
    JsonData simpleDetail(@RequestParam("shortLinkCode") String shortLinkCode);
}



    @Value("${rpc.token}")
    private String rpcToken;

    /**
     * rpc调用获取短链信息
     *
     * @return
     */
    @GetMapping("check")
    public JsonData simpleDetail(@RequestParam("shortLinkCode") String shortLinkCode, HttpServletRequest request) {
        String requestToken = request.getHeader("rpc-token");
        if (rpcToken.equalsIgnoreCase(requestToken)) {
            ShortLinkVO shortLinkVO = shortLinkService.parseShortLinkCode(shortLinkCode);
            return shortLinkVO == null ? JsonData.buildError("不存在") : JsonData.buildSuccess();
        } else {
            return JsonData.buildError("非法访问");
        }

    }

 延迟队列消费,回查短链是否创建状态,不成功则恢复流量包

else if(EventMessageType.TRAFFIC_USED.name().equalsIgnoreCase(messageType)){
            //流量包使用,检查是否成功使用
            //检查task是否存在
            //检查短链是否成功
            //如果不成功则恢复流量包,删除缓存key
            //删除task(也可以更新状态,定时删除也行)

            Long trafficTaskId = Long.valueOf(eventMessage.getBizId());
            TrafficTaskDO trafficTaskDO = trafficTaskManager.findByIdAndAccountNo(trafficTaskId, accountNo);

            //非空 且 是锁定状态
            if(trafficTaskDO!=null && trafficTaskDO.getLockState()
                    .equalsIgnoreCase(TaskStateEnum.LOCK.name())){

                JsonData jsonData = shortLinkFeignService.check(trafficTaskDO.getBizId());
                if(jsonData.getCode()!=0){
                    log.error("创建短链失败,流量包回滚");
trafficManager.releaseUsedTimes(accountNo,trafficTaskDO.getTrafficId(),trafficTaskDO.getUseTimes());
           }
                trafficTaskManager.deleteByIdAndAccountNo(trafficTaskId,accountNo);
            }

        }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/516500.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

多线程重点知识(个人整理笔记)

目录 1. java 多线程 1.1. 什么是进程?什么是线程? 1.1.1. 进程 1.1.2. 线程 1.1.3. 多线程 2. 并行和并发有什么区别&#xff1f; 3. 守护线程是什么&#xff1f; 4. 创建线程有哪几种方式&#xff1f; 4.1. 线程的常见成员方法 5. 线程安全问题 5.1. synchronize…

39.基于SpringBoot + Vue实现的前后端分离-无人智慧超市管理系统(项目 + 论文PPT)

项目介绍 随着互联网时代的发展&#xff0c;传统的线下管理技术已无法高效、便捷的管理信息。为了迎合时代需求&#xff0c;优化管理效率&#xff0c;各种各样的管理系统应运而生&#xff0c;国家在环境要求不断提高的前提下&#xff0c;无人智慧超市管理系统建设也逐渐进入了信…

Spring Boot | Spring Boot的“数据访问“、Spring Boot“整合MyBatis“

目录: 一、Spring Boot”数据访问概述“二、Spring Boot”整合MyBatis”1. 基础环境搭建 (引入对应的“依赖启动器” 配置数据库的“相关参数”)① 数据准备 (导入Sql文件)② 创建项目&#xff0c;引入相应的启动器&#xff0c;编写数据库对应的“实体类”③额外添加pom.xml文…

尚硅谷50道Java面试题笔记 写的不全

b站链接&#xff1a;https://www.bilibili.com/video/BV1Bb411d7SL/?p4&vd_source714a8042f058b82c668750a0930ff9b0 1 mysql使用innodb引擎&#xff0c;请简述mysql索引的最左前缀如何优化orderby语句。 关键点&#xff1a; 如果排序字段不在索引列上&#xff0c;file…

Filter Listener Interceptor

文章目录 第一章 Filter1. 目标2. 内容讲解2.1 Filter的概念2.2 Filter的作用2.3 Filter的入门案例2.3.1 案例目标2.3.2 代码实现2.3.2.1 创建ServletDemo012.3.2.2 创建EncodingFilter 2.4 Filter的生命周期2.4.1 回顾Servlet生命周期2.4.1.1 Servlet的创建时机2.4.1.2 Servle…

趣学前端 | 类,我想好好继承它的知识点

背景 最近睡前习惯翻会书&#xff0c;重温了《JavaScript权威指南》。这本书&#xff0c;文字小&#xff0c;内容多。两年了&#xff0c;我才翻到第十章。因为书太厚&#xff0c;平时都充当电脑支架。 JavaScript 类 话说当年类、原型、继承&#xff0c;差点给我绕晕。 在J…

Excel、PowerQuery 和 ChatGPT 终极手册(下)

原文&#xff1a;Ultimate ChatGPT Handbook for Enterprises 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 使用 SUMIFS、SUMPRODUCT、AGGREGATE 和 MAX 函数查找数值数据 其中之一鲜为人知的事实是&#xff0c;当查找单个数值时&#xff0c;匹配和三角函数可能比查…

软考--软件设计师(软件工程总结1)

目录 1.定义 2.软件生存周期 3.软件过程&#xff08;即软件开发中遵循的一系列可预测的步骤&#xff09; ​编辑4.软件开发模型 5.需求分析&#xff08;软件需求分析&#xff0c;系统需求分析或需求分析工程&#xff09; 6. 需求工程 7.系统设计 8.系统测试 1.定义 软件…

Android Studio学习9——使用Logcat打印日志

在Android开发中&#xff0c;Logcat是一个工具&#xff0c;它允许开发者查看设备或模拟器的日志信息。开发者可以使用Log类来打印日志信息&#xff0c;这对于调试和错误排查非常有帮助。 v 或 verbose: 最低等级&#xff0c;显示所有消息。d 或 debug: 用于调试消息。i 或 info…

在集群中使用deepspeed如果端口被占用可以使用deepspeed参数更改

在集群中使用deepspeed如果端口被占用可以使用deepspeed参数更改 这一次G老师不好使了 在集群中使用deepspeed默认的端口号29500被占用&#xff0c;显示更改居然不起作用 G老师给的方法也不好使 #!/bin/bash MASTER_ADDRlocalhost MASTER_PORT29501 # 选择一个未被占用的端…

Qt | 发布程序(以 minGW 编译器为例)

1、注意:修改 pro 文件后,最好执行“构建”>“重新构建项目”,否则 pro 文件的更改将不会反应到程序上。 2、发布程序的目的:就是让编译后生成的可执行文件(如 exe 文件),能在其他计算机上运行。 一、编译后生成的各种文件简介 Qt Creator 构建项目后产生的文件及目录…

SCI一区 | Matlab实现NGO-TCN-BiGRU-Attention北方苍鹰算法优化时间卷积双向门控循环单元融合注意力机制多变量时间序列预测

SCI一区 | Matlab实现NGO-TCN-BiGRU-Attention北方苍鹰算法优化时间卷积双向门控循环单元融合注意力机制多变量时间序列预测 目录 SCI一区 | Matlab实现NGO-TCN-BiGRU-Attention北方苍鹰算法优化时间卷积双向门控循环单元融合注意力机制多变量时间序列预测预测效果基本介绍模型…

鸿蒙原OS开发实例:【ArkTS类库单次I/O任务开发】

Promise和async/await提供异步并发能力&#xff0c;适用于单次I/O任务的场景开发&#xff0c;本文以使用异步进行单次文件写入为例来提供指导。 实现单次I/O任务逻辑。 import fs from ohos.file.fs; import common from ohos.app.ability.common;async function write(data:…

文心一言指令词宝典之生活篇

作者&#xff1a;哈哥撩编程&#xff08;视频号、抖音、公众号同名&#xff09; 新星计划全栈领域优秀创作者博客专家全国博客之星第四名超级个体COC上海社区主理人特约讲师谷歌亚马逊演讲嘉宾科技博主极星会首批签约作者 &#x1f3c6; 推荐专栏&#xff1a; &#x1f3c5;…

前端三剑客 —— CSS (第一节)

目录 CSS 什么是CSS CSS的几种写法&#xff1a; 行内样式 内嵌样式 外链样式 import 加载顺序 CSS选择器*** 基本选择器 ID选择器 标签选择器 类选择器 通用选择器 包含选择器 上节内容中提到了 前端三剑客 —— HTML 超文本标记语言&#xff0c;这节内容 跟大家…

基于注意力整合的超声图像分割信息在乳腺肿瘤分类中的应用

基于注意力整合的超声图像分割信息在乳腺肿瘤分类中的应用 摘要引言方法 Segmentation information with attention integration for classification of breast tumor in ultrasound image 摘要 乳腺癌是世界范围内女性最常见的癌症之一。基于超声成像的计算机辅助诊断&#x…

Day80:服务攻防-中间件安全HW2023-WPS分析WeblogicJettyJenkinsCVE

目录 中间件-Jetty-CVE&信息泄漏 CVE-2021-34429(信息泄露) CVE-2021-28169(信息泄露) 中间件-Jenkins-CVE&RCE执行 cve_2017_1000353 CVE-2018-1000861 cve_2019_1003000 中间件-Weblogic-CVE&反序列化&RCE 应用金山WPS-HW2023-RCE&复现&上线…

每日一题(leetcode75):颜色分类-双指针

采用双指针法&#xff0c;p0维护数字0&#xff0c;p1维护数字1。遇到1时&#xff0c;交换并且p1加1&#xff0c;遇到0时&#xff0c;交换并观察0的指针是不是小于1的指针&#xff0c;如果小于&#xff0c;那么之前0指针指向的数据1会被交换出去&#xff0c;所以要进一步进行交换…

全栈的自我修养 ———— react中router入门+路由懒加载

router 下载router配置view创建目录配置index.js 下载router npm install react-router-dom配置view 如下将组件倒出 const Login () > {return <div>这是登陆</div> } export default Login创建目录 配置index.js React.lazy有路由懒加载的功能&#xff0…

MySQL 表的增删改查

文章目录 一、什么是CRUD&#xff1f;二、新增&#xff08;Create&#xff09;1、单行数据 全列插入2、多行数据 指定列插入3、插入特殊类型 三、查询&#xff08;Retrieve&#xff09;1、全列查询2、指定列查询3、表达式查询4、指定别名5、去重6、排序7、条件查询基本查询&a…