dtm分布式事务框架之SAGA 实战

一.dtm分布式事务框架之SAGA

1.1DTM介绍

DTM是一款开源的分布式事务管理器,解决跨数据库、跨服务、跨语言栈更新数据的一致性问题。

通俗一点说,DTM提供跨服务事务能力,一组服务要么全部成功,要么全部回滚,避免只更新了一部分数据产生的一致性问题。

您可以在为什么选DTM中了解更多DTM的设计初衷。

1.2SAGA介绍

10分钟说透Saga分布式事务

Saga是这一篇数据库论文SAGAS提到的一个分布式事务方案。其核心思想是将长事务拆分为多个本地短事务,由Saga事务协调器协调,如果各个本地事务成功完成那就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。

与tcc(try,commit,cancel)不同,saga取消了commit阶段.可以出现中间状态.例如saga分布式事务(saga是dtm框架一部分):
1.从前往后执行事务,执行出错,向前补偿(回滚)
2.没有configm阶段,B可以看见中间状态

img

1.3.各种分布式事务应用场景

img

1.4DTM安装

这里采用的是源码编译安装

git clone https://github.com/dtm-labs/dtm && cd dtm
go build

启动后的界面如下

在这里插入图片描述

1.5HTTP-SAGA转账

这里参考的是DTM的SAGA例子

1.5.1创建我们的用户表
CREATE TABLE `user_account` (
 `id` int(11) NOT NULL AUTO_INCREMENT,
 `user_id` int(11) NOT NULL,
 `balance` decimal(10,2) NOT NULL DEFAULT '0.00',
 `trading_balance` decimal(10,2) NOT NULL DEFAULT '0.00',
 `create_time` datetime DEFAULT CURRENT_TIMESTAMP,
 `update_time` datetime DEFAULT CURRENT_TIMESTAMP,
 PRIMARY KEY (`id`),
 UNIQUE KEY `user_id` (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4
1.5.2编写核心业务代码

调整用户的账户余额

func SagaAdjustBalance(db *gorm.DB, uid int, amount float64) error {
   lock.Lock()
   defer lock.Unlock()

   if amount < 0 {
      var userAccount = UserAccount{}
      db.First(&userAccount, "user_id=?", uid)
      if userAccount.Balance < amount {
         return fmt.Errorf("余额不足")
      }
   }
   t := db.Exec("update user_account set balance = ? where user_id = ?", gorm.Expr("balance + ?", amount), uid)
   if t.Error != nil {
      return t.Error
   }
   return nil
}

再来编写具体的正向操作/补偿操作的处理函数

r.POST("/SagaBTransIn", func(c *gin.Context) {
		fmt.Printf("开始转入")
		userID := 1
		err = SagaAdjustBalance(db, userID, 100)
		if err != nil {
			fmt.Printf("转入失败:%s\r\n", err.Error())
		}
		fmt.Println("转入成功")
	})
	r.POST("/SagaBTransInCom", func(c *gin.Context) {
		fmt.Printf("转入失败,开始补偿")
		userID := 1
		err = SagaAdjustBalance(db, userID, -100)
		if err != nil {
			fmt.Printf("转入补偿失败:%s\r\n", err.Error())
		}
		fmt.Println("转入补偿成功")
	})
	r.POST("/SagaBTransOut", func(c *gin.Context) {
		fmt.Printf("开始转出")
		userID := 3
		err = SagaAdjustBalance(db, userID, -100)
		if err != nil {
			if err.Error() == "余额不足" {
				c.JSON(http.StatusConflict, gin.H{})
				return
			}
			fmt.Printf("转出失败:%s\r\n", err.Error())
			c.JSON(500, gin.H{"message": err.Error()})
			return
		}
		fmt.Println("转出成功")
	})

	r.POST("/SagaBTransOutCom", func(c *gin.Context) {
		fmt.Printf("转出补偿")
		userID := 3
		err = SagaAdjustBalance(db, userID, 100)
		if err != nil {
			fmt.Printf("转出补偿失败:%s\r\n", err.Error())
		}
		fmt.Println("转出补偿成功")
	})

到此各个子事务的处理函数已经OK了,然后是开启SAGA事务,进行分支调用

	r.GET("/start", func(c *gin.Context) {
		req := gin.H{}
		dmtServer := "http://127.0.0.1:36789/api/dtmsvr"
		qsBusi := "http://127.0.0.1:8089"
		saga := dtmcli.NewSaga(dmtServer, shortuuid.New()).
			// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCom"
			Add(qsBusi+"/SagaBTransOut", qsBusi+"/SagaBTransOutCom", req).
			// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCom"
			Add(qsBusi+"/SagaBTransIn", qsBusi+"/SagaBTransInCom", req)
		// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
		saga.WaitResult = true
		err := saga.Submit()
		if err != nil {
			c.JSON(500, gin.H{"message": err.Error()})
		}
		c.JSON(200, gin.H{"message": "ok"})
	})

完整代码如下

package main

import (
   "fmt"
   "github.com/dtm-labs/client/dtmcli"
   "github.com/gin-gonic/gin"
   "github.com/lithammer/shortuuid/v3"
   "gorm.io/driver/mysql"
   "gorm.io/gorm"
   glog "gorm.io/gorm/logger"
   "log"
   "net/http"
   "os"
   "sync"
   "time"
)

type UserAccount struct {
   ID             int     `gorm:"column:id;primary_key"`
   UserId         int     `gorm:"user_id"`
   Balance        float64 `gorm:"balance"`
   TradingBalance float64 `gorm:"trading_balance"`
}

func (UserAccount) TableName() string {
   return "user_account"
}

var lock sync.Mutex

func SagaAdjustBalance(db *gorm.DB, uid int, amount float64) error {
   lock.Lock()
   defer lock.Unlock()

   if amount < 0 {
      var userAccount = UserAccount{}
      db.First(&userAccount, "user_id=?", uid)
      if userAccount.Balance < amount {
         return fmt.Errorf("余额不足")
      }
   }
   t := db.Exec("update user_account set balance = ? where user_id = ?", gorm.Expr("balance + ?", amount), uid)
   if t.Error != nil {
      return t.Error
   }
   return nil
}

var db *gorm.DB

func InitDB() error {
   var err error
   dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
      "root",
      "123456",
      "127.0.0.1",
      "3306",
      "dtm")

   //希望大家自己可以去封装logger
   newLogger := glog.New(
      log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer(日志输出的目标,前缀和日志包含的内容——译者注)
      glog.Config{
         SlowThreshold:             time.Second, // 慢 SQL 阈值
         LogLevel:                  glog.Info,   // 日志级别
         IgnoreRecordNotFoundError: true,        // 忽略ErrRecordNotFound(记录未找到)错误
         Colorful:                  false,       // 禁用彩色打印
      },
   )
   db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
      Logger: newLogger,
   })
   if err != nil {
      return err
   }
   return nil
}

