支付模块-基于消息队列发送支付通知消息

消息队列发送支付通知消息

需求分析

订单服务作为通用服务,在订单支付成功后需要将支付结果异步通知给其他对接的微服务,微服务收到支付结果根据订单的类型去更新自己的业务数据

在这里插入图片描述

技术方案

使用消息队列进行异步通知需要保证消息的可靠性即生产端将消息成功通知到服务端: 消息发送到交换机 --> 由交换机发送到队列 --> 消费者监听队列,收到消息进行处理,参考文章02- 使用Docker安装RabbitMQ-CSDN博客

  • 生产者确认机制: 发送消息前使用数据库事务将消息保证到数据库表中,成功发送到交换机将消息从数据库中删除

  • 配置MQ持久化(交换机、队列、发送消息):MQ收到消息持久化,当MQ重启时即使消息没有消费完也不会丢失

  • 消费者确认机制: 消费者消费成功,自动发送ACK,负责重试消费

发布订阅模式: 订单服务接收支付成功结果通知后创建一条消息发送给Fanout广播类型的交换机,学习中心服务绑定队列到交换机接收消息,参考文章04- 基于SpringAMQP封装RabbitMQ,消息队列的Work模型和发布订阅模型-CSDN博客

环境搭建

第一步: 在订单服务和学习中心服务中添加消息队列依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:在Nacos的dev环境下添加RabbitMQ的配置信息rabbitmq-dev.yaml,设置group为xuecheng-plus-common

spring:
  rabbitmq:
    host: 192.168.101.128 # 主机
    port: 5672 # 端口名
    username: root # 用户名
    password: root # 密码
    virtual-host: / # 虚拟主机
    publisher-confirm-type: correlated # 异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
    publisher-returns: true # 开启publish-return功能,同样是基于callback机制调用回调函数ReturnCallback
    template:
      mandatory: true # 定义消息路由失败时的策略,true表示调用ReturnCallback;false表示直接丢弃消息
    listener:
      simple:
      	# 每次只能获取一条消息,处理完成才能获取下一个消息
        prefetch: 1  
        # auto:出现异常时返回unack且消息回滚到mq,如果没有异常直接返回ack
        # manual:手动控制
        # none:丢弃消息不回滚到mq
        acknowledge-mode: auto 
        retry:
          enabled: false # 开启消费者失败重试
          initial-interval: 5000ms # 初始的失败等待时长为几秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态;如果业务中包含事务需要改为false

第三步:在订单服务和学习中心服务的接口工程中引入rabbitmq-dev.yaml配置文件

- data-id: rabbitmq-${spring.profiles.active}.yaml
  group: xuecheng-plus-common
  refresh: true

第四步: 在订单服务的service工程编写MQ配置类PayNotifyConfig创建交换机和队列

@Slf4j
@Configuration
public class PayNotifyConfig implements ApplicationContextAware {

    // 交换机
    public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout";
    // 支付结果通知消息类型
    public static final String MESSAGE_TYPE = "payresult_notify";
    // 支付通知队列
    public static final String PAYNOTIFY_QUEUE = "paynotify_queue";

    // 声明交换机且持久化
    @Bean(PAYNOTIFY_EXCHANGE_FANOUT)
    public FanoutExchange paynotify_exchange_fanout() {
        // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
        return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false);
    }

    //支付通知队列且持久化
    @Bean(PAYNOTIFY_QUEUE)
    public Queue course_publish_queue() {
        return QueueBuilder.durable(PAYNOTIFY_QUEUE).build();
    }

    // 交换机和支付通知队列绑定
    @Bean
    public Binding binding_course_publish_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }
	
    // 交换机路由消息到队列的时候如果失败执行回调函数
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 消息处理service
        MqMessageService mqMessageService = applicationContext.getBean(MqMessageService.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 消息发送失败记录日志
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                    replyCode, replyText, exchange, routingKey, message.toString());
            // 解析消息内容,将消息再添加到消息表
            MqMessage mqMessage = JSON.parseObject(message.toString(), MqMessage.class);
            mqMessageService.addMessage(mqMessage.getMessageType(), mqMessage.getBusinessKey1(), mqMessage.getBusinessKey2(), mqMessage.getBusinessKey3());

        });
    }
}

