DTM分布式事务

DTM分布式事务

从内网看到了关于事务在业务中的讨论,评论区大佬有提及DTM开源项目[https://dtm.pub/],开学开学

基础理论

一、Why DTM

​ 项目产生于实际生产中的问题,涉及订单支付的服务会将所有业务相关逻辑放到一个大的本地事务,导致大量耦合,复杂度大幅提升

​ java成熟的分布式事务解决方案,使用代价过高:大量业务用java重写

​ DTM,Distributed Transaction Manager, 其是一个分布式事务管理器,解决跨数据库、跨服务、跨语言更新数据的一致性问题。

​ DTM提供了Saga、TCC、XA和二阶段消息模式以满足不同应用场景的需求,同时首创的子事务屏障技术有效解决幂等悬挂空补偿等异常问题。

​ DTM的优点:

  • 提供简单易用的接口,拆分具体业务接入分布式事务
  • 支持多语言栈
  • 核心技术子事务屏蔽,降低处理子事务乱的难度

二、 快速上手

1、Demo

​ 了解DTM的发展和特点,quick start一下8⃣️

// 运行dtm
git clone https://github.com/dtm-labs/dtm && cd dtm
go run main.go
// 运行一个saga示例
go run qs/main.go

​ 上述Saga示例实现一个类似跨行转账的功能,包括两个事务分支:资金转出(TransOut)、资金转入(TransIn)。DTM保证TransIn和TransOut要么全成功,要么全回滚,保证最终金额的正确性。

  // 具体业务微服务地址
  const qsBusi = "http://localhost:8081/api/busi_saga"
  req := &gin.H{"amount": 30} // 微服务的载荷
  // DtmServer为DTM服务的地址,是一个url
  DtmServer := "http://localhost:36789/api/dtmsvr"
  saga := dtmcli.NewSaga(DtmServer, shortuuid.New()).
    // 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 补偿操作为url: qsBusi+"/TransOutCompensate"
    Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req).
    // 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransIn", 补偿操作为url: qsBusi+"/TransInCompensate"
    Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req)
  // 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
  err := saga.Submit()

在这里插入图片描述

2、时序图

在这里插入图片描述

​ 从以上时序图可以看出,DTM整个全局事务分为如下几步:

  1. 用户定义好全局事务所有的事务分支(全局事务的组成部分称为事务分支),然后提交给DTM,DTM持久化全局事务信息后,立即返回
  2. DTM取出第一个事务分支,这里是TransOut,调用该服务并成功返回
  3. DTM取出第二个事务分支,这里是TransIn,调用该服务并成功返回
  4. DTM已完成所有的事务分支,将全局事务的状态修改为已完成

失败情况:

​ 在实际业务中,子事务可以出现失败,例如转入的子账号被冻结导致转账失败。因此可以对业务代码进行修改来模拟TransIn正向操作失败

func qsAddRoute(app *gin.Engine) {
	app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {
		log.Printf("TransIn")
		c.JSON(200, "")
		// c.JSON(409, "") // Status 409 for Failure. Won't be retried
	})
	app.POST(qsBusiAPI+"/TransInCompensate", func(c *gin.Context) {
		log.Printf("TransInCompensate")
		c.JSON(200, "")
	})
	app.POST(qsBusiAPI+"/TransOut", func(c *gin.Context) {
		log.Printf("TransOut")
		c.JSON(200, "")
	})
	app.POST(qsBusiAPI+"/TransOutCompensate", func(c *gin.Context) {
		log.Printf("TransOutCompensate")
		c.JSON(200, "")
	})
}

​ 再次运行,整个事务最终失败,时序图:

在这里插入图片描述

在转入操作失败的情况下,TransIn和TransOut的补偿操作被执行,保证了最终的余额和转账前是一样的

在这里插入图片描述

三、二阶段消息Demo

业务场景:

​ 跨行转账是典型的分布式事务场景,在这里A需要跨行转账给B

假设需求场景:

​ 只有转出A可能失败,转入B是能够最终成功的

​ 二阶段消息是DTM首创的事务模式,用于替换本地事务表和事务消息这两种现有的方案

​ 二阶段消息能够保证本地事务的提交和全局事务的提交是原子性的,适合解决不需要回滚的分布式事务场景

