分布式事务AP控制方案(上)

分布式事务控制方案

本篇文章给出一种要求高可用性(AP思想)的分布式事务控制方案

下篇新鲜出炉:点我查看

  • 分布式事务控制方案
  • 1、业务背景
  • 2、本地消息表的设计
  • 3、对消息表的操作
  • 4、任务调度
  • 5、任务流程控制的抽象类
  • 6、课程发布的实现类
  • 7、总结

1、业务背景

业务背景:在线学习平台,教学机构在上传课程时,需要将课程内容同步到数据库,缓存,文件系统,搜索系统,这里需要用到分布式事务,来确保四个组件的业务顺利完成。

CAP理论中,分布式系统只能满足一致性C、可用性A、分区容错性P三者中的两个,由于分布式系统天然要求分区容错,否则就是单体项目,所以只能选CP或AP

其中CP可以使用Seata框架基于AT和TCC模式去实现,AP也有多种实现方式。

我们的业务背景中这四个组件并不要求强一致性,而是要求高可用性,如果其中某个组件没有完成数据同步,那之前已经完成的组件不必回退到事务开始前的状态,所以我们实现AP思想,采用本地消息表+任务调度完成最终一致性

具体的项目环境:SpringBoot框架,数据库MySQL,使用MyBatis-Plus快速开发,缓存Redis,分布式文件系统MinIO,搜索系统ES

2、本地消息表的设计

在业务背景中,如果用户要进行课程发布,我们向MySQL中的消息表里插入一条记录,记录中应当包含四个组件(MySQL,Redis,MinIO,ES)完成的状态,如果四个组件全部完成,就删除这条记录,向历史消息表中插入一条记录,如果四个组件有哪个没有完成,通过查询记录就可以从未完成的地方重新进行数据同步,从而实现最终一致性。

我们除了课程这个业务场景,还会在其他业务场景执行相似的业务,所以我们要考虑如何进行代码的抽象、封装和复用。

我们发现在消息表中没有必要注明每个具体的组件,而是通过小任务一,小任务二…的方式设计数据表,具体的业务逻辑由具体的业务代码实现,而具体的业务代码通过继承抽象的类,来实现对数据库的控制。

设计一个抽象的类,这个类应当实现对数据表的处理,并提供一个接口,让具体的业务代码实现这个接口。

3、对消息表的操作

首先创建数据库和数据表,表中字段包括业务相关字段(消息类型代码,关联业务信息、代码等等),小任务的状态、上一次成功失败时间、重试次数(暂定五个小任务,提高适用性)。

其次创建一个微服务模块,添加MyBatis-Plus依赖和配置

<!-- MySQL 驱动 -->
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<scope>runtime</scope>
</dependency>
<!-- mybatis plus的依赖 -->
<dependency>
	<groupId>com.baomidou</groupId>
	<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>

spring:
  application:
    name: service
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/table?serverTimezone=UTC&userUnicode=true&useSSL=false&
    username: xxxx
    password: xxxx

实现DAO和service层开发
在这里插入图片描述

在实现流程控制的抽象类之前我们要思考一个问题,具体的业务代码在实现了任务后,如何开启调用任务的执行?

首先肯定不是在发布课程的方法中直接调用方法,这是同步调用,并且只能执行一次,不适合AP思想的分布式事务。对于数据同步实时性要求不高的技术解决方案有很多,例如MQ、Canal、Logstash、任务调度等等

我们可以在插入数据表后,向消息队列添加一条消息,消费者收到消息后检查数据库是否存在对应记录,没有就执行一次任务,如果任务执行失败,就像消息队列添加一条消息。

我们也可以使用中间件canal,解决耦合性问题,canal通过模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 请求,得到 binglog 日志,

我们也可以通过任务调度的方式执行任务,在任务流程代码中查询数据库,根据数据库中的记录执行任务。

这里我们采用任务调度的方式执行任务。

4、任务调度

任务调度是指对计算任务进行合理安排和调度的过程。分布式任务调度是指在分布式系统中,将任务分割成若干份,根据调度规则交由不同的实例并行执行。

XXL-JOB,是一个轻量级分布式任务调度平台,开发迅速,学习简单,易扩展。包含调度中心(管理执行器、任务、日志,监控运维),执行器(注册服务,执行服务,执行结果上报,记录日志)和任务(具体的业务代码)。