第五步: 在学习中心服务编写MQ配置类PayNotifyConfig创建交换机和队列,避免学习中心服务启动的时候监听的队列还没有创建,如果生产端已经创建就不再创建

@Slf4j
@Configuration
public class PayNotifyConfig implements ApplicationContextAware {
    // 声明交换机,支付通知队列,交换机和支付通知队列绑定关系
    
    // 不用设置回调函数,只有生产者才需要确认 
}

重启订单服务,登录rabbitmq查看交换机自动创建成功

在这里插入图片描述

生产者发送信息

在订单服务的OrderService中定义接口接收支付宝响应的通知消息结果并发送给学习中心服务

public interface OrderService {
    /**
 	* 接收通知结果并发送给学习中心服务
 	* @param mq	Message 消息
 	*/
    void notifyPayResult(MqMessage mqMessage);
}
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
    @Autowired
    MqMessageService mqMessageService;
    
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void notifyPayResult(MqMessage mqMessage) {
        // 1. 将消息体转为Json
        String jsonMsg = JSON.toJSONString(mqMessage);
        // 2. 设置消息的持久化方式为PERSISTENT,即消息会被持久化到磁盘上,确保即使在RabbitMQ服务器重启后也能够恢复消息
        Message msgObj = MessageBuilder.withBody(jsonMsg.getBytes()).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
        // 3. 封装CorrelationData,用于跟踪指定Id消息的相关信息
        CorrelationData correlationData = new CorrelationData(mqMessage.getId().toString());
        // 3.1 使用CorrelationData添加一个Callback对象指定回调方法,该对象用于在消息确认时处理消息的结果
        correlationData.getFuture().addCallback(result -> {
            if (result.isAck()) {
                // 3.2 消息成功发送到交换机,删除消息表中的记录
                log.debug("消息发送成功:{}", jsonMsg);
                mqMessageService.completed(mqMessage.getId());
            } else {
                // 3.3 消息发送失败
                log.error("消息发送失败,id:{},原因:{}", mqMessage.getId(), result.getReason());
            }
        }, ex -> {
            // 3.4 消息异常可能是网络问题
            log.error("消息发送异常,id:{},原因:{}", mqMessage.getId(), ex.getMessage());
        });
        // 4. 发送消息
        rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT, "", msgObj, correlationData);
    }
}

订单服务收到第三方平台的支付结果时,在saveAliPayStatus方法中除了保存支付宝响应的结果信息还需要向数据库消息表添加消息记录将消息封装好后发送给消费端

	/**
     * 保存支付结果信息,向数据库中的消息表添加消息并发送给消费端
     * @param payStatusDto 支付结果信息
     */
@Transactional
@Override
public void saveAlipayStatus(PayStatusDto payStatusDto) {
    // 1. 获取支付流水号
    String payNo = payStatusDto.getOut_trade_no();
    // 2. 查询数据库订单状态
    XcPayRecord payRecord = getPayRecordByPayNo(payNo);
    if (payRecord == null) {
        XueChengPlusException.cast("未找到支付记录");
    }
    XcOrders order = xcOrdersMapper.selectById(payRecord.getOrderId());
    if (order == null) {
        XueChengPlusException.cast("找不到相关联的订单");
    }
    String statusFromDB = payRecord.getStatus();
    // 2.1 已支付,直接返回
    if ("600002".equals(statusFromDB)) {
        return;
    }
    // 3. 查询支付宝交易状态
    String tradeStatus = payStatusDto.getTrade_status();
    // 3.1 支付宝交易已成功,保存订单表和交易记录表,更新交易状态
    if ("TRADE_SUCCESS".equals(tradeStatus)) {
        // 更新支付交易表
        payRecord.setStatus("601002");
        payRecord.setOutPayNo(payStatusDto.getTrade_no());
        payRecord.setOutPayChannel("Alipay");
        payRecord.setPaySuccessTime(LocalDateTime.now());
        int updateRecord = xcPayRecordMapper.updateById(payRecord);
        if (updateRecord <= 0) {
            XueChengPlusException.cast("更新支付交易表失败");
        }
        // 更新订单表
        order.setStatus("600002");
        int updateOrder = xcOrdersMapper.updateById(order);
        if (updateOrder <= 0) {
            log.debug("更新订单表失败");
            XueChengPlusException.cast("更新订单表失败");
        }
    }
    // 4. 创建消息记录并保存到消息表中,参数1:支付结果类型通知;参数2:业务id;参数3:业务类型
    MqMessage mqMessage = mqMessageService.addMessage("payresult_notify", order.getOutBusinessId(), order.getOrderType(), null);
    // 5. 封装消息记录并发送给消费端
    notifyPayResult(mqMessage);
}