1、核心代码
// SagaAdjustBalance 1
func SagaAdjustBalance(db dtmcli.DB, uid int, amount int, result string) error {
	if strings.Contains(result, dtmcli.ResultFailure) {
		return dtmcli.ErrFailure
	}
	_, err := dtmimp.DBExec(BusiConf.Driver, db, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)
	return err
}

调整用户的账号余额

app.POST(BusiAPI+"/SagaBTransIn", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
			barrier := MustBarrierFromGin(c)
			return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {
				return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)
			})
		}))

barrier.Call主要用于处理幂等,保证重复调用不会多次调整余额

开启事务,进行分支调用

	gid := dtmimp.GetFuncName()
	req := busi.GenReqHTTP(30, false, false)
	msg := dtmcli.NewMsg(DtmServer, gid).
		Add(busi.Busi+"/SagaBTransIn", req)
	err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
		return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS")
	})

该代码保证了DoAndSubmitDB的业务提交和全局事务提交是原子性的,保证TransOut和TransIn同时成功or失败。 其中DoAndSubmitDB第一个参数为回查URL

	app.GET(BusiAPI+"/QueryPrepared", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
		logger.Debugf("%s QueryPrepared", c.Query("gid"))
		return string2DtmError(dtmimp.OrString(MainSwitch.QueryPreparedResult.Fetch(), dtmcli.ResultSuccess))
	}))
	app.GET(BusiAPI+"/QueryPreparedB", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
		logger.Debugf("%s QueryPreparedB", c.Query("gid"))
		bb := MustBarrierFromGin(c)
		db := dbGet().ToSQLDB()
		return bb.QueryPrepared(db)
	}))
2、原子性

在这里插入图片描述

四、SAGA型事务Demo

业务场景:

​ A需要跨行转账给B

假设场景:

​ 转出A和转入B都可能成功和失败,需要最终转入转出都成功or失败

核心思想:

​ 将长事务拆分为多个本地短事务,有Saga事务协调器来协调,如果各个本地事务成功完成则正常完成,如果某个步骤失败,则根据相反顺序依次调用补偿操作。

1、核心代码

​ 调整用户的账户余额

func SagaAdjustBalance(db dtmcli.DB, uid int, amount int, result string) error {
	if strings.Contains(result, dtmcli.ResultFailure) {
		return dtmcli.ErrFailure
	}
	_, err := dtmimp.DBExec(BusiConf.Driver, db, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)
	return err
}

​ 正向操作/补偿操作的处理函数(源码新增的Demo并未在此作展示

		app.POST(BusiAPI+"/SagaBTransIn", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
			barrier := MustBarrierFromGin(c)
			return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {
				return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)
			})
		}))
		app.POST(BusiAPI+"/SagaBTransInCom", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
			barrier := MustBarrierFromGin(c)
			return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {
				return SagaAdjustBalance(tx, TransInUID, -reqFrom(c).Amount, "")
			})
		}))
		app.POST(BusiAPI+"/SagaBTransOut", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
			barrier := MustBarrierFromGin(c)
			return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {
				return SagaAdjustBalance(tx, TransOutUID, -reqFrom(c).Amount, reqFrom(c).TransOutResult)
			})
		}))
		app.POST(BusiAPI+"/SagaBTransOutCom", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
			barrier := MustBarrierFromGin(c)
			return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {
				return SagaAdjustBalance(tx, TransOutUID, reqFrom(c).Amount, "")
			})
		}))

开启事务,进行分支调用

		req := &busi.ReqHTTP{Amount: 30}
	  // DtmServer为DTM服务的地址
		saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, shortuuid.New()).
			// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCom"
      Add(busi.Busi+"/SagaBTransOut", busi.Busi+"/SagaBTransOutCom", req).
			// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCom"
			Add(busi.Busi+"/SagaBTransIn", busi.Busi+"/SagaBTransInCom", req)
		// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
    logger.Debugf("busi trans submit")
		err := saga.Submit()
2、时序图

​ 与快速上手demo一致

在这里插入图片描述

3、处理网络异常

​ 假设提交给dtm的事务中, 调用转入操作时,出现短暂的故障——>dtm会重试未完成的操作,此时要求全局事务中的各个子事务时幂等的