func main() {
   err := InitDB()
   if err != nil {
      panic(err)
   }
   r := gin.Default()
   r.POST("/SagaBTransIn", func(c *gin.Context) {
      fmt.Printf("开始转入")
      userID := 1
      err = SagaAdjustBalance(db, userID, 100)
      if err != nil {
         fmt.Printf("转入失败:%s\r\n", err.Error())
      }
      fmt.Println("转入成功")
   })
   r.POST("/SagaBTransInCom", func(c *gin.Context) {
      fmt.Printf("转入失败,开始补偿")
      userID := 1
      err = SagaAdjustBalance(db, userID, -100)
      if err != nil {
         fmt.Printf("转入补偿失败:%s\r\n", err.Error())
      }
      fmt.Println("转入补偿成功")
   })
   r.POST("/SagaBTransOut", func(c *gin.Context) {
      fmt.Printf("开始转出")
      userID := 3
      err = SagaAdjustBalance(db, userID, -100)
      if err != nil {
         if err.Error() == "余额不足" {
            c.JSON(http.StatusConflict, gin.H{})
            return
         }
         fmt.Printf("转出失败:%s\r\n", err.Error())
         c.JSON(500, gin.H{"message": err.Error()})
         return
      }
      fmt.Println("转出成功")
   })

   r.POST("/SagaBTransOutCom", func(c *gin.Context) {
      fmt.Printf("转出补偿")
      userID := 3
      err = SagaAdjustBalance(db, userID, 100)
      if err != nil {
         fmt.Printf("转出补偿失败:%s\r\n", err.Error())
      }
      fmt.Println("转出补偿成功")
   })
   r.GET("/start", func(c *gin.Context) {
      req := gin.H{}
      dmtServer := "http://127.0.0.1:36789/api/dtmsvr"
      qsBusi := "http://127.0.0.1:8089"
      saga := dtmcli.NewSaga(dmtServer, shortuuid.New()).
         // 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCom"
         Add(qsBusi+"/SagaBTransOut", qsBusi+"/SagaBTransOutCom", req).
         // 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCom"
         Add(qsBusi+"/SagaBTransIn", qsBusi+"/SagaBTransInCom", req)
      // 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
      saga.WaitResult = true
      err := saga.Submit()
      if err != nil {
         c.JSON(500, gin.H{"message": err.Error()})
      }
      c.JSON(200, gin.H{"message": "ok"})
   })
   r.Run(":8089")
}
1.5.3测试

启动 main.go,在浏览上运行http://127.0.0.1:8089/start

可以看到如下的运行结果:

[GIN-debug] Listening and serving HTTP on :8089
开始转出
2023/12/07 10:16:06 E:/Linuxshare/GoStart/dtm/main.go:37
[64.070ms] [rows:1] SELECT * FROM `user_account` WHERE user_id=3 ORDER BY `user_account`.`id` LIMIT 1
&gid=3NujmbFwy6caKsX88fkApj&op=action&trans_type=saga"
开始转入
2023/12/07 10:16:06 E:/Linuxshare/GoStart/dtm/main.go:42
[5.984ms] [rows:1] update user_account set balance = balance + 100 where user_id = 1
转入成功
[GIN] 2023/12/07 - 10:16:06 | 200 |     51.7071ms |       127.0.0.1 | POST     "/SagaBTransIn?branch_id=02&
gid=3NujmbFwy6caKsX88fkApj&op=action&trans_type=saga"
[GIN] 2023/12/07 - 10:16:06 | 200 |    667.8659ms |       127.0.0.1 | GET      "/start"                    
[GIN] 2023/12/07 - 10:16:06 | 404 |            0s |       127.0.0.1 | GET      "/favicon.ico"   

1.6GRPC-SAGA库存服务

1.6.1复制一份conf.sample.yml 改名为conf.yaml

这里面采用的通信协议是kratos,代码如下

MicroService: # gRPC/HTTP based microservice config
  Driver: 'dtm-driver-kratos' # name of the driver to handle register/discover
  Target: 'consul://127.0.0.1:8500/dtmservice' # register dtm server to this url
  EndPoint: 'grpc://127.0.0.1:36790'

修改完后,重新启动DTM,启动的时候要加参数,如下图所示

在这里插入图片描述

启动完后就可以看到已经注册到consul上去了

在这里插入图片描述

1.6.2编写具体的服务

SAGA库存服务的具体代码如下

package main

import (
   proto "GoStart/api/inventory/v1"
   "fmt"
   "github.com/dtm-labs/client/dtmgrpc"
   "github.com/gin-gonic/gin"
   "github.com/google/uuid"
)

func main() {
   r := gin.Default()
   r.GET("/start", func(c *gin.Context) {
      orderSn := uuid.NewString()
      req := &proto.SellInfo{
         GoodsInfo: []*proto.GoodsInvInfo{
            {
               GoodsId: 421,
               Num:     2,
            },
         },
         OrderSn: orderSn,
      }
      dmtServer := "127.0.0.1:36790"
      qsBusi := "discovery:///inventory-srv"
      uid := uuid.NewString()
      fmt.Println("uid", uid)
      saga := dtmgrpc.NewSagaGrpc(dmtServer, orderSn).
         // 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCom"
         Add(qsBusi+"/Inventory/Sell", qsBusi+"/Inventory/Reback", req)
      // 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
      saga.WaitResult = true
      err := saga.Submit()
      if err != nil {
         c.JSON(500, gin.H{"message": err.Error()})
      }
      c.JSON(200, gin.H{"message": "ok"})
   })
   r.Run(":8089")
}

1.6.3启动服务进行测试

商品服务:

在这里插入图片描述

订单服务:

在这里插入图片描述

库存服务:

在这里插入图片描述

库存服务原先的数据如下

在这里插入图片描述

这时候运行SAGA库存服务的代码,然后在浏览器上访问http://127.0.0.1:8089/start,可以在库存服务看到如下运行情况

2023-12-07 10:41:27.639 INFO    v1/inventory.go:56      订单a0ee4972-407e-4625-a478-c8ccbe42c28d扣减库存

2023/12/07 10:41:27 E:/Linuxshare/mxshop/app/inventory/srv/internal/data/v1/db/inventory.go:94
[4.898ms] [rows:1] SELECT * FROM `inventory` WHERE goods = 421 AND `inventory`.`deleted_at` IS NULL ORDER BY `inventory`.`id` LIMIT 1

2023/12/07 10:41:27 E:/Linuxshare/mxshop/app/inventory/srv/internal/data/v1/db/inventory.go:58
[5.495ms] [rows:1] UPDATE `inventory` SET `stocks`=stocks - 2 WHERE goods=421 AND stocks >= 2 AND `inventory`.`deleted_at` IS NULL 