消费者接收消息

在学习中心服务定义impl/ReceivePayNotifyService

  • 监听消息队列接收支付结果, 当接收到消息后更新选课记录表的选课状态为选课成功,同时向我的课程表中插入一条课程记录
@Slf4j
@Service
public class ReceivePayNotifyService {

    @Autowired
    MyCourseTablesService tablesService;

    @RabbitListener(queues = PayNotifyConfig.PAYNOTIFY_QUEUE)
    public void receive(Message message) {
        // 1. 获取消息
        MqMessage mqMessage = JSON.parseObject(message.getBody(), MqMessage.class);
        // 2. 根据消息内容,更新选课记录,向我的课程表插入记录
        // 2.1 消息类型,学习中心只处理支付结果的通知
        String messageType = mqMessage.getMessageType();
        // 2.2 选课id
        String chooseCourseId = mqMessage.getBusinessKey1();
        // 2.3 订单类型,60201表示购买课程
        String orderType = mqMessage.getBusinessKey2();
        // 3. 学习中心只负责处理支付结果的通知
        if (PayNotifyConfig.MESSAGE_TYPE.equals(messageType)){
            // 3.1 学习中心只负责购买课程类订单的结果
            if ("60201".equals(orderType)){
                // 3.2 保存选课记录
                boolean flag = tablesService.saveChooseCourseStatus(chooseCourseId);
                if (!flag){
                    XueChengPlusException.cast("保存选课记录失败");
                }
            }
        }
    }
}

MyCourseTablesService接口中定义方法更新选课记录的选课状态,同时向我的课程表添加选课记录(之前添加免费课程的时候已经实现过了)

public interface MyCourseTablesService {
	/**
     * 保存选课成功状态
     * @param chooseCourseId
     * @return
     */
    public boolean saveChooseCourseSuccess(String chooseCourseId);
}

@Slf4j
@Service
public class MyCourseTablesServiceImpl implements MyCourseTablesService {
    @Override
    @Transactional
    public boolean saveChooseCourseStatus(String chooseCourseId) {
        // 1. 根据选课id,查询对应的选课记录
        XcChooseCourse chooseCourse = chooseCourseMapper.selectById(chooseCourseId);
        if (chooseCourse == null) {
            log.error("接收到购买课程的消息,根据选课id未查询到课程,选课id:{}", chooseCourseId);
            return false;
        }
        // 2. 选课状态为未支付时,更新选课状态为选课成功
        if ("701002".equals(chooseCourse.getStatus())) {
            chooseCourse.setStatus("701001");
            int update = chooseCourseMapper.updateById(chooseCourse);
            if (update <= 0) {
                log.error("更新选课记录失败:{}", chooseCourse);
            }
        }
        // 3. 向我的课程表添加记录
        addCourseTables(chooseCourse);
        return true;
    }
}
public XcCourseTables addCourseTabls(XcChooseCourse xcChooseCourse){
    //选课成功了才可以向我的课程表添加
    String status = xcChooseCourse.getStatus();
    if(!"701001".equals(status)){
        XueChengPlusException.cast("选课没有成功无法添加到课程表");
    }
    XcCourseTables xcCourseTables = getXcCourseTables(xcChooseCourse.getUserId(), xcChooseCourse.getCourseId());
    if(xcCourseTables!=null){
        return xcCourseTables;
    }

    xcCourseTables = new XcCourseTables();
    BeanUtils.copyProperties(xcChooseCourse,xcCourseTables);
    xcCourseTables.setChooseCourseId(xcChooseCourse.getId());//记录选课表的逐渐
    xcCourseTables.setCourseType(xcChooseCourse.getOrderType());//选课类型
    xcCourseTables.setUpdateDate(LocalDateTime.now());
    int insert = courseTablesMapper.insert(xcCourseTables);
    if(insert<=0){
        XueChengPlusException.cast("添加我的课程表失败");
    }

    return xcCourseTables;
}

