目录
引言:
1. TCC事务模式
2. TCC组成
3. TCC执行流程
3.1 TCC正常执行流程
3.2 TCC失败回滚
4. Confirm/Cancel操作异常
5. TCC 设计原则
5.1 TCC如何做到更好的一致性
5.2 为什么只适合短事务
6. 嵌套的TCC
7. .NET CORE结合DTM实现TCC分布式事务
7.1 轮子小卖部(Nuget)引入Dtmcli
7.2 生成转账数据库(EF_CORE)
7.3 数据库持久化
7.4 appsettings.json
7.5 Program.cs
7.6 主程序事务API控制器
7.7 用户1转账事务API控制器
7.8 用户2转账事务API控制器
小结
引言:
紧接上一期.NET CORE 分布式事务(一) DTM实现二阶段提交(.NET CORE 分布式事务(一) DTM实现二阶段提交-CSDN博客)
1. TCC事务模式
什么是TCC,TCC是Try、Confirm、Cancel三个词语的缩写,最早是由 Pat Helland 于 2007 年发表的一篇名为《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文提出。
2. TCC组成
TCC分为3个阶段
- Try 阶段:尝试执行,完成所有业务检查(一致性), 预留必须业务资源(准隔离性)
- Confirm 阶段:如果所有分支的Try都成功了,则走到Confirm阶段。Confirm真正执行业务,不作任何业务检查,只使用 Try 阶段预留的业务资源
- Cancel 阶段:如果所有分支的Try有一个失败了,则走到Cancel阶段。Cancel释放 Try 阶段预留的业务资源。
3. TCC执行流程
3.1 TCC正常执行流程
一般情况下,时序图中的9个步骤会正常完成,整个业务按照预期进行。主程序注册全局事务,以及Try尝试事务Api地址、Confirm提交事务Api地址、Cancel回滚事务Api地址。并开始执行Try尝试事务,Try中的事务要对资源进行预算以及锁定,也就是尝试执行,判断资源是否支持提交执行事务。然后进行提交事务。最终完成全局事务。
3.2 TCC失败回滚
当Try尝试事务异常时与上面的正常流程的区别是,现在不会调用Confirm提交事务,而是调用Cancel回滚事务,对Try尝试事务进行的资源锁定进行解锁释放等操作。回退到全局事务开始前。
4. Confirm/Cancel操作异常
假如Confirm/Cancel操作遇见失败会怎么样?按照Tcc模式的协议,Confirm/Cancel操作是要求最终成功的,遇见失败的情况,都是由于临时故障或者程序bug。dtm在Confirm/Cancel操作遇见失败时,会不断进行重试,直到成功。
为了避免程序bug导致补偿操作一直无法成功,建议开发者对全局事务表进行监控,发现重试超过3次的事务,发出报警,由运维人员找开发手动处理。进行人工干预。
5. TCC 设计原则
在设计上,TCC主要用于处理一致性要求较高、需要较多灵活性的短事务。
5.1 TCC如何做到更好的一致性
对于我们的 A 跨行转账给 B 的场景,如果采用SAGA,在正向操作中调余额,在补偿操作中,反向调整余额,那么会出现这种情况:如果A扣款成功,金额转入B失败,最后回滚,把A的余额调整为初始值。整个过程中如果A发现自己的余额被扣减了,但是收款方B迟迟没有收到资金,那么会对A造成非常大的困扰。
上述需求在SAGA中无法解决,但是可以通过TCC来解决,设计技巧如下:
- 在账户中的 balance 字段之外,再引入一个 trading_balance 字段
- Try 阶段检查账户是否被冻结,检查账户余额是否充足,没问题后,调整 trading_balance (即业务上的冻结资金)
- Confirm 阶段,调整 balance ,调整 trading_balance (即业务上的解冻资金)
- Cancel 阶段,调整 trading_balance (即业务上的解冻资金)
这种情况下,终端用户 A 就不会看到自己的余额扣减了,但是 B 又迟迟收不到资金的情况。
5.2 为什么只适合短事务
TCC 的事务编排放在了应用端上,就是事务一共包含多少个分支,每个分支的顺序什么样,这些信息不会像 SAGA 那样,都发送给dtm服务器之后,再去调用实际的事务分支。当应用出现 crash 或退出,编排信息丢失,那么整个全局事务,就没有办法往前重试,只能够进行回滚。如果全局事务持续时间很长,例如一分钟以上,那么当应用进行正常的发布升级时,也会导致全局事务回滚,影响业务。因此 TCC 会更适合短事务。
那么是否可以把TCC的事务编排都保存到服务器,保证应用重启也不受到影响呢?理论上这种做法是可以解决这个问题的,但是存储到服务器会比在应用端更不灵活,无法获取到每个分支的中间结果,无法做嵌套等等。
考虑到一致性要求较高和短事务是高度相关的(一个中间不一致状态持续很长时间的事务,自然不能算一致性较好),这两者跟“应用灵活编排”,也是有较高相关度,所以将 TCC 实现为应用端编排,而 SAGA 实现为服务端编排。
6. 嵌套的TCC
dtm的Tcc事务模式,支持子事务嵌套,流程图如下:
在这个流程图中,Order这个微服务,管理了订单相关的数据修改,同时还管理了一个嵌套的子事务,因此他即扮演了RM的角色,也扮演了AP的角色。
7. .NET CORE结合DTM实现TCC分布式事务
还是以跨行转账作为例子,给大家详解这种架构。业务场景介绍如下:
我们需要跨行从A转给B 30元,我们先进行可能失败的转出操作TccUserTry,即进行A扣减30元。如果A因余额不足扣减失败,那么转账直接失败,返回错误;如果扣减成功,那么进行下一步转入操作,因为转入操作没有余额不足的问题,可以假定转入操作一定会成功。
7.1 轮子小卖部(Nuget)引入Dtmcli
<ItemGroup>
<PackageReference Include="Dtmcli" Version="1.4.0" />
</ItemGroup>
7.2 生成转账数据库(EF_CORE)
数据库模型
//模型
public partial class UserMoney
{
public int id { get; set; }
public int money { get; set; }
public int trading_balance { get; set; }
public int balance { get; set; }
public int trymoney { get; set; }
public string guid { get; set; }
}
DbContext
public class DtmDbContext : DbContext
{
public DtmDbContext() { }
public DtmDbContext(DbContextOptions<DtmDbContext> options) : base(options) { }
public virtual DbSet<UserMoney> UserMoney { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder
.UseMySql("server=localhost;port=3307;user id=root;password=123;database=DTM_Test", ServerVersion.Parse("8.0.23-mysql"))
.UseLoggerFactory(LoggerFactory.Create(option =>
{
option.AddConsole();
}));
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder
.UseCollation("utf8_general_ci")
.HasCharSet("utf8");
modelBuilder.Entity<UserMoney>(entity =>
{
entity.ToTable("UserMoney");
});
}
}
7.3 数据库持久化
CREATE TABLE
IF
NOT EXISTS DTM_Test.barrier (
id BIGINT ( 22 ) PRIMARY KEY AUTO_INCREMENT,
trans_type VARCHAR ( 45 ) DEFAULT '',
gid VARCHAR ( 128 ) DEFAULT '',
branch_id VARCHAR ( 128 ) DEFAULT '',
op VARCHAR ( 45 ) DEFAULT '',
barrier_id VARCHAR ( 45 ) DEFAULT '',
reason VARCHAR ( 45 ) DEFAULT '' COMMENT 'the branch type who insert this record',
create_time datetime DEFAULT now( ),
update_time datetime DEFAULT now( ),
KEY ( create_time ),
KEY ( update_time ),
UNIQUE KEY ( gid, branch_id, op, barrier_id )
) ENGINE = INNODB DEFAULT CHARSET = utf8mb4;
数据库最终生成:
7.4 appsettings.json
{
"AllowedHosts": "*",
"ConnectionString": "server=localhost;port=3307;user id=root;password=123;database=test",
"DtmUrl": "http://localhost:36789",
"TransactionUrl": "http://localhost:5016",
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
}
}
7.5 Program.cs
// 注册DbContext
builder.Services.AddDbContext<DtmDbContext>(options =>
{
options.UseMySql(builder.Configuration.GetValue<string>("ConnectionString"), ServerVersion.Parse("8.0.23-mysql"));
});
// 注册dtm
builder.Services.AddDtmcli(dtm =>
{
dtm.DtmUrl = builder.Configuration.GetValue<string>("DtmUrl");
dtm.DBType = "mysql";
dtm.BarrierTableName = "dtm_test.barrier";
});
7.6 主程序事务API控制器
using DTM_EF.Model;
using Dtmcli;
using Microsoft.AspNetCore.Mvc;
using System.Threading;
namespace Dtm_TCC.Controllers
{
[ApiController]
[Route("[controller]")]
public class DtmTccController : ControllerBase
{
private readonly ILogger<DtmTccController> _logger;
private readonly TccGlobalTransaction _globalTransaction;
private readonly IDtmClient _dtmClient;
private readonly IConfiguration _configuration;
public DtmTccController(ILogger<DtmTccController> logger,
TccGlobalTransaction globalTransaction,
IDtmClient dtmClient,
IConfiguration configuration)
{
_logger = logger;
_globalTransaction = globalTransaction;
_dtmClient = dtmClient;
_configuration = configuration;
}
[HttpPost(Name = "DtmTcc")]
public async Task<IActionResult> DtmTcc()
{
var transactionurl = _configuration.GetValue<string>("TransactionUrl");
// 创建CancellationToken用于取消事务
CancellationToken cancellationToken = new CancellationToken();
// 生成全局事务ID
var gid = await _dtmClient.GenGid(cancellationToken);
UserMoney body = new UserMoney() { id = 1, trymoney = -30, guid = string.Empty };
UserMoney body2 = new UserMoney() { id = 2, trymoney = 30, guid = string.Empty };
await _globalTransaction.Excecute(/*gid,*/ async (tcc) =>
{
// 用户1 转出30元 第一个参数是try检测及冻结阶段,第二个是提交,第三个是回滚
var res1 = await tcc.CallBranch(body,
transactionurl + "/TccUserTry",
transactionurl + "/TccUserConfirm",
transactionurl + "/TccUserCancel", cancellationToken);
// 用户2 转入30元
var res2 = await tcc.CallBranch(body2,
transactionurl + "/TccUser2Try",
transactionurl + "/TccUser2Confirm",
transactionurl + "/TccUser2Cancel", cancellationToken);
}, cancellationToken);
return Ok(TransResponse.BuildSucceedResponse());
}
}
}
7.7 用户1转账事务API控制器
using DTM_EF;
using DTM_EF.Model;
using Dtmcli;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using MySqlConnector;
namespace Dtm_TCC.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class UserController : ControllerBase
{
private readonly IBranchBarrierFactory _barrierFactory;
private readonly ILogger<UserController> _Logger;
private readonly DtmDbContext _dtmDbContext;
public UserController(IBranchBarrierFactory barrierFactory,
ILogger<UserController> Logger,
DtmDbContext dtmDbContext)
{
_barrierFactory = barrierFactory;
_Logger = Logger;
_dtmDbContext = dtmDbContext;
}
[HttpPost]
[Route("/TccUserTry")]
public async Task<IActionResult> TccUserTry([FromQuery] string gid, [FromQuery] string trans_type, [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
{
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
var obj = TransResponse.BuildFailureResponse();
using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
{
await branchBarrier.Call(conn, async (tx) =>
{
//获取用户账户信息
var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id ).FirstOrDefault();
//判断预算--不够回滚
if (UserMoney == null || UserMoney!.money + body.trymoney < 0) obj = TransResponse.BuildFailureResponse();
else
{
//修改信息准备提交
UserMoney!.balance = 1;
UserMoney.trading_balance = 1;
UserMoney.trymoney = body.trymoney;
UserMoney.guid = gid;
_dtmDbContext.SaveChanges();
obj = TransResponse.BuildSucceedResponse();
}
await Task.CompletedTask;
});
}
_Logger.LogInformation($"{gid}--Try成功");
return Ok(obj);
}
[HttpPost]
[Route("/TccUserConfirm")]
public async Task<IActionResult> TccUserConfirm([FromQuery] string gid, [FromQuery] string trans_type,
[FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
{
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
var obj = TransResponse.BuildFailureResponse();
using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
{
await branchBarrier.Call(conn, async (tx) =>
{
//获取用户账户信息
var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();
if (UserMoney != null)
{
UserMoney!.balance = 0;
UserMoney!.trading_balance = 0;
UserMoney!.money += UserMoney!.trymoney;
UserMoney!.trymoney = 0;
UserMoney!.guid = string.Empty;
_dtmDbContext.SaveChanges();
obj = TransResponse.BuildSucceedResponse();
}
//修改信息准备提交
await Task.CompletedTask;
});
}
_Logger.LogInformation($"{gid}--Confirm成功");
return Ok(obj);
}
[HttpPost]
[Route("/TccUserCancel")]
public async Task<IActionResult> TccUserCancel([FromQuery] string gid, [FromQuery] string trans_type,
[FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
{
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
var obj = TransResponse.BuildFailureResponse();
using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
{
//操作回滚并解锁
await branchBarrier.Call(conn, async (tx) =>
{
//获取用户账户信息
var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id ).FirstOrDefault();
if (UserMoney != null)
{
UserMoney!.balance = 0;
UserMoney!.trading_balance = 0;
UserMoney!.trymoney = 0;
UserMoney!.guid = string.Empty;
_dtmDbContext.SaveChanges();
obj = TransResponse.BuildSucceedResponse();
}
await Task.CompletedTask;
});
}
_Logger.LogInformation($"{gid}--Cancel成功");
return Ok(obj);
}
}
}
7.8 用户2转账事务API控制器
using DTM_EF;
using DTM_EF.Model;
using Dtmcli;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using MySqlConnector;
namespace Dtm_TCC.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class User2Controller : ControllerBase
{
private readonly IBranchBarrierFactory _barrierFactory;
private readonly ILogger<User2Controller> _Logger;
private readonly DtmDbContext _dtmDbContext;
public User2Controller(IBranchBarrierFactory barrierFactory,
ILogger<User2Controller> Logger,
DtmDbContext dtmDbContext)
{
_barrierFactory = barrierFactory;
_Logger = Logger;
_dtmDbContext = dtmDbContext;
}
[HttpPost]
[Route("/TccUser2Try")]
public async Task<IActionResult> TccUserTry([FromQuery] string gid, [FromQuery] string trans_type, [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
{
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
var obj = TransResponse.BuildFailureResponse();
using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
{
await branchBarrier.Call(conn, async (tx) =>
{
//获取用户账户信息
var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();
//判断预算--不够回滚
if (UserMoney == null || UserMoney!.money + body.trymoney < 0) obj = TransResponse.BuildFailureResponse();
else
{
//修改信息准备提交
UserMoney!.balance = 1;
UserMoney.trading_balance = 1;
UserMoney.trymoney = body.trymoney;
UserMoney.guid = gid;
_dtmDbContext.SaveChanges();
obj = TransResponse.BuildSucceedResponse();
}
await Task.CompletedTask;
});
}
obj = TransResponse.BuildFailureResponse();
_Logger.LogInformation($"{gid}--Try成功");
return Ok(obj);
}
[HttpPost]
[Route("/TccUser2Confirm")]
public async Task<IActionResult> TccUserConfirm([FromQuery] string gid, [FromQuery] string trans_type,
[FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
{
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
var obj = TransResponse.BuildFailureResponse();
using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
{
await branchBarrier.Call(conn, async (tx) =>
{
//获取用户账户信息
var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();
if (UserMoney != null)
{
UserMoney!.balance = 0;
UserMoney!.trading_balance = 0;
UserMoney!.money += UserMoney!.trymoney;
UserMoney!.trymoney = 0;
UserMoney!.guid = string.Empty;
_dtmDbContext.SaveChanges();
obj = TransResponse.BuildSucceedResponse();
}
//修改信息准备提交
await Task.CompletedTask;
});
}
_Logger.LogInformation($"{gid}--Confirm成功");
return Ok(obj);
}
[HttpPost]
[Route("/TccUser2Cancel")]
public async Task<IActionResult> TccUserCancel([FromQuery] string gid, [FromQuery] string trans_type,
[FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
{
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
var obj = TransResponse.BuildFailureResponse();
using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
{
//操作回滚并解锁
await branchBarrier.Call(conn, async (tx) =>
{
//获取用户账户信息
var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id ).FirstOrDefault();
if (UserMoney != null)
{
UserMoney!.balance = 0;
UserMoney!.trading_balance = 0;
UserMoney!.trymoney = 0;
UserMoney!.guid = string.Empty;
_dtmDbContext.SaveChanges();
obj = TransResponse.BuildSucceedResponse();
}
await Task.CompletedTask;
});
}
_Logger.LogInformation($"{gid}--Cancel成功");
return Ok(obj);
}
}
}
小结
本文给出了一个完整的 TCC 事务方案,是一个可以实际运行的 TCC,您只需要在这个示例的基础上进行简单修改,就能够用于解决您的真实问题