SpringBoot整合RabbitMQ实现消息延迟队列(含源码)

环境依赖

SpringBoot 3.1.0

JDK 17

前期准备

安装MQ:  liunx+docker+rabbitmq安装延迟队列插件

实例

实现延迟队列的一种方式是在 RabbitMQ 中使用消息延迟插件,这个插件可以让你在消息发送时设置一个延迟时间,超过这个时间后消息才会被消费者接收到。下面是 SpringBoot 整合 RabbitMQ 实现延迟队列的简单步骤:

1.添加 RabbitMQ 的 Maven 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置 RabbitMQ

在 application.properties 配置文件中添加 RabbitMQ 的连接信息:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test
spring.rabbitmq.virtual-host=/
# 手动应答
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
#每次从队列中取一个,轮询分发,默认是公平分发
spring.rabbitmq.listener.simple.prefetch=1
# 开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5

3.配置文件

@Configuration
public class RabbitMQOrderConfig {

    /**
     * 订单交换机
     */
    public static final String ORDER_EXCHANGE = "order_exchange";
    /**
     * 订单队列
     */
    public static final String ORDER_QUEUE = "order_queue";
    /**
     * 订单路由key
     */
    public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";

    /**
     * 死信交换机
     */
    public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";
    /**
     * 死信队列 routingKey
     */
    public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";

    /**
     * 死信队列
     */
    public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";

    /**
     * 延迟时间 (单位:ms(毫秒))
     */
    public  static final Integer DELAY_TIME = 10000;


    /**
     * 创建死信交换机
     */
    @Bean("orderDeadLetterExchange")
    public Exchange orderDeadLetterExchange() {
        return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);
    }

    /**
     * 创建死信队列
     */
    @Bean("orderDeadLetterQueue")
    public Queue orderDeadLetterQueue() {
        return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();
    }

    /**
     * 绑定死信交换机和死信队列
     */
    @Bean("orderDeadLetterBinding")
    public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();
    }


    /**
     * 创建订单交换机
     */
    @Bean("orderExchange")
    public Exchange orderExchange() {
        return new TopicExchange(ORDER_EXCHANGE, true, false);
    }

    /**
     * 创建订单队列
     */
    @Bean("orderQueue")
    public Queue orderQueue() {
        Map<String, Object> args = new HashMap<>(3);
        //消息过期后,进入到死信交换机
        args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);

        //消息过期后,进入到死信交换机的路由key
        args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);

        //过期时间,单位毫秒
        args.put("x-message-ttl", DELAY_TIME);

        return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();
    }

    /**
     * 绑定订单交换机和队列
     */
    @Bean("orderBinding")
    public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();
    }
}

4.定义消息实体类

定义一个消息体类,用来存储需要发送的消息:

@Slf4j
@Data
@Builder
public class OrderMessage implements Serializable {

    /**
     * 商户订单号
     */
    private String orderId;

    /**
     * 支付宝订单号
     */
    private String tradeNo;
}

5.定义消息发送者

定义一个 RabbitMQ 消息发送者类,用来发送消息到 RabbitMQ:

@Slf4j
@Component
public class MessageSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendOrderMessage(OrderMessage message) {
        //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定
        rabbitTemplate.setMandatory(true);
        //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
        rabbitTemplate.setReturnsCallback(returned -> {
            int code = returned.getReplyCode();
            System.out.println("code=" + code);
            System.out.println("returned=" + returned);
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", message);

        log.info("===============延时队列生产消息====================");
        log.info("发送时间:{},发送内容:{}, {}ms后执行", LocalDateTime.now(), message, RabbitMQConfig.DELAY_TIME);
    }
}

6.定义消息消费者

定义一个 RabbitMQ 消息消费者类,用来接收并处理消息:

@Component
@Slf4j
@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)
public class OrderMQListener {

