第十六章 RabbitMQ延迟消息之延迟插件优化

目录

一、引言

二、优化方案 

三、核心代码实现

3.1. 生产者代码

3.2. 消息处理器 

3.3. 自定义多延迟消息封装类 

3.4. 订单实体类 

3.5. 消费者代码 

四、运行效果


一、引言

上一章节我们提到,直接使用延迟插件,创建一个延迟指定时间的消息(如10分钟),并不是最好的解决方案,因为假如我们的订单是在5分钟支付的,那么剩余的5分钟时间,RabbitMQ中延迟消息时钟还是一直占用着资源。如果有大量的延迟消息,那么对于服务来说压力是很大的,同时会耗费庞大昂贵的资源。因此,本章节我们就来近一步对延迟插件的消息进行优化。

我们通过下面的流程图来做近一步分析:

1. 用户下单完成后,发送15分钟延迟消息,在15分钟后接收消息,检查支付状态:

2. 已支付:更新订单状态为已支付

3. 未支付:更新订单状态为关闭订单,恢复商品库存

常规延迟插件消息使用的弊端总结:

1. 设置30分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:

2. 如果并发较高,30分钟可能堆积消息过多,对MQ压力很大

3. 大多数订单在下单后1分钟内就会支付,但是却需要在MQ内等待30分钟,浪费资源

二、优化方案 

如下图所示,我们可以将10分钟甚至30分钟拆分成多份零散的较短的时间。

消息初次发送的延迟时间设定为10s,10s过后如果订单还是未支付状态,我们判断延迟时间数组里还有没有剩余延迟时间,如果有则继续发送延迟消息,时间设定为数组中的第二个时间10s,直到订单支付成功终止循环,或是最后一份时间消耗完依然未支付,我们取消订单。

三、核心代码实现

3.1. 生产者代码

package com.example.publisher;

import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * 生产者
 */
@Slf4j
@SpringBootTest
class PublisherApplicationTests {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    void test() {
        Order order = Order.builder().orderId(1L).content("生活不易,所以保持足够的努力,对自己要有信心,积极地去面对工作生活的挑战!").build();
        MultiDelayMessage<Order> msg = MultiDelayMessage.of(order, 1000L, 5000L, 2000L, 10000L);

        rabbitTemplate.convertAndSend("delay.direct", "delay", msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelayLong(msg.removeNextDelay());
                return message;
            }
        });
    }
}

3.2. 消息处理器 

package com.example.publisher;

import lombok.AllArgsConstructor;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;

/**
 * 消息请求处理器
 */
@AllArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {

    private final Long delay;

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setDelayLong(delay);
        return message;
    }
}

3.3. 自定义多延迟消息封装类 

package com.example.publisher;

import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.util.CollectionUtils;

import java.io.Serializable;
import java.util.List;

/**
 * 自定义的多延时消息封装类
 * @param <T>
 */
@Data
@NoArgsConstructor
public class MultiDelayMessage<T> implements Serializable {
    /**
     * 消息体
     */
    private T data;

    /**
     * 记录延迟时间的集合
     */
    private List<Long> delayMillis;

    public MultiDelayMessage(T data, List<Long> delayMillis) {
        this.data = data;
        this.delayMillis = delayMillis;
    }

    public static <T> MultiDelayMessage<T> of(T data, Long...delayMillis) {
        return new MultiDelayMessage<>(data, (List<Long>) CollectionUtils.arrayToList(delayMillis));
    }

    /**
     * 获取并移除下一个延迟时间
     * @return 队列中的第一个延迟时间
     */
    public Long removeNextDelay() {
        return delayMillis.remove(0);
    }

    /**
     * 是否还有下一个延迟时间
     * @return
     */
    public boolean hasNextDelay() {
        return !delayMillis.isEmpty();
    }
}

3.4. 订单实体类 

package com.example.publisher;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * 订单类 
 * 此处为了演示,将真实业务中的订单类做了简化
 * 只包含一个订单ID和自定义消息内容
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Order {

    private Long orderId;

    private String content;
}

3.5. 消费者代码 

package com.example.consumer;

import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

/**
 * 消费者
 * 因为作为演示,所以商城支付、订单、及扣减库存的业务代码已注释
 * 注释中保留了整个商城下单支付扣减库存的流程步骤
 */