如何确保任务不重复执行?任务调度采用分片广播的方式,查询数据表得到任务的id(自增id),模上分片总数,如果等于当前执行器的任务号,就执行该任务,否则不执行,对于任务分配超出执行器执行能力的情况,通过合理设置任务广播频率,以及设置任务拒绝策略为丢弃任务来确保没有任务被重复执行。

执行流程:

启动XXL-JOB调度中心,创建执行器和任务,任务执行实现通过cron表达式设置为每小时一次,设置分片广播和丢弃策略。

在课程发布微服务中添加XXL-JOB的依赖、配置文件和配置类,创建任务方法,在该方法前添加注解@XxlJob(“JobHandler”)

@XxlJob("JobHandler")
public void coursePublishJobHandler() throws Exception {
	// 分片参数
	int shardIndex = XxlJobHelper.getShardIndex();
	int shardTotal = XxlJobHelper.getShardTotal();
	// 在下一节中实现process,这里仅是测试方法
	System.out.println("XXL-JOB任务调度测试成功");
	// process(......);
}

启动微服务,可以在物理地址中看到一个实例,执行一次任务,在输出窗口看到
在这里插入图片描述

我们可以发现,任务的触发是根据创建任务时设置的执行时间来完成的,但是从用户的角度出发,有些用户允许在一段时间以内完成数据同步,例如一到两个工作日内完成,但是有的用户希望在发布课程后能及时的完成数据同步,例如一小时内完成数据同步,这就需要在代码端对xxl-job的控制中心进行通知,xxl-job也提供了这个接口

打开从github下载的xxl-job项目,在JobInfoController的接口中有体现,通过调用start方法,传入任务的id,来触发一次任务。

下面是代码逻辑


	@Override
	public ReturnT<String> start(int id) {
		XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);

		// valid
		ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(xxlJobInfo.getScheduleType(), ScheduleTypeEnum.NONE);
		if (ScheduleTypeEnum.NONE == scheduleTypeEnum) {
			return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type_none_limit_start")) );
		}

		// next trigger time (5s后生效,避开预读周期)
		long nextTriggerTime = 0;
		try {
			Date nextValidTime = JobScheduleHelper.generateNextValidTime(xxlJobInfo, new Date(System.currentTimeMillis() + JobScheduleHelper.PRE_READ_MS));
			if (nextValidTime == null) {
				return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type")+I18nUtil.getString("system_unvalid")) );
			}
			nextTriggerTime = nextValidTime.getTime();
		} catch (Exception e) {
			logger.error(e.getMessage(), e);
			return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type")+I18nUtil.getString("system_unvalid")) );
		}

		xxlJobInfo.setTriggerStatus(1);
		xxlJobInfo.setTriggerLastTime(0);
		xxlJobInfo.setTriggerNextTime(nextTriggerTime);

		xxlJobInfo.setUpdateTime(new Date());
		xxlJobInfoDao.update(xxlJobInfo);
		return ReturnT.SUCCESS;
	}

上述代码逻辑:
首先根据传入的任务id获取任务,判断任务的触发方式,如果为空,就报错,返回异常码500。
然后设置任务下一次的触发时间,为当前时间的5s后,更新触发状态,返回success。

我们可以在课程发布的逻辑中中调用这个方法,来实现及时同步课程内容。

注意,需要在调用方法头添加注解@PermissionLimit(limit = false),来绕开登录验证,但是这增加了代码的不安全性,需要对这种权限的使用进行限制。


	@RequestMapping("/startJob")
	@ResponseBody
	@PermissionLimit(limit = false)
	public ReturnT<String> startJob(@RequestBody XxlJobInfo jobInfo) {
		return xxlJobService.start(jobInfo.getId());
	}

5、任务流程控制的抽象类

接下来实现抽象类,在这个类中需要提供任务执行的流程,而非具体的代码,提供一个抽象方法,业务代码通过实现这个抽象方法,在这个方法中实现具体的业务执行代码

此时我们已经得到分片总数和分片号,通过查询数据库中记录,自增id模上分片总数等于分片号的方式判断是否由当前执行实例实行

MyBatis-Plus没有提供查询方法,在Mapper中进行实现