通知支付结果测试

选择一门已发布的收费课程,如果在我的课程表存储则删除记录及其相关的选课记录及订单记录信息

  • 进入课程详细页面,点击马上学习生成二维码进行支付
  • 支付完成点击“支付完成”,观察订单服务控制台是否发送消息(使用内网穿透工具)
  • 观察学习中心服务控制台是否接收到消息
  • 观察数据库中的消息表的相应记录是否已删除,我的选课表中是否有对应的选课记录

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/462453.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【echarts中解决适配窗口大小的问题】

适配窗口大小 一、基础适配方案&#xff1a;remflexible.js布局二、echart图表适配1.resize函数2.使用resize的前提&#xff01;重点&#xff01;不然resize极有可能失效 一、基础适配方案&#xff1a;remflexible.js布局 vscode插件&#xff1a;cssrem 编写好的less文件保存后…

什么又是线程呢??

线程&#xff1a; 线程可以并发的执行&#xff0c;但是线程的地址是可以共享的 进程与线程的比较&#xff1a; 进程>线程 线程分三种&#xff1a; 用户线程 只有用户程序的库函数来 用户线程 因为操作系统感知不到 线程&#xff0c;如果有线程在运行&#xff0c;然后不交…

Docker使用(二)Docker安装和常见典型操作

Docker使用(二)Docker安装和常见典型操作 二、软件安装 1、Docker安装 &#xff08;1&#xff09;环境准备 [rootlocalhost ~]# uname -r 3.10.0-327.el7.x86_64 # cat /etc/os-release &#xff08;2&#xff09;卸载旧版本 $ sudo yum remove docker \ ​ docker-cli…

内衣洗衣机怎么选?推荐四大表现突出的宝藏内衣洗衣机

近年来&#xff0c;家庭洗衣机的标准容量有了很大的提高&#xff0c;从5公斤、6公斤发展到9公斤和10公斤。大容量的洗衣机可以在家里清洗大尺寸的衣服和床单被子。不过&#xff0c;因为洗衣机里的所洗的衣服都是比较混杂的&#xff0c;所以很多时候&#xff0c;由于内衣袜子和婴…

HTML

什么是HTML&#xff1f; HTML是一门语言&#xff0c;所有的网页都是用HTML这门语言编写出来的 HTML&#xff08;HyperText Markup Language&#xff09;&#xff1a;超文本标记语言 超文本&#xff1a;超越了文本的限制&#xff0c;比普通文本更强大。除了文字信息&#xff…

Registry dubbo的注册中心

1. 架构说 注册中心。注册中心&#xff08;Registry&#xff09;在微服务架构中的作用举足轻重&#xff0c;有了它&#xff0c;服务提供者&#xff08;Provider&#xff09; 和消费者&#xff08;Consumer&#xff09; 就能感知彼此。从下面的 Dubbo 架构图中可知&#xff1a;…

零信任SDP是什么,有什么作用

物理边界曾经是可信网络和不可信网络之间的有效分割。防火墙通常部署于网络的边缘&#xff0c;基于静态策略来限制网路流量。位于防火墙内部的用户会被授予较高信任等级来访问企业的敏感资源&#xff0c;因为他们被默认是可信的。 然而随着云计算、移动互联、物联网、人工智能…

【Java探索之旅】数据类型与变量 字面常量 整型变量

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; Java编程秘籍 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f4d1;前言一、字面常量二、数据类型三、变量3.1 变量概念3.2 语法格式 四、整型变量4.1 整型变量…

HTTP服务器简单编译测试

目录 登入页面测试 登入测试 根目录请求测试 功能性请求访问测试 GET方法请求测试 POST请求方法测试 PUT方法请求测试 DELETE方法请求测试 本文为承接上文进行简单的测试 登入页面测试 登入测试 根目录请求测试 功能性请求访问测试 GET方法请求测试 POST请求方法测试 P…

python的小技巧一