2023/12/07 10:41:27 E:/Linuxshare/mxshop/app/inventory/srv/internal/data/v1/db/inventory.go:76
[17.646ms] [rows:1] INSERT INTO `stockselldetail` (`order_sn`,`status`,`detail`) VALUES ('a0ee4972-407e-4625-a478-c8ccbe42c28d',1,'
[{"Goods":421,"Num":2}]')



再看数据库,库存的数据已发生变动,库存明细数据也行插入了

在这里插入图片描述

在这里插入图片描述

1.7事务屏障达到通过gin集成转入转出功能

1.7.1事务屏障介绍

异常与子事务屏障

分布式事务之所以难,主要是因为分布式系统中的各个节点都可能发生各种非预期的情况。本文先介绍分布式系统中的异常问题,然后介绍这些问题带给分布式事务的挑战,接下来指出现有各种常见用法的问题,最后给出正确的方案。

NPC的挑战

分布式系统最大的敌人可能就是NPC了,在这里它是Network Delay, Process Pause, Clock Drift的首字母缩写。我们先看看具体的NPC问题是什么:

  • Network Delay,网络延迟。虽然网络在多数情况下工作的还可以,虽然TCP保证传输顺序和不会丢失,但它无法消除网络延迟问题。
  • Process Pause,进程暂停。有很多种原因可以导致进程暂停:比如编程语言中的GC(垃圾回收机制)会暂停所有正在运行的线程;再比如,我们有时会暂停云服务器,从而可以在不重启的情况下将云服务器从一台主机迁移到另一台主机。我们无法确定性预测进程暂停的时长,你以为持续几百毫秒已经很长了,但实际上持续数分钟之久进程暂停并不罕见。
  • Clock Drift,时钟漂移。现实生活中我们通常认为时间是平稳流逝,单调递增的,但在计算机中不是。计算机使用时钟硬件计时,通常是石英钟,计时精度有限,同时受机器温度影响。为了在一定程度上同步网络上多个机器之间的时间,通常使用NTP协议将本地设备的时间与专门的时间服务器对齐,这样做的一个直接结果是设备的本地时间可能会突然向前或向后跳跃。

分布式事务既然是分布式的系统,自然也有NPC问题。因为没有涉及时间戳,带来的困扰主要是NP。

异常分类

我们以分布式事务中的TCC作为例子,看看NP带来的影响。

一般情况下,一个TCC回滚时的执行顺序是,先执行完Try,再执行Cancel,但是由于N,则有可能Try的网络延迟大,导致先执行Cancel,再执行Try。

这种情况就引入了分布式事务中的两个难题:

  • 空补偿: Cancel执行时,Try未执行,事务分支的Cancel操作需要判断出Try未执行,这时需要忽略Cancel中的业务数据更新,直接返回
  • 悬挂: Try执行时,Cancel已执行完成,事务分支的Try操作需要判断出Cancel已执行,这时需要忽略Try中的业务数据更新,直接返回

分布式事务还有一类需要处理的常见问题,就是重复请求

  • 幂等: 由于任何一个请求都可能出现网络异常,出现重复请求,所有的分布式事务分支操作,都需要保证幂等性

因为空补偿、悬挂、重复请求都跟NP有关,我们把他们统称为子事务乱序问题。在业务处理中,需要小心处理好这三种问题,否则会出现错误数据。

异常原因

下面看一个网络异常的时序图,更好的理解上述几种问题

exception

  • 业务处理请求4的时候,Cancel在Try之前执行,需要处理空回滚
  • 业务处理请求6的时候,Cancel重复执行,需要幂等
  • 业务处理请求8的时候,Try在Cancel后执行,需要处理悬挂

现有方案的问题

我们看到开源项目dtm之外,包括各云厂商,各开源项目,他们给出的业务实现建议大多类似如下(这也是大多数用户最容易想到的方案):

  • 空补偿: “针对该问题,在服务设计时,需要允许空补偿,即在没有找到要补偿的业务主键时,返回补偿成功,并将原业务主键记录下来,标记该业务流水已补偿成功。”
  • 防悬挂: “需要检查当前业务主键是否已经在空补偿记录下来的业务主键中存在,如果存在则要拒绝执行该笔服务,以免造成数据不一致。”

上述的这种实现,能够在大部分情况下正常运行,但是上述做法中的“先查后改”在并发情况下是容易掉坑里的,我们分析以下如下场景:

  • 正常执行顺序下,Try执行时,在查完没有空补偿记录的业务主键之后,事务提交之前,如果发生了进程暂停P,或者事务内部进行网络请求出现了拥塞,导致本地事务等待较久
  • 全局事务超时后,Cancel执行,因为没有查到要补偿的业务主键,因此判断是空补偿,返回
  • Try的进程暂停结束,最后提交本地事务
  • 全局事务回滚完成后,Try分支的业务操作没有被回滚,产生了悬挂

事实上,NPC里的P和C,以及P和C的组合,有很多种的场景,都可以导致上述竞态情况,就不一一赘述了。

虽然这种情况发生的概率不高,但是在金融领域,一旦涉及金钱账目,那么带来的影响可能是巨大的。

PS:幂等控制如果也采用“先查再改”,也是一样很容易出现类似的问题。解决这一类问题的关键点是要利用唯一索引,“以改代查”来避免竞态条件。

子事务屏障

我们在dtm中,首创了子事务屏障技术,使用该技术,能够非常便捷的解决异常问题,极大的降低了分布式事务的使用门槛。

子事务屏障能够达到下面这个效果,看示意图:

barrier

所有这些请求,到了子事务屏障后:不正常的请求,会被过滤;正常请求,通过屏障。开发者使用子事务屏障之后,前面所说的各种异常全部被妥善处理,业务开发人员只需要关注实际的业务逻辑,负担大大降低。 子事务屏障提供了方法BranchBarrier.CallWithDB ,方法的原型为:

func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BusiFunc) error