@Slf4j
@Component
public class SimpleListener {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(name = "delay.direct", delayed = "true"),
            key = "delay"
    ))
    public void listener(MultiDelayMessage<Order> msg) throws Exception {
        System.out.println(((Order)msg.getData()).getContent());
        // 1. 查询订单状态
        // Order order = orderService.getById(msg.getData())
        // 2. 判断是否已支付
//        if (Order == null || order.status == 2) {
//            订单不存在或者已处理则直接返回
//            return;
//        }
        // 主动去支付服务查询真正的支付状态
//        PayOrder payOrder = payService.getById(order.getId());
        // 2.1. 已支付,则标记订单为已支付
//        if (payOrder.isPay()) {
//            orderService.markOrderPaySuccess(order.getId());
//            return;
//        }
        // 2.2. 未支付,获取下次订单延迟时间
        // 3. 判断是否存在延迟时间
        if (msg.hasNextDelay()) {
            // 3.1 存在,重发延迟消息
            Long nextDelay = msg.removeNextDelay();

            rabbitTemplate.convertAndSend("delay.direct", "delay", msg, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setDelayLong(nextDelay);
                    return message;
                }
            });
            return;
        }
        // 3.2 不存在,取消订单
//        orderService.lambdaUpdate()
//                .set(Order::getStatus, 5);
//                .set(Order::getCloseTime, LocalDateTime.now());
//                .eq(Order::getId, order.getId())
//                .update();
        // 4. 恢复库存
    }
}

四、运行效果

最终我们会看到每间隔一段时间消费者就会消费一条消息,这个间隔时间就是我们设定的分段时间数组,这么做就能极大地减少资源消耗和服务的压力:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/890663.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C++算法】双指针

目录 一、快乐数&#xff1a; 二、有效三角形的个数&#xff1a; 三、盛最多水的容器&#xff1a; 四、复写0&#xff1a; 五、三数之和&#xff1a; 总结&#xff1a; 一、快乐数&#xff1a; 题目出处&#xff1a; 202. 快乐数 - 力扣&#xff08;LeetCode&#xff09…

ROS2 通信三大件之动作 -- Action

通信最后一个&#xff0c;也是不太容易理解的方式action&#xff0c;复杂且重要 1、创建action数据结构 创建工作空间和模块就不多说了 在模块 src/action_moudle/action/Counter.action 下创建文件 Counter.action int32 target # Goal: 目标 --- int32 current_value…

智能健康顾问:基于SpringBoot的系统

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统&#xff0c;它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等&#xff0c;非常…

Qt:图片文字转base64程序

目录 一.Base64 1.编码原理 2.应用场景 3.优点 4.限制 5.变种 二.文字与Base64互转 1.ui设计 2.文字转Base64 3.Base64转文字 三.图片与Base64互转 1.ui设计 2.选择图片与图片路径 3.图片转Base64 4.Base64转图片 四.清空设置 五.效果 六.代码 base64conver…

PDF编辑不求人!4款高效工具,内容修改从此变得简单又快捷

咱们现在生活在一个数字时代&#xff0c;PDF文件可不就是工作、学习还有日常生活中经常要用的东西嘛。但遇到那些需要改动的PDF文件&#xff0c;是不是就觉得有点头疼啊&#xff1f; 因为传统的PDF文件真的不好编辑&#xff0c;这确实挺烦人的。不过呢&#xff0c;我今天要给你…

【北京迅为】《STM32MP157开发板嵌入式开发指南》- 第三十九章 Linux Misc驱动

iTOP-STM32MP157开发板采用ST推出的双核cortex-A7单核cortex-M4异构处理器&#xff0c;既可用Linux、又可以用于STM32单片机开发。开发板采用核心板底板结构&#xff0c;主频650M、1G内存、8G存储&#xff0c;核心板采用工业级板对板连接器&#xff0c;高可靠&#xff0c;牢固耐…

SpringBoot下的智能健康推荐引擎

3系统分析 3.1可行性分析 通过对本基于智能推荐的卫生健康系统实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本基于智能推荐的卫生健康系统采用SSM框架&#…

24秋面试笔记

文章目录 一、专业技能1.1 具备扎实的Java基础&#xff0c;熟练掌握面向对象编码规范、集合、反射以及Java8特性等。1.1.1 Java基础1.1.2 集合1.1.3 Java8新特性 1.2 熟悉常用的数据结构(链表、栈、队列、二叉树等)&#xff0c;熟练使用排序、动态规划、DPS等算法。1.2.1 数据结…

CountUp.js 实现数字增长动画 Vue

效果&#xff1a; 官网介绍 1. 安装 npm install --save countup.js2. 基本使用 // template <span ref"number1Ref"></span>// script const number1Ref ref<HTMLElement>() onMounted(() > {new CountUp(number1Ref.value!, 9999999).sta…

