【SpringBoot】SpringBoot整合RabbitMQ消息中间件,实现延迟队列和死信队列

  📝个人主页:哈__

期待您的关注 

目录

 一、🔥死信队列

RabbitMQ的工作模式

 死信队列的工作模式

 二、🍉RabbitMQ相关的安装 

三、🍎SpringBoot引入RabbitMQ

1.引入依赖

2.创建队列和交换器

2.1 变量声明 

2.2 创建延迟交换器

2.3 创建延迟队列

2.4 延迟队列绑定延迟交换器

2.5 死信队列配置

3. 添加application.yml

4. 添加RabbitMQListener (消费者)

5. 创建DelayMessageSender 

6. 创建Controller 

7.测试 

四、🍌死信队列的应用场景


 一、🔥死信队列

RabbitMQ的死信队列(Dead Letter Queue,DLQ)是一种特殊的队列,用于接收其他队列中的“死信”消息。所谓“死信”,是指满足一定条件而无法被消费者正确处理的消息,这些条件包括消息被拒绝、消息过期、消息达到最大重试次数等。

当消息成为死信时,RabbitMQ会将其重新发送到指定的死信队列,而不是丢弃它们。这样做的好处是可以对死信进行分析和处理,例如记录日志、重新入队或者进一步处理。

死信队列通常与RabbitMQ的延迟队列(Delayed Message Queue)一起使用,通过延迟队列延迟消息的处理时间,可以更容易地触发消息成为死信的条件,从而进行测试和调试。

死信队列在消息中间件中有许多实际应用场景,主要用于处理无法被正常消费的消息,增强了消息的可靠性和处理能力。以下是一些常见的应用场景:

  1. 延迟消息处理:通过将消息发送到延迟队列,在指定的时间后再将消息发送到目标队列,实现延迟处理消息的功能。

  2. 消息重试:当消费者无法处理消息时,消息可以被重新发送到队列并设置重试次数,达到最大重试次数后转发到死信队列,以便进行进一步处理。

  3. 异常处理:当消息无法被消费者正常处理时(如格式错误、业务异常等),将消息转发到死信队列,用于记录日志、报警或人工处理。

  4. 消息超时处理:当消息在队列中等待时间过长时,可以设置消息的过期时间(TTL),超过时间后将消息转发到死信队列。

  5. 消息路由失败:当消息无法被正确路由到目标队列时,可以将消息发送到死信队列,避免消息丢失。

  6. 消息版本兼容性处理:当消息的格式或内容发生变化时,通过死信队列可以处理老版本消息,确保新版本系统的兼容性。


RabbitMQ的工作模式


 死信队列的工作模式

今天我要实现的就是这个延迟队列和死信队列。生产者首先向延迟队列发送消息,待达到TTL后消息会被转送到死信队列当中,消费者会从死信队列中获取消息进行消费。

 二、🍉RabbitMQ相关的安装 

win10 安装rabbitMQ详细步骤_rabbitmq 安装-CSDN博客

我这里直接引用别人的文章了,下载需要大家去看一看。

RabbitMQ延迟插件的安装。

[超详细]RabbitMQ安装延迟消息插件_rabbitmq安装延迟插件-CSDN博客

三、🍎SpringBoot引入RabbitMQ

1.引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </dependency>

2.创建队列和交换器

这一步是很重要的,如果你配置错误了,消息很可能无法正确的传送。要实现延迟队列和死信队列,我们一共要创建以下几个组件:

  1. 延迟队列
  2. 延迟队列的交换器
  3. 死信队列
  4. 死信队列的交换器

在我们创建了这几个组件之后,我们还要干一些事情,我们需要把这些组件进行组装,如果你不了解RabbitMQ的基础,你可以先看看基础教学,我这里简单的说一下。RabbitMQ中有一种绑定方式,这种绑定方式会把BindingKey和RoutingKey完全匹配的进行绑定,如下图所示,生产者发送了一个BindingKey为“warning”的消息,那么这个消息就会被发送到Queue1和Queue2,这并不难理解。

我们要做的就是把队列和交换器通过一个RoutingKey绑定在一起。


2.1 变量声明 

 接下来的代码要好好看了,首先我们把我们后边要用到的名称变量全部定义出来。因为这个名称起的很长,我们不方便直接使用。创建DeadRabbitConfig。在类中定义如下变量,延迟队列交换器名称、延迟队列名称、延迟队列Routing名称。除此之外还有死信队列交换器名称、死信队列名称和死信Routing名称。

  // 延迟队列交换器名称
    public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";
    // 延迟队列A名称
    public static final String DELAY_QUEUE_A_NAME = "delay.queue.demo.business.queue_a";
    // 延迟队列B名称
    public static final String DELAY_QUEUE_B_NAME = "delay.queue.demo.business.queue_b";
    // 延迟队列routingA名称
    public static final String DELAY_QUEUE_ROUTING_A_NAME = "delay.queue.demo.business.queue_a.routing_key";
    // 延迟队列routingB名称
    public static final String DELAY_QUEUE_ROUTING_B_NAME = "delay.queue.demo.business.queue_b.routing_key";

    // 死信队列
    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routing_key";
    public static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routing_key";
    public static final String DEAD_LETTER_QUEUE_A_NAME = "delay.queue.demo.deadletter.queue_a";
    public static final String DEAD_LETTER_QUEUE_B_NAME = "delay.queue.demo.deadletter.queue_b";