业务开发人员,在busiCall里面编写自己的相关逻辑,调用 BranchBarrier.CallWithDB 。 BranchBarrier.CallWithDB 保证,在空回滚、悬挂等场景下,busiCall不会被调用;在业务被重复调用时,有幂等控制,保证只被提交一次。

子事务屏障会管理TCC、SAGA、事务消息等,也可以扩展到其他领域

原理

子事务屏障技术的原理是,在本地数据库,建立分支操作状态表dtm_barrier,唯一键为全局事务id-分支id-分支操作(try|confirm|cancel)

  1. 开启本地事务
  2. 对于当前操作op(try|confirm|cancel),insert ignore一条数据gid-branchid-op,如果插入不成功,提交事务返回成功(常见的幂等控制方法)
  3. 如果当前操作是cancel,那么在insert ignore一条数据gid-branchid-try,如果插入成功(注意是成功),则提交事务返回成功
  4. 调用屏障内的业务逻辑,如果业务返回成功,则提交事务返回成功;如果业务返回失败,则回滚事务返回失败

在此机制下,解决了乱序相关的问题

  • 空补偿控制–如果Try没有执行,直接执行了Cancel,那么3中Cancel插入gid-branchid-try会成功,不走屏障内的逻辑,保证了空补偿控制
  • 幂等控制–2中任何一个操作都无法重复插入唯一键,保证了不会重复执行
  • 防悬挂控制–Try在Cancel之后执行,那么Cancel会在3中插入gid-branchid-try,导致Try在2中不成功,就不执行屏障内的逻辑,保证了防悬挂控制

对于SAGA、二阶段消息,也是类似的机制。

原理图解

下面我们以图的方式来详解子事务屏障,因为Confirm操作不涉及空补偿和悬挂,所以重点看Try与Cancel,Try对应图中的A,Cancel对应图中的C:

子事务屏障中对应的幂等处理部分:

barrier-idem

这部分就是常规的幂等处理部分,往数据库中插入一个唯一键,如果是重复请求,那么插入失败,直接失败返回。

子事务屏障技术就是在上述的幂等处理部分,添加一个步骤–补偿服务再插入一条A记录,正常流程下,会因为唯一键冲突导致插入失败,往下执行业务。

在这里插入图片描述

当发生乱序,假设C在A前面执行,那么会发生下面的时序图:

在这里插入图片描述

  • 对于C操作,他先于A执行,是一个空补偿;此时C操作插入A记录时,发现插入成功,直接返回
  • 对于A操作,他在C之后执行,是一个悬挂;此时A操作插入A记录时,发现插入失败,直接返回

这两种情况都会被子事务屏障拦截返回,而不执行内部的业务操作。可以看到子事务屏障非常巧妙的解决了幂等、空补偿和悬挂三个问题。

竞态分析

上面分析了Try和Cancel的执行时间没有重叠的情况下,能够解决空补偿和悬挂问题。如果出现了Try和Cancel执行时间重叠的情况,我们看看会发生什么。

假设Try和Cancel并发执行,Cancel和Try都会插入同一条记录gid-branchid-try,由于唯一索引冲突,那么两个操作中只有一个能够成功,而另一个则会等持有锁的事务完成后返回。

  • 情况1,Try插入gid-branchid-try失败,Cancel操作插入gid-branchid-try成功,此时就是典型的空补偿和悬挂场景,按照子事务屏障算法,Try和Cancel都会直接返回
  • 情况2,Try插入gid-branchid-try成功,Cancel操作插入gid-branchid-try失败,按照上述子事务屏障算法,会正常执行业务,而且业务执行的顺序是Try在Cancel前
  • 情况3,Try和Cancel的操作在重叠期间又遇见宕机等情况,那么至少Cancel会被dtm重试,那么最终会走到情况1或2。

综上各种情况的详细论述,子事务屏障能够在各种NP情况下,保证最终结果的正确性。

优点

事实上,子事务屏障有大量优点,包括:

  • 两个insert判断解决空补偿、防悬挂、幂等这三个问题,比其他方案的三种情况分别判断,逻辑复杂度大幅降低
  • dtm的子事务屏障是SDK层解决这三个问题,业务完全不需要关心
  • 性能高,对于正常完成的事务(一般失败的事务不超过1%),子事务屏障的额外开销是每个分支操作一个SQL,比其他方案代价更小。

支持的存储

目前子事务屏障已经支持了

  • 数据库:包括 Mysql, Postgres, 以及与Mysql,Postgres兼容的数据库
  • 缓存 Redis:采用 Lua 脚本事务支持
  • Mongo:采用 Mongo 的事务支持

在子事务屏障的支持下,您可以将Redis、Mongo和数据库的事务组合在一起,形成一个全局事务。相关用法,可以在dtm-examples里面找到

理论上支持事务的各种存储都可以轻松实现子事务屏障,例如 TiKV 等,如果较多用户有这样的需求,我们将会快速支持。

对接orm库

barrier提供了sql标准接口,但大家的应用通常都会引入更高级的orm库,而不是裸用sql接口,因此需要进行转化. 相关的对接参考对接ORM