​ 子事务屏障技术,提供了BranchBarrier工具类,提供Call函数来保证函数内部的业务最多被调用一次。

4、处理回滚

​ 事务失败交互的时序图:

在这里插入图片描述

  • TransIn的正向操作发生在提交之前,则补偿为空操作
  • TransIn的操作如果发生在提交后,则补偿操作会将数据提交一次

五、TCC型事务Demo

业务场景:

​ A需要跨行转账给B

假设场景:

​ 转出A和转入B都可能成功和失败,需要最终转入转出都成功or失败

​ 还有一个要求,假如发生回滚,SAGA 模式下会发生A发现自己的余额被扣减了,但是收款方B迟迟没有收到余额,那么会对A造成很大的困扰。业务上面希望不要出现这种情况

TCC分为3个阶段

  • Try 阶段:尝试执行,完成所有业务检查(一致性), 预留必须业务资源(准隔离性)
  • Confirm 阶段:如果所有分支的Try都成功了,则走到Confirm阶段。Confirm真正执行业务,不作任何业务检查,只使用 Try 阶段预留的业务资源
  • Cancel 阶段:如果所有分支的Try有一个失败了,则走到Cancel阶段。Cancel释放 Try 阶段预留的业务资源。

在这里插入图片描述

1、核心代码

​ 冻结/解冻资金操作,会检查约束balance+trading_balance >= 0,如果约束不成立,执行失败(trading_balance 表示被冻结的金额

func tccAdjustTrading(db dtmcli.DB, uid int, amount int) error {
	affected, err := dtmimp.DBExec(BusiConf.Driver, db, `update dtm_busi.user_account
		set trading_balance=trading_balance+?
		where user_id=? and trading_balance + ? + balance >= 0`, amount, uid, amount)
	if err == nil && affected == 0 {
		return fmt.Errorf("update error, maybe balance not enough")
	}
	return err
}

func tccAdjustBalance(db dtmcli.DB, uid int, amount int) error {
	affected, err := dtmimp.DBExec(BusiConf.Driver, db, `update dtm_busi.user_account
		set trading_balance=trading_balance-?,
		balance=balance+? where user_id=?`, amount, amount, uid)
	if err == nil && affected == 0 {
		return fmt.Errorf("update user_account 0 rows")
	}
	return err
}

​ Try/Confirm/Cancel处理函数:

		app.POST(BusiAPI+"/TccBTransOutTry", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
			req := reqFrom(c)
			if req.TransOutResult != "" {
				return string2DtmError(req.TransOutResult)
			}
			bb := MustBarrierFromGin(c)
			if req.Store == Redis {
				return bb.RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), req.Amount, 7*86400)
			} else if req.Store == Mongo {
				return bb.MongoCall(MongoGet(), func(sc mongo.SessionContext) error {
					return SagaMongoAdjustBalance(sc, sc.Client(), TransOutUID, -req.Amount, "")
				})
			}

			return bb.CallWithDB(pdbGet(), func(tx *sql.Tx) error {
				return tccAdjustTrading(tx, TransOutUID, -req.Amount)
			})
		}))
		app.POST(BusiAPI+"/TccBTransOutConfirm", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
			if reqFrom(c).Store == Redis || reqFrom(c).Store == Mongo {
				return nil
			}
			return MustBarrierFromGin(c).CallWithDB(pdbGet(), func(tx *sql.Tx) error {
				return tccAdjustBalance(tx, TransOutUID, -reqFrom(c).Amount)
			})
		}))
		app.POST(BusiAPI+"/TccBTransOutCancel", dtmutil.WrapHandler(TccBarrierTransOutCancel))
		app.POST(BusiAPI+"/TccBTransInTry", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
			req := reqFrom(c)
			if req.TransInResult != "" {
				return string2DtmError(req.TransInResult)
			}
			return MustBarrierFromGin(c).CallWithDB(pdbGet(), func(tx *sql.Tx) error {
				return tccAdjustTrading(tx, TransInUID, req.Amount)
			})
		}))
		app.POST(BusiAPI+"/TccBTransInConfirm", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
			return MustBarrierFromGin(c).CallWithDB(pdbGet(), func(tx *sql.Tx) error {
				return tccAdjustBalance(tx, TransInUID, reqFrom(c).Amount)
			})
		}))
		app.POST(BusiAPI+"/TccBTransInCancel", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
			return MustBarrierFromGin(c).CallWithDB(pdbGet(), func(tx *sql.Tx) error {
				return tccAdjustTrading(tx, TransInUID, -reqFrom(c).Amount)
			})
		}))