文章目录 python的小技巧系列1、变量相关变量交换三元运算符一个数值的范围比较有的场景下使用 try...exception 代替if...else 2、字符串相关格式化连接字符串的分割字符串的连接 3、生成器4、列表相关取最后一个元素判断列表是否为空列表合并去除列表中的重复值判断某个值是包…

天天说微服务,天天开发RESTful API,那你知道RESTful API是什么东东吗?

RESTful API&#xff08;Representational State Transfer&#xff09;是一种基于网络的架构风格&#xff0c;用于设计和构建Web服务。它是一种轻量级的架构&#xff0c;可以通过HTTP协议进行通信&#xff0c;并支持各种数据格式&#xff0c;例如JSON和XML。 在现代的Web应用程…

[Linux_IMX6ULL应用开发]-hello程序的交叉编译

目录 【开发板、虚拟机和PC的三者联通】 使用串口连接到开发板 连接Ubuntu虚拟机 互ping测试 【交叉编译hello.c文件】 Ubuntu编译无法在板子运行问题 使用交叉编译链编译hello.c 【开发板、虚拟机和PC的三者联通】 在这里我们使用IMX6ULL-PRO开发板进行学习&#xff0c;…

软件工程(Software Engineering)

一、软件工程概述 1.软件生存周期 软件&#xff1a; 包含程序、数据及相关文档 软件工程&#xff1a; 涉及到软件开发、维护、管理等多方面的原理、工具与环境。最终的目的是开发高质量的软件。 目的&#xff1a; 提高软件生产率、提高软件质量、降低软件成本。 文档的作用&…

TinyEMU之Linux Kernel编译

TinyEMU之Linux Kernel编译 1 准备工作2 安装RISC-V交叉编译器3 编译Linux Kernel4 镜像格式转换 本文属于《 TinyEMU模拟器基础系列教程》之一&#xff0c;欢迎查看其它文章。 1 准备工作 我们需要&#xff0c;下载以下内容。 编译好的RISC-V交叉编译器&#xff1a;riscv64-…

判断语句-Python

师从黑马程序员 if判断语句 if 要判断的条件: 条件成立时&#xff0c;要做的事&#xff08;四个空格&#xff09; age 15 if age>18:print("已经成年了")print("即将步入大学生活") print("时间过得真快") 成年人判断 ageint(input(&quo…

2024年高处安装、维护、拆除证模拟考试题库及高处安装、维护、拆除理论考试试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2024年高处安装、维护、拆除证模拟考试题库及高处安装、维护、拆除理论考试试题是由安全生产模拟考试一点通提供&#xff0c;高处安装、维护、拆除证模拟考试题库是根据高处安装、维护、拆除最新版教材&#xff0c;高…

【MASM汇编语言快速入门】8086MASM汇编深入理解指令对标志位的影响

8086MASM汇编深入理解指令对标志位的影响 文章目录 8086MASM汇编深入理解指令对标志位的影响0. 指令对标志位影响1. 指令对标志位影响速查表2. flags标志寄存器: 标志位含义解读flags1. 状态标志cf, pf, af, zf, sf, of2. 控制标志df, if, tf 详解&#xff1a;1. 传送指令2. 算…

EOS 与ESD 区别

ESD: 英文&#xff1a;Electrical Static Discharge&#xff1b; 定义&#xff1a;不同静电电位的两个物体之间的电荷转移&#xff1b;中文释为静电放电。静电是一种客观的自然现象&#xff1b; EOS&#xff1a; 英文&#xff1a;Electrical Over Stress 定义&#xf…

Avalonia学习1:下载通用皮肤SukiUI,并在windows上启动成功

目录 1、引言 2、碰到的问题 1、下载下拉VS2022老版本的用不了。 2、升级后&#xff0c;发现没有装wsl&#xff0c;导致启动不了&#xff0c;但wsl又由于国内的关系安装不了&#xff0c;怎么办呢&#xff0c; 1、引言 最近在想有没有什么可以开发在Linux下运行…

【C++】了解一下编码

个人主页 &#xff1a; zxctscl 如有转载请先通知 文章目录 1. 前言2. ASCII编码3. unicode4. GBK5. 类型转换 1. 前言 看到string里面还有Template instantiations&#xff1a; string其实是basic_string<char>&#xff0c;它还是一个模板。 再看看wstring&#xff1…