DTM分布式事务
从内网看到了关于事务在业务中的讨论,评论区大佬有提及DTM开源项目[https://dtm.pub/],开学开学
基础理论
一、Why DTM
项目产生于实际生产中的问题,涉及订单支付的服务会将所有业务相关逻辑放到一个大的本地事务,导致大量耦合,复杂度大幅提升
java成熟的分布式事务解决方案,使用代价过高:大量业务用java重写
DTM,Distributed Transaction Manager, 其是一个分布式事务管理器,解决跨数据库、跨服务、跨语言更新数据的一致性问题。
DTM提供了Saga、TCC、XA和二阶段消息模式以满足不同应用场景的需求,同时首创的子事务屏障技术有效解决幂等
、悬挂
和空补偿
等异常问题。
DTM的优点:
- 提供简单易用的接口,拆分具体业务接入分布式事务
- 支持多语言栈
- 核心技术子事务屏蔽,降低处理子事务乱的难度
二、 快速上手
1、Demo
了解DTM的发展和特点,quick start一下8⃣️
// 运行dtm
git clone https://github.com/dtm-labs/dtm && cd dtm
go run main.go
// 运行一个saga示例
go run qs/main.go
上述Saga示例实现一个类似跨行转账的功能,包括两个事务分支:资金转出(TransOut)、资金转入(TransIn)。DTM保证TransIn和TransOut要么全成功,要么全回滚,保证最终金额的正确性。
// 具体业务微服务地址
const qsBusi = "http://localhost:8081/api/busi_saga"
req := &gin.H{"amount": 30} // 微服务的载荷
// DtmServer为DTM服务的地址,是一个url
DtmServer := "http://localhost:36789/api/dtmsvr"
saga := dtmcli.NewSaga(DtmServer, shortuuid.New()).
// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 补偿操作为url: qsBusi+"/TransOutCompensate"
Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req).
// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransIn", 补偿操作为url: qsBusi+"/TransInCompensate"
Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req)
// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
err := saga.Submit()
2、时序图
从以上时序图可以看出,DTM整个全局事务分为如下几步:
- 用户定义好全局事务所有的事务分支(全局事务的组成部分称为事务分支),然后提交给DTM,DTM持久化全局事务信息后,立即返回
- DTM取出第一个事务分支,这里是TransOut,调用该服务并成功返回
- DTM取出第二个事务分支,这里是TransIn,调用该服务并成功返回
- DTM已完成所有的事务分支,将全局事务的状态修改为已完成
失败情况:
在实际业务中,子事务可以出现失败,例如转入的子账号被冻结导致转账失败。因此可以对业务代码进行修改来模拟TransIn正向操作失败
func qsAddRoute(app *gin.Engine) {
app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {
log.Printf("TransIn")
c.JSON(200, "")
// c.JSON(409, "") // Status 409 for Failure. Won't be retried
})
app.POST(qsBusiAPI+"/TransInCompensate", func(c *gin.Context) {
log.Printf("TransInCompensate")
c.JSON(200, "")
})
app.POST(qsBusiAPI+"/TransOut", func(c *gin.Context) {
log.Printf("TransOut")
c.JSON(200, "")
})
app.POST(qsBusiAPI+"/TransOutCompensate", func(c *gin.Context) {
log.Printf("TransOutCompensate")
c.JSON(200, "")
})
}
再次运行,整个事务最终失败,时序图:
在转入操作失败的情况下,TransIn和TransOut的补偿操作被执行,保证了最终的余额和转账前是一样的
三、二阶段消息Demo
业务场景:
跨行转账是典型的分布式事务场景,在这里A需要跨行转账给B
假设需求场景:
只有转出A可能失败,转入B是能够最终成功的
二阶段消息是DTM首创的事务模式,用于替换本地事务表和事务消息这两种现有的方案
二阶段消息能够保证本地事务
的提交和全局事务
的提交是原子性的,适合解决不需要回滚的分布式事务场景
1、核心代码
// SagaAdjustBalance 1
func SagaAdjustBalance(db dtmcli.DB, uid int, amount int, result string) error {
if strings.Contains(result, dtmcli.ResultFailure) {
return dtmcli.ErrFailure
}
_, err := dtmimp.DBExec(BusiConf.Driver, db, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)
return err
}
调整用户的账号余额
app.POST(BusiAPI+"/SagaBTransIn", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
barrier := MustBarrierFromGin(c)
return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {
return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)
})
}))
barrier.Call主要用于处理幂等,保证重复调用不会多次调整余额
开启事务,进行分支调用
gid := dtmimp.GetFuncName()
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/SagaBTransIn", req)
err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS")
})
该代码保证了DoAndSubmitDB的业务提交和全局事务提交是原子性的,保证TransOut和TransIn同时成功or失败。 其中DoAndSubmitDB第一个参数为回查URL
app.GET(BusiAPI+"/QueryPrepared", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
logger.Debugf("%s QueryPrepared", c.Query("gid"))
return string2DtmError(dtmimp.OrString(MainSwitch.QueryPreparedResult.Fetch(), dtmcli.ResultSuccess))
}))
app.GET(BusiAPI+"/QueryPreparedB", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
logger.Debugf("%s QueryPreparedB", c.Query("gid"))
bb := MustBarrierFromGin(c)
db := dbGet().ToSQLDB()
return bb.QueryPrepared(db)
}))
2、原子性
四、SAGA型事务Demo
业务场景:
A需要跨行转账给B
假设场景:
转出A和转入B都可能成功和失败,需要最终转入转出都成功or失败
核心思想:
将长事务拆分为多个本地短事务,有Saga事务协调器来协调,如果各个本地事务成功完成则正常完成,如果某个步骤失败,则根据相反顺序依次调用补偿操作。
1、核心代码
调整用户的账户余额
func SagaAdjustBalance(db dtmcli.DB, uid int, amount int, result string) error {
if strings.Contains(result, dtmcli.ResultFailure) {
return dtmcli.ErrFailure
}
_, err := dtmimp.DBExec(BusiConf.Driver, db, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)
return err
}
正向操作/补偿操作的处理函数(源码新增的Demo并未在此作展示
app.POST(BusiAPI+"/SagaBTransIn", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
barrier := MustBarrierFromGin(c)
return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {
return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)
})
}))
app.POST(BusiAPI+"/SagaBTransInCom", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
barrier := MustBarrierFromGin(c)
return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {
return SagaAdjustBalance(tx, TransInUID, -reqFrom(c).Amount, "")
})
}))
app.POST(BusiAPI+"/SagaBTransOut", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
barrier := MustBarrierFromGin(c)
return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {
return SagaAdjustBalance(tx, TransOutUID, -reqFrom(c).Amount, reqFrom(c).TransOutResult)
})
}))
app.POST(BusiAPI+"/SagaBTransOutCom", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
barrier := MustBarrierFromGin(c)
return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {
return SagaAdjustBalance(tx, TransOutUID, reqFrom(c).Amount, "")
})
}))
开启事务,进行分支调用
req := &busi.ReqHTTP{Amount: 30}
// DtmServer为DTM服务的地址
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, shortuuid.New()).
// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCom"
Add(busi.Busi+"/SagaBTransOut", busi.Busi+"/SagaBTransOutCom", req).
// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCom"
Add(busi.Busi+"/SagaBTransIn", busi.Busi+"/SagaBTransInCom", req)
// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
logger.Debugf("busi trans submit")
err := saga.Submit()
2、时序图
与快速上手
demo一致
3、处理网络异常
假设提交给dtm的事务中, 调用转入操作时,出现短暂的故障——>dtm会重试未完成的操作,此时要求全局事务中的各个子事务时幂等的。
子事务屏障技术,提供了BranchBarrier
工具类,提供Call函数来保证函数内部的业务最多被调用一次。
4、处理回滚
事务失败交互的时序图:
- TransIn的正向操作发生在提交之前,则补偿为空操作
- TransIn的操作如果发生在提交后,则补偿操作会将数据提交一次
五、TCC型事务Demo
业务场景:
A需要跨行转账给B
假设场景:
转出A和转入B都可能成功和失败,需要最终转入转出都成功or失败
还有一个要求,假如发生回滚,SAGA 模式下会发生A发现自己的余额被扣减了,但是收款方B迟迟没有收到余额,那么会对A造成很大的困扰。业务上面希望不要出现这种情况
TCC分为3个阶段
- Try 阶段:尝试执行,完成所有业务检查(一致性), 预留必须业务资源(准隔离性)
- Confirm 阶段:如果所有分支的Try都成功了,则走到Confirm阶段。Confirm真正执行业务,不作任何业务检查,只使用 Try 阶段预留的业务资源
- Cancel 阶段:如果所有分支的Try有一个失败了,则走到Cancel阶段。Cancel释放 Try 阶段预留的业务资源。
1、核心代码
冻结/解冻资金操作,会检查约束balance+trading_balance >= 0,如果约束不成立,执行失败(trading_balance 表示被冻结的金额
func tccAdjustTrading(db dtmcli.DB, uid int, amount int) error {
affected, err := dtmimp.DBExec(BusiConf.Driver, db, `update dtm_busi.user_account
set trading_balance=trading_balance+?
where user_id=? and trading_balance + ? + balance >= 0`, amount, uid, amount)
if err == nil && affected == 0 {
return fmt.Errorf("update error, maybe balance not enough")
}
return err
}
func tccAdjustBalance(db dtmcli.DB, uid int, amount int) error {
affected, err := dtmimp.DBExec(BusiConf.Driver, db, `update dtm_busi.user_account
set trading_balance=trading_balance-?,
balance=balance+? where user_id=?`, amount, amount, uid)
if err == nil && affected == 0 {
return fmt.Errorf("update user_account 0 rows")
}
return err
}
Try/Confirm/Cancel处理函数:
app.POST(BusiAPI+"/TccBTransOutTry", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
req := reqFrom(c)
if req.TransOutResult != "" {
return string2DtmError(req.TransOutResult)
}
bb := MustBarrierFromGin(c)
if req.Store == Redis {
return bb.RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), req.Amount, 7*86400)
} else if req.Store == Mongo {
return bb.MongoCall(MongoGet(), func(sc mongo.SessionContext) error {
return SagaMongoAdjustBalance(sc, sc.Client(), TransOutUID, -req.Amount, "")
})
}
return bb.CallWithDB(pdbGet(), func(tx *sql.Tx) error {
return tccAdjustTrading(tx, TransOutUID, -req.Amount)
})
}))
app.POST(BusiAPI+"/TccBTransOutConfirm", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
if reqFrom(c).Store == Redis || reqFrom(c).Store == Mongo {
return nil
}
return MustBarrierFromGin(c).CallWithDB(pdbGet(), func(tx *sql.Tx) error {
return tccAdjustBalance(tx, TransOutUID, -reqFrom(c).Amount)
})
}))
app.POST(BusiAPI+"/TccBTransOutCancel", dtmutil.WrapHandler(TccBarrierTransOutCancel))
app.POST(BusiAPI+"/TccBTransInTry", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
req := reqFrom(c)
if req.TransInResult != "" {
return string2DtmError(req.TransInResult)
}
return MustBarrierFromGin(c).CallWithDB(pdbGet(), func(tx *sql.Tx) error {
return tccAdjustTrading(tx, TransInUID, req.Amount)
})
}))
app.POST(BusiAPI+"/TccBTransInConfirm", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).CallWithDB(pdbGet(), func(tx *sql.Tx) error {
return tccAdjustBalance(tx, TransInUID, reqFrom(c).Amount)
})
}))
app.POST(BusiAPI+"/TccBTransInCancel", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).CallWithDB(pdbGet(), func(tx *sql.Tx) error {
return tccAdjustTrading(tx, TransInUID, -reqFrom(c).Amount)
})
}))
开启事务,进行分支调用
req := busi.GenReqHTTP(30, false, false)
gid := dtmimp.GetFuncName()
// TccGlobalTransaction 会开启一个全局事务
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
// CallBranch 会将事务分支的Confirm/Cancel注册到全局事务上,然后直接调用Try
_, err := tcc.CallBranch(req, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
assert.Nil(t, err)
return tcc.CallBranch(req, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
})
2、处理网络异常
同SAGA部分
3、处理回滚
事务失败交互的时序图:
跟成功的TCC差别就在于,当某个子事务返回失败后,后续就回滚全局事务,调用各个子事务的Cancel操作,保证全局事务全部回滚
- TransIn的正向操作发生在提交之前,则补偿为空操作
- TransIn的操作如果发生在提交后,则补偿操作会将数据提交一次