    @RabbitHandler
    public void consumer(OrderMessage orderMessage, Message message, Channel channel) throws IOException {
        log.info("收到消息:{}",new Date());
        log.info("msgTag:{}", message.getMessageProperties().getDeliveryTag());
        log.info("message:{}", message);
        log.info("content:{}", orderMessage);
    }
}

这里使用了 @RabbitListener 注解来将一个方法标记为一个 RabbitMQ 消息监听器,通过设置 queues 属性来指定监听的队列名称。

7.定义一个controller

@Slf4j
@Api(tags = "延迟消息接口")
@RestController
@RequestMapping("/rabbitmq_order_delay_message")
public class RabbitMQDelayMessageController {

    @Autowired
    private MessageSender sender;

    /**
     * 发送消息
     * @return
     */
    @RequestMapping(value = "/sendMsg", method = RequestMethod.GET)
    @ResponseBody
    public void sendMsg() {
        OrderMessage orderMessage = OrderMessage.builder().orderId(UUID.randomUUID().toString()).tradeNo(UUID.randomUUID().toString()).build();
        sender.sendOrderMessage(orderMessage);
    }
}

启动项目,请求运行结果:

image-20230704171525938

总的xml:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
    <groupId>com.xiaoleilu</groupId>
    <artifactId>hutool-all</artifactId>
    <version>3.0.7</version>
</dependency>

<dependency>
    <groupId>io.swagger</groupId>
    <artifactId>swagger-annotations</artifactId>
    <version>${swagger-annotations.version}</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.73</version>
    <scope>compile</scope>
</dependency>

问题总结

1.Invalid argument, ‘x-delayed-type’ must be an existing exchange type

需要创建一个交换机

image-20230703161834850

2.Connection refused: no further information

请检查配置 application.xml配置的rabbimq不生效,可以将配置放到application.properties

3.Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)

这种情况:

1.消费者内部重复签收导致签收异常

​ 解决方案:增加配置手动处理应答

  1. 配置新增
spring.rabbitmq.listener.simple.acknowledge-mode=manual #手动签收
  1. 代码里: 增加channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    public void consumer(String body, Message message, Channel channel) throws IOException {
        long msgTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("收到消息:" + new Date());
            System.out.println("msgTag=" + msgTag);
            System.out.println("message=" + message);
            System.out.println("body=" + body);
            channel.basicAck(msgTag, false);
        }catch (Exception e) {
            log.error("【订单延迟关闭处理异常】 接收到消息为:" + msgTag + " ,消息异常消费 : ", e);
        } finally {
            // 处理完之后手动签收(这里再次签收)
            channel.basicAck(msgTag, false);
        }
    }

2.已经是自动处理了,然后代码里还有手动处理channel.basicAck(msgTag, false)

​ 解决方案:去除channel.basicAck(msgTag, false)

4.Failed to convert message

消息发送和接收的方式不对 比如发送的是对象,则接收的也必须是对象,发送的是string ,接收的也必须是string

image-20230704174011064

image-20230704174057321

如果需要完整源码请关注公众号"架构殿堂" ,回复 "SpringBoot+RabbitMQ实现消息延迟队列"获得

写在最后

如果大家对相关文章感兴趣,可以关注公众号"架构殿堂",会持续更新AIGC,java基础面试题, netty, spring boot,spring cloud等系列文章,一系列干货随时送达!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/34484.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【JVM内存模型】—— 每天一点小知识

&#x1f4a7; J V M 内存模型 \color{#FF1493}{JVM内存模型} JVM内存模型&#x1f4a7; &#x1f337; 仰望天空&#xff0c;妳我亦是行人.✨ &#x1f984; 个人主页——微风撞见云的博客&#x1f390; &#x1f433; 《数据结构与算法》专栏的文章图文并茂&#x…

智谱AI-算法实习生(知识图谱方向)实习面试记录