开启事务,进行分支调用

	req := busi.GenReqHTTP(30, false, false)
	gid := dtmimp.GetFuncName()
	// TccGlobalTransaction 会开启一个全局事务
	err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
    // CallBranch 会将事务分支的Confirm/Cancel注册到全局事务上,然后直接调用Try
		_, err := tcc.CallBranch(req, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
		assert.Nil(t, err)
		return tcc.CallBranch(req, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
	})
2、处理网络异常

​ 同SAGA部分

3、处理回滚

​ 事务失败交互的时序图:

在这里插入图片描述

​ 跟成功的TCC差别就在于,当某个子事务返回失败后,后续就回滚全局事务,调用各个子事务的Cancel操作,保证全局事务全部回滚

  • TransIn的正向操作发生在提交之前,则补偿为空操作
  • TransIn的操作如果发生在提交后,则补偿操作会将数据提交一次

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/301904.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【性能测试入门】:压力测试概念!

压力测试可以验证软件应用程序的稳定性和可靠性。压力测试的目标是评估软件在极端负载条件下的鲁棒性和错误处理能力,并确保软件在紧急情况下不会崩溃。它甚至可以进行超出软件正常工作条件的测试,并评估软件在极端条件下的工作方式。 在软件工程中&…

简单介绍Java 的内存泄漏

java最明显的一个优势就是它的内存管理机制。你只需简单创建对象,java的垃圾回收机制负责分配和释放内存。然而情况并不像想像的那么简单,因为在Java应用中经常发生内存泄漏。 本教程演示了什么是内存泄漏,为什么会发生内存泄漏以及如何预防…

2024年第十届计算机与技术应用国际会议(ICCTA 2024)即将召开!

​ ​ 2024年第十届计算机与技术应用国际会议(ICCTA 2024) 会议时间:2024年5月15-17日 会议地点:奥地利维也纳 (线上线下会议) 会议官网: Home_ICCTA 2024 | Vienna, Austria 组织单位: 奥地利FH JOANN…

狂拿offer,这12道性能测试面试题你会多少?不要再被挖坑了

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 1、性能测试包含了…

NFS 共享存储实验

一、服务器部署 第一步、安装nfs和rpcbind包 [rootserver ~]# yum install -y nfs-utils rpcbind截图: 第二步、这里选择一个 lvm 挂载点做 NFS 共享目录 [rootserver ~]# df -HT截图: 第三步、修改配置文件 [rootserver ~]# vi /etc/exports /home …

2024农历新年是什么时候?电脑如何设置农历新年提醒

元旦的钟声已经远去,2024年的阳历新年就这样悄无声息地开始了。但对于我们很多人来说,真正的“过年”氛围,还得等到农历新年的到来。那么,今年的农历新年究竟是什么时候呢?答案是2月10日。 每当想到农历新年&#xff…

【docker笔记】Docker容器数据卷

Docker容器数据卷 卷就是目录或者文件,存在于一个或多个容器中,由docker挂载到容器,但不属于联合文件系统,因此能够绕过Union File System提供一些用于持续存储或共享数据的特性 卷的设计目的就是数据的持久化,完全独…

Element-Puls Form表单内嵌套el-table表格,根据表格复选框多选或单选动态设置行的验证规则

需求 根据 Table 表格内的复选框来控制当前选中行是否添加必填校验规则 效果图 实现思想 我们需要设置一个 flag 来标识已勾选的行,el-table渲染数据结构是数组对象形式,我们可以在每个对象中手动加如一个标识,例如默认:selected …

密码输入检测 - 华为OD统一考试

OD统一考试&#xff08;C卷&#xff09; 分值&#xff1a; 100分 题解&#xff1a; Java / Python / C 题目描述 给定用户密码输入流input&#xff0c;输入流中字符 ‘<’ 表示退格&#xff0c;可以清除前一个输入的字符&#xff0c;请你编写程序&#xff0c;输出最终得到的…