2.2 创建延迟交换器

// 注册延迟交换器delayExchange
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return  new DirectExchange(DELAY_EXCHANGE_NAME);
    }

2.3 创建延迟队列

这里的延迟队列需要我们额外的配置一些参数,用于和死信队列进行信息发送。这里我是用了两种不同的方式构建延迟队列A和延迟队列B,在延迟队列A种我没有设置TTL参数,而是通过RabbitMQ的延迟插件实现的,而延迟队列B我设置了TTL为10000ms,也就是十秒,十秒内消息如果没有被消费掉就会发送到死信队列。

// 注册延迟队列A   还要绑定死信交换器和死信routingA
    @Bean("delayQueueA")
    public Queue delayQueueA(){
        Map<String,Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_A_ROUTING_KEY);
        //args.put("x-message-ttl",6000);
        return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(args).build();
    }
    // 注册延迟队列B   还要绑定死信交换器和死信routingB
    @Bean("delayQueueB")
    public Queue delayQueueB(){
        Map<String,Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_B_ROUTING_KEY);
        args.put("x-message-ttl",10000);
        return QueueBuilder.durable(DELAY_QUEUE_B_NAME).withArguments(args).build();
    }

2.4 延迟队列绑定延迟交换器

 // 延迟队列A绑定交换器
    @Bean
    public Binding delayQueueABinding(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange delayExchange){
        return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_A_NAME);
    }
    // 延迟队列B绑定交换器
    @Bean
    public Binding delayQueueBBinding(@Qualifier("delayQueueB") Queue queue,@Qualifier("delayExchange") DirectExchange delayExchange){
        return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_B_NAME);
    }

2.5 死信队列配置

与延迟队列不同的是,死信队列并没有配置延迟参数。

// 注册死信队列A
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUE_A_NAME);
    }
    // 注册死信队列B
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUE_B_NAME);
    }
    // 注册死信交换器
    @Bean
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }
    // 死信队列A绑定死信交换器
    @Bean
    public Binding deadLetterQueueABinding(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange deadLetterExchange){
        return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY);
    }
    // 死信队列B绑定死信交换器
    @Bean
    public Binding deadLetterQueueBBinding(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange")DirectExchange deadLetterExchange){
        return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_B_ROUTING_KEY);
    }

到此为止,RabbitMQ的组件配置完成。


3. 添加application.yml

server:
  port: 8081
spring:
  application:
    name: test-rabbitmq-producer
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

4. 添加RabbitMQListener (消费者)

下方的代码一共有两个消费者,一个消费者获取死信队列A中的消息,另一个消费者获取死信队列B中的消息。

@Component
public class DeadLetterQueueConsumer {
    public static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterQueueConsumer.class);

    @RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_A_NAME,ackMode = "MANUAL")
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        LOGGER.info("当前时间:{},死信队列A收到消息:{}", new Date().toString(), msg);
        System.out.println(message.getMessageProperties().getDeliveryTag());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_B_NAME,ackMode = "MANUAL")
    public void receiveB(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        LOGGER.info("当前时间:{},死信队列B收到消息:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

5. 创建DelayMessageSender 

这里采用的就是两种不同的方式,一种方式是使用插件来延迟消息的发送,另一种是通过TTL参数。

@Component
public class DelayMessageSender {
    @Resource
    RabbitTemplate rabbitTemplate;


    public void sendMessage(String msg,Integer delayTimes){
        switch (delayTimes){
            case 6:
                rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_EXCHANGE_NAME, DeadRabbitConfig.DELAY_QUEUE_ROUTING_A_NAME,msg,new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setExpiration(String.valueOf(6000));
                        return message;
                    }
                });
                break;
            case 10:
                rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_QUEUE_B_NAME,msg);
                break;
        }
    }
}

6. 创建Controller 

@RestController
@RequestMapping("/student")
public class StudentController {
    @Autowired
    DelayMessageSender messageSender;
    @RequestMapping("/send-message")
    public String sendMessage(String msg,Integer delayTimes){
        System.out.println(new Date());
        messageSender.sendMessage(msg,delayTimes);
        return "发送成功";
    }
}

7.测试 

在浏览器中输入以下地址进入RabbitMQ界面。账号密码都是guest。

