一.dtm分布式事务框架之SAGA
1.1DTM介绍
DTM是一款开源的分布式事务管理器,解决跨数据库、跨服务、跨语言栈更新数据的一致性问题。
通俗一点说,DTM提供跨服务事务能力,一组服务要么全部成功,要么全部回滚,避免只更新了一部分数据产生的一致性问题。
您可以在为什么选DTM中了解更多DTM的设计初衷。
1.2SAGA介绍
10分钟说透Saga分布式事务
Saga是这一篇数据库论文SAGAS提到的一个分布式事务方案。其核心思想是将长事务拆分为多个本地短事务,由Saga事务协调器协调,如果各个本地事务成功完成那就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。
与tcc(try,commit,cancel)不同,saga取消了commit阶段.可以出现中间状态.例如saga分布式事务(saga是dtm框架一部分):
1.从前往后执行事务,执行出错,向前补偿(回滚)
2.没有configm阶段,B可以看见中间状态
1.3.各种分布式事务应用场景
1.4DTM安装
这里采用的是源码编译安装
git clone https://github.com/dtm-labs/dtm && cd dtm
go build
启动后的界面如下
1.5HTTP-SAGA转账
这里参考的是DTM的SAGA例子
1.5.1创建我们的用户表
CREATE TABLE `user_account` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` int(11) NOT NULL,
`balance` decimal(10,2) NOT NULL DEFAULT '0.00',
`trading_balance` decimal(10,2) NOT NULL DEFAULT '0.00',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `user_id` (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4
1.5.2编写核心业务代码
调整用户的账户余额
func SagaAdjustBalance(db *gorm.DB, uid int, amount float64) error {
lock.Lock()
defer lock.Unlock()
if amount < 0 {
var userAccount = UserAccount{}
db.First(&userAccount, "user_id=?", uid)
if userAccount.Balance < amount {
return fmt.Errorf("余额不足")
}
}
t := db.Exec("update user_account set balance = ? where user_id = ?", gorm.Expr("balance + ?", amount), uid)
if t.Error != nil {
return t.Error
}
return nil
}
再来编写具体的正向操作/补偿操作的处理函数
r.POST("/SagaBTransIn", func(c *gin.Context) {
fmt.Printf("开始转入")
userID := 1
err = SagaAdjustBalance(db, userID, 100)
if err != nil {
fmt.Printf("转入失败:%s\r\n", err.Error())
}
fmt.Println("转入成功")
})
r.POST("/SagaBTransInCom", func(c *gin.Context) {
fmt.Printf("转入失败,开始补偿")
userID := 1
err = SagaAdjustBalance(db, userID, -100)
if err != nil {
fmt.Printf("转入补偿失败:%s\r\n", err.Error())
}
fmt.Println("转入补偿成功")
})
r.POST("/SagaBTransOut", func(c *gin.Context) {
fmt.Printf("开始转出")
userID := 3
err = SagaAdjustBalance(db, userID, -100)
if err != nil {
if err.Error() == "余额不足" {
c.JSON(http.StatusConflict, gin.H{})
return
}
fmt.Printf("转出失败:%s\r\n", err.Error())
c.JSON(500, gin.H{"message": err.Error()})
return
}
fmt.Println("转出成功")
})
r.POST("/SagaBTransOutCom", func(c *gin.Context) {
fmt.Printf("转出补偿")
userID := 3
err = SagaAdjustBalance(db, userID, 100)
if err != nil {
fmt.Printf("转出补偿失败:%s\r\n", err.Error())
}
fmt.Println("转出补偿成功")
})
到此各个子事务的处理函数已经OK了,然后是开启SAGA事务,进行分支调用
r.GET("/start", func(c *gin.Context) {
req := gin.H{}
dmtServer := "http://127.0.0.1:36789/api/dtmsvr"
qsBusi := "http://127.0.0.1:8089"
saga := dtmcli.NewSaga(dmtServer, shortuuid.New()).
// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCom"
Add(qsBusi+"/SagaBTransOut", qsBusi+"/SagaBTransOutCom", req).
// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCom"
Add(qsBusi+"/SagaBTransIn", qsBusi+"/SagaBTransInCom", req)
// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
saga.WaitResult = true
err := saga.Submit()
if err != nil {
c.JSON(500, gin.H{"message": err.Error()})
}
c.JSON(200, gin.H{"message": "ok"})
})
完整代码如下
package main
import (
"fmt"
"github.com/dtm-labs/client/dtmcli"
"github.com/gin-gonic/gin"
"github.com/lithammer/shortuuid/v3"
"gorm.io/driver/mysql"
"gorm.io/gorm"
glog "gorm.io/gorm/logger"
"log"
"net/http"
"os"
"sync"
"time"
)
type UserAccount struct {
ID int `gorm:"column:id;primary_key"`
UserId int `gorm:"user_id"`
Balance float64 `gorm:"balance"`
TradingBalance float64 `gorm:"trading_balance"`
}
func (UserAccount) TableName() string {
return "user_account"
}
var lock sync.Mutex
func SagaAdjustBalance(db *gorm.DB, uid int, amount float64) error {
lock.Lock()
defer lock.Unlock()
if amount < 0 {
var userAccount = UserAccount{}
db.First(&userAccount, "user_id=?", uid)
if userAccount.Balance < amount {
return fmt.Errorf("余额不足")
}
}
t := db.Exec("update user_account set balance = ? where user_id = ?", gorm.Expr("balance + ?", amount), uid)
if t.Error != nil {
return t.Error
}
return nil
}
var db *gorm.DB
func InitDB() error {
var err error
dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
"root",
"123456",
"127.0.0.1",
"3306",
"dtm")
//希望大家自己可以去封装logger
newLogger := glog.New(
log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer(日志输出的目标,前缀和日志包含的内容——译者注)
glog.Config{
SlowThreshold: time.Second, // 慢 SQL 阈值
LogLevel: glog.Info, // 日志级别
IgnoreRecordNotFoundError: true, // 忽略ErrRecordNotFound(记录未找到)错误
Colorful: false, // 禁用彩色打印
},
)
db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: newLogger,
})
if err != nil {
return err
}
return nil
}
func main() {
err := InitDB()
if err != nil {
panic(err)
}
r := gin.Default()
r.POST("/SagaBTransIn", func(c *gin.Context) {
fmt.Printf("开始转入")
userID := 1
err = SagaAdjustBalance(db, userID, 100)
if err != nil {
fmt.Printf("转入失败:%s\r\n", err.Error())
}
fmt.Println("转入成功")
})
r.POST("/SagaBTransInCom", func(c *gin.Context) {
fmt.Printf("转入失败,开始补偿")
userID := 1
err = SagaAdjustBalance(db, userID, -100)
if err != nil {
fmt.Printf("转入补偿失败:%s\r\n", err.Error())
}
fmt.Println("转入补偿成功")
})
r.POST("/SagaBTransOut", func(c *gin.Context) {
fmt.Printf("开始转出")
userID := 3
err = SagaAdjustBalance(db, userID, -100)
if err != nil {
if err.Error() == "余额不足" {
c.JSON(http.StatusConflict, gin.H{})
return
}
fmt.Printf("转出失败:%s\r\n", err.Error())
c.JSON(500, gin.H{"message": err.Error()})
return
}
fmt.Println("转出成功")
})
r.POST("/SagaBTransOutCom", func(c *gin.Context) {
fmt.Printf("转出补偿")
userID := 3
err = SagaAdjustBalance(db, userID, 100)
if err != nil {
fmt.Printf("转出补偿失败:%s\r\n", err.Error())
}
fmt.Println("转出补偿成功")
})
r.GET("/start", func(c *gin.Context) {
req := gin.H{}
dmtServer := "http://127.0.0.1:36789/api/dtmsvr"
qsBusi := "http://127.0.0.1:8089"
saga := dtmcli.NewSaga(dmtServer, shortuuid.New()).
// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCom"
Add(qsBusi+"/SagaBTransOut", qsBusi+"/SagaBTransOutCom", req).
// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCom"
Add(qsBusi+"/SagaBTransIn", qsBusi+"/SagaBTransInCom", req)
// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
saga.WaitResult = true
err := saga.Submit()
if err != nil {
c.JSON(500, gin.H{"message": err.Error()})
}
c.JSON(200, gin.H{"message": "ok"})
})
r.Run(":8089")
}
1.5.3测试
启动 main.go,在浏览上运行http://127.0.0.1:8089/start
可以看到如下的运行结果:
[GIN-debug] Listening and serving HTTP on :8089
开始转出
2023/12/07 10:16:06 E:/Linuxshare/GoStart/dtm/main.go:37
[64.070ms] [rows:1] SELECT * FROM `user_account` WHERE user_id=3 ORDER BY `user_account`.`id` LIMIT 1
&gid=3NujmbFwy6caKsX88fkApj&op=action&trans_type=saga"
开始转入
2023/12/07 10:16:06 E:/Linuxshare/GoStart/dtm/main.go:42
[5.984ms] [rows:1] update user_account set balance = balance + 100 where user_id = 1
转入成功
[GIN] 2023/12/07 - 10:16:06 | 200 | 51.7071ms | 127.0.0.1 | POST "/SagaBTransIn?branch_id=02&
gid=3NujmbFwy6caKsX88fkApj&op=action&trans_type=saga"
[GIN] 2023/12/07 - 10:16:06 | 200 | 667.8659ms | 127.0.0.1 | GET "/start"
[GIN] 2023/12/07 - 10:16:06 | 404 | 0s | 127.0.0.1 | GET "/favicon.ico"
1.6GRPC-SAGA库存服务
1.6.1复制一份conf.sample.yml 改名为conf.yaml
这里面采用的通信协议是kratos,代码如下
MicroService: # gRPC/HTTP based microservice config
Driver: 'dtm-driver-kratos' # name of the driver to handle register/discover
Target: 'consul://127.0.0.1:8500/dtmservice' # register dtm server to this url
EndPoint: 'grpc://127.0.0.1:36790'
修改完后,重新启动DTM,启动的时候要加参数,如下图所示
启动完后就可以看到已经注册到consul上去了
1.6.2编写具体的服务
SAGA库存服务的具体代码如下
package main
import (
proto "GoStart/api/inventory/v1"
"fmt"
"github.com/dtm-labs/client/dtmgrpc"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
func main() {
r := gin.Default()
r.GET("/start", func(c *gin.Context) {
orderSn := uuid.NewString()
req := &proto.SellInfo{
GoodsInfo: []*proto.GoodsInvInfo{
{
GoodsId: 421,
Num: 2,
},
},
OrderSn: orderSn,
}
dmtServer := "127.0.0.1:36790"
qsBusi := "discovery:///inventory-srv"
uid := uuid.NewString()
fmt.Println("uid", uid)
saga := dtmgrpc.NewSagaGrpc(dmtServer, orderSn).
// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCom"
Add(qsBusi+"/Inventory/Sell", qsBusi+"/Inventory/Reback", req)
// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
saga.WaitResult = true
err := saga.Submit()
if err != nil {
c.JSON(500, gin.H{"message": err.Error()})
}
c.JSON(200, gin.H{"message": "ok"})
})
r.Run(":8089")
}
1.6.3启动服务进行测试
商品服务:
订单服务:
库存服务:
库存服务原先的数据如下
这时候运行SAGA库存服务的代码,然后在浏览器上访问http://127.0.0.1:8089/start,可以在库存服务看到如下运行情况
2023-12-07 10:41:27.639 INFO v1/inventory.go:56 订单a0ee4972-407e-4625-a478-c8ccbe42c28d扣减库存
2023/12/07 10:41:27 E:/Linuxshare/mxshop/app/inventory/srv/internal/data/v1/db/inventory.go:94
[4.898ms] [rows:1] SELECT * FROM `inventory` WHERE goods = 421 AND `inventory`.`deleted_at` IS NULL ORDER BY `inventory`.`id` LIMIT 1
2023/12/07 10:41:27 E:/Linuxshare/mxshop/app/inventory/srv/internal/data/v1/db/inventory.go:58
[5.495ms] [rows:1] UPDATE `inventory` SET `stocks`=stocks - 2 WHERE goods=421 AND stocks >= 2 AND `inventory`.`deleted_at` IS NULL
2023/12/07 10:41:27 E:/Linuxshare/mxshop/app/inventory/srv/internal/data/v1/db/inventory.go:76
[17.646ms] [rows:1] INSERT INTO `stockselldetail` (`order_sn`,`status`,`detail`) VALUES ('a0ee4972-407e-4625-a478-c8ccbe42c28d',1,'
[{"Goods":421,"Num":2}]')
再看数据库,库存的数据已发生变动,库存明细数据也行插入了
1.7事务屏障达到通过gin集成转入转出功能
1.7.1事务屏障介绍
异常与子事务屏障
分布式事务之所以难,主要是因为分布式系统中的各个节点都可能发生各种非预期的情况。本文先介绍分布式系统中的异常问题,然后介绍这些问题带给分布式事务的挑战,接下来指出现有各种常见用法的问题,最后给出正确的方案。
NPC的挑战
分布式系统最大的敌人可能就是NPC了,在这里它是Network Delay, Process Pause, Clock Drift的首字母缩写。我们先看看具体的NPC问题是什么:
- Network Delay,网络延迟。虽然网络在多数情况下工作的还可以,虽然TCP保证传输顺序和不会丢失,但它无法消除网络延迟问题。
- Process Pause,进程暂停。有很多种原因可以导致进程暂停:比如编程语言中的GC(垃圾回收机制)会暂停所有正在运行的线程;再比如,我们有时会暂停云服务器,从而可以在不重启的情况下将云服务器从一台主机迁移到另一台主机。我们无法确定性预测进程暂停的时长,你以为持续几百毫秒已经很长了,但实际上持续数分钟之久进程暂停并不罕见。
- Clock Drift,时钟漂移。现实生活中我们通常认为时间是平稳流逝,单调递增的,但在计算机中不是。计算机使用时钟硬件计时,通常是石英钟,计时精度有限,同时受机器温度影响。为了在一定程度上同步网络上多个机器之间的时间,通常使用NTP协议将本地设备的时间与专门的时间服务器对齐,这样做的一个直接结果是设备的本地时间可能会突然向前或向后跳跃。
分布式事务既然是分布式的系统,自然也有NPC问题。因为没有涉及时间戳,带来的困扰主要是NP。
异常分类
我们以分布式事务中的TCC作为例子,看看NP带来的影响。
一般情况下,一个TCC回滚时的执行顺序是,先执行完Try,再执行Cancel,但是由于N,则有可能Try的网络延迟大,导致先执行Cancel,再执行Try。
这种情况就引入了分布式事务中的两个难题:
- 空补偿: Cancel执行时,Try未执行,事务分支的Cancel操作需要判断出Try未执行,这时需要忽略Cancel中的业务数据更新,直接返回
- 悬挂: Try执行时,Cancel已执行完成,事务分支的Try操作需要判断出Cancel已执行,这时需要忽略Try中的业务数据更新,直接返回
分布式事务还有一类需要处理的常见问题,就是重复请求
- 幂等: 由于任何一个请求都可能出现网络异常,出现重复请求,所有的分布式事务分支操作,都需要保证幂等性
因为空补偿、悬挂、重复请求都跟NP有关,我们把他们统称为子事务乱序问题。在业务处理中,需要小心处理好这三种问题,否则会出现错误数据。
异常原因
下面看一个网络异常的时序图,更好的理解上述几种问题
- 业务处理请求4的时候,Cancel在Try之前执行,需要处理空回滚
- 业务处理请求6的时候,Cancel重复执行,需要幂等
- 业务处理请求8的时候,Try在Cancel后执行,需要处理悬挂
现有方案的问题
我们看到开源项目dtm之外,包括各云厂商,各开源项目,他们给出的业务实现建议大多类似如下(这也是大多数用户最容易想到的方案):
- 空补偿: “针对该问题,在服务设计时,需要允许空补偿,即在没有找到要补偿的业务主键时,返回补偿成功,并将原业务主键记录下来,标记该业务流水已补偿成功。”
- 防悬挂: “需要检查当前业务主键是否已经在空补偿记录下来的业务主键中存在,如果存在则要拒绝执行该笔服务,以免造成数据不一致。”
上述的这种实现,能够在大部分情况下正常运行,但是上述做法中的“先查后改”在并发情况下是容易掉坑里的,我们分析以下如下场景:
- 正常执行顺序下,Try执行时,在查完没有空补偿记录的业务主键之后,事务提交之前,如果发生了进程暂停P,或者事务内部进行网络请求出现了拥塞,导致本地事务等待较久
- 全局事务超时后,Cancel执行,因为没有查到要补偿的业务主键,因此判断是空补偿,返回
- Try的进程暂停结束,最后提交本地事务
- 全局事务回滚完成后,Try分支的业务操作没有被回滚,产生了悬挂
事实上,NPC里的P和C,以及P和C的组合,有很多种的场景,都可以导致上述竞态情况,就不一一赘述了。
虽然这种情况发生的概率不高,但是在金融领域,一旦涉及金钱账目,那么带来的影响可能是巨大的。
PS:幂等控制如果也采用“先查再改”,也是一样很容易出现类似的问题。解决这一类问题的关键点是要利用唯一索引,“以改代查”来避免竞态条件。
子事务屏障
我们在dtm中,首创了子事务屏障技术,使用该技术,能够非常便捷的解决异常问题,极大的降低了分布式事务的使用门槛。
子事务屏障能够达到下面这个效果,看示意图:
所有这些请求,到了子事务屏障后:不正常的请求,会被过滤;正常请求,通过屏障。开发者使用子事务屏障之后,前面所说的各种异常全部被妥善处理,业务开发人员只需要关注实际的业务逻辑,负担大大降低。 子事务屏障提供了方法BranchBarrier.CallWithDB ,方法的原型为:
func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BusiFunc) error
业务开发人员,在busiCall里面编写自己的相关逻辑,调用 BranchBarrier.CallWithDB 。 BranchBarrier.CallWithDB 保证,在空回滚、悬挂等场景下,busiCall不会被调用;在业务被重复调用时,有幂等控制,保证只被提交一次。
子事务屏障会管理TCC、SAGA、事务消息等,也可以扩展到其他领域
原理
子事务屏障技术的原理是,在本地数据库,建立分支操作状态表dtm_barrier,唯一键为全局事务id-分支id-分支操作(try|confirm|cancel)
- 开启本地事务
- 对于当前操作op(try|confirm|cancel),insert ignore一条数据gid-branchid-op,如果插入不成功,提交事务返回成功(常见的幂等控制方法)
- 如果当前操作是cancel,那么在insert ignore一条数据gid-branchid-try,如果插入成功(注意是成功),则提交事务返回成功
- 调用屏障内的业务逻辑,如果业务返回成功,则提交事务返回成功;如果业务返回失败,则回滚事务返回失败
在此机制下,解决了乱序相关的问题
- 空补偿控制–如果Try没有执行,直接执行了Cancel,那么3中Cancel插入gid-branchid-try会成功,不走屏障内的逻辑,保证了空补偿控制
- 幂等控制–2中任何一个操作都无法重复插入唯一键,保证了不会重复执行
- 防悬挂控制–Try在Cancel之后执行,那么Cancel会在3中插入gid-branchid-try,导致Try在2中不成功,就不执行屏障内的逻辑,保证了防悬挂控制
对于SAGA、二阶段消息,也是类似的机制。
原理图解
下面我们以图的方式来详解子事务屏障,因为Confirm操作不涉及空补偿和悬挂,所以重点看Try与Cancel,Try对应图中的A,Cancel对应图中的C:
子事务屏障中对应的幂等处理部分:
这部分就是常规的幂等处理部分,往数据库中插入一个唯一键,如果是重复请求,那么插入失败,直接失败返回。
子事务屏障技术就是在上述的幂等处理部分,添加一个步骤–补偿服务再插入一条A记录,正常流程下,会因为唯一键冲突导致插入失败,往下执行业务。
当发生乱序,假设C在A前面执行,那么会发生下面的时序图:
- 对于C操作,他先于A执行,是一个空补偿;此时C操作插入A记录时,发现插入成功,直接返回
- 对于A操作,他在C之后执行,是一个悬挂;此时A操作插入A记录时,发现插入失败,直接返回
这两种情况都会被子事务屏障拦截返回,而不执行内部的业务操作。可以看到子事务屏障非常巧妙的解决了幂等、空补偿和悬挂三个问题。
竞态分析
上面分析了Try和Cancel的执行时间没有重叠的情况下,能够解决空补偿和悬挂问题。如果出现了Try和Cancel执行时间重叠的情况,我们看看会发生什么。
假设Try和Cancel并发执行,Cancel和Try都会插入同一条记录gid-branchid-try,由于唯一索引冲突,那么两个操作中只有一个能够成功,而另一个则会等持有锁的事务完成后返回。
- 情况1,Try插入gid-branchid-try失败,Cancel操作插入gid-branchid-try成功,此时就是典型的空补偿和悬挂场景,按照子事务屏障算法,Try和Cancel都会直接返回
- 情况2,Try插入gid-branchid-try成功,Cancel操作插入gid-branchid-try失败,按照上述子事务屏障算法,会正常执行业务,而且业务执行的顺序是Try在Cancel前
- 情况3,Try和Cancel的操作在重叠期间又遇见宕机等情况,那么至少Cancel会被dtm重试,那么最终会走到情况1或2。
综上各种情况的详细论述,子事务屏障能够在各种NP情况下,保证最终结果的正确性。
优点
事实上,子事务屏障有大量优点,包括:
- 两个insert判断解决空补偿、防悬挂、幂等这三个问题,比其他方案的三种情况分别判断,逻辑复杂度大幅降低
- dtm的子事务屏障是SDK层解决这三个问题,业务完全不需要关心
- 性能高,对于正常完成的事务(一般失败的事务不超过1%),子事务屏障的额外开销是每个分支操作一个SQL,比其他方案代价更小。
支持的存储
目前子事务屏障已经支持了
- 数据库:包括 Mysql, Postgres, 以及与Mysql,Postgres兼容的数据库
- 缓存 Redis:采用 Lua 脚本事务支持
- Mongo:采用 Mongo 的事务支持
在子事务屏障的支持下,您可以将Redis、Mongo和数据库的事务组合在一起,形成一个全局事务。相关用法,可以在dtm-examples里面找到
理论上支持事务的各种存储都可以轻松实现子事务屏障,例如 TiKV 等,如果较多用户有这样的需求,我们将会快速支持。
对接orm库
barrier提供了sql标准接口,但大家的应用通常都会引入更高级的orm库,而不是裸用sql接口,因此需要进行转化. 相关的对接参考对接ORM
1.7.2建表
create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(
id bigint(22) PRIMARY KEY AUTO_INCREMENT,
trans_type varchar(45) default '',
gid varchar(128) default '',
branch_id varchar(128) default '',
op varchar(45) default '',
barrier_id varchar(45) default '',
reason varchar(45) default '' comment 'the branch type who insert this record',
create_time datetime DEFAULT now(),
update_time datetime DEFAULT now(),
key(create_time),
key(update_time),
UNIQUE key(gid, branch_id, op, barrier_id)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
17.3代码编写
package main
import (
"database/sql"
"fmt"
"log"
"net/http"
"os"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/lithammer/shortuuid/v3"
"github.com/dtm-labs/client/dtmcli"
"gorm.io/driver/mysql"
"gorm.io/gorm"
glog "gorm.io/gorm/logger"
)
type UserAccount struct {
ID int `gorm:"column:id;primary_key"`
UserId int `gorm:"user_id"`
Balance float64 `gorm:"balance"`
TradingBalance float64 `gorm:"trading_balance"`
}
func (UserAccount) TableName() string {
return "user_account"
}
var lock sync.Mutex
// 转入和转出的时候,都要加锁,否则会出现并发问题
func SagaAdjustBalance(db *sql.Tx, uid int, amount float64) error {
lock.Lock()
defer lock.Unlock()
if amount < 0 {
var balance float64
db.QueryRow("select balance from dtm.user_account where user_id = ?", uid).Scan(&balance)
if balance < -amount {
return fmt.Errorf("余额不足")
}
}
_, err := db.Exec("update dtm.user_account set balance = balance + ? where user_id = ?", amount, uid)
if err != nil {
return err
}
return nil
}
var db *gorm.DB
func initDB() error {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
"root",
"root",
"192.168.2.13",
"3306",
"dtm")
newLogger := glog.New(
log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer(日志输出的目标,前缀和日志包含的内容——译者注)
glog.Config{
SlowThreshold: time.Second, // 慢 SQL 阈值
LogLevel: glog.Info, // 日志级别
IgnoreRecordNotFoundError: true, // 忽略ErrRecordNotFound(记录未找到)错误
Colorful: false, // 禁用彩色打印
},
)
var err error
db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: newLogger,
})
if err != nil {
return err
}
return nil
}
// 获取屏障
// MustBarrierFromGin 1
func MustBarrierFromGin(c *gin.Context) *dtmcli.BranchBarrier {
ti, err := dtmcli.BarrierFromQuery(c.Request.URL.Query())
fmt.Println(err)
return ti
}
// 服务发现, 库存服务有5个
func main() {
err := initDB()
if err != nil {
panic(err)
}
r := gin.Default()
r.POST("/SagaBTransIn", func(c *gin.Context) {
barrier := MustBarrierFromGin(c) //1.生成一个屏障
tx := db.Begin() //2.开启事务
sourceTx := tx.Statement.ConnPool.(*sql.Tx)
err := barrier.Call(sourceTx, func(tx1 *sql.Tx) error { //3.将业务逻辑翻到Call方法执行
fmt.Println("开始转入")
userID := 1
err := SagaAdjustBalance(sourceTx, userID, 100) //4.修改gorm为 sql.Tx并使用原生sql查询(gorm支持不全)
if err != nil {
fmt.Printf("转入失败:%s\r\n", err.Error())
return err
}
return nil
})
if err != nil {
c.JSON(http.StatusOK, gin.H{"code": 1, "msg": err.Error()})
return
}
return
})
r.POST("/SagaBTransInCom", func(c *gin.Context) {
fmt.Println("转入失败, 开始补偿")
//userID := 1
//err := SagaAdjustBalance(db, userID, -100)
//if err != nil {
// fmt.Printf("转入补偿失败:%s\r\n", err.Error())
// return
//}
fmt.Println("转入补偿成功")
})
r.POST("/SagaBTransOut", func(c *gin.Context) {
barrier := MustBarrierFromGin(c)
tx := db.Begin()
sourceTx := tx.Statement.ConnPool.(*sql.Tx)
err := barrier.Call(sourceTx, func(tx1 *sql.Tx) error {
fmt.Println("开始转出")
userID := 3
err := SagaAdjustBalance(sourceTx, userID, -100)
if err != nil {
if err.Error() == "余额不足" {
c.JSON(http.StatusConflict, gin.H{})
}
fmt.Printf("转出失败:%s\r\n", err.Error())
c.JSON(500, gin.H{"msg": err.Error()})
}
fmt.Println("转出成功")
return nil
})
if err != nil {
c.JSON(http.StatusOK, gin.H{"code": 1, "msg": err.Error()})
return
}
return
})
r.POST("/SagaBTransOutCom", func(c *gin.Context) {
fmt.Println("转出失败, 开始补偿")
//userID := 3
//err := SagaAdjustBalance(db, userID, 100)
//if err != nil {
// fmt.Printf("转出补偿失败:%s\r\n", err.Error())
// return
//}
fmt.Println("转出补偿成功")
})
r.GET("start", func(c *gin.Context) {
req := gin.H{}
dmtServer := "http://127.0.0.1:36789/api/dtmsvr"
qsBusi := "http://127.0.0.1:8089"
saga := dtmcli.NewSaga(dmtServer, shortuuid.New()).
// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCom"
Add(qsBusi+"/SagaBTransOut", qsBusi+"/SagaBTransOutCom", req).
// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCom"
Add(qsBusi+"/SagaBTransIn", qsBusi+"/SagaBTransInCom", req)
// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
saga.WaitResult = true
err := saga.Submit()
if err != nil {
c.JSON(500, gin.H{"message": err.Error()})
}
c.JSON(200, gin.H{"message": "ok"})
})
r.Run(":8089")
}
s\r\n", err.Error())
// return
//}
fmt.Println(“转出补偿成功”)
})
r.GET("start", func(c *gin.Context) {
req := gin.H{}
dmtServer := "http://127.0.0.1:36789/api/dtmsvr"
qsBusi := "http://127.0.0.1:8089"
saga := dtmcli.NewSaga(dmtServer, shortuuid.New()).
// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCom"
Add(qsBusi+"/SagaBTransOut", qsBusi+"/SagaBTransOutCom", req).
// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCom"
Add(qsBusi+"/SagaBTransIn", qsBusi+"/SagaBTransInCom", req)
// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
saga.WaitResult = true
err := saga.Submit()
if err != nil {
c.JSON(500, gin.H{"message": err.Error()})
}
c.JSON(200, gin.H{"message": "ok"})
})
r.Run(":8089")
}