fpmarkets盘点成功交易者的十个习惯(一)

在交易中能够盈利一次&#xff0c;fpmarkets认为这种情况100%的交易者都会做到&#xff0c;但是要做到每次交易都能盈利&#xff0c;即使是巴菲特也做到&#xff0c;我们只需要做到整体盈利就可以了&#xff0c;那么如何做到呢&#xff1f;今天fpmarkets就总结一下成功交易者的…

代码随想录算法训练营Day08|344.反转字符串、541. 反转字符串II、卡码网:替换数字、151.翻转字符串里的单词、卡码网:右旋字符串

文章目录 一、344.反转字符串1. 双指针法 二、541. 反转字符串II1. 字符串解法 三、卡码网&#xff1a;替换数字四、151.翻转字符串里的单词1.使用库函数2.自行编写函数3.创建字符数组填充3.双反转移位 五、卡码网&#xff1a;右旋字符串1. 自行编写函数 总结 一、344.反转字符…

【软件测试】白盒测试 / 逻辑覆盖法

《语句覆盖法》 使程序中的每个可执行语句至少执行一次 所有的可执行语句得到执行语句覆盖测试是较弱的一种测试发现错误能力最弱的逻辑覆盖 《判定覆盖法》 使每一个判定获得每一种可能的结果至少一次 每个判定得到真值和假值判断覆盖法满足了语句覆盖&#xff0c;因此比语…

【AI视野·今日Sound 声学论文速览 第四十期】Wed, 3 Jan 2024

AI视野今日CS.Sound 声学论文速览 Wed, 3 Jan 2024 Totally 4 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Sound Papers Auffusion: Leveraging the Power of Diffusion and Large Language Models for Text-to-Audio Generation Authors Jinlong Xue, Yayue De…

十一、工具盒类(MyQQ)(Qt5 GUI系列)

目录 ​编辑 一、设计需求 二、实现代码 三、代码解析 四、总结 一、设计需求 抽屉效果是软件界面设计中的一种常用形式&#xff0c;可以以一种动态直观的方式在有限大小的界面上扩展出更多的功能。本例要求实现类似 QQ 抽屉效果。 二、实现代码 #include "dialog.…

2024年第二届语言、创新教育与文化交流国际学术会议(CLEC 2024)

2024年第二届语言、创新教育与文化交流国际学术会议(CLEC 2024) 2024 2nd International Conference on Language, Innovative Education and Cultural Communication 为迎接知识经济时代的挑战&#xff0c;创新教育被用来培养学生的创新精神与能力。知识的普遍性使得创新教育…

CSS同时使用背景图和渐变色

CSS同时使用背景图和渐变色 需求代码实现完整写法 需求 一个盒子&#xff0c;在拥有渐变色的前提下还需要同时拥有背景图层 类似如下的效果 代码实现 首先我们按照常规的写css的方式来写 <div class"box"></div>.box{width: 300px;height: 120px;bo…

「网络安全术语解读」SARIF详解

引言&#xff1a;什么是SARIF&#xff1f;它的产生背景是什么&#xff1f;SARIF主要包含哪些内容&#xff1f;使用SARIF有哪些好处&#xff1f; 1. SARIF简介 SARIF&#xff08;Static Analysis Results Interchange Format &#xff0c;静态分析结果交换格式&#xff09;是一…

jenkins 自由风格部署vue项目,参数化构建vue项目

1. 丢弃旧的构建 2. 是否需要install 3. git 4. 配置node16: 5. 脚本&#xff1a; 脚本&#xff1a; #进入Jenkins工作空间下项目目录 cd /var/lib/jenkins/workspace/你的任务名称 node -v #检测node版本&#xff08;此条命令非必要&#xff09; npm -v #检测npm版本&#x…

电力系统中的交流负载箱

交流负载箱是电力系统中的一种重要设备&#xff0c;主要用于模拟实际的电力负载&#xff0c;以便对电力系统进行各种性能测试和分析。在电力系统的设计和运行过程中&#xff0c;交流负载箱起着至关重要的作用。 交流负载箱的主要功能是模拟实际的电力负载&#xff0c;包括电阻、…

开源内容管理框架Drupal在Docker本地部署并实现公网远程访问

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…