http://localhost:15672/

 先来看看我们的初始队列。这里是什么都没有的。


然后我们启动项目后在看。我们刚才创建出来的四个队列全部都被加载了出来。


 使用PostMan发送一次请求。


 我们的请求在17s的时候发送到后端,消息打印在23s,说明我们的延迟队列有效果。


接下来我们测试10s的延迟队列。


 10s后死信队列B成功的接收到了消息。

四、🍌死信队列的应用场景

延迟队列通常用于需要延迟执行某些任务或触发某些事件的场景。例如,在电子商务中,可以使用延迟队列实现订单超时未支付自动取消功能。

  • 1.订单创建

    • 用户下单后,系统生成订单,并将订单信息发送到一个普通队列,同时设置一个TTL(Time-To-Live)为30分钟。
    • 这个队列配置了死信交换机(Dead Letter Exchange, DLX),当消息过期后会被转发到死信队列。
  • 2.等待支付

    • 在30分钟内,用户可以完成支付。如果用户在30分钟内支付完成,系统会从普通队列中移除对应的消息并正常处理订单。
  • 3.订单超时处理

    • 如果用户未在30分钟内完成支付,消息会自动过期并转发到死信交换机,进而转发到死信队列。
  • 4.取消订单

    • 系统有一个专门的消费者监听死信队列。当有消息进入死信队列时,消费者会自动处理这些消息,即取消订单、释放库存,并通知用户订单已取消。
  • 5.定时任务(可选):

    • 虽然死信队列已经提供了超时订单的处理,但为了防止消息丢失或处理延迟,可以设置一个定时任务定期检查订单状态,确保所有超时未支付的订单都得到了处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/690560.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[经验] 昆山教育网(昆山教育网中小学报名) #媒体#职场发展#微信

昆山教育网&#xff08;昆山教育网中小学报名&#xff09; 昆山教育局网站 网站&#xff1a;昆山市教育局 昆山市教育局全面贯彻执行党和国家的教育方针、政策&#xff0c;落实有关教育工作的法律、法规&#xff1b;负责制定本市教育工作的实施意见和措施&#xff0c;并监督…

WEB漏洞服务能提供哪些帮助

在数字化浪潮的推动下&#xff0c;Web应用程序已成为企业展示形象、提供服务、与用户进行交互的重要平台。然而&#xff0c;随着技术的飞速发展&#xff0c;Web应用程序中的安全漏洞也日益显现&#xff0c;成为网络安全的重大隐患。这些漏洞一旦被恶意攻击者利用&#xff0c;可…

C++笔记之一个函数多个返回值的方法、std::pair、std::tuple、std::tie的用法

