问题描述:
先说一下流程:后端保存前端提交的图表信息,然后发送异步消息到消息队列,由下游服务去处理图表信息。
部署项目到服务器,验证项目功能的时候,出现了以下错误:数据库存在数据。下游服务查不到数据库的数据
// service代码
@Override
@Retryable
@Transactional
public ChartVo genChartByAiAsyncMq(Long uid, MultipartFile multipartFile, GenChartByAiRequest genChartByAiRequest) {
// ...省略
Chart chart = Chart.builder().name(name).goal(goal)
.chartData(data).chartType(chartType)
.uid(uid).status("wait").build();
boolean save = this.save(chart);
if(!save){
log.info("保存表单失败");
throw new RuntimeException("保存表单失败");
}
List<Chart> list = this.list();
log.info("chart长度:{}", list.size());
// 发送消息,触发异步处理
biMessageProducer.sendMessage(String.valueOf(chart.getId()));
log.info("发送消息成功");
// 省略
}
下游服务处理代码
@SneakyThrows
@RabbitListener(queues = {MQConstants.BI_QUEUE_NAME}, ackMode = "MANUAL")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
log.info("receive message: {}", message);
// 检查消息是否为空
if(StringUtils.isBlank(message)){
log.info("消息不能为空");
channel.basicNack(deliveryTag, false, false);
throw new RuntimeException("消息不能为空");
}
List<Chart> list = biService.list();
log.info("图表列表长度:{}", list.size());
// 尝试将消息解析为图表ID,并查询图表信息
Long chartId = Long.parseLong(message);
Chart chart = biService.getById(chartId);
log.info("图表信息:{}", chart);
// 图表不存在时的处理
if(chart == null){
log.info("图表不存在");
channel.basicNack(deliveryTag, false, false);
throw new RuntimeException("图表不存在");
}
// 省略
}
日志输出如下:
数据库信息:
解决过程
首先说明一下,这个错误之前没有出现过,下午出错,再次测试的时候,也会出现正常的情况,只不过错误占比有点高(10次有6次获取不到数据库消息)。
分析的过程:
步骤1、首先在上游服务和下游服务打印日志,查看数据库有多少条数据,上游服务显示有2条数据,下游服务显示有1条数据
步骤2、找错的时候,看见方法加了事务注解@Transactional,这个时候想到可能是事务影响(后面分析原因),然后取掉注解,重新验证,发现没有出错
原因分析
为什么会出现这个错误呢?
我觉得是由MySQL的事务和网络引起的,
MySQL事务+网络
我们都知道MySQL(8.x版本)的事务的隔离级别默认是可重复读(RR),那么一个事务在操作完成之前,对其他事务是不看见的,所以就说,方法中先保存图表信息到数据库,然后发送消息到消息队列,再执行方法的后续过程。发送消息到队列之后,可能数据库事务还没有提交,但是消息发送成功了,就立刻被消费者端消费,此时,消费者端查询数据库中的图表信息,当然查不到,因为生产者端的事务还没有提交。
之前没有出错,这次验证出错分析
服务端将消息发送消息队列,由于网络有延迟,导致事务提交之后,消息才被消费端消费。
解决方法
1、手动提交事务,不使用注解
2、设置延迟队列,但是这个延迟的时间具体是多少,我们无法确定,所以最后采用第一种方法解决此问题。
@Slf4j
@Service
public class BiServiceImpl extends ServiceImpl<ChartMapper, Chart> implements BiService {
@Resource
private PlatformTransactionManager transactionManager;
@Override
@Retryable
public ChartVo genChartByAiAsyncMq(Long uid, MultipartFile multipartFile, GenChartByAiRequest genChartByAiRequest) {
TransactionStatus transactionStatus = transactionManager.getTransaction(new DefaultTransactionDefinition());
try{
// 省略...
Chart chart = Chart.builder().name(name).goal(goal)
.chartData(data).chartType(chartType)
.uid(uid).status("wait").build();
boolean save = this.save(chart);
if(!save){
log.info("保存表单失败");
throw new RuntimeException("保存表单失败");
}
transactionManager.commit(transactionStatus);
List<Chart> list = this.list();
log.info("chart长度:{}", list.size());
// 发送消息,触发异步处理
biMessageProducer.sendMessage(String.valueOf(chart.getId()));
log.info("发送消息成功");
// 省略...
}catch (Exception e){
log.error("AI 异步调用失败", e);
transactionManager.rollback(transactionStatus);
throw new RuntimeException("AI 异步调用失败");
}
}
}