一、背景
Netflix Conductor是Netflix开源的一个微服务编排引擎。它旨在简化和自动化微服务架构中复杂的业务流程和工作流处理。Conductor允许开发人员使用声明性的方式定义工作流,将多个服务和任务组合成一个完整的业务流程。它提供了一个用户友好的UI界面,可以可视化地创建和编辑工作流。同时,Conductor还提供了一组强大的API,可以通过编程方式创建和管理工作流。
由于Netflix Conductor本身并不直接支持分布式事务,因此无法满足对可靠性和事务一致性要求较高的业务场景。
Apache Seata是阿里巴巴开源的分布式事务解决方案。在Spring cloud,Dubbo等微服务框架上可以直接使用。但Conductor不在官方支持列表中,因此需要设计一套Seata与Conductor的集成的解决方案。
二、Seata 基本原理
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。
TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
在一个完整的Seata事务中,有以下关键步骤:
- 在全局事务开始时开启一个全局事务;
- 全局事务XID在事务参与者中传递;
- 全局事务结束时提交/回滚事务。
三、事务管理器TM
在 Spring 项目中可以在需要全局事务的方法上增加@GlobalTransaction。该方法可以称为TM。它在方法开始前开启全局事务,并把xid加到上下文中传递 ,在方法执行完成后(或异常结束)提交或回滚全局事务。
在 Conductor 中,由于 worker 是独立执行的没有一个固定的顺序,也不存在一个统一的方法内被一起执行(任务驱动),因此无法简单通过增加@GlobalTransaction来开始全局事务、提交或回滚全局事务。因此需要显式地手动在 Conductor 中通过 API 方式完成 TM角色所需要做的事。
开启全局事务
在 Conductor 开始一个工作流时增加以下代码
public String startWorkflow(
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
try {
//开启全局事务
tx.begin(timeout, name);
//do something
// 获取 XID
String xid = RootContext.getXID();
//通过 InputData 传递 XID 到 worker
taskModle.setInputData(RootContext.KEY_XID,xid)
}
}
提交全局事务
在 Conductor 结束一个工作流时增加以下代码
if (outcome.isComplete) {
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
//提交事务
tx.commit();
}
回滚全局事务
所有 worker 在发生异常时需要回滚事务则需要 catch 异常并设置状态为FAILED。
TaskResult result = new TaskResult(task);
result.setStatus(TaskResult.Status.FAILED);
public boolean decide(String workflowId) {
try{
//do something
} catch (TerminateWorkflowException twe) {
LOGGER.info("Execution terminated of workflow: {}", workflowId, twe);
terminate(workflow, twe);
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
//回滚事务
tx.rollback();
return true;
}
四、事务XID传递
一、在 conductor 开始全局事务或设置下一个任务时,会把 XID 设置到 task 的 InputData 中,由此把 XID 传到 worker
// 获取 XID
String xid = RootContext.getXID();
//通过 InputData 传递 XID 到 worker
taskModle.setInputData(RootContext.KEY_XID,xid)
public class someWorker implements Worker {
@Override
public TaskResult execute(Task task) {
String xid = (String) task.getInputData().get(RootContext.KEY_XID);
// 绑定 XID
RootContext.bind(xid);
}
}
public class someWorker implements Worker {
public TaskResult execute(Task task) {
@Override
public TaskResult execute(Task task) {
result.setOutputData(new HashMap<String,Object>(){{
put(RootContext.KEY_XID,xid);
}});
}
}
public void updateTask(TaskResult taskResult) {
String xid = (String) taskResult.getOutputData().get(RootContext.KEY_XID);
// 绑定 XID
RootContext.bind(xid);
}
作者介绍:
道一云,成立于2004年,是中国低代码领域的领导厂商、腾讯战略投资企业、腾讯生态核心合作伙伴。拥有自主知识产权管理软件产品百余项,涵盖数字化应用构建低代码平台-七巧、全场景智能业务分析BI-七析、千人千面、数智化办公企业级门户-七星以及30多款开箱即用的场景应用。
欢迎关注:
官网:道一云七巧 - 可视化、智能化、数字化应用构建
免费体验:道一云产品免费试用
公众号:道一云低代码(do1info)