因为这里主要是讲解分布式事务,关于什么是事务,以及事务的特性,单个事务的使用方式,以及在Spring框架下,事务的传播方式,这里就不再赘述了。但是我这里要补充一点就是,一提到事务大家脑子里第一闪念就是传统意义上的事务,就是指数据库上的事务,实则在我们正常的开发中,更多的关注的是数据的一致性。
但是又因为我们的数据不仅仅是存储在数据库上,也可能是别的组件上,或者是别的不支持事务的数据库上。时至今日,我们的“事务”的概念已经逐渐泛化掉了,在很多场景下,如果只是讲我们的数据库的事务显得过于狭隘,终极目的就是为了保持数据的一致性。因为数据的事务比较有代表性,而且每一位开发的同学都是从这里接触的事务,所以我下面也主要以数据库来讲,但不仅仅是指数据库的事务。
数据库的事务的生效范围是数据库连接级别的。我们先说一下这里的连接是什么意思,无论是我们通过navicat或者别的方式,比如JDBC操作数据库,其实都是基于连接。我想大家一定要理解“事务基于连接而不是基于数据库”这句话的意思,就是说即便我操作的是同一个数据库,不同连接上的事务仍然是两个事务,而不是说我只有都对两个不同的数据库的操作才是两个事务。
- 当我们使用navicat的时候,我们连上数据库开的窗口那就是一个数据库连接
- JDBC中先获取数据库连接才能开启并操作事务
try {
Connection connection = DriverManager.getConnection(URL, USER, PASSWORD);
// 禁用自动提交,开始手动管理事务
connection.setAutoCommit(false);
// 执行DML
String deductSql = "UPDATE account SET balance = balance - ? WHERE account_id = ?";
deductStmt = connection.prepareStatement(deductSql);
deductStmt.setDouble(1, 100); // 扣减金额
deductStmt.setInt(2, 1); // 账户A的ID
deductStmt.executeUpdate();
// 提交事务
connection.commit();
} catch (Exception e) {
// 出现异常回滚
connection.rollback();
}
从上面大家已经知道事务是基于连接级别的。也就是说在一个连接上某一瞬间同时只能有一个事务存在。当你在两个不同的连接上同时操作两个事务,两个事务是相互独立的,但是效果都会体现在数据库上。如果是两个事务同时开启并操作相同的数据,这种冲突当前的数据库会根据设置的隔离级别来进行处理,由数据库本身保证。
上面已经说了我们的事务都是基于连接的。如果在一次请求进来,在处理这一个请求期间,我们多次执行数据库操作,如果每次操作数据库的时候每次都会重新获取新的连接(现在大家都会配置数据库连接池,所以存在多次获取同一个连接的可能性),无论获取的是不是同一个数据库连接,只要是设置的是自动提交,数据库连接默认都是自动提交,就没办法保证整个请求生命周期中的数据符合事务一致性。
想必大家已经明白事务是基于连接的意义了,而连接又是面向数据库的。如果我们在一次请求中操作了同一个数据库,其中的事务我们叫做集中式事务,这个是比较好保证的,只需要我们在整个请求过程中保持使用的是同一个数据库连接,如果全部执行成功就commit,所有的数据库操作全部生效。如果失败就rollback,所有的数据库操作全部失效。本质可以参考上面的jdbc demo,其实Spring框架的事务就是基于这个原理做的。详情可以看大白话讲解Spring对数据源和事务管理以及多数据源配置
众所周知(你真知道吗?评论区告诉我),Spring的事务是基于事务管理器 @Transactional(transactionManager = "primaryTransactionManager")
,事务管理器是基于数据源。
// 配置主数据源的事务管理器
@Primary
@Bean(name = "primaryTransactionManager")
public DataSourceTransactionManager primaryTransactionManager(
@Qualifier("primaryDataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
Spring支持多个数据源的配置,也就支持多个事务管理器,也就支持多个不同数据源的事务,但是没办法保证不同数据源的事务保持一致,不同的数据源是在不同的事务上下文处理的,这种情况它们已经是分布式事务了。
如果是操作了不同的数据库,要么对所有数据库的操作全部成功,要么对所有数据库的操作全部回滚,这种场景的事务,我们叫做分布式事务。解释下,分布式事务是我们需要保证全局的一致性,无论是不是数据库,有可能我们把数据更新到Redis、Mongo或者别的保存数据状态的组件上。所以分布式事务只是一种对数据状态全局一致性的保证。
分布式事务,目前市场上比较成熟的阿里的开源的 SEATA分布式事务框架。
分布式事务的四种模式:
- Seata AT 模式
- Seata TCC 模式
- Seata Saga 模式
- Seata XA 模式
四种模式对比
模式 | 一致性 | 适用场景 | 性能 | 开发复杂度 | 数据库支持 |
---|---|---|---|---|---|
AT | 最终一致性 | 数据库为主的简单场景 | 高 | 低 | 关系型数据库 |
TCC | 强一致性 | 复杂业务逻辑 | 较高 | 高 | 多种存储类型 |
SAGA | 最终一致性 | 长事务、异步场景 | 较高 | 较高 | 不限 |
XA | 强一致性 | 金融类强一致性需求 | 较低 | 低 | 支持 XA 的数据库 |
AT模式
AT 模式是 Seata 中的核心模式,适用于关系型数据库的分布式事务处理。以下是一个使用 AT 模式的简单示例,演示如何在 Spring Boot 中集成 Seata 来处理分布式事务。假设有两个微服务 order-service
和 account-service
,它们分别负责订单处理和账户扣款。
1. 环境准备
- Spring Boot
- MySQL
- Seata Server
- Nacos(用于服务发现和配置管理)
2. 项目结构
假设我们有两个模块:
order-service
:负责生成订单account-service
:负责扣减账户余额
每个服务都使用 Spring Boot 和 MySQL,并且通过 Seata 来保证分布式事务的一致性。
seata-demo/
├── account-service
├── order-service
└── seata-server (单独运行的 Seata Server)
3. 依赖引入
每个微服务都需要引入以下依赖:
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
4. 配置 Seata
在 order-service
和 account-service
的 application.yml
中添加 Seata 和 Nacos 的配置。
spring:
application:
name: order-service
cloud:
nacos:
discovery:
server-addr: localhost:8848
datasource:
url: jdbc:mysql://localhost:3306/order_db
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.zaxxer.hikari.HikariDataSource
seata:
tx-service-group: my_test_tx_group
feign:
discovery:
enabled: true
server:
port: 8081
seata:
enabled: true
service:
vgroup-mapping:
my_test_tx_group: "default"
disable-global-transaction: false
5. 数据库表结构
order
表(订单服务中的订单表)
CREATE TABLE `order_tbl` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`user_id` VARCHAR(255) DEFAULT NULL,
`commodity_code` VARCHAR(255) DEFAULT NULL,
`count` INT(11) DEFAULT NULL,
`money` DECIMAL(11, 0) DEFAULT NULL,
PRIMARY KEY (`id`)
);
account
表(账户服务中的账户表)
CREATE TABLE `account_tbl` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`user_id` VARCHAR(255) DEFAULT NULL,
`money` DECIMAL(11, 0) DEFAULT NULL,
PRIMARY KEY (`id`)
);
6. Order Service 实现
在 order-service
中实现订单创建逻辑,并使用 Seata 的全局事务。
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private OrderService orderService;
@PostMapping("/create")
public String create(@RequestParam("userId") String userId,
@RequestParam("commodityCode") String commodityCode,
@RequestParam("count") Integer count,
@RequestParam("money") BigDecimal money) {
orderService.createOrder(userId, commodityCode, count, money);
return "Order created successfully!";
}
}
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private AccountFeignClient accountFeignClient;
@GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
public void createOrder(String userId, String commodityCode, Integer count, BigDecimal money) {
// 1. 创建订单
Order order = new Order();
order.setUserId(userId);
order.setCommodityCode(commodityCode);
order.setCount(count);
order.setMoney(money);
orderRepository.save(order);
// 2. 调用账户服务扣减余额
accountFeignClient.debit(userId, money);
// 3. 模拟异常
if (money.compareTo(BigDecimal.TEN) > 0) {
throw new RuntimeException("Money exceeds limit, transaction rollback!");
}
}
}
7. Account Service 实现
在 account-service
中实现扣减账户余额的逻辑。
@RestController
@RequestMapping("/account")
public class AccountController {
@Autowired
private AccountService accountService;
@PostMapping("/debit")
public String debit(@RequestParam("userId") String userId, @RequestParam("money") BigDecimal money) {
accountService.debit(userId, money);
return "Account debited successfully!";
}
}
@Service
public class AccountService {
@Autowired
private AccountRepository accountRepository;
public void debit(String userId, BigDecimal money) {
Account account = accountRepository.findByUserId(userId);
account.setMoney(account.getMoney().subtract(money));
accountRepository.save(account);
}
}
8. Feign Client
在 order-service
中通过 Feign 客户端调用 account-service
。
@FeignClient(name = "account-service")
public interface AccountFeignClient {
@PostMapping("/account/debit")
String debit(@RequestParam("userId") String userId, @RequestParam("money") BigDecimal money);
}
9. 启动 Seata Server
启动 Seata Server,并确保配置文件 registry.conf
中的注册中心指向 Nacos。
sh ./seata-server.sh -p 8091 -h 127.0.0.1
10. 测试分布式事务
通过调用 order-service
的接口 /order/create
创建订单并触发分布式事务。Seata 会确保在 order-service
和 account-service
之间的一致性。如果中间出现异常,Seata 会自动回滚两个服务中的事务。
curl -X POST "http://localhost:8081/order/create?userId=U1001&commodityCode=C00321&count=2&money=20"
总结
以上是一个简单的 Seata AT 模式的分布式事务处理示例。在这个示例中,order-service
和 account-service
分别管理订单和账户余额,Seata 确保了它们之间的事务一致性。
TCC模式
TCC(Try-Confirm-Cancel)模式是 Seata 中的一种分布式事务解决方案,通过分离业务操作的三个步骤(Try、Confirm、Cancel)来保证分布式事务的一致性。以下是一个 TCC 模式的简单示例,展示如何使用 Seata 的 TCC 模式在 Spring Boot 中处理分布式事务。
场景
假设有两个微服务:
order-service
: 负责创建订单。account-service
: 负责扣减账户余额。
TCC 模式下,每个服务都需要实现三种逻辑:
- Try: 尝试执行业务,预留资源。
- Confirm: 确认操作,提交业务。
- Cancel: 取消操作,回滚预留的资源。
1. 项目结构
seata-tcc-demo/
├── account-service
├── order-service
└── seata-server
2. 依赖引入
为两个服务引入必要的依赖(TCC 模式专用)。
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
3. TCC Resource 接口定义
TCC 模式中,每个服务都需要实现 Try、Confirm 和 Cancel 逻辑。在 account-service
和 order-service
中实现 TCC 资源接口。
以 account-service
为例:
public interface AccountTccAction {
/**
* Try: 预留资源(扣减余额)
*/
@TwoPhaseBusinessAction(name = "accountTccAction", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepareDecreaseAccount(BusinessActionContext context, String userId, BigDecimal money);
/**
* Confirm: 提交
*/
boolean commit(BusinessActionContext context);
/**
* Cancel: 回滚
*/
boolean rollback(BusinessActionContext context);
}
4. AccountTccActionImpl 实现
@Service
public class AccountTccActionImpl implements AccountTccAction {
@Autowired
private AccountRepository accountRepository;
// Try 阶段:预留资源,减少账户余额
@Override
public boolean prepareDecreaseAccount(BusinessActionContext context, String userId, BigDecimal money) {
Account account = accountRepository.findByUserId(userId);
if (account.getMoney().compareTo(money) < 0) {
throw new RuntimeException("Insufficient balance");
}
// 扣减可用余额,标记冻结金额
account.setFrozenMoney(account.getFrozenMoney().add(money));
account.setMoney(account.getMoney().subtract(money));
accountRepository.save(account);
return true;
}
// Confirm 阶段:确认提交,正式扣减余额
@Override
public boolean commit(BusinessActionContext context) {
String userId = context.getActionContext("userId").toString();
BigDecimal money = new BigDecimal(context.getActionContext("money").toString());
Account account = accountRepository.findByUserId(userId);
// 解除冻结金额
account.setFrozenMoney(account.getFrozenMoney().subtract(money));
accountRepository.save(account);
return true;
}
// Cancel 阶段:取消操作,回滚冻结的余额
@Override
public boolean rollback(BusinessActionContext context) {
String userId = context.getActionContext("userId").toString();
BigDecimal money = new BigDecimal(context.getActionContext("money").toString());
Account account = accountRepository.findByUserId(userId);
// 恢复余额
account.setFrozenMoney(account.getFrozenMoney().subtract(money));
account.setMoney(account.getMoney().add(money));
accountRepository.save(account);
return true;
}
}
5. Order Service 实现
OrderTccAction
定义:
public interface OrderTccAction {
@TwoPhaseBusinessAction(name = "orderTccAction", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepareCreateOrder(BusinessActionContext context, String userId, String commodityCode, Integer count, BigDecimal money);
boolean commit(BusinessActionContext context);
boolean rollback(BusinessActionContext context);
}
OrderTccActionImpl
实现:
@Service
public class OrderTccActionImpl implements OrderTccAction {
@Autowired
private OrderRepository orderRepository;
@Override
public boolean prepareCreateOrder(BusinessActionContext context, String userId, String commodityCode, Integer count, BigDecimal money) {
// 预留订单(只写入订单状态为“处理中”)
Order order = new Order();
order.setUserId(userId);
order.setCommodityCode(commodityCode);
order.setCount(count);
order.setMoney(money);
order.setStatus("processing");
orderRepository.save(order);
return true;
}
@Override
public boolean commit(BusinessActionContext context) {
// 提交事务,将订单状态改为“已完成”
Long orderId = Long.valueOf(context.getActionContext("orderId").toString());
Order order = orderRepository.findById(orderId).orElse(null);
if (order != null) {
order.setStatus("completed");
orderRepository.save(order);
}
return true;
}
@Override
public boolean rollback(BusinessActionContext context) {
// 回滚事务,删除订单或恢复订单状态
Long orderId = Long.valueOf(context.getActionContext("orderId").toString());
Order order = orderRepository.findById(orderId).orElse(null);
if (order != null) {
orderRepository.delete(order);
}
return true;
}
}
6. Feign Client 实现
order-service
调用 account-service
来执行账户扣款。
@FeignClient(name = "account-service")
public interface AccountFeignClient {
@PostMapping("/account/tcc/prepare")
boolean prepareDecreaseAccount(@RequestParam("userId") String userId, @RequestParam("money") BigDecimal money);
}
7. Global Transaction(全局事务控制)
在 order-service
中通过全局事务控制进行 TCC 操作:
@Service
public class OrderService {
@Autowired
private OrderTccAction orderTccAction;
@Autowired
private AccountFeignClient accountFeignClient;
@GlobalTransactional(name = "create-order-tcc", rollbackFor = Exception.class)
public void createOrder(String userId, String commodityCode, Integer count, BigDecimal money) {
// 调用订单服务的 TCC Try 阶段
orderTccAction.prepareCreateOrder(null, userId, commodityCode, count, money);
// 调用账户服务的 TCC Try 阶段
accountFeignClient.prepareDecreaseAccount(userId, money);
// 模拟异常情况,触发事务回滚
if (money.compareTo(BigDecimal.TEN) > 0) {
throw new RuntimeException("Money exceeds limit, transaction rollback!");
}
}
}
8. 测试
启动 Seata Server 并运行服务后,可以通过调用 order-service
的接口 /order/create
创建订单并触发 TCC 模式的全局事务。
curl -X POST "http://localhost:8081/order/create?userId=U1001&commodityCode=C00321&count=2&money=5"
总结
这个简单的 TCC 模式示例展示了如何在分布式系统中通过 Try-Confirm-Cancel 三阶段控制分布式事务的一致性。在这个示例中,order-service
和 account-service
通过 TCC 模式的接口实现,确保了分布式事务在各个服务中的一致性。
XA模式
XA 模式是分布式事务中的一种标准化协议,由 X/Open 组织提出,支持两阶段提交协议(2PC, Two-Phase Commit)。它通过协调器协调各参与者的提交和回滚操作,以保证分布式事务的一致性。Seata 的 XA 模式利用数据库本身的 XA 接口来实现分布式事务管理。下面是一个基于 Seata 的 XA 模式的简单示例。
场景
假设有两个微服务:
order-service
:负责订单的创建。account-service
:负责扣减账户余额。
XA 模式通过数据库的本地事务实现分布式事务的提交和回滚,Seata 作为事务协调器,协调这些本地事务的执行。
1. 项目结构
seata-xa-demo/
├── account-service
├── order-service
└── seata-server
2. 依赖引入
在每个服务中都需要引入 Seata 和 Spring Boot 的相关依赖:
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
3. 数据库配置
XA 模式依赖于数据库的本地事务支持。数据库需要开启 XA 支持。常见的关系型数据库(如 MySQL、PostgreSQL 等)大部分都支持 XA 事务。
配置数据源(MySQL):
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/order_db?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC
username: root
password: root
type: com.zaxxer.hikari.HikariDataSource
xa:
enabled: true
4. 配置 Seata
在 order-service
和 account-service
中的 application.yml
中配置 Seata XA 模式。
seata:
tx-service-group: my_test_tx_group
mode: XA
5. 数据库表结构
order_tbl
(订单表):
CREATE TABLE `order_tbl` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`user_id` VARCHAR(255) DEFAULT NULL,
`commodity_code` VARCHAR(255) DEFAULT NULL,
`count` INT(11) DEFAULT NULL,
`money` DECIMAL(11, 0) DEFAULT NULL,
PRIMARY KEY (`id`)
);
account_tbl
(账户表):
CREATE TABLE `account_tbl` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`user_id` VARCHAR(255) DEFAULT NULL,
`money` DECIMAL(11, 0) DEFAULT NULL,
PRIMARY KEY (`id`)
);
6. Order Service 实现
OrderController
:
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private OrderService orderService;
@PostMapping("/create")
public String create(@RequestParam("userId") String userId,
@RequestParam("commodityCode") String commodityCode,
@RequestParam("count") Integer count,
@RequestParam("money") BigDecimal money) {
orderService.createOrder(userId, commodityCode, count, money);
return "Order created successfully!";
}
}
OrderService
:
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private AccountFeignClient accountFeignClient;
@GlobalTransactional(name = "create-order-xa", rollbackFor = Exception.class)
public void createOrder(String userId, String commodityCode, Integer count, BigDecimal money) {
// 1. 创建订单
Order order = new Order();
order.setUserId(userId);
order.setCommodityCode(commodityCode);
order.setCount(count);
order.setMoney(money);
orderRepository.save(order);
// 2. 调用账户服务扣减余额
accountFeignClient.debit(userId, money);
// 3. 模拟异常
if (money.compareTo(BigDecimal.TEN) > 0) {
throw new RuntimeException("Money exceeds limit, transaction rollback!");
}
}
}
7. Account Service 实现
AccountController
:
@RestController
@RequestMapping("/account")
public class AccountController {
@Autowired
private AccountService accountService;
@PostMapping("/debit")
public String debit(@RequestParam("userId") String userId, @RequestParam("money") BigDecimal money) {
accountService.debit(userId, money);
return "Account debited successfully!";
}
}
AccountService
:
@Service
public class AccountService {
@Autowired
private AccountRepository accountRepository;
@Transactional
public void debit(String userId, BigDecimal money) {
Account account = accountRepository.findByUserId(userId);
if (account.getMoney().compareTo(money) < 0) {
throw new RuntimeException("Insufficient balance");
}
account.setMoney(account.getMoney().subtract(money));
accountRepository.save(account);
}
}
8. Feign Client
order-service
使用 Feign Client 调用 account-service
的扣减余额操作。
@FeignClient(name = "account-service")
public interface AccountFeignClient {
@PostMapping("/account/debit")
String debit(@RequestParam("userId") String userId, @RequestParam("money") BigDecimal money);
}
9. 启动 Seata Server
启动 Seata Server,确保它能够正确注册到 Nacos 或者其他注册中心,并将事务日志写入数据库。
sh ./seata-server.sh -p 8091 -h 127.0.0.1
10. 测试
通过调用 order-service
的接口 /order/create
创建订单并触发 XA 模式的分布式事务。
curl -X POST "http://localhost:8081/order/create?userId=U1001&commodityCode=C00321&count=2&money=5"
这将会触发订单创建和账户扣款的操作,Seata 的 XA 模式会确保这两个操作在不同的数据库上保持一致性。如果在任何服务中发生异常,Seata 会回滚事务,确保数据的一致性。
11. 总结
Seata 的 XA 模式依赖于数据库本地事务接口,通过两阶段提交(2PC)协议保证多个数据库实例间的事务一致性。在该示例中,order-service
和 account-service
都通过 Seata 协调执行分布式事务操作,确保了订单创建和账户扣款操作的原子性。
SEATA模式
以下是一个完整的 Seata 分布式事务的示例,展示如何在 Spring Boot 项目中使用 Seata 进行分布式事务管理。
场景
假设有两个微服务:
order-service
: 负责创建订单。account-service
: 负责扣减账户余额。
当创建订单的同时,账户余额也需要扣除,这些操作需要在一个分布式事务中进行,以确保数据一致性。
1. 项目结构
seata-demo/
├── account-service
├── order-service
└── seata-server
2. 依赖引入
在两个服务中都需要引入以下 Seata 相关依赖:
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
3. 配置 Seata
在 application.yml
中配置 Seata,并确保 Seata 使用的注册中心和配置中心,比如 Nacos:
seata:
tx-service-group: my_test_tx_group
service:
vgroup-mapping:
my_test_tx_group: "default"
enable-auto-data-source-proxy: true
data-source-proxy-mode: AT
4. 数据库表结构
在两个服务的数据库中创建以下表:
order_tbl
(订单表):
CREATE TABLE `order_tbl` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`user_id` VARCHAR(255) DEFAULT NULL,
`commodity_code` VARCHAR(255) DEFAULT NULL,
`count` INT(11) DEFAULT NULL,
`money` DECIMAL(11, 0) DEFAULT NULL,
PRIMARY KEY (`id`)
);
account_tbl
(账户表):
CREATE TABLE `account_tbl` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`user_id` VARCHAR(255) DEFAULT NULL,
`money` DECIMAL(11, 0) DEFAULT NULL,
PRIMARY KEY (`id`)
);
5. Account Service 实现
account-service
负责扣减账户余额。
AccountController
:
@RestController
@RequestMapping("/account")
public class AccountController {
@Autowired
private AccountService accountService;
@PostMapping("/debit")
public String debit(@RequestParam("userId") String userId, @RequestParam("money") BigDecimal money) {
accountService.debit(userId, money);
return "Account debited successfully!";
}
}
AccountService
:
@Service
public class AccountService {
@Autowired
private AccountRepository accountRepository;
@Transactional
public void debit(String userId, BigDecimal money) {
Account account = accountRepository.findByUserId(userId);
if (account.getMoney().compareTo(money) < 0) {
throw new RuntimeException("Insufficient balance");
}
account.setMoney(account.getMoney().subtract(money));
accountRepository.save(account);
}
}
AccountRepository
:
@Repository
public interface AccountRepository extends JpaRepository<Account, Long> {
Account findByUserId(String userId);
}
6. Order Service 实现
order-service
负责创建订单。
OrderController
:
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private OrderService orderService;
@PostMapping("/create")
public String create(@RequestParam("userId") String userId,
@RequestParam("commodityCode") String commodityCode,
@RequestParam("count") Integer count,
@RequestParam("money") BigDecimal money) {
orderService.createOrder(userId, commodityCode, count, money);
return "Order created successfully!";
}
}
OrderService
:
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private AccountFeignClient accountFeignClient;
@GlobalTransactional(name = "create-order-tx", rollbackFor = Exception.class)
public void createOrder(String userId, String commodityCode, Integer count, BigDecimal money) {
// 1. 创建订单
Order order = new Order();
order.setUserId(userId);
order.setCommodityCode(commodityCode);
order.setCount(count);
order.setMoney(money);
orderRepository.save(order);
// 2. 调用账户服务扣减余额
accountFeignClient.debit(userId, money);
// 3. 模拟异常
if (money.compareTo(BigDecimal.TEN) > 0) {
throw new RuntimeException("Money exceeds limit, transaction rollback!");
}
}
}
OrderRepository
:
@Repository
public interface OrderRepository extends JpaRepository<Order, Long> {
}
7. Feign Client
order-service
使用 Feign Client 调用 account-service
的扣减余额操作。
@FeignClient(name = "account-service")
public interface AccountFeignClient {
@PostMapping("/account/debit")
String debit(@RequestParam("userId") String userId, @RequestParam("money") BigDecimal money);
}
8. 启动 Seata Server
确保 Seata Server 已启动,并注册到 Nacos 或者其他服务注册中心。
sh ./seata-server.sh -p 8091 -h 127.0.0.1
9. 测试
通过调用 order-service
的接口 /order/create
创建订单并触发分布式事务:
curl -X POST "http://localhost:8081/order/create?userId=U1001&commodityCode=C00321&count=2&money=5"
10. 总结
在这个简单的 Seata Demo 中,我们通过 order-service
和 account-service
演示了 Seata 的 AT 模式如何保证分布式事务的一致性。在 order-service
中,通过 Seata 的 @GlobalTransactional
注解,将订单创建和账户扣款的操作放在同一个全局事务中,确保了跨服务操作的原子性。如果任一操作失败,事务将回滚,保证数据一致性。