Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案
Seata 官网:https://seata.io/zh-cn/
Spring Cloud Alibaba 官网:https://sca.aliyun.com/zh-cn/
版本说明
SpringBoot 版本 2.6.5
SpringCloud 版本 2021.0.1
SpringCloudAlibaba 版本 2021.0.1.0
本文详细说明
数据库服务器版本 mysql 8.0.25
mybatis plus 版本 3.5.1
nacos 版本 1.4.2
seata 客户端版本 1.4.2
seata 服务端版本 1.7.1
本文讲解的是 seata 的 SAGA 事物模型,在开始阅读下面内容之前,建议先阅读笔者的这篇文章《Spring Cloud Alibaba Seata 实现分布式事物》,这篇文章中实现的是 seata 的 AT 事物,且笔者的本篇文章《Spring Cloud Alibaba Seata 实现 SAGA 事物》是在《Spring Cloud Alibaba Seata 实现分布式事物》基础上写的,很多内容需要先了解,涉及seata 和nacos的重复内容,笔者在本篇文章中不在赘述,因此建议读者先看《Spring Cloud Alibaba Seata 实现分布式事物》,之后再学习本篇文章。当然,如果你对 seata 的搭建已经非常熟悉,那么可以直接开始下面阅读
Saga模式是SEATA提供的长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现
Saga 文档:https://seata.io/zh-cn/docs/user/saga
目录
1、创建项目
1.1、新建 maven 聚合项目 cloud-learn
1.2、创建 account 服务
1.3、创建 order 服务
2、添加配置
2.1、客户端配置
2.2、服务端配置
3、数据库建表
3.1、seata 服务端建表
3.2、seata 客户端建表
3.3、Saga 状态机建表
4、Saga 状态机 json 文件说明
5、运行测试
6、项目代码
1、创建项目
1.1、新建 maven 聚合项目 cloud-learn
最外层父工程 cloud-learn 的 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wsjzzcbq</groupId>
<artifactId>cloud-learn</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>gateway-learn</module>
<module>consumer-learn</module>
<module>sentinel-learn</module>
<module>seata-at-account-learn</module>
<module>seata-at-order-learn</module>
<module>seata-tcc-order-learn</module>
<module>seata-tcc-account-learn</module>
<module>seata-saga-account-learn</module>
<module>seata-saga-order-learn</module>
</modules>
<packaging>pom</packaging>
<repositories>
<repository>
<id>naxus-aliyun</id>
<name>naxus-aliyun</name>
<url>https://maven.aliyun.com/repository/public</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.5</version>
<relativePath/>
</parent>
<properties>
<spring-cloud.version>2021.0.1</spring-cloud.version>
<spring-cloud-alibaba.version>2021.0.1.0</spring-cloud-alibaba.version>
<alibaba-nacos-discovery.veriosn>2021.1</alibaba-nacos-discovery.veriosn>
<alibaba-nacos-config.version>2021.1</alibaba-nacos-config.version>
<spring-cloud-starter-bootstrap.version>3.1.1</spring-cloud-starter-bootstrap.version>
<druid.version>1.1.17</druid.version>
<mysql.version>8.0.11</mysql.version>
<mybatis-plus.version>3.5.1</mybatis-plus.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>${alibaba-nacos-discovery.veriosn}</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<version>${alibaba-nacos-config.version}</version>
</dependency>
<!--spring-cloud-dependencies 2020.0.0 版本不在默认加载bootstrap文件,如果需要加载bootstrap文件需要手动添加依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
<version>${spring-cloud-starter-bootstrap.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.40</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
下面会创建2个服务 account 和 order,模拟用户下订单后扣减账户金额,服务间使用 feign 调用,因为 account 和 order 服务使用不同的数据库,因此产生分布式事物,使用 seata 解决
1.2、创建 account 服务
创建子工程 seata-saga-account-learn
seata-saga-account-learn pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud-learn</artifactId>
<groupId>com.wsjzzcbq</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-saga-account-learn</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
启动类 SeataSAGAAccountApplication
package com.wsjzzcbq;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* SeataSAGAAccountApplication
*
* @author wsjz
* @date 2023/10/24
*/
@MapperScan(value = {"com.wsjzzcbq.mapper"})
@SpringBootApplication
public class SeataSAGAAccountApplication {
public static void main(String[] args) {
SpringApplication.run(SeataSAGAAccountApplication.class, args);
}
}
实体类 Account
package com.wsjzzcbq.bean;
import lombok.Data;
/**
* Account
*
* @author wsjz
* @date 2022/07/07
*/
@Data
public class Account {
private Integer id;
private String userId;
private Integer money;
}
AccountMapper
package com.wsjzzcbq.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.wsjzzcbq.bean.Account;
/**
* AccountMapper
*
* @author wsjz
* @date 2023/10/13
*/
public interface AccountMapper extends BaseMapper<Account> {
}
AccountService
package com.wsjzzcbq.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.wsjzzcbq.bean.Account;
/**
* AccountService
*
* @author wsjz
* @date 2023/10/23
*/
public interface AccountService extends IService<Account> {
boolean deductAccount(String userId, int money, boolean rollback);
boolean compensateDeductAccount(String userId, int money);
}
AccountServiceImpl
package com.wsjzzcbq.service.impl;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.wsjzzcbq.bean.Account;
import com.wsjzzcbq.mapper.AccountMapper;
import com.wsjzzcbq.service.AccountService;
import org.springframework.stereotype.Service;
/**
* AccountServiceImpl
*
* @author wsjz
* @date 2023/10/23
*/
@Service
public class AccountServiceImpl extends ServiceImpl<AccountMapper, Account> implements AccountService {
@Override
public boolean deductAccount(String userId, int money, boolean rollback) {
System.out.println("扣减");
UpdateWrapper<Account> up = new UpdateWrapper<>();
String sql = "money = money - " + money;
up.setSql(sql);
up.eq("user_id", userId);
this.update(up);
if (rollback) {
int a = 1/0;
}
return true;
}
@Override
public boolean compensateDeductAccount(String userId, int money) {
System.out.println("补偿");
UpdateWrapper<Account> up = new UpdateWrapper<>();
String sql = "money = money + " + money;
up.setSql(sql);
up.eq("user_id", userId);
this.update(up);
return true;
}
}
AccountController
package com.wsjzzcbq.controller;
import com.wsjzzcbq.service.AccountService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* AccountController
*
* @author wsjz
* @date 2023/10/23
*/
@RequestMapping("/account")
@RestController
public class AccountController {
@Autowired
private AccountService accountService;
@GetMapping("/deduct")
public boolean deductAccount(String userId, int money, boolean rollback) {
return accountService.deductAccount(userId, money, rollback);
}
@GetMapping("/compensate/deduct")
public boolean compensateDeductAccount(String userId, int money) {
return accountService.compensateDeductAccount(userId, money);
}
}
application.yml 文件
server:
port: 9001
spring:
main:
allow-circular-references: true
application:
name: seata-saga-account-learn
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.3.232:3306/pmc-account?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: 123456
cloud:
nacos:
username: nacos
password: nacos
server-addr: 192.168.2.140
discovery:
namespace: public
# server-addr: 192.168.2.140
# config:
# server-addr:
seata:
config:
type: nacos
nacos:
server-addr: ${spring.cloud.nacos.server-addr}
username: ${spring.cloud.nacos.username}
password: ${spring.cloud.nacos.password}
group: SEATA_GROUP
data-id: seata-saga.properties
registry:
type: nacos
nacos:
application: seata-server
cluster: default
server-addr: ${spring.cloud.nacos.server-addr}
username: ${spring.cloud.nacos.username}
password: ${spring.cloud.nacos.password}
group: SEATA_GROUP
enable-auto-data-source-proxy: false
client:
rm:
report-success-enable: true
# 事物分组,如果不配置默认是spring.application.name + '-seata-service-group'
# tx-service-group:
logging:
level:
com.wsjzzcbq.mapper: debug
配置参数说明可以看《Spring Cloud Alibaba Seata 实现分布式事物》,这里不再赘述
1.3、创建 order 服务
创建子工程 seata-saga-order-learn 项目
seata-saga-order-learn pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud-learn</artifactId>
<groupId>com.wsjzzcbq</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-saga-order-learn</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
启动类 SeataSAGAOrderApplication
package com.wsjzzcbq;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
/**
* SeataSAGAOrderApplication
*
* @author wsjz
* @date 2023/10/24
*/
@MapperScan(value = {"com.wsjzzcbq.mapper"})
@EnableFeignClients
@SpringBootApplication
public class SeataSAGAOrderApplication {
public static void main(String[] args) {
SpringApplication.run(SeataSAGAOrderApplication.class, args);
}
}
订单实体类 Order
package com.wsjzzcbq.bean;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
/**
* Order
*
* @author wsjz
* @date 2022/07/07
*/
@TableName("order_tbl")
@Data
public class Order {
@TableId
private Integer id;
private String userId;
private String code;
private Integer count;
private Integer money;
}
OrderMapper
package com.wsjzzcbq.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.wsjzzcbq.bean.Order;
/**
* OrderMapper
*
* @author wsjz
* @date 2022/07/07
*/
public interface OrderMapper extends BaseMapper<Order> {
}
AccountFeign
package com.wsjzzcbq.feign;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
/**
* AccountFeign
*
* @author wsjz
* @date 2023/10/13
*/
@FeignClient(value = "seata-saga-account-learn")
public interface AccountFeign {
@GetMapping("/account/deduct")
boolean deductAccount(@RequestParam("userId") String userId, @RequestParam("money") int money, @RequestParam("rollback") boolean rollback);
@GetMapping("/account/compensate/deduct")
boolean compensateDeductAccount(@RequestParam("userId") String userId, @RequestParam("money") int money);
}
OrderService
package com.wsjzzcbq.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.wsjzzcbq.bean.Order;
/**
* OrderService
*
* @author wsjz
* @date 2023/10/23
*/
public interface OrderService extends IService<Order> {
boolean create(String orderCode, String userId, int money, int count, boolean rollback);
boolean compensateCreate(String orderCode, String userId, int money);
}
OrderServiceImpl
package com.wsjzzcbq.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.wsjzzcbq.bean.Order;
import com.wsjzzcbq.mapper.OrderMapper;
import com.wsjzzcbq.service.OrderService;
import org.springframework.stereotype.Service;
/**
* OrderServiceImpl
*
* @author wsjz
* @date 2023/10/23
*/
@Service("orderService")
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {
@Override
public boolean create(String orderCode, String userId, int money, int count, boolean rollback) {
System.out.println("order下单");
Order order = new Order();
order.setCode(orderCode);
order.setUserId(userId);
order.setMoney(money);
order.setCount(count);
this.save(order);
// if (rollback) {
// int a = 1/0;
// }
return true;
}
@Override
public boolean compensateCreate(String orderCode, String userId, int money) {
System.out.println("order下单补偿");
QueryWrapper<Order> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("code", orderCode);
this.remove(queryWrapper);
return true;
}
}
saga基于状态机调用各个节点,这里 seata-saga-order-learn 既是saga事物中的一个节点,又是最外层的调用方,笔者为了节省代码将调用方和order事物节点放在一起了
saga 事物 AccountService 节点调用
package com.wsjzzcbq.saga;
/**
* AccountService
*
* @author wsjz
* @date 2023/10/23
*/
public interface AccountService {
boolean deductAccount(String userId, int money, boolean rollback);
boolean compensateDeductAccount(String userId, int money);
}
saga 事物 AccountService 节点实现类 AccountServiceImpl
package com.wsjzzcbq.saga.impl;
import com.wsjzzcbq.feign.AccountFeign;
import com.wsjzzcbq.saga.AccountService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* AccountServiceImpl
*
* @author wsjz
* @date 2023/10/23
*/
@Service("accountService")
public class AccountServiceImpl implements AccountService {
@Autowired
private AccountFeign accountFeign;
@Override
public boolean deductAccount(String userId, int money, boolean rollback) {
System.out.println("userId: " + userId + ": money" + money);
try {
return accountFeign.deductAccount(userId, money, rollback);
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
@Override
public boolean compensateDeductAccount(String userId, int money) {
System.out.println("userId: " + userId + ": money" + money);
try {
return accountFeign.compensateDeductAccount(userId, money);
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
}
saga 状态机配置类 StateMachineEngineConfig
package com.wsjzzcbq.config;
import io.seata.saga.engine.config.DbStateMachineConfig;
import io.seata.saga.engine.impl.ProcessCtrlStateMachineEngine;
import io.seata.saga.rm.StateMachineEngineHolder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.sql.DataSource;
import java.util.concurrent.ThreadPoolExecutor;
/**
* StateMachineEngineConfig
*
* @author wsjz
* @date 2023/10/23
*/
@Configuration
public class StateMachineEngineConfig {
@Autowired
private DataSource dataSource;
@Bean
public ThreadPoolExecutor threadExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程数
executor.setCorePoolSize(1);
//最大线程数
executor.setMaxPoolSize(20);
//线程池中线程的名称前缀
executor.setThreadNamePrefix("SAGA_ASYNC_EXE_");
//初始化
executor.initialize();
return executor.getThreadPoolExecutor();
}
@Bean
public DbStateMachineConfig dbStateMachineConfig() {
DbStateMachineConfig stateMachineConfig = new DbStateMachineConfig();
//设置saga状态机json文件路径
stateMachineConfig.setDataSource(dataSource);
ClassPathResource resource = new ClassPathResource("seata/saga_order.json");
stateMachineConfig.setResources(new Resource[]{resource});
stateMachineConfig.setEnableAsync(true);
stateMachineConfig.setThreadPoolExecutor(threadExecutor());
//seata server服务名
stateMachineConfig.setApplicationId("seata-server");
//事物分组
stateMachineConfig.setTxServiceGroup("seata-saga-account-learn-seata-service-group");
return stateMachineConfig;
}
@Bean
public ProcessCtrlStateMachineEngine stateMachineEngine() {
ProcessCtrlStateMachineEngine processCtrlStateMachineEngine = new ProcessCtrlStateMachineEngine();
processCtrlStateMachineEngine.setStateMachineConfig(dbStateMachineConfig());
return processCtrlStateMachineEngine;
}
@Bean
public StateMachineEngineHolder stateMachineEngineHolder() {
StateMachineEngineHolder engineHolder = new StateMachineEngineHolder();
engineHolder.setStateMachineEngine(stateMachineEngine());
return engineHolder;
}
}
OrderController
package com.wsjzzcbq.controller;
import io.seata.saga.engine.StateMachineEngine;
import io.seata.saga.statelang.domain.ExecutionStatus;
import io.seata.saga.statelang.domain.StateMachineInstance;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* OrderController
*
* @author wsjz
* @date 2023/10/23
*/
@RequestMapping("/order")
@RestController
public class OrderController {
@Autowired
private StateMachineEngine stateMachineEngine;
/**
* http://localhost:9002/order/create?userId=101&money=10&count=1&rollback=false
* @param userId
* @param money
* @param count
* @param rollback
* @return
*/
@RequestMapping("/create")
public String create(String userId, int money, int count, boolean rollback) {
String orderCode = UUID.randomUUID().toString();
Map<String, Object> startParams = new HashMap<>();
startParams.put("orderCode", orderCode);
startParams.put("userId", userId);
startParams.put("money", money);
startParams.put("count", count);
startParams.put("rollback", rollback);
String businessKey = String.valueOf(System.currentTimeMillis());
StateMachineInstance stateMachineInstance = stateMachineEngine.startWithBusinessKey("order", null, businessKey, startParams);
if (ExecutionStatus.SU.equals(stateMachineInstance.getStatus())) {
return "下单成功";
} else {
return "下单失败";
}
}
}
application.yml 文件
server:
port: 9002
spring:
main:
allow-circular-references: true
application:
name: seata-saga-order-learn
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.3.232:3306/pmc-order?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: 123456
cloud:
nacos:
username: nacos
password: nacos
server-addr: 192.168.2.140
discovery:
namespace: public
# server-addr: 192.168.2.140
# config:
# server-addr:
seata:
config:
type: nacos
nacos:
server-addr: ${spring.cloud.nacos.server-addr}
username: ${spring.cloud.nacos.username}
password: ${spring.cloud.nacos.password}
group: SEATA_GROUP
data-id: seata-saga.properties
registry:
type: nacos
nacos:
application: seata-server
cluster: default
server-addr: ${spring.cloud.nacos.server-addr}
username: ${spring.cloud.nacos.username}
password: ${spring.cloud.nacos.password}
group: SEATA_GROUP
# 事物分组,如果不配置默认是spring.application.name + '-seata-service-group'
tx-service-group: seata-saga-account-learn-seata-service-group
enabled: true
client:
rm:
report-success-enable: true
# 是否开启数据源自动代理,seata-spring-boot-starter专有配置,默认会开启数据源自动代理,可通过该配置项关闭
enable-auto-data-source-proxy: false
logging:
level:
com.wsjzzcbq.mapper: debug
mybatis-plus:
global-config:
db-config:
id-type: auto
saga 状态机 json 文件
在resources 目录下新建 seata 文件夹,在seata 文件夹目录下新建 saga_order.json 文件
saga_order.json 文件内容
{
"nodes": [
{
"type": "node",
"size": "80*72",
"shape": "flow-rhombus",
"color": "#13C2C2",
"label": "订单服务结果选择",
"stateId": "OrderService-create-Choice",
"stateType": "Choice",
"x": 467.875,
"y": 286.5,
"id": "c11238b3",
"stateProps": {
"Type": "Choice",
"Choices": [
{
"Expression": "[deductResult] == true",
"Next": "AccountService-deductAccount"
}
],
"Default": "Fail"
},
"index": 6
},
{
"type": "node",
"size": "39*39",
"shape": "flow-circle",
"color": "red",
"label": "账户服务异常捕获",
"stateId": "AccountService-deductAccount-catch",
"stateType": "Catch",
"x": 524.875,
"y": 431.5,
"id": "053ac3ac",
"index": 7
},
{
"type": "node",
"size": "72*72",
"shape": "flow-circle",
"color": "#FA8C16",
"label": "开始",
"stateId": "Start",
"stateType": "Start",
"stateProps": {
"StateMachine": {
"Name": "order",
"Comment": "saga事物调用",
"Version": "0.0.1"
},
"Next": "OrderService-create"
},
"x": 467.875,
"y": 53,
"id": "973bd79e",
"index": 11
},
{
"type": "node",
"size": "110*48",
"shape": "flow-rect",
"color": "#1890FF",
"label": "订单服务",
"stateId": "OrderService-create",
"stateType": "ServiceTask",
"stateProps": {
"Type": "ServiceTask",
"ServiceName": "orderService",
"Next": "AccountService-deduct-Choice",
"ServiceMethod": "create",
"Input": [
"$.[orderCode]",
"$.[userId]",
"$.[money]",
"$.[count]",
"$.[rollback]"
],
"Output": {
"deductResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"CompensateState": "OrderService-compensateCreate",
"Retry": []
},
"x": 467.875,
"y": 172,
"id": "e17372e4",
"index": 12
},
{
"type": "node",
"size": "110*48",
"shape": "flow-rect",
"color": "#1890FF",
"label": "账户服务",
"stateId": "AccountService-deductAccount",
"stateType": "ServiceTask",
"stateProps": {
"Type": "ServiceTask",
"ServiceName": "accountService",
"ServiceMethod": "deductAccount",
"CompensateState": "AccountService-compensateDeductAccount",
"Input": [
"$.[userId]",
"$.[money]",
"$.[rollback]"
],
"Output": {
"deductResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Next": "Succeed"
},
"x": 467.125,
"y": 411,
"id": "a6c40952",
"index": 13
},
{
"type": "node",
"size": "110*48",
"shape": "flow-capsule",
"color": "#722ED1",
"label": "订单服务补偿",
"stateId": "OrderService-compensateCreate",
"stateType": "Compensation",
"stateProps": {
"Type": "Compensation",
"ServiceName": "orderService",
"ServiceMethod": "compensateCreate",
"Input": [
"$.[orderCode]",
"$.[userId]",
"$.[money]"
]
},
"x": 260.625,
"y": 172.5,
"id": "3b348652",
"index": 14
},
{
"type": "node",
"size": "110*48",
"shape": "flow-capsule",
"color": "#722ED1",
"label": "账户服务补偿",
"stateId": "AccountService-compensateDeductAccount",
"stateType": "Compensation",
"stateProps": {
"Type": "Compensation",
"ServiceName": "accountService",
"ServiceMethod": "compensateDeductAccount",
"Input": [
"$.[userId]",
"$.[money]",
"$.[rollback]"
]
},
"x": 262.125,
"y": 411,
"id": "13b600b1",
"index": 15
},
{
"type": "node",
"size": "72*72",
"shape": "flow-circle",
"color": "#05A465",
"label": "成功",
"stateId": "Succeed",
"stateType": "Succeed",
"x": 466.625,
"y": 597.5,
"id": "690e5c5e",
"stateProps": {
"Type": "Succeed"
},
"index": 16
},
{
"type": "node",
"size": "110*48",
"shape": "flow-capsule",
"color": "red",
"label": "补偿触发器",
"stateId": "CompensationTrigger",
"stateType": "CompensationTrigger",
"x": 894.125,
"y": 287,
"id": "757e057f",
"stateProps": {
"Type": "CompensationTrigger",
"Next": "Fail"
},
"index": 17
},
{
"type": "node",
"size": "72*72",
"shape": "flow-circle",
"color": "red",
"label": "失败",
"stateId": "Fail",
"stateType": "Fail",
"stateProps": {
"Type": "Fail",
"ErrorCode": "FAILED",
"Message": "buy failed"
},
"x": 684.125,
"y": 287,
"id": "0131fc0c",
"index": 18
},
{
"type": "node",
"size": "39*39",
"shape": "flow-circle",
"color": "red",
"label": "订单服务异常捕获",
"stateId": "OrderService-create-catch",
"stateType": "Catch",
"x": 518.125,
"y": 183,
"id": "0955401d"
}
],
"edges": [
{
"source": "973bd79e",
"sourceAnchor": 2,
"target": "e17372e4",
"targetAnchor": 0,
"id": "f0a9008f",
"index": 0
},
{
"source": "e17372e4",
"sourceAnchor": 2,
"target": "c11238b3",
"targetAnchor": 0,
"id": "cd8c3104",
"index": 2,
"label": "执行结果",
"shape": "flow-smooth"
},
{
"source": "c11238b3",
"sourceAnchor": 2,
"target": "a6c40952",
"targetAnchor": 0,
"id": "e47e49bc",
"stateProps": {},
"label": "执行成功",
"shape": "flow-smooth",
"index": 3
},
{
"source": "c11238b3",
"sourceAnchor": 1,
"target": "0131fc0c",
"targetAnchor": 3,
"id": "e3f9e775",
"stateProps": {},
"label": "执行失败",
"shape": "flow-smooth",
"index": 4
},
{
"source": "053ac3ac",
"sourceAnchor": 1,
"target": "757e057f",
"targetAnchor": 2,
"id": "3f7fe6ad",
"stateProps": {
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
},
"label": "账户服务异常触发补偿",
"shape": "flow-polyline-round",
"index": 5
},
{
"source": "e17372e4",
"sourceAnchor": 3,
"target": "3b348652",
"targetAnchor": 1,
"id": "52a2256e",
"style": {
"lineDash": "4"
},
"index": 8,
"label": "",
"shape": "flow-smooth"
},
{
"source": "a6c40952",
"sourceAnchor": 3,
"target": "13b600b1",
"targetAnchor": 1,
"id": "474512d9",
"style": {
"lineDash": "4"
},
"index": 9
},
{
"source": "0955401d",
"sourceAnchor": 1,
"target": "757e057f",
"targetAnchor": 0,
"id": "654280aa",
"shape": "flow-polyline-round",
"stateProps": {
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
},
"label": "订单服务异常触发补偿"
},
{
"source": "a6c40952",
"sourceAnchor": 2,
"target": "690e5c5e",
"targetAnchor": 0,
"id": "b6bd2f2a",
"shape": "flow-polyline-round"
},
{
"source": "757e057f",
"sourceAnchor": 3,
"target": "0131fc0c",
"targetAnchor": 1,
"id": "7ad2f2b9",
"shape": "flow-polyline-round"
}
]
}
2、添加配置
2.1、客户端配置
在nacos上新建 group 是 SEATA_GROUP,data-id 是 seata-saga.properties 的配置,内容如下
seata-saga.properties
#For details about configuration items, see https://seata.io/zh-cn/docs/user/configurations.html
#Transport configuration, for client and server
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
transport.serialization=seata
transport.compressor=none
#Transaction routing rules configuration, only for the client
service.vgroupMapping.seata-saga-account-learn-seata-service-group=default
#If you use a registry, you can ignore it
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
#Transaction rule configuration, only for the client
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=true
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.sagaJsonParser=fastjson
client.rm.tccActionInterceptorOrder=-2147482648
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
#For TCC transaction mode
tcc.fence.logTableName=tcc_fence_log
tcc.fence.cleanPeriod=1h
# You can choose from the following options: fastjson, jackson, gson
tcc.contextJsonParserType=fastjson
#Log rule configuration, for client and server
log.exceptionRate=100
seata-saga.properties 在《Spring Cloud Alibaba Seata 实现分布式事物》的 seata.properties 基础上修改事物分组即可
2.2、服务端配置
服务端配置和《Spring Cloud Alibaba Seata 实现分布式事物》保持一致,无需修改
3、数据库建表
3.1、seata 服务端建表
看《Spring Cloud Alibaba Seata 实现分布式事物》seata 服务端建表,保持一致,无需修改
3.2、seata 客户端建表
看《Spring Cloud Alibaba Seata 实现分布式事物》seata 客户端建表
undo_log 表不需要,保留 account 和 order_tbl 表即可
3.3、Saga 状态机建表
Saga 状态机表和最外层调用方在同一个库,在笔者的项目中和 order服务的库放在一起
建表 sql 在 seata 源码 seata\script\client\saga\db 目录下
建表sql
-- -------------------------------- The script used for sage --------------------------------
CREATE TABLE IF NOT EXISTS `seata_state_machine_def`
(
`id` VARCHAR(32) NOT NULL COMMENT 'id',
`name` VARCHAR(128) NOT NULL COMMENT 'name',
`tenant_id` VARCHAR(32) NOT NULL COMMENT 'tenant id',
`app_name` VARCHAR(32) NOT NULL COMMENT 'application name',
`type` VARCHAR(20) COMMENT 'state language type',
`comment_` VARCHAR(255) COMMENT 'comment',
`ver` VARCHAR(16) NOT NULL COMMENT 'version',
`gmt_create` DATETIME(3) NOT NULL COMMENT 'create time',
`status` VARCHAR(2) NOT NULL COMMENT 'status(AC:active|IN:inactive)',
`content` TEXT COMMENT 'content',
`recover_strategy` VARCHAR(16) COMMENT 'transaction recover strategy(compensate|retry)',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
CREATE TABLE IF NOT EXISTS `seata_state_machine_inst`
(
`id` VARCHAR(128) NOT NULL COMMENT 'id',
`machine_id` VARCHAR(32) NOT NULL COMMENT 'state machine definition id',
`tenant_id` VARCHAR(32) NOT NULL COMMENT 'tenant id',
`parent_id` VARCHAR(128) COMMENT 'parent id',
`gmt_started` DATETIME(3) NOT NULL COMMENT 'start time',
`business_key` VARCHAR(48) COMMENT 'business key',
`start_params` TEXT COMMENT 'start parameters',
`gmt_end` DATETIME(3) COMMENT 'end time',
`excep` BLOB COMMENT 'exception',
`end_params` TEXT COMMENT 'end parameters',
`status` VARCHAR(2) COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
`compensation_status` VARCHAR(2) COMMENT 'compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
`is_running` TINYINT(1) COMMENT 'is running(0 no|1 yes)',
`gmt_updated` DATETIME(3) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `unikey_buz_tenant` (`business_key`, `tenant_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
CREATE TABLE IF NOT EXISTS `seata_state_inst`
(
`id` VARCHAR(48) NOT NULL COMMENT 'id',
`machine_inst_id` VARCHAR(128) NOT NULL COMMENT 'state machine instance id',
`name` VARCHAR(128) NOT NULL COMMENT 'state name',
`type` VARCHAR(20) COMMENT 'state type',
`service_name` VARCHAR(128) COMMENT 'service name',
`service_method` VARCHAR(128) COMMENT 'method name',
`service_type` VARCHAR(16) COMMENT 'service type',
`business_key` VARCHAR(48) COMMENT 'business key',
`state_id_compensated_for` VARCHAR(50) COMMENT 'state compensated for',
`state_id_retried_for` VARCHAR(50) COMMENT 'state retried for',
`gmt_started` DATETIME(3) NOT NULL COMMENT 'start time',
`is_for_update` TINYINT(1) COMMENT 'is service for update',
`input_params` TEXT COMMENT 'input parameters',
`output_params` TEXT COMMENT 'output parameters',
`status` VARCHAR(2) NOT NULL COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
`excep` BLOB COMMENT 'exception',
`gmt_updated` DATETIME(3) COMMENT 'update time',
`gmt_end` DATETIME(3) COMMENT 'end time',
PRIMARY KEY (`id`, `machine_inst_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
在order服务 pmc-order 库中
4、Saga 状态机 json 文件说明
笔者使用的 seata-server 版本是 1.7.1,可以直接打开 http://localhost:7091/ seata 控制台,里面有Saga 状态机设计器,可以拖拽编辑状态机文件。如果是较低版本的 seata server,需要在seata源码中找到 seata-saga-statemachine-designer 项目,npm install 安装依赖,npm run start 运行项目,这个seata-saga-statemachine-designer 是一个单独的Saga 状态机设计器,和seata-server -1.7.1 控制台中的一样
将笔者的 saga_order.json 文件复制到状态机设计器中,可查看saga事物调用流程
这个流程图定义了saga事物流程
Next,指向下一步执行的节点
ServiceName 对应代码中spring的bean名称,即 seata-saga-order-learn 中 OrderServiceImpl,OrderServiceImpl 类上面标记注解 @Service("orderService")
ServiceMethod 对应的是 ServiceName下的方法名
Input 是ServiceMethod 方法的参数
Output 是ServiceMethod 方法返回值,赋给变量 deductResult
Status 是服务执行状态,SU 成功、FA 失败、UN 未知。我们需要把程序执行结果转换成这个3个状态,程序返回true对应 SU,false 对应FA,抛出异常是UN
CompensateState 是补偿,写补偿节点的 id
ServiceName 、 ServiceMethod 和 Input 道理同上,发生补偿时触发补偿的方法
捕获异常,触发补偿
补偿触发器
程序执行结果判断
Expression 判断程序执行结果
Next 指向下一节点
更多seata内容可以看官网文档:https://seata.io/zh-cn/docs/user/saga
5、运行测试
启动 seata-server-1.7.1
进入 bin 目录,双击 seata-server.bat
启动 account 和 order 服务
nacos 服务和配置
测试正常情况
浏览器请求:http://localhost:9002/order/create?userId=101&money=10&rollback=false
扣减账户 10 元,新增订单
测试回滚情况
6、项目代码
码云地址:https://gitee.com/wsjzzcbq/csdn-blog/tree/master/cloud-learn
至此完