Centos7 搭建单机elasticsearch

以下是在 CentOS 7 上安装 Elasticsearch 7.17.7 的完整步骤&#xff1a;&#xff08;数据默认保存在/var/lib/elasticsearch下&#xff0c;自行更改&#xff09; 一、装 Java 环境 Elasticsearch 是用 Java 编写的&#xff0c;所以需要先安装 Java 运行环境。 检查系统中是…

弘景光电:以创新为翼,翱翔光学科技新蓝海

在科技日新月异的今天&#xff0c;光学镜头及模组作为智能设备的核心组件&#xff0c;其重要性日益凸显。广东弘景光电科技股份有限公司&#xff08;以下简称“弘景光电”&#xff09;正是在这一领域中&#xff0c;凭借其卓越的研发实力和市场洞察力&#xff0c;即将在创业板上…

001 Qt_从零开始创建项目

文章目录 前言什么是QtQt的优点Qt的应用场景创建项目小结 前言 本文是Qt专栏的第一篇文章&#xff0c;该文将会向你介绍如何创建一个Qt项目 什么是Qt Qt 是⼀个 跨平台的 C 图形⽤⼾界⾯应⽤程序框架 。它为应⽤程序开发者提供了建⽴艺术级图形界⾯所需的所有功能。它是完全…

英特尔新旗舰 CPU 将运行更凉爽、更高效,适合 PC 游戏

英特尔终于解决了台式机 CPU 发热和耗电的问题。英特尔的新旗舰 Core Ultra 200S 系列处理器将于 10 月 24 日上市&#xff0c;该系列专注于每瓦性能&#xff0c;比之前的第 14 代芯片运行更凉爽、更高效。这些代号为 Arrow Lake S 的处理器也是英特尔首款内置 NPU&#xff08;…

Unity3D 观察者模式

Unity3D 泛型事件系统 观察者模式 观察者模式是一种行为设计模式&#xff0c;通过订阅机制&#xff0c;可以让对象触发事件时&#xff0c;通知多个其他对象。 在游戏逻辑中&#xff0c;UI 界面通常会监听一些事件&#xff0c;当数据层发生变化时&#xff0c;通过触发事件&am…

LabVIEW提高开发效率技巧----状态保存与恢复

在LabVIEW开发中&#xff0c;保存和恢复程序运行时的状态是一个关键技巧&#xff0c;特别是在涉及需要暂停或恢复操作的应用中。通过使用 Flatten To String 和 Unflatten From String 函数&#xff0c;开发人员可以将程序当前的状态转换为字符串并保存&#xff0c;再在需要时恢…

决策树随机森林-笔记

决策树 1. 什么是决策树&#xff1f; 决策树是一种基于树结构的监督学习算法&#xff0c;适用于分类和回归任务。 根据数据集构建一棵树&#xff08;二叉树或多叉树&#xff09;。 先选哪个属性作为向下分裂的依据&#xff08;越接近根节点越关键&#xff09;&#xff1f;…

人工智能和机器学习之线性代数(一)

人工智能和机器学习之线性代数&#xff08;一&#xff09; 人工智能和机器学习之线性代数一将介绍向量和矩阵的基础知识以及开源的机器学习框架PyTorch。 文章目录 人工智能和机器学习之线性代数&#xff08;一&#xff09;基本定义标量&#xff08;Scalar&#xff09;向量&a…

机器视觉AI场景为什么用Python比C++多?

好多开发者在讨论机在机器视觉人工智能领域的时候&#xff0c;纠结到底是用Python还是C&#xff0c;实际上&#xff0c;Python 和 C 都有广泛的应用&#xff0c;选择 Python而不是 C 可能有以下一些原因&#xff1a; 语言易学性和开发效率 语法简洁&#xff1a; Python 语法简…

软考系统分析师知识点十:软件工程

前言 今年报考了11月份的软考高级&#xff1a;系统分析师。 考试时间为&#xff1a;11月9日。 倒计时&#xff1a;27天。 目标&#xff1a;优先应试&#xff0c;其次学习&#xff0c;再次实践。 复习计划第一阶段&#xff1a;扫平基础知识点&#xff0c;仅抽取有用信息&am…

【消息队列】Kafka从入门到面试学习总结

国科大学习生活&#xff08;期末复习资料、课程大作业解析、大厂实习经验心得等&#xff09;: 文章专栏&#xff08;点击跳转&#xff09; 大数据开发学习文档&#xff08;分布式文件系统的实现&#xff0c;大数据生态圈学习文档等&#xff09;: 文章专栏&#xff08;点击跳转&…