岗位描述 没错和我的经历可以说是match得不能再match了&#xff0c;但是还是挂了hh。 面试内容 给我面试的是唐杰老师的博士生&#xff0c;方向是社交网络数据挖掘&#xff0c;知识图谱。不cue名了&#xff0c;态度很友好的 &#xff0c;很赞。 date&#xff1a;6.28 Q1 自…

【Spark】介绍,部署与快速入门

文章目录 介绍核心模块Spark CoreSpark SQLSpark StreamingSpark MLlibSpark GraphX 部署命令行Web UI提交应用Local 模式Standalone配置文件添加 JAVA_HOME 环境变量和集群对应的 master 节点启动集群配置历史服务添加日志存储路径添加日志配置webui 配置高可用 Yarn模式配置文…

使用npm install -g @vue/cli 命令安装最新的脚手架与Vue版本不匹配的问题

使用npm install -g vue/cli 命令安装最新的脚手架 创建项目时不要选择Vue版本&#xff0c;让它默认选择&#xff08;默认选择 Vue2&#xff09;否则会出现 vue版本和脚手架版本vue-cli 不兼容的问题&#xff08;怪哉&#xff09; 脚手架兼容vue2 不兼容vue3 &#xff1f; 不理…

2023 年 10 大前端发展趋势

新技术的出现和老技术的淘汰让前端开发者们需要不断地学习和更新知识。特别是在经济不好的情况下&#xff0c;是否掌握新的技术很大程度决定着你是否被淘汰。 虽然应用程序试图将网站替代&#xff0c;但前端 Web 开发业务仍在快速变化和增长&#xff0c;前端开发人员的功能并没…

配置Jenkins slave agent(通过jnlp)方式连接

上一章&#xff0c;使用ssh的方式添加了两个agent&#xff0c;并都成功完成了构建任务&#xff0c;这一章使用jnlp的方式配置agent&#xff0c;jnlp方式配置agent有个好处&#xff0c;就是agent是主动去找到Master请求连接的&#xff0c;master->agent的通道可以配置一个age…

Leetcode-每日一题【234.回文链表】

题目 给你一个单链表的头节点 head &#xff0c;请你判断该链表是否为回文链表。如果是&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,2,1]输出&#xff1a;true 示例 2&#xff1a; 输入&#xff1a;head…

要从HTML中提取img标签的src属性(图片链接),可以使用正则表达式方式。

1. 定义提取src属性的正则表达式: const srcRegex /<img\s(?:[^>]*?\s)?src\s*\s*(["])((?:[^\1"]|\\\1|.)*?)\1/g 这个正则会匹配类似<img src"http://example.com/1.jpg">中的src属性和括号中的连接。2. 调用字符串的matchAll()方法…

【数据仓库】Apache Doris介绍

Apache Doris介绍 Apache Doris应用场景 Apache Doris核心特性 Apache Doris架构 Doris数据模型三种 Aggregate模型介绍 Uniq模型介绍 在某些多维分析场景下,用户更关注的是如何保证Key的唯一性Key 唯一性约束。因此&#xff0c;我们引入了 Unig 的数据模型。该模型本质上是聚…

redis高可用(二)

redis高可用&#xff08;二&#xff09; 一、主从复制 1.概念 主从复制&#xff0c;是指将一台Redis服务器的数据&#xff0c;复制到其他的Redis服务器。前者称为主节点(Master)&#xff0c;后者称为从节点(Slave)&#xff1b;数据的复制是单向的&#xff0c;只能由主节点到…

爬虫入门指南(5): 分布式爬虫与并发控制 【提高爬取效率与请求合理性控制的实现方法】

文章目录 前言多线程与多进程多线程多进程多线程和多进程的选择 使用Scrapy框架实现分布式爬虫1. 创建Scrapy项目2. 配置Scrapy-Redis3. 创建爬虫4. 启动爬虫节点5. 添加任务到队列 并发控制与限制请求频率并发控制限制请求频率 未完待续... 前言 在进行爬虫任务时&#xff0c;…