1.7.2建表
create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(
  id bigint(22) PRIMARY KEY AUTO_INCREMENT,
  trans_type varchar(45) default '',
  gid varchar(128) default '',
  branch_id varchar(128) default '',
  op varchar(45) default '',
  barrier_id varchar(45) default '',
  reason varchar(45) default '' comment 'the branch type who insert this record',
  create_time datetime DEFAULT now(),
  update_time datetime DEFAULT now(),
  key(create_time),
  key(update_time),
  UNIQUE key(gid, branch_id, op, barrier_id)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
17.3代码编写
package main

import (
	"database/sql"
	"fmt"
	"log"
	"net/http"
	"os"
	"sync"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/lithammer/shortuuid/v3"

	"github.com/dtm-labs/client/dtmcli"
	"gorm.io/driver/mysql"
	"gorm.io/gorm"
	glog "gorm.io/gorm/logger"
)

type UserAccount struct {
	ID             int     `gorm:"column:id;primary_key"`
	UserId         int     `gorm:"user_id"`
	Balance        float64 `gorm:"balance"`
	TradingBalance float64 `gorm:"trading_balance"`
}

func (UserAccount) TableName() string {
	return "user_account"
}

var lock sync.Mutex

// 转入和转出的时候,都要加锁,否则会出现并发问题
func SagaAdjustBalance(db *sql.Tx, uid int, amount float64) error {
	lock.Lock()
	defer lock.Unlock()

	if amount < 0 {
		var balance float64
		db.QueryRow("select balance from dtm.user_account where user_id = ?", uid).Scan(&balance)
		if balance < -amount {
			return fmt.Errorf("余额不足")
		}
	}
	_, err := db.Exec("update dtm.user_account set balance = balance + ? where user_id = ?", amount, uid)
	if err != nil {
		return err
	}
	return nil
}

var db *gorm.DB

func initDB() error {
	dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
		"root",
		"root",
		"192.168.2.13",
		"3306",
		"dtm")
	newLogger := glog.New(
		log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer(日志输出的目标,前缀和日志包含的内容——译者注)
		glog.Config{
			SlowThreshold:             time.Second, // 慢 SQL 阈值
			LogLevel:                  glog.Info,   // 日志级别
			IgnoreRecordNotFoundError: true,        // 忽略ErrRecordNotFound(记录未找到)错误
			Colorful:                  false,       // 禁用彩色打印
		},
	)

	var err error
	db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
		Logger: newLogger,
	})
	if err != nil {
		return err
	}
	return nil
}

// 获取屏障
// MustBarrierFromGin 1
func MustBarrierFromGin(c *gin.Context) *dtmcli.BranchBarrier {
	ti, err := dtmcli.BarrierFromQuery(c.Request.URL.Query())
	fmt.Println(err)
	return ti
}

// 服务发现, 库存服务有5个
func main() {
	err := initDB()
	if err != nil {
		panic(err)
	}

	r := gin.Default()
	r.POST("/SagaBTransIn", func(c *gin.Context) {
		barrier := MustBarrierFromGin(c) //1.生成一个屏障
		tx := db.Begin()                 //2.开启事务
		sourceTx := tx.Statement.ConnPool.(*sql.Tx)
		err := barrier.Call(sourceTx, func(tx1 *sql.Tx) error { //3.将业务逻辑翻到Call方法执行
			fmt.Println("开始转入")
			userID := 1
			err := SagaAdjustBalance(sourceTx, userID, 100) //4.修改gorm为 sql.Tx并使用原生sql查询(gorm支持不全)
			if err != nil {
				fmt.Printf("转入失败:%s\r\n", err.Error())
				return err
			}
			return nil
		})
		if err != nil {
			c.JSON(http.StatusOK, gin.H{"code": 1, "msg": err.Error()})
			return
		}

		return
	})

	r.POST("/SagaBTransInCom", func(c *gin.Context) {
		fmt.Println("转入失败, 开始补偿")
		//userID := 1
		//err := SagaAdjustBalance(db, userID, -100)
		//if err != nil {
		//	fmt.Printf("转入补偿失败:%s\r\n", err.Error())
		//	return
		//}
		fmt.Println("转入补偿成功")
	})

	r.POST("/SagaBTransOut", func(c *gin.Context) {
		barrier := MustBarrierFromGin(c)
		tx := db.Begin()
		sourceTx := tx.Statement.ConnPool.(*sql.Tx)

		err := barrier.Call(sourceTx, func(tx1 *sql.Tx) error {
			fmt.Println("开始转出")
			userID := 3
			err := SagaAdjustBalance(sourceTx, userID, -100)
			if err != nil {
				if err.Error() == "余额不足" {
					c.JSON(http.StatusConflict, gin.H{})
				}
				fmt.Printf("转出失败:%s\r\n", err.Error())
				c.JSON(500, gin.H{"msg": err.Error()})
			}
			fmt.Println("转出成功")
			return nil
		})
		if err != nil {
			c.JSON(http.StatusOK, gin.H{"code": 1, "msg": err.Error()})
			return
		}
		return
	})

	r.POST("/SagaBTransOutCom", func(c *gin.Context) {
		fmt.Println("转出失败, 开始补偿")
		//userID := 3
		//err := SagaAdjustBalance(db, userID, 100)
		//if err != nil {
		//	fmt.Printf("转出补偿失败:%s\r\n", err.Error())
		//	return
		//}
		fmt.Println("转出补偿成功")
	})

	r.GET("start", func(c *gin.Context) {
		req := gin.H{}
		dmtServer := "http://127.0.0.1:36789/api/dtmsvr"
		qsBusi := "http://127.0.0.1:8089"
		saga := dtmcli.NewSaga(dmtServer, shortuuid.New()).
			// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCom"
			Add(qsBusi+"/SagaBTransOut", qsBusi+"/SagaBTransOutCom", req).
			// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCom"
			Add(qsBusi+"/SagaBTransIn", qsBusi+"/SagaBTransInCom", req)
		// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
		saga.WaitResult = true
		err := saga.Submit()
		if err != nil {
			c.JSON(500, gin.H{"message": err.Error()})
		}
		c.JSON(200, gin.H{"message": "ok"})
	})

	r.Run(":8089")
}