C++笔记之一个函数多个返回值的方法、std::pair、std::tuple、std::tie的用法 —— 2024-06-08 杭州 code review! 文章目录 C++笔记之一个函数多个返回值的方法、std::pair、std::tuple、std::tie的用法一.从一个函数中获取多个返回值的方法1. 使用结构体或类2. 使用`std::t…

C# MES通信从入门到精通(11)——C#如何使用Json字符串

前言 我们在开发上位机软件的过程中,经常需要和Mes系统进行数据交互,并且最常用的数据格式是Json,本文就是详细介绍Json格式的类型,以及我们在与mes系统进行交互时如何组织Json数据。 1、在C#中如何调用Json 在C#中调用Json相关的对象的话,需要引用Newtonsoft.Json的dl…

三十六篇:未来架构师之道:掌握现代信息系统典型架构

未来架构师之道&#xff1a;掌握现代信息系统典型架构 1. 引言 在企业的数字化转型浪潮中&#xff0c;信息系统架构的角色变得日益重要。它不仅承载了企业的IT战略&#xff0c;更是确保企业在复杂、动态的市场环境中稳定运行的关键。作为信息系统的骨架&#xff0c;一个精心设…

贪心算法学习二

例题一 解法&#xff08;贪⼼&#xff09;&#xff1a; 贪⼼策略&#xff1a; 由于只能交易⼀次&#xff0c;所以对于某⼀个位置 i &#xff0c;要想获得最⼤利润&#xff0c;仅需知道前⾯所有元素的最⼩ 值。然后在最⼩值的位置「买⼊」股票&#xff0c;在当前位置「卖出」…

Spring进阶技巧:利用AOP提前介入的巧妙实践

Spring框架中的面向切面编程&#xff08;AOP&#xff09;是一种强大的机制&#xff0c;它允许开发者在不修改原有代码的情况下&#xff0c;对程序进行横向切面的功能扩展。AOP提供了一种方式&#xff0c;可以在目标Bean的生命周期早期阶段就实施切面逻辑&#xff0c;这为我们在…

JMS VS AMQP

JMS&#xff08;Java Message Service&#xff09;是一个为Java平台设计的API&#xff0c;主要针对Java开发者提供了一套用于企业级消息服务的标准接口。而AMQP&#xff08;Advanced Message Queuing Protocol&#xff09;是一个应用层协议&#xff0c;它提供了一个开放的、标准…

代码随想录算法训练营第十六天| 找树左下角的值、路径总和、 从中序与后序遍历序列构造二叉树

找树左下角的值 题目链接&#xff1a;找树左下角的值 文档讲解&#xff1a;代码随想录 状态&#xff1a;递归没想到中右左的遍历顺序&#xff0c;迭代想出来了 思路&#xff1a;需要找最大深度&#xff0c;然后使用中右左的遍历顺序找最左节点 题解&#xff1a; int res 0;int…

【RK3568】制作Android11开机动画

Android 开机 logo 分为两种&#xff1a;静态显示和动态显示。静态显示就是循环显示一张图片&#xff1b;动态显示就是以特定帧率顺序显示多张图片 1.准备 android logo 图片 Android logo最好是png格式的&#xff0c;因为同一张图片的情况下&#xff0c;png 格式的比 jpg和b…

【ARM Cache 与 MMU 系列文章 7.6 -- ARMv8 MMU 配置 寄存器使用介绍】

请阅读【ARM Cache 及 MMU/MPU 系列文章专栏导读】 及【嵌入式开发学习必备专栏】 文章目录 MMU 转换控制寄存器 TCR_ELxTCR_ELx 概览TCR_ELx 寄存器字段详解TCR 使用示例Normal MemoryCacheableShareability MMU 内存属性寄存器 MAIR_ELxMAIR_ELx 寄存器结构内存属性字段Devic…

汇编:数组定义数据填充

数组的定义 在32位汇编语言中&#xff0c;定义数组时&#xff0c;通常使用定义数据指令&#xff08;如 DB, DW, DD,DQ &#xff09;和标签来指定数组的名称和内容。DB定义字节数组&#xff08;每个元素占1字节&#xff09;、DW定义字数组&#xff08;每个元素占2字节&#xff…

学习Canvas过程中2D的方法、注释及感悟一(通俗易懂)

1.了解Canvas&#xff1a; Canvas是前端一个很重要的知识点&#xff0c;<canvas>标签用于创建画布绘制图形&#xff0c;通过JavaScript进行操作。它为开发者提供一个动态绘制图形的区域&#xff0c;用于创建图标、游戏动画、图像处理等。 对于能够熟练使用Canvas的开发者…

计算机网络--计算机网络概念

计算机网络--计算机网络概念 计算机网络--物理层 计算机网络--数据链路层 计算机网络--网络层 计算机网络--传输层 计算机网络--应用层 0.计算机网络简介 0.2 计算机网络的功能简介 数据通信(连通性)资源共享&#xff1a; 软件硬件数据 分布式处理 多台计算机各自承担同…

Ant Design+react 表单只读

表单禁用&#xff0c;样式不好看&#xff0c;不符合甲方标准&#xff0c;看了一下文档&#xff0c;select、radio等都不支持只读状态。 解决方法&#xff1a; 利用css3的point-events属性&#xff0c;设置为none 在查看弹窗时&#xff0c;传入一个变量&#xff0c;当变量为true…

React实战(一)初始化项目、配置router、redux、axios

(一)初始化项目 1.安装项目 npx create-react-app 项目名 编译报错&#xff1a; 解决办法&#xff1a;安装最新的babel-preset-react-app npm install babel-preset-react-applatest 2.配置项目 (1)配置文件目录 (2)使用craco配置webpack.config npm install craco/crac…

wx 生命周期

以下内容你不需要立马完全弄明白&#xff0c;不过以后它会有帮助。 下图说明了页面 Page 实例的生命周期。

spring源码解析-(1)关于Bean

什么是Bean&#xff1f; 是spring对所有注入到IoC容器中的类的统称。 我们要注册进入spirng的bean千奇百怪&#xff0c;所以spring必须需要使用一个统一的定义来标识bean&#xff0c;就有了接下来的BeandDefinition&#xff0c;通过名称我们就可以知道&#xff0c;他是对bean…

IEDA 默认依赖概述

IEDA 默认依赖概述 目录概述需求&#xff1a; 设计思路实现思路分析1.AI Azure OpenAlAzure Al SearchAmazon BedrockChroma Vector DatabaseMilvus Vector DatabaseMistral AlNeo4J Vector DatabaseOllamaOpenAlPGvector Vector DatabasePinecone Vector DatabasePostgresMLRe…

IM即时通信技术

本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl IM&#xff0c;即Instant Messaging&#xff0c;是指即时通讯技术&#xff0c;它允许用户通过互联网实时交换文本、语音、视频、文件等多种形式的信息。这种技术打破了传统通信方式的时…