分布式事务seata使用示例及注意事项
- 示例说明
- 代码
- 调用方(微服务A)
- 服务方(微服务B)
- 测试
- 测试一 ,seata发挥作用,成功回滚!
- 测试二:处理feignclient接口的返回类型从Integer变成String,其他的环境及代码完全一样,但是结果是“数据没有回滚:一致性未能保证”
- ```结论一:要保证事务自动回滚,则需要有对应的异常抛出,才会触发自动回滚机制```
- 测试三,针对如下场景:服务端(微服务B)抛出了异常,但是业务上又不能直接抛出,需要try...catch...捕获后并处理再返回。 在这种场景下如何保证分布式事务的回滚?—手动回滚
- ```结论二: 业务上或交互上不允许抛出异常时,可采用上面的手动回滚的方式实现分布式事务```
- 测试三 考虑这样一种场景:服务A调用服务B,服务B调用服务C ; 即是 A—>B—>C的情况;B即是服务端也是调用端
- ```结论三: A——>B——>C,当C抛出异常时,可依次回滚! 注意,在调用端需加上@GlobalTransactional,如果对抛出的异常做了捕获处理则需要手动进行回滚!```
- 延伸 关于openfeign调用:服务端返回异常信息,调用端接收异常的说明
- 前面“测试一”中调用端接收到的异常是 服务端抛出的异常吗??
- openfeign调用如何接收到服务端返回的真实的异常信息呢?
- 关于feign.codec.ErrorDecoder的使用及注意
示例说明
- 有两个微服务A、B。微服务A连接本地数据库ai_vs_remote_demo,并向表xl中插入一条数据;微服务B连接远程数据库hrm_db,并向表job_inf中插入一条数据;数据库ai_vs_remote_demo与hrm_db在不同的物理机、不同的地方。
- 示例模拟这样一种情况:微服务A成功插入一条数据,微服务B在插入数据的业务过程中抛出异常,以此来验证分布式事务的回滚:微服务B的数据插入失败,
同时微服务A插入的数据会被回滚!
代码
调用方(客户端):微服务A , 服务提供方(服务器端):微服务B
调用方(微服务A)
- controller
@RestController
public class ClassForTest {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private XlServiceImpl xlServiceImpl;
@GetMapping("/dosth")
public CheckListPo doSth() throws Exception {
String ne = xlServiceImpl.doInsert();
return null;
}
}
- service
@Service
@Transactional(rollbackFor = Exception.class)
public class XlServiceImpl {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private XlMapper xlMapper;
@Autowired
private XlFeignClient xlFeignClient;
@GlobalTransactional(rollbackFor = Exception.class)
public String doInsert() throws Exception {
// 演示如何获取seata的xid
String xid = RootContext.getXID();
logger.info("++++++++xid="+xid);
// 本地数据库插入一条数据(向数据库ai_vs_remote_demo的表xl中插入一条数据)
xlMapper.insert2();
// openfeign调用远程微服务B(在微服务B中会向数据库hrm_db的表job_inf中插入一条数据)
Integer ne = xlFeignClient.invokeBusi();
return "";
}
}
- feigclient
@FeignClient(contextId = "xl20231108", name = "checklist-service")
public interface XlFeignClient {
@PostMapping("/checklistservice/xl/test")
Integer invokeBusi() throws Exception;
}
服务方(微服务B)
- controller
@RestController
public class XlContoller {
@Autowired
private XlBusiServiceImpl XlBusiServiceImpl;
@PostMapping("/checklistservice/xl/test")
public Integer invokeBusi() throws Exception {
XlBusiServiceImpl.test();
return 3;
}
}
- service ,
这里会抛出异常:以测试分布式事务的回滚
@Service
@Transactional(rollbackFor = Exception.class)
public class XlBusiServiceImpl {
@Autowired
private XlBusiMapper xlBusiMapper;
public String test() {
xlBusiMapper.insert1();
int x = 0;
int y = 3 / x ;
return "";
}
}
测试
测试一 ,seata发挥作用,成功回滚!
- 先看下测试前数据库的数据
- 运行项目:http://localhost:8088/dosth
-
微服务A中的数据库操作语句
xlMapper.insert2();
执行成功!
-
微服务B中抛出异常!
以上两点符合预期
-
微服务A中同样也抛出了异常,
但是异常信息并不是微服务A传递过来的
, 这是个疑点!后面会解释!
feign.codec.DecodeException: Could not extract response: no suitable HttpMessageConverter found for response type [class java.lang.Integer] and content type [text/html;charset=UTF-8]
at feign.AsyncResponseHandler.decode(AsyncResponseHandler.java:119) ~[feign-core-10.12.jar:?]
at feign.AsyncResponseHandler.handleResponse(AsyncResponseHandler.java:87) ~[feign-core-10.12.jar:?]
at feign.SynchronousMethodHandler.executeAndDecode(SynchronousMethodHandler.java:138) ~[feign-core-10.12.jar:?]
at feign.SynchronousMethodHandler.invoke(SynchronousMethodHandler.java:89) ~[feign-core-10.12.jar:?]
at feign.ReflectiveFeign$FeignInvocationHandler.invoke(ReflectiveFeign.java:100) ~[feign-core-10.12.jar:?]
at com.sun.proxy.$Proxy236.invokeBusi(Unknown Source) ~[?:?]
at com.omomcam.service.impl.xl.XlServiceImpl.doInsert(XlServiceImpl.java:43) ~[classes/:?]
- 最终数据库的结果,因为微服务A抛出了异常,所以自然会回滚插入的数据!
测试二:处理feignclient接口的返回类型从Integer变成String,其他的环境及代码完全一样,但是结果是“数据没有回滚:一致性未能保证”
- 同样运行项目并访问:http://localhost:8088/dosth。
- 微服务B抛出同样的异常
- 但是,
微服务A没有抛出异常!
也就是说,在微服务A中的如下代码一切正常执行完毕!
@GlobalTransactional(rollbackFor = Exception.class)
public String doInsert() throws Exception {
// 演示如何获取seata的xid
String xid = RootContext.getXID();
logger.info("++++++++xid="+xid);
// 本地数据库插入一条数据(向数据库ai_vs_remote_demo的表xl中插入一条数据)
xlMapper.insert2();
// openfeign调用远程微服务B(在微服务B中会向数据库hrm_db的表job_inf中插入一条数据)
String ne = xlFeignClient.invokeBusi();
return "";
}
既然,程序正常执行完毕,那当然也不会存在数据的回滚了
- 数据库验证:数据是否回滚
刚刚访问http://localhost:8088/dosth 了两次
- 未回滚原因分析
上面已经提到了,因为微服务A中代码正常执行完成并没有抛出异常,所以是不会触发回滚机制的:
那为什么(对比“测试一”)将feignclient的中接口的返回类型从Integer改为String,调用方(微服务A)就不报错了呢?
原因参考:openfeign客户端A调用服务B,服务B抛出异常时,客户端A接收的几种情况
结论一:要保证事务自动回滚,则需要有对应的异常抛出,才会触发自动回滚机制
测试三,针对如下场景:服务端(微服务B)抛出了异常,但是业务上又不能直接抛出,需要try…catch…捕获后并处理再返回。 在这种场景下如何保证分布式事务的回滚?—手动回滚
-
服务端抛出的异常被捕获了,而调用端又必须要有异常抛出才会回滚,这是相互矛盾的。那么要解决这个矛盾就只有手动进行回滚!
-
手动回滚代码
try {
GlobalTransactionContext.reload(RootContext.getXID()).rollback();
} catch (TransactionException e1) {
e1.printStackTrace();
}
手动回滚的时机/位置:是在服务端还是调用端呢?
经测试手动回滚的代码,只能放在调用端,放在服务端无效!
- 调用端示例代码
特别说明:手动回滚时,不能同时使用io.seata.spring.annotation.GlobalTransactional注解和org.springframework.transaction.annotation.Transactional注解,因为这里使用的是全局事务@GlobalTransactional的手动回滚功能,所以不能有@Transactional注解,至少要保证在@GlobalTransactional的方法上不能有@Transactional注解的作用(注意:这个时候也不能在类上加@Transactional了,因为其会作用于所有的方法)
package com.omomcam.service.impl.xl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.omomcam.dao.xl.XlMapper;
import com.omomcam.entity.common.ResponseData;
import com.omomcam.feign.XlFeignClient;
import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import io.seata.tm.api.GlobalTransactionContext;
@Service
//注释掉本注解,如果其他非@GlobalTransactional的方法上需要@Transactional注解,可单独加在方法上
//@Transactional(rollbackFor = Exception.class)
public class XlServiceImpl {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private XlMapper xlMapper;
@Autowired
private XlFeignClient xlFeignClient;
public void fb() {
xlMapper.insert1();
int y = 0;
int x = 3/y;
}
@GlobalTransactional(rollbackFor = Exception.class)
public String doInsert() throws Exception {
// 本地数据库插入一条数据(向数据库ai_vs_remote_demo的表xl中插入一条数据)
xlMapper.insert2();
// openfeign调用远程微服务B(在微服务B中会向数据库hrm_db的表job_inf中插入一条数据)
ResponseData<String> rd = xlFeignClient.invokeBusi();
if (rd.getRespCode() != 200) {// 非200(失败)时,手动回滚事务,可根据具体业务自己灵活定义
try {
GlobalTransactionContext.reload(RootContext.getXID()).rollback();
} catch (TransactionException e1) {
e1.printStackTrace();
}
}
return "";
}
}
- 服务端代码
抛出异常,但是会捕获并处理返回业务信息
抛出异常的代码:
package com.omomcam.checklistservice.service.impl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.omomcam.checklistservice.dao.XlBusiMapper;
import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import io.seata.tm.api.GlobalTransactionContext;
@Service
@Transactional(rollbackFor = Exception.class)
public class XlBusiServiceImpl {
@Autowired
private XlBusiMapper xlBusiMapper;
public String test() {
xlBusiMapper.insert1();
int x = 0;
int y = 3 / x ;
return "";
}
}
捕获异常并返回自定义信息给调用端,代码如下:
package com.omomcam.checklistservice.controller;
import javax.servlet.http.HttpServletResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import com.omomcam.checklistservice.service.impl.XlBusiServiceImpl;
import com.omomcam.entity.common.ResponseData;
@RestController
public class XlContoller {
@Autowired
private XlBusiServiceImpl XlBusiServiceImpl;
@PostMapping("/checklistservice/xl/test")
public ResponseData<String> invokeBusi(HttpServletResponse response) throws Exception {
@SuppressWarnings("unchecked")
ResponseData<String> rd = (ResponseData<String>) ResponseData.init();
try {
XlBusiServiceImpl.test();
} catch (Exception e) {
rd.setRespCode(400);
rd.setRespMsg("服务端发生了异常");
rd.setData("其他信息。。。");
}
return rd;
}
}
- 测试
说明:尽管服务端的异常被自己捕获了,返回了自定的相关信息,即是调用端没有收到任何异常。但是,调用端采用了手动回滚的方式,所以,调用端插入的数据仍然会被回滚!
运行前数据库中的数据:
执行过程
执行后数据库中的数据:
对比前后数据库及执行过程可知:数据回滚了!
结论二: 业务上或交互上不允许抛出异常时,可采用上面的手动回滚的方式实现分布式事务
测试三 考虑这样一种场景:服务A调用服务B,服务B调用服务C ; 即是 A—>B—>C的情况;B即是服务端也是调用端
如果B发生异常,则需要回滚A!上面的 测试二 已经测试了此种情况
如果C发生异常,则需要回滚B以及A!下面测试此种情况
- 测试 (在上面“测试二”的基础上修改代码)
因为这里只有两个微服务A和B ,所以A处理是自己以外还扮演C的角色。即是调用链为 A——>B——>A。详细解释即是:调用端A——>服务端B 调用端B——>服务端A
- 修改服务B的代码
因为服务B既作为服务端有作为调用端,故需要保证服务端B中的数据库操作正常,以验证服务端A抛出异常后服务端B的数据可以回滚并且调用端A的数据库操作同样能够回滚!
保证XlBusiServiceImpl.test();调用正常:注释掉一下两行代码
package com.omomcam.checklistservice.service.impl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.omomcam.checklistservice.dao.XlBusiMapper;
import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import io.seata.tm.api.GlobalTransactionContext;
@Service
@Transactional(rollbackFor = Exception.class)
public class XlBusiServiceImpl {
@Autowired
private XlBusiMapper xlBusiMapper;
public String test() {
xlBusiMapper.insert1();
// int x = 0;
// int y = 3 / x ;
return "";
}
}
服务端B及调用端B的完整代码如下,因为B这里又作为调用端所以需要加上 手动回滚 的代码以及注解 @GlobalTransactional(rollbackFor = Exception.class)
package com.omomcam.checklistservice.controller;
import javax.servlet.http.HttpServletResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import com.omomcam.checklistservice.feign.OnlineReadingRestFeignClient;
import com.omomcam.checklistservice.service.impl.XlBusiServiceImpl;
import com.omomcam.entity.common.ResponseData;
import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import io.seata.tm.api.GlobalTransactionContext;
@RestController
public class XlContoller {
@Autowired
private XlBusiServiceImpl XlBusiServiceImpl;
@Autowired
private OnlineReadingRestFeignClient onlineReadingRestFeignClient;
@GlobalTransactional(rollbackFor = Exception.class) // 需要加上 本注解
@PostMapping("/checklistservice/xl/test")
public ResponseData<String> invokeBusi(HttpServletResponse response) throws Exception {
@SuppressWarnings("unchecked")
ResponseData<String> rd = (ResponseData<String>) ResponseData.init();
try {
XlBusiServiceImpl.test();
} catch (Exception e) {
rd.setRespCode(400);
rd.setRespMsg("服务端发生了异常");
rd.setData("其他信息。。。");
return rd; // 抛出异常后,直接返回
}
@SuppressWarnings("unchecked")
ResponseData<Integer> resd = (ResponseData<Integer>) ResponseData.init();
try {
// 本"服务端"又反过去调用 "调用端" 即:本"服务端"现在扮演的是一个新的"调用端",原"调用端"变成了新的"服务端"
resd = onlineReadingRestFeignClient.xlSeataTest();
if (resd.getRespCode() != 200) { // 不等于200,说明发生了异常,则手动回滚
try {
GlobalTransactionContext.reload(RootContext.getXID()).rollback();
} catch (TransactionException e1) {
e1.printStackTrace();
}
}
} catch (Exception e) {
rd.setRespCode(resd.getRespCode());
rd.setRespMsg(resd.getRespMsg());
return rd;
}
return rd;
}
}
- 修改服务A的代码
增加提供服务的接口方法,同时要保证返回的转态码不等于200:使xlServiceImpl.fb();抛出异常;详细见如下代码:
@PostMapping("/seata/xl/test")
public ResponseData<Integer> xlSeataTest() {
@SuppressWarnings("unchecked")
ResponseData<Integer> rd = (ResponseData<Integer>) ResponseData.init();
try {
xlServiceImpl.fb();
} catch (Exception e) {
rd.setRespCode(500);
}
return rd;
}
完整controller代码:
package com.omomcam.controller.xl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import com.omomcam.entity.CheckListPo;
import com.omomcam.entity.common.ResponseData;
import com.omomcam.service.impl.xl.XlServiceImpl;
@RestController
public class ClassForTest {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private XlServiceImpl xlServiceImpl;
@GetMapping("/dosth")
public CheckListPo doSth() throws Exception {
String ne = xlServiceImpl.doInsert();
return null;
}
@PostMapping("/seata/xl/test")
public ResponseData<Integer> xlSeataTest() {
@SuppressWarnings("unchecked")
ResponseData<Integer> rd = (ResponseData<Integer>) ResponseData.init();
try {
xlServiceImpl.fb();
} catch (Exception e) {
rd.setRespCode(500);
}
return rd;
}
}
完整service代码:
package com.omomcam.service.impl.xl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.omomcam.dao.xl.XlMapper;
import com.omomcam.entity.common.ResponseData;
import com.omomcam.feign.XlFeignClient;
import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import io.seata.tm.api.GlobalTransactionContext;
@Service
//注释掉本注解,如果其他非@GlobalTransactional的方法上需要@Transactional注解,可单独加在方法上
//@Transactional(rollbackFor = Exception.class)
public class XlServiceImpl {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private XlMapper xlMapper;
@Autowired
private XlFeignClient xlFeignClient;
@Transactional(rollbackFor = Exception.class)
public void fb() {
xlMapper.insert1();
int y = 0;
int x = 3/y;
}
@GlobalTransactional(rollbackFor = Exception.class)
public String doInsert() throws Exception {
// 本地数据库插入一条数据(向数据库ai_vs_remote_demo的表xl中插入一条数据)
xlMapper.insert2();
// openfeign调用远程微服务B(在微服务B中会向数据库hrm_db的表job_inf中插入一条数据)
ResponseData<String> rd = xlFeignClient.invokeBusi();
if (rd.getRespCode() != 200) { // 非200(失败)时,手动回滚事务
try {
GlobalTransactionContext.reload(RootContext.getXID()).rollback();
} catch (TransactionException e1) {
e1.printStackTrace();
}
}
return "";
}
}
- 测试
先看调用端A的执行情况:
再看服务端B执行情况
再看调用端B的执行情况:
执行前的数据库中数据的情况
再执行一遍
执行后的数据库中数据情况
截图说明,数据成功回滚: C异常回滚B以及回滚A
结论三: A——>B——>C,当C抛出异常时,可依次回滚! 注意,在调用端需加上@GlobalTransactional,如果对抛出的异常做了捕获处理则需要手动进行回滚!
延伸 关于openfeign调用:服务端返回异常信息,调用端接收异常的说明
前面“测试一”中调用端接收到的异常是 服务端抛出的异常吗??
显示不是的!
服务端返回的是
/ by zero 异常
调用端接收到的是
HttpMessageConverter 转换异常
feign.codec.DecodeException: Could not extract response: no suitable HttpMessageConverter found for response type [class java.lang.Integer] and content type [text/html;charset=UTF-8]
at feign.AsyncResponseHandler.decode(AsyncResponseHandler.java:119) ~[feign-core-10.12.jar:?]
at feign.AsyncResponseHandler.handleResponse(AsyncResponseHandler.java:87) ~[feign-core-10.12.jar:?]
at feign.SynchronousMethodHandler.executeAndDecode(SynchronousMethodHandler.java:138) ~[feign-core-10.12.jar:?]
at feign.SynchronousMethodHandler.invoke(SynchronousMethodHandler.java:89) ~[feign-core-10.12.jar:?]
at feign.ReflectiveFeign$FeignInvocationHandler.invoke(ReflectiveFeign.java:100) ~[feign-core-10.12.jar:?]
at com.sun.proxy.$Proxy236.invokeBusi(Unknown Source) ~[?:?]
at com.omomcam.service.impl.xl.XlServiceImpl.doInsert(XlServiceImpl.java:43) ~[classes/:?]
参考:openfeign客户端A调用服务B,服务B抛出异常时,客户端A接收的几种情况
openfeign调用如何接收到服务端返回的真实的异常信息呢?
参考:
摘自: openfeign集成sentinel实现服务降级
关于feign.codec.ErrorDecoder的使用及注意
1、实现ErrorDecoder接口
package com.omomcam.config;
import feign.Response;
import feign.Response.Body;
import feign.codec.ErrorDecoder;
import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.tm.api.GlobalTransactionContext;
public class CustomErrorDecoder implements ErrorDecoder {
private final ErrorDecoder defaultErrorDecoder = new Default();
@Override
public Exception decode(String methodKey, Response response) {
Body body = response.body();
String bodyStr = body.toString();
// 在这里进行自定义的异常处理逻辑
// 例如,你可以根据 HTTP 状态码区分不同的异常类型
if (response.status() == 404) {
// 处理 404 错误
// return new NotFoundException("Not Found");
}
// try {
// GlobalTransactionContext.reload(RootContext.getXID()).rollback();
// } catch (TransactionException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// 如果没有匹配到特定的异常,使用默认的 ErrorDecoder
return defaultErrorDecoder.decode(methodKey, response);
}
}
2、将实现类配置进spring环境
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import feign.codec.ErrorDecoder;
@Configuration
public class MyConfig {
@Bean
public ErrorDecoder errorDecoder() {
return new CustomErrorDecoder();
}
}
3, 特别注意!!并不是服务器端只要抛出了异常就会触发下面的方法执行,只有response的status不是200的时候才会触发
@Override
public Exception decode(String methodKey, Response response) {
Body body = response.body();
String bodyStr = body.toString();
// 在这里进行自定义的异常处理逻辑
// 例如,你可以根据 HTTP 状态码区分不同的异常类型
if (response.status() == 404) {
// 处理 404 错误
// return new NotFoundException("Not Found");
}
// try {
// GlobalTransactionContext.reload(RootContext.getXID()).rollback();
// } catch (TransactionException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// 如果没有匹配到特定的异常,使用默认的 ErrorDecoder
return defaultErrorDecoder.decode(methodKey, response);
}
什么情况response的status不是200?
- openfeign自动返回非200的情况
- 在程序中手动设置response的status的值
import javax.servlet.http.HttpServletResponse;
public ResponseData<String> invokeBusi(HttpServletResponse response) throws Exception {
try {
XlBusiServiceImpl.test();
} catch (Exception e) {
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
// 或者直接设任意非200类型的值
response.setStatus(404);
ne.setSs("报错了,报错信息:"+e.getMessage());
}
}