s\r\n", err.Error())
// return
//}
fmt.Println(“转出补偿成功”)
})

r.GET("start", func(c *gin.Context) {
	req := gin.H{}
	dmtServer := "http://127.0.0.1:36789/api/dtmsvr"
	qsBusi := "http://127.0.0.1:8089"
	saga := dtmcli.NewSaga(dmtServer, shortuuid.New()).
		// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCom"
		Add(qsBusi+"/SagaBTransOut", qsBusi+"/SagaBTransOutCom", req).
		// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCom"
		Add(qsBusi+"/SagaBTransIn", qsBusi+"/SagaBTransInCom", req)
	// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
	saga.WaitResult = true
	err := saga.Submit()
	if err != nil {
		c.JSON(500, gin.H{"message": err.Error()})
	}
	c.JSON(200, gin.H{"message": "ok"})
})

r.Run(":8089")

}




本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/227359.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

javaTCP协议实现一对一聊天

我们首先要完成服务端&#xff0c;不然出错&#xff0c;运行也要先运行服务端&#xff0c;如果不先连接服务端&#xff0c;就不监听&#xff0c;那客户端不知道连接谁 服务端 import java.awt.BorderLayout; import java.awt.event.ActionEvent; import java.awt.event.Actio…

ncnn模型部署——使用VS2019把项目打包成DLL文件

一、项目打包成DLL文件 1.创建动态链接库DLL项目 创建完成&#xff0c;项目中包含源文件dllmain.cpp, pch.cpp&#xff0c;头文件framework.h, pch.h 2.编写和配置DLL项目 &#xff08;1&#xff09;配置pch.h文件&#xff0c;在头文件pch.h中定义宏&#xff0c;宏的作用的是…

gma 空间绘图实战(1):绘制多个子图,连接并展示局部放大区域

安装 gma&#xff1a;pip install gma 本文基于&#xff1a;gma 2.0.3&#xff0c;Python 3.10 本文用到的矢量数据为&#xff1a;CTAmap 1.12。来源于 https://www.shengshixian.com/ 。&#xff08;感谢锐多宝&#xff09; 绘图目标 参考代码 import matplotlib.pyplot as p…

电子秤ADC芯片CS1237技术资料问题合集

问题11&#xff1a;实际应用中&#xff0c;多个称重传感器应该怎么与ADC连接&#xff1f; 解答&#xff1a;如果传感器是测量同一物体&#xff08;例如&#xff1a;厨房垃圾处理器&#xff09;&#xff0c;一般建议使用并联的方式。则相同类型的信号线连接在一起。对于传感器的…

MySQL - 并发控制与事务的隔离级别

目录 第1关&#xff1a;并发控制与事务的隔离级别 第2关&#xff1a;读脏 第3关&#xff1a;不可重复读 第4关&#xff1a;幻读 第5关&#xff1a;主动加锁保证可重复读 第6关&#xff1a;可串行化 第1关&#xff1a;并发控制与事务的隔离级别 任务描述 本关任务&#…

Java第二十一章 :网络通信

网络程序设计基础 网络程序设计编写的是与其他计算机进行通信的程序。Java 已经将网络程序所需要的元素封装成不同的类&#xff0c;用户只要创建这些类的对象&#xff0c;使用相应的方法&#xff0c;即使不具备有关的网络支持&#xff0c;也可以编写出高质量的网络通信程…

十六、FreeRTOS之FreeRTOS队列集

