消息队列发送支付通知消息
需求分析
订单服务作为通用服务,在订单支付成功后需要将支付结果异步通知
给其他对接的微服务,微服务收到支付结果根据订单的类型去更新自己的业务数据
技术方案
使用消息队列进行异步通知需要保证消息的可靠性即生产端将消息成功通知到服务端: 消息发送到交换机 --> 由交换机发送到队列 --> 消费者监听队列,收到消息进行处理,参考文章02- 使用Docker安装RabbitMQ-CSDN博客
-
生产者确认机制
: 发送消息前使用数据库事务将消息保证到数据库表中,成功发送到交换机将消息从数据库中删除 -
配置MQ持久化(交换机、队列、发送消息)
:MQ收到消息持久化,当MQ重启时即使消息没有消费完也不会丢失 -
消费者确认机制
: 消费者消费成功,自动发送ACK,负责重试消费
发布订阅模式
: 订单服务接收支付成功结果通知后创建一条消息发送给Fanout广播类型的交换机
,学习中心服务
绑定队列到交换机接收消息,参考文章04- 基于SpringAMQP封装RabbitMQ,消息队列的Work模型和发布订阅模型-CSDN博客
环境搭建
第一步: 在订单服务和学习中心服务
中添加消息队列依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
第二步:在Nacos的dev环境下添加RabbitMQ的配置信息rabbitmq-dev.yaml
,设置group为xuecheng-plus-common
spring:
rabbitmq:
host: 192.168.101.128 # 主机
port: 5672 # 端口名
username: root # 用户名
password: root # 密码
virtual-host: / # 虚拟主机
publisher-confirm-type: correlated # 异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-returns: true # 开启publish-return功能,同样是基于callback机制调用回调函数ReturnCallback
template:
mandatory: true # 定义消息路由失败时的策略,true表示调用ReturnCallback;false表示直接丢弃消息
listener:
simple:
# 每次只能获取一条消息,处理完成才能获取下一个消息
prefetch: 1
# auto:出现异常时返回unack且消息回滚到mq,如果没有异常直接返回ack
# manual:手动控制
# none:丢弃消息不回滚到mq
acknowledge-mode: auto
retry:
enabled: false # 开启消费者失败重试
initial-interval: 5000ms # 初始的失败等待时长为几秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态;如果业务中包含事务需要改为false
第三步:在订单服务和学习中心服务的接口工程中引入rabbitmq-dev.yaml
配置文件
- data-id: rabbitmq-${spring.profiles.active}.yaml
group: xuecheng-plus-common
refresh: true
第四步: 在订单服务的service工程编写MQ配置类PayNotifyConfig
创建交换机和队列
@Slf4j
@Configuration
public class PayNotifyConfig implements ApplicationContextAware {
// 交换机
public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout";
// 支付结果通知消息类型
public static final String MESSAGE_TYPE = "payresult_notify";
// 支付通知队列
public static final String PAYNOTIFY_QUEUE = "paynotify_queue";
// 声明交换机且持久化
@Bean(PAYNOTIFY_EXCHANGE_FANOUT)
public FanoutExchange paynotify_exchange_fanout() {
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false);
}
//支付通知队列且持久化
@Bean(PAYNOTIFY_QUEUE)
public Queue course_publish_queue() {
return QueueBuilder.durable(PAYNOTIFY_QUEUE).build();
}
// 交换机和支付通知队列绑定
@Bean
public Binding binding_course_publish_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
// 交换机路由消息到队列的时候如果失败执行回调函数
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 消息处理service
MqMessageService mqMessageService = applicationContext.getBean(MqMessageService.class);
// 设置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 消息发送失败记录日志
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 解析消息内容,将消息再添加到消息表
MqMessage mqMessage = JSON.parseObject(message.toString(), MqMessage.class);
mqMessageService.addMessage(mqMessage.getMessageType(), mqMessage.getBusinessKey1(), mqMessage.getBusinessKey2(), mqMessage.getBusinessKey3());
});
}
}
第五步: 在学习中心服务编写MQ配置类PayNotifyConfig
创建交换机和队列,避免学习中心服务启动的时候监听的队列还没有创建,如果生产端已经创建就不再创建
@Slf4j
@Configuration
public class PayNotifyConfig implements ApplicationContextAware {
// 声明交换机,支付通知队列,交换机和支付通知队列绑定关系
// 不用设置回调函数,只有生产者才需要确认
}
重启订单服务,登录rabbitmq查看交换机自动创建成功
生产者发送信息
在订单服务的OrderService
中定义接口接收支付宝响应的通知消息结果并发送给学习中心服务
public interface OrderService {
/**
* 接收通知结果并发送给学习中心服务
* @param mq Message 消息
*/
void notifyPayResult(MqMessage mqMessage);
}
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
MqMessageService mqMessageService;
@Autowired
RabbitTemplate rabbitTemplate;
@Override
public void notifyPayResult(MqMessage mqMessage) {
// 1. 将消息体转为Json
String jsonMsg = JSON.toJSONString(mqMessage);
// 2. 设置消息的持久化方式为PERSISTENT,即消息会被持久化到磁盘上,确保即使在RabbitMQ服务器重启后也能够恢复消息
Message msgObj = MessageBuilder.withBody(jsonMsg.getBytes()).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
// 3. 封装CorrelationData,用于跟踪指定Id消息的相关信息
CorrelationData correlationData = new CorrelationData(mqMessage.getId().toString());
// 3.1 使用CorrelationData添加一个Callback对象指定回调方法,该对象用于在消息确认时处理消息的结果
correlationData.getFuture().addCallback(result -> {
if (result.isAck()) {
// 3.2 消息成功发送到交换机,删除消息表中的记录
log.debug("消息发送成功:{}", jsonMsg);
mqMessageService.completed(mqMessage.getId());
} else {
// 3.3 消息发送失败
log.error("消息发送失败,id:{},原因:{}", mqMessage.getId(), result.getReason());
}
}, ex -> {
// 3.4 消息异常可能是网络问题
log.error("消息发送异常,id:{},原因:{}", mqMessage.getId(), ex.getMessage());
});
// 4. 发送消息
rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT, "", msgObj, correlationData);
}
}
订单服务收到第三方平台的支付结果时,在saveAliPayStatus
方法中除了保存支付宝响应的结果信息
还需要向数据库消息表添加消息记录
并将消息封装好后发送给消费端
/**
* 保存支付结果信息,向数据库中的消息表添加消息并发送给消费端
* @param payStatusDto 支付结果信息
*/
@Transactional
@Override
public void saveAlipayStatus(PayStatusDto payStatusDto) {
// 1. 获取支付流水号
String payNo = payStatusDto.getOut_trade_no();
// 2. 查询数据库订单状态
XcPayRecord payRecord = getPayRecordByPayNo(payNo);
if (payRecord == null) {
XueChengPlusException.cast("未找到支付记录");
}
XcOrders order = xcOrdersMapper.selectById(payRecord.getOrderId());
if (order == null) {
XueChengPlusException.cast("找不到相关联的订单");
}
String statusFromDB = payRecord.getStatus();
// 2.1 已支付,直接返回
if ("600002".equals(statusFromDB)) {
return;
}
// 3. 查询支付宝交易状态
String tradeStatus = payStatusDto.getTrade_status();
// 3.1 支付宝交易已成功,保存订单表和交易记录表,更新交易状态
if ("TRADE_SUCCESS".equals(tradeStatus)) {
// 更新支付交易表
payRecord.setStatus("601002");
payRecord.setOutPayNo(payStatusDto.getTrade_no());
payRecord.setOutPayChannel("Alipay");
payRecord.setPaySuccessTime(LocalDateTime.now());
int updateRecord = xcPayRecordMapper.updateById(payRecord);
if (updateRecord <= 0) {
XueChengPlusException.cast("更新支付交易表失败");
}
// 更新订单表
order.setStatus("600002");
int updateOrder = xcOrdersMapper.updateById(order);
if (updateOrder <= 0) {
log.debug("更新订单表失败");
XueChengPlusException.cast("更新订单表失败");
}
}
// 4. 创建消息记录并保存到消息表中,参数1:支付结果类型通知;参数2:业务id;参数3:业务类型
MqMessage mqMessage = mqMessageService.addMessage("payresult_notify", order.getOutBusinessId(), order.getOrderType(), null);
// 5. 封装消息记录并发送给消费端
notifyPayResult(mqMessage);
}
消费者接收消息
在学习中心服务定义impl/ReceivePayNotifyService
类
- 监听消息队列接收支付结果, 当接收到消息后
更新选课记录表
的选课状态为选课成功,同时向我的课程表中
插入一条课程记录
@Slf4j
@Service
public class ReceivePayNotifyService {
@Autowired
MyCourseTablesService tablesService;
@RabbitListener(queues = PayNotifyConfig.PAYNOTIFY_QUEUE)
public void receive(Message message) {
// 1. 获取消息
MqMessage mqMessage = JSON.parseObject(message.getBody(), MqMessage.class);
// 2. 根据消息内容,更新选课记录,向我的课程表插入记录
// 2.1 消息类型,学习中心只处理支付结果的通知
String messageType = mqMessage.getMessageType();
// 2.2 选课id
String chooseCourseId = mqMessage.getBusinessKey1();
// 2.3 订单类型,60201表示购买课程
String orderType = mqMessage.getBusinessKey2();
// 3. 学习中心只负责处理支付结果的通知
if (PayNotifyConfig.MESSAGE_TYPE.equals(messageType)){
// 3.1 学习中心只负责购买课程类订单的结果
if ("60201".equals(orderType)){
// 3.2 保存选课记录
boolean flag = tablesService.saveChooseCourseStatus(chooseCourseId);
if (!flag){
XueChengPlusException.cast("保存选课记录失败");
}
}
}
}
}
在MyCourseTablesService
接口中定义方法更新选课记录的选课状态,同时向我的课程表添加选课记录(之前添加免费课程的时候已经实现过了)
public interface MyCourseTablesService {
/**
* 保存选课成功状态
* @param chooseCourseId
* @return
*/
public boolean saveChooseCourseSuccess(String chooseCourseId);
}
@Slf4j
@Service
public class MyCourseTablesServiceImpl implements MyCourseTablesService {
@Override
@Transactional
public boolean saveChooseCourseStatus(String chooseCourseId) {
// 1. 根据选课id,查询对应的选课记录
XcChooseCourse chooseCourse = chooseCourseMapper.selectById(chooseCourseId);
if (chooseCourse == null) {
log.error("接收到购买课程的消息,根据选课id未查询到课程,选课id:{}", chooseCourseId);
return false;
}
// 2. 选课状态为未支付时,更新选课状态为选课成功
if ("701002".equals(chooseCourse.getStatus())) {
chooseCourse.setStatus("701001");
int update = chooseCourseMapper.updateById(chooseCourse);
if (update <= 0) {
log.error("更新选课记录失败:{}", chooseCourse);
}
}
// 3. 向我的课程表添加记录
addCourseTables(chooseCourse);
return true;
}
}
public XcCourseTables addCourseTabls(XcChooseCourse xcChooseCourse){
//选课成功了才可以向我的课程表添加
String status = xcChooseCourse.getStatus();
if(!"701001".equals(status)){
XueChengPlusException.cast("选课没有成功无法添加到课程表");
}
XcCourseTables xcCourseTables = getXcCourseTables(xcChooseCourse.getUserId(), xcChooseCourse.getCourseId());
if(xcCourseTables!=null){
return xcCourseTables;
}
xcCourseTables = new XcCourseTables();
BeanUtils.copyProperties(xcChooseCourse,xcCourseTables);
xcCourseTables.setChooseCourseId(xcChooseCourse.getId());//记录选课表的逐渐
xcCourseTables.setCourseType(xcChooseCourse.getOrderType());//选课类型
xcCourseTables.setUpdateDate(LocalDateTime.now());
int insert = courseTablesMapper.insert(xcCourseTables);
if(insert<=0){
XueChengPlusException.cast("添加我的课程表失败");
}
return xcCourseTables;
}
通知支付结果测试
选择一门已发布的收费课程,如果在我的课程表存储则删除记录及其相关的选课记录及订单记录信息
- 进入课程详细页面,点击马上学习生成二维码进行支付
- 支付完成点击“支付完成”,观察订单服务控制台是否发送消息(使用内网穿透工具)
- 观察学习中心服务控制台是否接收到消息
- 观察数据库中的消息表的相应记录是否已删除,我的选课表中是否有对应的选课记录