开源 sysgrok — 用于分析、理解和优化系统的人工智能助手

作者&#xff1a;Sean Heelan 在这篇文章中&#xff0c;我将介绍 sysgrok&#xff0c;这是一个研究原型&#xff0c;我们正在研究大型语言模型 (LLM)&#xff08;例如 OpenAI 的 GPT 模型&#xff09;如何应用于性能优化、根本原因分析和系统工程领域的问题。 你可以在 GitHub …

Scrapy框架--settings配置 (详解)

目录 settings配置 官网-参考配置 配置文档 Scrapy默认BASE设置 settings配置 Scrapy框架中的配置文件&#xff08;settings.py&#xff09;是用来管理爬虫行为和功能的关键部分。它是一个Python模块&#xff0c;提供了各种配置选项&#xff0c;可以自定义和控制爬虫的行为。…

Excel实用技巧 如何将EXCEL中在同个单元格中的汉字和数字分开

右边字符串&#xff0c;左边数字 RIGHT(A1,LENB(A1)-LEN(A1)) LEFT(A1,2*LEN(A1)-LENB(A1)) 左边字符串&#xff0c;右边数字 LEFT(A1,LENB(A1)-LEN(A1)) RIGHT(A1,2*LEN(A1)-LENB(A1))

关注个人信息安全

近日&#xff0c;某高校毕业生在校期间窃取学校内网数据&#xff0c;收集全校学生个人隐私信息的新闻引发了人们对互联网生活中个人信息安全问题的再度关注。在大数据时代&#xff0c;算法分发带来了隐私侵犯&#xff0c;在享受消费生活等便捷权利的同时&#xff0c;似乎又有不…

jupyter-notebook使用指南

jupyter-notebook使用指南 jupyter-notebook安装[python版][anaconda版] jupyter-notebook如何导出PDF&#xff1f;【没解决&#xff0c;直接看最后&#xff0c;不要跟着操作&#xff01;】正常导出步骤安装Pandoc安装Xelatex问题没解决&#xff0c;懒得安装了&#xff0c;放弃…

【数据可视化】大作业(意向考研高校的数据可视化)

文章目录 前言一、数据介绍1.1 基本信息1.2 考研信息1.3 导师信息 二、预处理及分析2.1 数据预处理2.1.1 考研信息预处理2.1.2 导师信息预处理 2.2 数据分析 三、可视化方法及结果3.1 可视化方法3.2 可视化结果展示3.2.1 基本信息3.2.2 考研信息3.2.3 导师信息 四、总结五、附录…

WWDC2023 Metal swift 头显ARKit支持c c++ 开发

1 今年WWDC&#xff0c;我们看见了苹果的空间计算设备&#xff0c;visionOS也支持了c c API. 这有什么好处呢&#xff0c;不是说能够吸引更多c c开发者加入苹果开发者阵营&#xff0c;而是我们过去的很多软件&#xff0c;可以轻松对接到苹果的头显设备&#xff0c;让我们的软件…

AI 语音 - 人物音色训练

前情提要 2023-07-02 周日 杭州 阴晴不定 AI 入门三大项&#xff0c;AI 绘画基础学习&#xff0c;AI 语音合成&#xff0c;AI 智能对话训练&#xff0c;进入 AI 语音合成阶段了&#xff0c;搓搓小手很激动的&#xff0c;对于一个五音不全的我来说&#xff0c;这个简直了(摆脱…

DatenLord前沿技术分享 No.29

达坦科技专注于打造新一代开源跨云存储平台DatenLord&#xff0c;通过软硬件深度融合的方式打通云云壁垒&#xff0c;致力于解决多云架构、多数据中心场景下异构存储、数据统一管理需求等问题&#xff0c;以满足不同行业客户对海量数据跨云、跨数据中心高性能访问的需求。BSV的…