本节需要掌握以下内容&#xff1a; 1&#xff0c;队列集简介&#xff08;了解&#xff09; 2&#xff0c;队列集相关API函数介绍&#xff08;熟悉&#xff09; 3&#xff0c;队列集操作实验&#xff08;掌握&#xff09; 一、队列集简介&#xff08;了解&#xff09; 一个…

硬件基础:差模和共模

一直以来&#xff0c;都难以理解差模和共模这两个概念&#xff0c;什么差分信号、差模信号、共模信号&#xff0c;差模干扰、共模干扰……虽然看了一些资料&#xff0c;但貌似说法还挺多的&#xff0c;理解起来仍然是一头雾水。所以&#xff0c;专门用一篇文章来好好研究下这个…

Anisble中剧本的应用

1.什么是playbook及playbook的组成 1. Playbook 的功能 playbook 是由一个或多个 play 组成的列表 Playboot 文件使用 YAML 来写的 2. YAML 简介&#xff1a; 是一种表达资料序列的格式 &#xff0c; 类似 XML Yet Another Markup Language 3. 特点 可读性好 和脚本语言…

Java+Swing: 登录和重置按钮的点击事件 整理6

1. 在Login类中给按钮添加事件 // 按钮添加鼠标点击事件loginButton.addActionListener();resetButton.addActionListener(); 2. 创建一个事件处理的类&#xff0c; 该类实现了ActionListener package com.handler;/*** Author&#xff1a;xiexu* Date&#xff1a;2023/12/7 13…

Python面向对象③:封装【侯小啾Python基础领航计划 系列(二十一)】

Python面向对象③:封装【侯小啾Python基础领航计划 系列(二十一)】 大家好,我是博主侯小啾, 🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔…

simulink enable模块——使能子系统案例仿真分析

1.案例分析 仍以一个简单的乘法增益案例分析 分析&#xff1a;可以看到&#xff0c;在满足条件性才条用使能子系统&#xff0c;在t1s和3s时刻&#xff0c;进行增益操作&#xff0c;这和上篇博客中的触发trigger子系统相同的作用。 simulink trigger模块使用——多种调用案例分…

Pixyz Studio 和 Pixyz Scenario Processor 使用入门

介绍 Pixyz产品官网 下载、安装与技术文档 官网介绍&#xff1a;Pixyz 支持超过 45 种工业文件格式&#xff0c;包括 CATIA、JT、STEP、IFC、PVZ、NWD、USD 及 glTF。包括 CAD、曲面细分/网格模型、点云等。Unity 中的资产将实时关联到原始数据&#xff0c;可自动更新文件的修…

HL7/FHIR 是什么

如果你对上面 2 个单词不熟悉的话&#xff0c;那就需要先脑补下了。 HL7 HL7 可以认为是一个标准化的组织&#xff0c;这个组织主要对标准进行控制。 如果你希望在医疗系统中对数据进行交换&#xff0c;通常 HL7 现在就是事实上的标准了。 FHIR FHIR – Fast Health Inter…

华清远见嵌入式学习——QT——作业1

作业要求&#xff1a; 代码&#xff1a; ①&#xff1a;头文件 #ifndef LOGIN_H #define LOGIN_H#include <QWidget> #include <QLineEdit> //行编辑器类 #include <QPushButton> //按钮类 #include <QLabel> //标签类 #include <QM…

Java 实现TCP一对一聊天,UDP协议实现群聊

用TCP编程实现一对一式聊天&#xff0c;并用多线程解决了处于同一线程中的问题。 客户端代码&#xff1a;mport java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.ut…

c++ - 警告 : treating ‘c-header‘ input as ‘c++-header‘ when in C++ mode, 此行为已弃用

一、问题出现 在进行多文件编译的时候报错 二、原因 我们多文件编译的时候加了头文件 三、解决办法 去掉头文件&#xff0c;只编译源文件

流星雨效果

文章目录 html css实现jscanvas实现 html css实现 对于 HTML 来说&#xff0c;:root 表示 元素&#xff0c;除了优先级更高之外&#xff0c;与 html 选择器相同。带有前缀 – 的属性名&#xff0c;比如 --example–name&#xff0c;表示的是带有值的自定义属性&#xff0c;其…

Threejs项目实战之一:汽车外观换肤效果三维展示

目录 最终效果1 创建项目2 安装插件3 编写代码3.1 准备工作3.2 代码编写3.2.1 在template标签中构建html页面3.2.2 在style标签中构建页面样式文件3.2.3 在script标签中编写js代码 最终效果 先看下最终实现的效果 接下来&#xff0c;我们就从创建项目开始&#xff0c;一步一步…

ChatGPT/GPT4科研实践篇: AI绘图+论文写作+编程

1、熟练掌握ChatGPT提示词技巧及各种应用方法&#xff0c;并成为工作中的助手。 2、通过案例掌握ChatGPT撰写、修改论文及工作报告&#xff0c;提供写作能力及优化工作 3、熟练掌握ChatGPT融合相关插件的应用&#xff0c;完成数据分析、编程以及深度学习等相关科研项目。 4、…