参考
本文参考https://zhuanlan.zhihu.com/p/650791238从零到一搭建 TCC 分布式事务框架,并在小徐的基础上增加个人见解+笔记。
项目地址:https://github.com/xiaoxuxiansheng/gotcc
图解分布式事务实现原理(一):https://blog.csdn.net/qq_40318498/article/details/134430322?spm=1001.2014.3001.5502
图解分布式事务实现原理(二):https://blog.csdn.net/qq_40318498/article/details/134432420?spm=1001.2014.3001.5502
整体架构
TCC 本质上是一种 2PC(two phase commitment protocal 两阶段提交)的实现:
- 把分布式事务中,负责维护状态数据变更的模块,封装成一个 TCC 组件
- 把数据的变更状态拆分为对应 Try 操作的【冻结】、对应 Confirm 操作的【成功】以及对应 Cancel 操作的【失败回滚】
- 抽出一个统筹全局的事务协调者角色 TXManager. 在执行分布式事务时,分为两个阶段:
- 阶段 I:先对所有组件执行 Try 操作
- 阶段 II:根据上阶段 Try 操作的执行结果,决定本轮执行 Confirm 还是 Cancel 操作
在我们实现 TCC 框架的实战环节中,首先需要明确的事情是:
- 哪部分内容在 TCC 架构中属于通用的流程,这部分内容可以抽取出来放在 sdk 中,以供后续复用
- 哪部分内容需要给使用方预留出足够的自由度,由使用方自行实现,然后和通用 sdk 进行接轨.
这两点也是很好想,最终,这两部分内容明确如下:
- 在 TCC sdk 中实现的通用逻辑包含了和事务协调器 txManager 有关的核心流程
- 事务协调器 TXManager 开启事务以及 try-confirm/cancel 的 2PC 流程串联
- 事务协调器 TXManager 异步轮询任务,用于推进事务从中间态走向终态
- TCC 组件的注册流程
- 需要预定义事务日志存储模块 TXStore 的实现规范(声明 interface)
- 需要预定义 TCC 组件 TCCComponent 的实现规范(声明 interface)
以及使用方
- TCC 组件和 TXStore 两部分内容需要由使用方自行实现:
- 使用方自行实现 TCCComponent 类,包括其 Try、Confirm、Cancel 方法的执行逻辑
- 使用方自行实现具体的 TXStore 日志存储模块. 可以根据实际需要,选型合适的存储组件和存储方式
TCC Component
现在我们来思考一下TCC(Try-Confirm-Cancel)组件的定位和功能要求,以下是对每个要点的解释
-
用户自行实现:TCC组件是由用户自行实现的部分,这意味着开发者需要编写自定义的TCC组件代码来处理事务的Try、Confirm和Cancel操作。用户需要根据业务逻辑和需求来实现这些组件。
-
注册到RegistryCenter:TCC组件需要在TXManager启动时注册到注册中心(RegistryCenter)。这意味着用户需要配置TXManager以连接到注册中心,并将自定义的TCC组件注册到注册中心,以便TXManager能够获取并使用这些组件。
-
TXManager启动时获取组件:当使用方(应用程序)调用TXManager开启事务时,TXManager会通过注册中心获取已注册的TCC组件。这样,TXManager能够识别并使用用户自定义的TCC组件来执行事务的Try、Confirm和Cancel操作。
-
TCC组件的能力:TCC组件需要具备以下能力:
实现Try、Confirm和Cancel操作:TCC组件需要实现事务的Try、Confirm和Cancel三个阶段的操作,以确保事务的一致性。
处理业务逻辑:TCC组件需要能够执行与具体业务相关的操作,包括业务检查、资源预留、业务确认和回滚操作。
与TXManager进行交互:TCC组件需要与TXManager进行通信,以接收来自TXManager的指令,并汇报操作的执行结果(成功或失败)。
那么对应的代码实现可以简单化为:
// tcc 组件
type TCCComponent interface {
// 返回组件唯一 id
ID() string
// 执行第一阶段的 try 操作
Try(ctx context.Context, req *TCCReq) (*TCCResp, error)
// 执行第二阶段的 confirm 操作
Confirm(ctx context.Context, txID string) (*TCCResp, error)
// 执行第二阶段的 cancel 操作
Cancel(ctx context.Context, txID string) (*TCCResp, error)
}
这里我们只是简单定义了TCCComponent 接口。
TX Manager
下面是关于事务协调器 TXManager 的定位.
- TXManager 是整个 TCC 架构中最核心的角色
- TXManager 作为 gotcc 的统一入口,供使用方执行启动事务和注册组件的操作
- TXManager 作为中枢系统分别和 RegisterCenter、TXStore 交互
- TXManager 需要串联起整个 Try-Confirm/Canel 的 2PC 调用流程
- TXManager 需要运行异步轮询任务,推进未完成的事务走向终态
TX Store
TXStore 是用于存储和管理事务日志明细记录的模块:
- 需要支持事务明细数据的 CRUD 能力
- 通常情况下,底层需要应用到实际的存储组件作为支持
- TXStore 在 gotcc 的 sdk 中体现为一个抽象的 interface. 需要由用户完成具体类的实现,并将其注入到 TXManager 当中.
// 事务日志存储模块
type TXStore interface {
// 创建一条事务明细记录
CreateTX(ctx context.Context, components ...component.TCCComponent) (txID string, err error)
// 更新事务进度:实际更新的是每个组件的 try 请求响应结果
TXUpdate(ctx context.Context, txID string, componentID string, accept bool) error
// 提交事务的最终状态, 标识事务执行结果为成功或失败
TXSubmit(ctx context.Context, txID string, success bool) error
// 获取到所有未完成的事务
GetHangingTXs(ctx context.Context) ([]*Transaction, error)
// 获取指定的一笔事务
GetTX(ctx context.Context, txID string) (*Transaction, error)
// 锁住整个 TXStore 模块(要求为分布式锁)
Lock(ctx context.Context, expireDuration time.Duration) error
// 解锁TXStore 模块
Unlock(ctx context.Context) error
}
- CreateTX:创建一条事务明细记录,会在入参中传入本事务涉及的 TCC 组件列表,同时需要在出参中返回全局唯一的事务 id
- TXUpdate:更新一条事务明细记录. 这里指的更新,针对于,事务中某个 TCC 组件 Try 响应状态的更新
- TXSubmit:提交一条事务的执行结果. 要么置为成功,要么置为失败
- GetHangingTXs:获取所有未完成的事务明细记录
- GetTX:根据事务 id,获取指定的一条事务明细记录
- Lock:锁住整个事务日志存储模块(要求为分布式锁)
- Unlock:解锁整个事务日志存储模块
RegistryCenter
最后是 TCC 组件的注册管理中心 RegistryCenter,负责给 txManager 提供注册和查询 TCC 组件的能力.
TXManager 核心源码讲解
理完了基本的流程和概念,下面我们一起开启一线实战环节.
TXManager
下面是关于事务协调器 TXManager 的几个核心字段:
- txStore:内置的事务日志存储模块,需要由使用方实现并完成注入
- registryCenter:TCC 组件的注册管理中心
- opts:内聚了一些 TXManager 的配置项,可以由使用方自定义,并通过 option 注入
- ctx:用于反映 TXManager 运行生命周期的的 context,当 ctx 终止时,异步轮询任务也会随之退出
- stop:用于停止 txManager 的控制器. 当 stop 被调用后,异步轮询任务会被终止
type TXManager struct {
ctx context.Context
stop context.CancelFunc
opts *Options
txStore TXStore
registryCenter *registryCenter
}
func NewTXManager(txStore TXStore, opts ...Option) *TXManager {
ctx, cancel := context.WithCancel(context.Background())
txManager := TXManager{
opts: &Options{},
txStore: txStore,
registryCenter: newRegistryCenter(),
ctx: ctx,
stop: cancel,
}
for _, opt := range opts {
opt(txManager.opts)
}
repair(txManager.opts)
go txManager.run()
return &txManager
}
事务主流程
下面进入最核心的部分,介绍一下整个分布式事务的运行流程.
主流程
用户可以通过 txManager.Transaction 方法,一键启动动一个分布式事务流程,其中包含的几个核心步骤展示如下图:
txManager.Transaction 方法是用户启动分布式事务的入口,需要在入参中声明本次事务涉及到的组件以及需要在 Try 流程中传递给对应组件的请求参数:
type RequestEntity struct {
// 组件名称
ComponentID string `json:"componentName"`
// Try 请求时传递的参数
Request map[string]interface{} `json:"request"`
}
比如可以定义如下的参数:
componentAID := "componentA"
RequestEntity{
ComponentID: componentAID,
Request: map[string]interface{}{
"biz_id": componentAID + "_biz",
}
}
txManager.Transaction 对应源码如下,核心步骤均给出了注释. 核心的 try-confirm/cancel 流程,会在后续的 txManager.twoPhaseCommit 方法中展开.
// 启动事务
func (t *TXManager) Transaction(ctx context.Context, reqs ...*RequestEntity) (bool, error) {
// 1 限制分布式事务执行时长
tctx, cancel := context.WithTimeout(ctx, t.opts.Timeout)
defer cancel()
// 2 获得所有的涉及使用的 tcc 组件
componentEntities, err := t.getComponents(tctx, reqs...)
if err != nil {
return false, err
}
// 3 调用 txStore 模块,创建新的事务明细记录,并取得全局唯一的事务 id
txID, err := t.txStore.CreateTX(tctx, componentEntities.ToComponents()...)
if err != nil {
return false, err
}
// 4. 开启两阶段提交流程:try-confirm/cancel
return t.twoPhaseCommit(ctx, txID, componentEntities)
}
接下来我们看一下twoPhaseCommit函数。
2PC 串联
func (t *TXManager) twoPhaseCommit(ctx context.Context, txID string, componentEntities ComponentEntities) (bool, error) {
// 1 创建子 context 用于管理子 goroutine 生命周期
// 手握 cancel 终止器,能保证在需要的时候终止所有子 goroutine 生命周期
cctx, cancel := context.WithCancel(ctx)
defer cancel()
// 2 创建一个 chan,用于接收子 goroutine 传递的错误
errCh := make(chan error)
// 3 并发启动,批量执行各 tcc 组件的 try 流程
go func() {
// 通过 waitGroup 进行多个子 goroutine 的汇总
var wg sync.WaitGroup
for _, componentEntity := range componentEntities {
// shadow
componentEntity := componentEntity
wg.Add(1)
// 并发执行各组件的 try 流程
go func() {
defer wg.Done()
resp, err := componentEntity.Component.Try(cctx, &component.TCCReq{
ComponentID: componentEntity.Component.ID(),
TXID: txID,
Data: componentEntity.Request,
})
// 出现 tcc 组件执行 try 操作失败,则需要对事务明细记录进行更新,同时把错误通过 chan 抛给父 goroutine
if err != nil || !resp.ACK {
// 对对应的事务进行更新
_ = t.txStore.TXUpdate(cctx, txID, componentEntity.Component.ID(), false)
errCh <- fmt.Errorf("component: %s try failed", componentEntity.Component.ID())
return
}
// try 请求成功,则对事务明细记录进行更新. 倘若更新失败,也要视为错误,抛给父 goroutine
if err = t.txStore.TXUpdate(cctx, txID, componentEntity.Component.ID(), true); err != nil {
errCh <- err
}
}()
}
// 等待所有子 goroutine 运行完成
wg.Wait()
// 关闭 errCh,告知父 goroutine 所有任务已运行完成的信息
close(errCh)
}()
successful := true
// 4 通过 chan,阻塞子 goroutine 执行完成
// 4.1 但凡出现一个子 goroutine 遇到了错误,则会提前接收到错误,并调用 cancel 方法熔断其他所有子 goroutine 流程
// 4.2 倘若所有子 goroutine 都执行成功,则会通过 chan 的关闭事件推进流程,对应 err 为 nil
if err := <-errCh; err != nil {
// 只要有一笔 try 请求出现问题,其他的都进行终止
cancel()
successful = false
}
// 5 异步执行第二阶段的 confirm/cancel 流程
// 之所以是异步,是因为实际上在第一阶段 try 的响应结果尘埃落定时,对应事务的成败已经有了定论
// 第二阶段能够容忍异步执行的原因在于,执行失败时,还有轮询任务进行兜底
go t.advanceProgressByTXID(txID)
// 6 响应结果
// 6.1 倘若所有 try 请求都成功,则 successful 为 try,事务成功
// 6.2 但凡有一个 try 请求处理出现问题,successful 为 false,事务失败
return successful, nil
}
该函数的入参是ctx,事务id以及所有的组件。
这段代码实现了一个分布式事务管理器中的两阶段提交(Two-Phase Commit, 2PC)过程。整个过程分为几个主要步骤:
- 创建子 Context 用于管理子 Goroutine 生命周期
使用 context.WithCancel 创建一个可取消的子上下文(cctx)和对应的取消函数(cancel)。这样,在需要的时候可以通过调用 cancel 函数来终止所有子 Goroutine。 - 创建错误通道
初始化一个错误通道(errCh),用于从子 Goroutine 接收错误信息。 - 并发启动,执行各组件的 Try 流程
通过一个匿名 Goroutine 并发地启动各组件的 Try 流程。
使用 sync.WaitGroup 来等待所有子 Goroutine 完成。
在子 Goroutine 中:
对每个组件执行 Try 方法。
如果 Try 方法执行失败,或者响应的 ACK 字段为 false,则更新事务状态并通过错误通道发送错误。
如果 Try 请求成功,更新事务状态。如果更新失败,也通过错误通道发送错误。 - 阻塞等待子 Goroutine 完成
从错误通道接收错误。如果接收到错误,调用 cancel 函数终止所有子 Goroutine,并设置 successful 标记为 false。 - 异步执行第二阶段的 Confirm/Cancel 流程
通过一个新的 Goroutine 调用 advanceProgressByTXID 函数来异步处理事务的 Confirm 或 Cancel 阶段。
第二阶段的执行可以是异步的,因为第一阶段的结果已经确定了事务是否成功。 - 返回结果
返回 successful 标记和 nil(无错误)。如果所有 Try 请求成功,successful 为 true,表示事务成功;如果任一 Try 请求失败,successful 为 false,表示事务失败。
现在我们看一下advanceProgressByTXID函数
事务进度推进
当一笔事务在第一阶段中所有的 Try 请求都有了响应后,就需要根据第一阶段的结果,执行第二阶段的 Confirm 或者 Cancel 操作,并且将事务状态推进为成功或失败的终态:
- 倘若所有组件的 Try 响应都是成功,则需要批量调用组件的 Confirm 接口,并在这之后将事务状态更新为成功
- 倘若存在某个组件 Try 响应为失败,则需要批量调用组件的 Cancel 接口,并在这之后将事务状态更新为失败
- 倘若当前事务已执行超时,同样需要批量调用组件的 Cancel 接口,并在这之后将事务状态更新为失败
// 传入一个事务 id 推进其进度
func (t *TXManager) advanceProgressByTXID(txID string) error {
// 获取事务日志明细
tx, err := t.txStore.GetTX(t.ctx, txID)
if err != nil {
return err
}
// 推进进度
return t.advanceProgress(tx)
}
// 传入一个事务 id 推进其进度
func (t *TXManager) advanceProgress(tx *Transaction) error {
// 1 推断出事务当前的状态
// 1.1 倘若所有组件 try 都成功,则为 successful
// 1.2 倘若存在组件 try 失败,则为 failure
// 1.3 倘若事务超时了,则为 failure
// 1.4 否则事务状态为 hanging
txStatus := tx.getStatus(time.Now().Add(-t.opts.Timeout))
// hanging 状态的事务暂时不处理
if txStatus == TXHanging {
return nil
}
// 2 根据事务是否成功,定制不同的处理函数
success := txStatus == TXSuccessful
var confirmOrCancel func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error)
var txAdvanceProgress func(ctx context.Context) error
if success {
// 如果事务成功,则需要对组件进行 confirm
confirmOrCancel = func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error) {
return component.Confirm(ctx, tx.TXID)
}
// 如果事务成功,则需要在最后更新事务日志记录的状态为成功
txAdvanceProgress = func(ctx context.Context) error {
return t.txStore.TXSubmit(ctx, tx.TXID, true)
}
} else {
// 如果事务失败,则需要对组件进行 cancel
confirmOrCancel = func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error) {
return component.Cancel(ctx, tx.TXID)
}
// 如果事务失败,则需要在最后更新事务日志记录的状态为失败
txAdvanceProgress = func(ctx context.Context) error {
return t.txStore.TXSubmit(ctx, tx.TXID, false)
}
}
// 3 批量调用组件,执行第二阶段的 confirm/cancel 操作
for _, component := range tx.Components {
// 获取对应的 tcc component
components, err := t.registryCenter.getComponents(component.ComponentID)
if err != nil || len(components) == 0 {
return errors.New("get tcc component failed")
}
//components是一个数组,这里取一个元素
resp, err := confirmOrCancel(t.ctx, components[0])
if err != nil {
return err
}
if !resp.ACK {
return fmt.Errorf("component: %s ack failed", component.ComponentID)
}
}
// 4 二阶段 confirm/cancel 操作都执行完成后,对事务状态进行提交
return txAdvanceProgress(t.ctx)
}
这段代码是一个事务管理器 (TXManager) 中的 advanceProgress 函数,它用于处理事务的第二阶段操作(即确认或取消)在分布式事务的两阶段提交协议中。以下是代码的详细解读和步骤概述:
- 判断事务当前状态
- 根据事务的情况推断当前状态。状态判断依据包括:
- 成功(TXSuccessful):如果所有组件的 try 都成功。
- 失败(TXFailure):如果任何组件的 try 失败,或者事务超时。
- 挂起(TXHanging):如果事务状态未明确为成功或失败。
- 如果事务处于挂起状态,函数直接返回,不进行进一步处理。
- 根据事务状态选择相应的操作
- 根据事务的成功或失败状态,选择 confirm 或 cancel 操作:
- 成功时:使用 confirm 函数处理每个组件。
- 失败时:使用 cancel 函数处理每个组件。
- 准备一个函数 txAdvanceProgress 来在所有组件处理完毕后更新事务日志的状态。
- 执行第二阶段的 Confirm/Cancel 操作
- 对于事务中的每个组件:
- 从注册中心获取相应的 TCC 组件。
- 执行 confirmOrCancel 函数(根据事务状态决定是确认还是取消)。
- 检查操作的响应,如果响应不成功,则返回错误。
- 提交事务状态
- 在所有组件的第二阶段操作执行完毕后,使用 txAdvanceProgress 函数提交事务的最终状态。
总结:这个函数实现了分布式事务两阶段提交协议的第二阶段。它首先判断事务的当前状态,然后根据这个状态对事务中的每个组件执行确认(Confirm)或取消(Cancel)操作。最后,它更新事务的最终状态。这是分布式事务管理中非常关键的一部分,确保了事务的一致性和完整性。
异步轮询流程
接下来聊聊 txManager 的异步轮询流程. 这个流程同样非常重要,是支撑 txManager 鲁棒性的重要机制.
倘若存在事务已经完成第一阶段 Try 操作的执行,但是第二阶段没执行成功,则需要由异步轮询流程进行兜底处理,为事务补齐第二阶段的操作,并将事务状态更新为终态。
启动时机
异步轮询任务是在 txManager 的初始化流程中启动的,通过异步 goroutine 持久运行:
go txManager.run()
轮询流程
异步轮询任务运行时,基于 for 循环 + select 多路复用的方式,实现定时任务的执行.
轮询的时间间隔会根据一轮任务处理过程中是否出现错误,而进行动态调整. 这里调整规则指的是:当一次处理流程中发生了错误,就需要调大当前节点轮询的时间间隔,让其他节点的异步轮询任务得到更大的执行机会.
func (t *TXManager) run() {
var tick time.Duration
var err error
// 1 for 循环自旋式运行任务
for {
// 如果处理过程中出现了错误,需要增长轮询时间间隔
if err == nil {
tick = t.opts.MonitorTick
} else {
tick = t.backOffTick(tick)
}
// select 多路复用
select {
// 倘若 txManager.ctx 被终止,则异步轮询任务退出
case <-t.ctx.Done():
return
// 2 等待 tick 对应时长后,开始执行任务
case <-time.After(tick):
// 对 txStore 加Redis分布式锁,避免分布式服务下多个服务节点的轮询任务重复执行
if err = t.txStore.Lock(t.ctx, t.opts.MonitorTick); err != nil {
// 取锁失败时(大概率被其他节点占有),不需要增加 tick 时长
err = nil
continue
}
// 3 获取处于 hanging 状态的事务
var txs []*Transaction
if txs, err = t.txStore.GetHangingTXs(t.ctx); err != nil {
_ = t.txStore.Unlock(t.ctx)
continue
}
// 4 批量推进事务进度
err = t.batchAdvanceProgress(txs)
_ = t.txStore.Unlock(t.ctx)
}
}
}
这段 Go 语言代码定义了一个名为 TXManager 的结构体的 run 方法。该方法实现了一个异步任务处理流程,主要用于处理分布式事务。我将按照代码中的逻辑分步骤解读:
- 循环运行:
- 方法使用一个 for 循环,这意味着它会不断地运行,直到满足某个退出条件。
- 错误处理和轮询间隔:
- 在每次循环的开始,根据之前的执行是否出错来决定轮询时间间隔 tick。如果没有错误发生,则使用 t.opts.MonitorTick 作为间隔;如果发生错误,则调用 t.backOffTick(tick) 来增加轮询间隔。
- select 语句多路复用:
- 使用 select 语句来同时处理多种情况。
- 第一种情况是 t.ctx.Done() 通道收到消息,这通常意味着 TXManager 的上下文被取消或终止,此时方法返回,结束运行。
- 第二种情况是等待 tick 时间后执行任务。
- 获取和处理分布式事务:
- 首先尝试对 txStore 加分布式锁。如果锁定失败(可能是因为其他节点已经占用锁),则跳过当前循环迭代,继续等待下一个 tick。
- 如果成功加锁,则继续获取处于挂起(hanging)状态的事务列表。
- 如果获取事务列表时发生错误,则释放锁并继续下一个循环迭代。
- 批量处理事务:
- 对获取到的事务列表进行批量处理,推进事务进度。
- 处理完毕后释放之前获取的锁。
- 错误处理和轮询调整:
- 在整个过程中,如果任何步骤出现错误,该错误会被记录并用于调整下一次的轮询间隔。
有关于轮询时间间隔的退避谦让策略为:每次对时间间隔进行翻倍,封顶为初始时长的 8 倍:
func (t *TXManager) backOffTick(tick time.Duration) time.Duration {
tick <<= 1
if threshold := t.opts.MonitorTick << 3; tick > threshold {
return threshold
}
return tick
}
批量推进事务进度
下面是异步轮询任务批量推进事务第二阶段执行的流程,核心是开启多个 goroutine 并发对多项事务进行处理:
func (t *TXManager) batchAdvanceProgress(txs []*Transaction) error {
// 1 创建一个 chan,用于接收子 goroutine 传输的 err
errCh := make(chan error)
go func() {
// 2 通过 waitGroup 聚合多个子 groutine
var wg sync.WaitGroup
for _, tx := range txs {
// shadow
tx := tx
wg.Add(1)
go func() {
defer wg.Done()
// 3 推进每笔事务的进度
if err := t.advanceProgress(tx); err != nil {
// 遇到错误则投递到 errCh
errCh <- err
}
}()
}
// 4 收口等待所有子 goroutine 执行完成
wg.Wait()
// 5 所有子 goroutine 执行完成后关闭 chan,唤醒阻塞等待的父 goroutine
close(errCh)
}()
// 记录遇到的第一个错误
var firstErr error
// 6 父 goroutine 通过 chan 阻塞在这里,直到所有 goroutine 执行完成,chan 被 close 才能往下
for err := range errCh {
// 记录遇到的第一个错误
if firstErr != nil {
continue
}
firstErr = err
}
// 7 返回错误,核心是标识执行过程中,是否发生过错误
return firstErr
}