@Select("SELECT t.* FROM mq_message t WHERE t.id % #{shardTotal} = #{shardindex} and t.state='0' and t.message_type=#{messageType} limit #{count}")
List<MqMessage> selectListByShardIndex(@Param("shardTotal") int shardTotal, @Param("shardindex") int shardindex, @Param("messageType") String messageType,@Param("count") int count);

在得到消息记录后,这是一个列表的形式,我们开启线程池,使用newFixedThreadPool,线程总数就是任务数,没有临时线程,使用CountDownLatch控制线程完成情况,每个线程中执行process方法,process是一个抽象方法,由具体的实现类进行实现,返回一个boolean变量,表示任务是否完成,并记录日志。


public abstract class MessageProcessAbstract {

    @Autowired
    MqMessageService mqMessageService;

    /**
     * @param mqMessage 执行任务内容
     * @return boolean true:处理成功,false处理失败
     * @description 任务处理
     * @author zkp15
     * @date 2023/9/21 19:47
     */
    public abstract boolean execute(MqMessage mqMessage);

    /**
     * @description 扫描消息表多线程执行任务
     * @param shardIndex 分片序号
     * @param shardTotal 分片总数
     * @param messageType  消息类型
     * @param count  一次取出任务总数
     * @param timeout 预估任务执行时间,到此时间如果任务还没有结束则强制结束 单位秒
     * @return void
     * @author zkp15
     * @date 2023/9/21 20:35
    */
    public void process(int shardIndex, int shardTotal,  String messageType,int count,long timeout) {

        try {
            //扫描消息表获取任务清单
            List<MqMessage> messageList = mqMessageService.getMessageList(shardIndex, shardTotal,messageType, count);
            //任务个数
            int size = messageList.size();
            if(size<=0){
                return ;
            }

            //创建线程池
            ExecutorService threadPool = Executors.newFixedThreadPool(size);
            //计数器
            CountDownLatch countDownLatch = new CountDownLatch(size);
            messageList.forEach(message -> {
                threadPool.execute(() -> {
                    //处理任务
                    try {
                        boolean result = execute(message);
                        if(result){
                            //更新任务状态,删除消息表记录,添加到历史表
                            int completed = mqMessageService.completed(message.getId());
                            if (completed>0){
                                log.debug("任务执行成功:{}",message);
                            }else{
                                log.debug("任务执行失败:{}",message);
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        log.debug("任务出现异常:{},任务:{}",e.getMessage(),message);
                    }
                    //计数
                    countDownLatch.countDown();

                });
            });

            //等待,给一个充裕的超时时间,防止无限等待,到达超时时间还没有处理完成则结束任务
            countDownLatch.await(timeout,TimeUnit.SECONDS);
        } catch (InterruptedException e) {
           e.printStackTrace();
        }
    }
}


6、课程发布的实现类

在这个实现类中,继承上一节的抽象类,以及抽象方法execute,在execute中,分别执行数据库操作,建立缓存,上传分布式文件系统,建立搜索索引。


@Component
public class CoursePublishTask extends MessageProcessAbstract {

	......

    //任务调度入口
    @XxlJob("CoursePublishJobHandler")
    public void coursePublishJobHandler() throws Exception {
        // 分片参数
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();
        log.debug("shardIndex="+shardIndex+",shardTotal="+shardTotal);
        //参数:分片序号、分片总数、消息类型、一次最多取到的任务数量、一次任务调度执行的超时时间
        process(shardIndex,shardTotal,"course_publish",30,60);
    }

    //课程发布任务处理
    @Override
    public boolean execute(MqMessage mqMessage) {
        //获取消息相关的业务信息
        String businessKey1 = mqMessage.getBusinessKey1();
        long courseId = Integer.parseInt(businessKey1);
        // 课程发布表
        saveCourseToMQ(mqMessage, courseId);
        // 课程缓存
        saveCourseCache(mqMessage, courseId);
        // 课程静态化
        generateCourseHtml(mqMessage, courseId);
        // 课程索引
        saveCourseIndex(mqMessage, courseId);
        return true;
    }

	......

}

7、总结

本文在实际开发业务场景的基础上,给出了一种遵循AP思想的分布式事务控制方案,通过本地消息表+任务调度的方式实现。

项目亮点有:

  • 本地消息表通过任务123代替具体的任务,结合流程控制抽象类,只给出流程控制的代码,具体的业务实现由具体的实现类完成,从而实现解耦合,提高代码复用。

  • 任务流程控制中开启多实例和多线程,并行高效的执行任务。

  • 使用任务调度XXL-JOB进行任务执行,采用分片广播的方式,保证了任务执行的幂等性。其中控制中心提供了两种任务调度的规则,按照Cron的定时执行策略,和非登录任务执行通知的及时执行策略,为用户提供了多样化的体验

由于篇幅原因,四个小任务的实现,数据库、缓存、文件系统、搜索系统的数据同步,我们放在下一篇继续论述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/697496.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C++】C++ QT实现Huffman编码器与解码器(源码+课程论文+文件)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化【获取源码商业合作】 &#x1f449;荣__誉&#x1f448;&#xff1a;阿里云博客专家博主、5…

Vue17-条件渲染

一、使用v-show属性做条件渲染 控制元素的显示和隐藏 v-show里面也能是表达式&#xff0c;只要表达式的值是boolean就行。 或者 当时结构还在&#xff1a; 二、使用v-if属性做条件渲染 结构也不在了 三、示例 方式一&#xff1a; 方式二&#xff1a; 当元素有很高的切换频率&am…

机器学习实验----支持向量机(SVM)实现二分类

目录 一、介绍 (1)解释算法 (2)数据集解释 二、算法实现和代码介绍 1.超平面 2.分类判别模型 3.点到超平面的距离 4.margin 间隔 5.拉格朗日乘数法KKT不等式 (1)介绍 (2)对偶问题 (3)惩罚参数 (4)求解 6.核函数解决非线性问题 7.SMO (1)更新w (2)更新b 三、代…

我在得物的这两年

写在前面 这篇文章非常简单&#xff0c;和大家简单聊聊我在得物的这两年&#xff0c;也是从学生到社会人的这两年。 我是2022年的6月加入得物实习&#xff0c;负责某个业务中台的后端研发&#xff0c;那一年我21岁&#xff0c;还在读大三&#xff0c;还在迷茫未来是读研还是工…

nw.js 如何调用activeX控件 (控件是C++编写的dll文件)

&#x1f3c6;本文收录于「Bug调优」专栏&#xff0c;主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案&#xff0c;希望能够助你一臂之力&#xff0c;帮你早日登顶实现财富自由&#x1f680;&#xff1b;同时&#xff0c;欢迎大家关注&&收藏&&…

【氵】Archlinux+KDE Plasma 6+Wayland 安装nvidia驱动 / 开启HDR

参考: NVIDIA - Arch Linux 中文维基 &#xff08;其实就是把 wiki 简化了一下 注&#xff1a;本教程适用 GeForce 930 起、10 系至 20 系、 Quadro / Tesla / Tegra K-系列以及更新的显卡&#xff08;NV110 以及更新的显卡家族&#xff09;&#xff0c;此处以 RTX3060 为例 …

Cyber Weekly #10

赛博新闻 1、最强开源大模型面世&#xff1a;阿里发布Qwen2 6月7日凌晨&#xff0c;阿里巴巴通义千问团队发布了Qwen2系列开源模型。该系列模型包括5个尺寸的预训练和指令微调模型&#xff1a;Qwen2-0.5B、Qwen2-1.5B、Qwen2-7B、Qwen2-57B-A14B以及Qwen2-72B。据Qwen官方博客…

1.奖牌的数量

上海市计算机学会竞赛平台 | YACSYACS 是由上海市计算机学会于2019年发起的活动,旨在激发青少年对学习人工智能与算法设计的热情与兴趣,提升青少年科学素养,引导青少年投身创新发现和科研实践活动。https://www.iai.sh.cn/problem/447 题目描述 小爱获得了 𝑎a 枚金牌,…

MATLAB实现磷虾算法(Krill herd algorithm)

1.算法介绍 磷虾算法&#xff08;Krill Herd Algorithm, KH&#xff09;是一种基于生物启发的优化算法&#xff0c;其原理模拟了南极磷虾&#xff08;Euphausia superba&#xff09;群体的聚集行为。该算法旨在通过模拟磷虾个体间的相互作用、觅食行为和随机扩散&#xff0c;来…

springboot3一些听课笔记

文章目录 一、错误处理机制1.1 默认1.2 自定义 二、嵌入式容器 一、错误处理机制 1.1 默认 错误处理的自动配置都在ErrorMvcAutoConfiguration中&#xff0c;两大核心机制&#xff1a; ● 1. SpringBoot 会自适应处理错误&#xff0c;响应页面或JSON数据 ● 2. SpringMVC的错…

知识图谱的应用---智慧农业

文章目录 智慧农业典型应用 智慧农业 智慧农业通过生产领域的智能化、经营领域的差异性以及服务领域的全方位信息服务&#xff0c;推动农业产业链改造升级;实现农业精细化、高效化与绿色化&#xff0c;保障农产品安全、农业竞争力提升和农业可持续发展。目前&#xff0c;我国的…

第1章Hello world 4/5:对比Rust/Java/C++创建和运行Hello world全过程:运行第一个程序

讲动人的故事,写懂人的代码 1.7 对比Rust/Java/C++创建和运行Hello world全过程 有了会听懂人类的讲话,还能做记录的编程助理艾极思,他们三人的讨论内容,都可以变成一份详细的会议纪要啦。 接下来,我们一起看看艾极思是如何记录下赵可菲创建和运行Java程序Hello world,…

基于Java-SpringBoot-VUE-MySQL的高校数字化迎新管理系统

基于Java-SpringBoot-VUE-MySQL的高校数字化迎新管理系统 登陆界面 联系作者 如需本项目源代码&#xff0c;可扫码或者VX:bob1638联系作者。 首页图表 系统功能持续更新中。。。 介绍 这是一款主要用于高校迎新的系统&#xff0c;主要是采用了SpringBoot2.X VUE2.6 ElementUI2.…

怎么避免电脑磁盘数据泄露?磁盘数据保护方法介绍

电脑磁盘是电脑存储数据的基础&#xff0c;而为了避免磁盘数据泄露&#xff0c;我们需要保护电脑磁盘。下面我们就来了解一下磁盘数据保护的方法。 磁盘加密 磁盘加密可以通过专业的加密算法来加密保护磁盘数据&#xff0c;避免电脑磁盘数据泄露。在这里小编推荐使用文件夹只读…

App UI 风格,尽显魅力

精妙无比的App UI 风格

PawSQL优化 | 分页查询太慢?别忘了投影下推

​在进行数据库应用开发中&#xff0c;分页查询是一项非常常见而又至关重要的任务。但你是否曾因为需要获取总记录数的性能而感到头疼&#xff1f;现在&#xff0c;让PawSQL的投影下推优化来帮你轻松解决这一问题&#xff01;本文以TPCH的Q12为案例进行验证&#xff0c;经过Paw…

利用阿里云PAI平台微调ChatGLM3-6B

1.介绍ChatGLM3-6B ChatGLM3-6B大模型是智谱AI和清华大学 KEG 实验室联合发布的对话预训练模型。 1.1 模型规模 模型规模通常用参数数量&#xff08;parameters&#xff09;来衡量。参数数量越多&#xff0c;模型理论上越强大&#xff0c;但也更耗费资源。以下是一些典型模型…

类和对象(上续)

前言&#xff1a;本文介绍类和对象中的一些比较重要的知识点&#xff0c;为以后的继续学习打好基础。 目录 拷贝构造 拷贝构造的特征&#xff1a; 自定义类型的传值传参 自定义类型在函数中的传值返回 如果返回值时自定义的引用呢&#xff1f; 在什么情况下使用呢&#…

前端技术入门指南

引言 在数字化时代&#xff0c;前端开发成为了连接用户与数字世界的重要桥梁。无论你是对编程充满好奇的新手&#xff0c;还是想要拓展自己技能领域的在职人员&#xff0c;前端开发都是一个值得学习和探索的领域。本文将带你走进前端技术的世界&#xff0c;为你提供一个入门指…

前端nvm的安装和使用nodejs多版本管理2024

nvm的安装和使用 1、简介 nvm是一个管理nodejs版本的工具。在实际的开发中&#xff0c;项目的开发依赖需要的nodejs版本运行环境不同&#xff0c;此时我们就需要使用nvm来进行不同nodejs版本的切换。其实就是一个方便的node版本管理工具。 注意&#xff1a;如果有安装过node&a…