RabbitMQ 是如何做延迟消息的 ?——Java全栈知识(15)

RabbitMQ 是如何做延迟消息的 ?

1、什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用 basic.rejectbasic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递
    如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。
    死信交换机有什么作用呢?
  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因 TTL(有效期)到期的消息

2、死信队列

架构:
image.png
由于第一个队列没有消费者,所以可以在第一个队列中设置 TTL,当消息过期的时候,这个消息就变成了死信,被丢掉私信交换机中,以此实现延迟任务功能。

3、延迟消息

前面两种作用场景可以看做是把死信交换机当做一种消息处理的最终兜底方案,与消费者重试时讲的RepublishMessageRecoverer作用类似。

而最后一种场景,大家设想一下这样的场景: 如图,有一组绑定的交换机(ttl.fanout)和队列(ttl.queue)。但是ttl.queue没有消费者监听,而是设定了死信交换机hmall.direct,而队列direct.queue1则与死信交换机绑定,RoutingKey是blue:
假如我们现在发送一条消息到ttl.fanout,RoutingKey为blue,并设置消息的有效期为5000毫秒: image.png注意:尽管这里的ttl.fanout不需要RoutingKey,但是当消息变为死信并投递到死信交换机时,会沿用之前的RoutingKey,这样hmall.direct才能正确路由消息。
消息肯定会被投递到 ttl.queue 之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信: image.png 死信被再次投递到死信交换机 hmall.direct,并沿用之前的 RoutingKey,也就是 blueimage.png 由于 direct.queue1hmall.direct 绑定的 key 是 blue,因此最终消息被成功路由到 direct.queue1,如果此时有消费者与 direct.queue1 绑定,也就能成功消费消息了。但此时已经是5秒钟以后了: image.png 也就是说,publisher 发送了一条消息,但最终 consumer 在5秒后才收到消息。我们成功实现了延迟消息

[!info]
而且,RabbitMQ 中的这个 TTL 是可以设置任意时长的,这相比于 RocketMQ 只支持一些固定的时长而显得更加灵活一些。

死信队列消息堆积问题

[!danger] 死信队列消息堆积问题
但是,死信队列的实现方式存在一个问题,那就是可能造成队头阻塞。RabbitMQ 会定期扫描队列的头部检查队首的消息是否过期。如果队首消息过期了,它会被放到死信队列中。然而,RabbitMQ 不会逐个检查队列中的所有消息是否过期,而是仅检查队首消息。这样,如果队列的队头消息未过期,而它后面的消息已过期,这些后续消息将无法被单独移除,直到队头的消息被消费或过期。
因为队列是先进先出的,在普通队列中的消息,每次只会判断邢队头的消息是否过期,那么,如果队头的消息时间很长,一直都不过期,那么就会阻塞整个队列,这时候即使排在他后面的消息过期了,那么也会被一直阻塞。

基于 RabbitMQ 的死信队列,可以实现延迟消息,非常灵活的实现定时关单,并且借助 RabbitMQ 的集群扩展性,可以实现高可用,以及处理大并发量。他的缺点第一是可能存在消息阻塞的问题,还有就是方案比较复杂,不仅要依赖 RabbitMQ, 而目还需要声明很多队列出来,增加系统的复杂度

3、DelayExchange 插件

前面我们提到的基于死信队列的方式,是消息先会投递到一个正常队列,在 TTL 过期后进入死信队列。但是基于插件的这种方式,消息并不会立即进入队列,而是先把他们保存在一个基于 Erlang 开发的 Mnesia 数据库中,然后通过一个定时器去查询需要被投递的消息,再把他们投递到 x-delayed-message 交换机中。
基于 RabbitMQ 插件的方式可以实现延迟消息,并且不存在消息阻塞的问题,但是因为是基于插件的,而这个插件支持的最大延长时间是 (2^32)-1 毫秒,大约 49 天,超过这个时间就会被立即消费。

插件下载地址: GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
由于我们安装的 MQ 是 3.8 版本,因此这里下载 3.8.17 版本:
image.png|600px
附件:![[rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez]]

4.2.2. 安装

因为我们是基于 Docker 安装,所以需要先查看 RabbitMQ 的插件目录对应的数据卷。

docker volume inspect mq-plugins

结果如下:

[  
    {  
        "CreatedAt": "2024-06-19T09:22:59+08:00",  
        "Driver": "local",  
        "Labels": null,  
        "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",  
        "Name": "mq-plugins",  
        "Options": null,  
        "Scope": "local"  
    }  
]  

插件目录被挂载到了 /var/lib/docker/volumes/mq-plugins/_data 这个目录,我们上传插件到该目录下

注意上传插件

接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

运行结果如下:
image.png

4.2.3. 声明延迟交换机

image.png

根据

1、创建交换机:
image.png
2、创建队列
image.png
3、根据 bandingKey 绑定队列:
image.png|500

基于注解方式:

@RabbitListener(bindings = @QueueBinding(  
        value = @Queue(name = "delay.queue", durable = "true"),  
        exchange = @Exchange(name = "delay.direct", delayed = "true"),  
        key = "delay"  
))  
public void listenDelayMessage(String msg){  
    log.info("接收到delay.queue的延迟消息:{}", msg);  
}

基于 @Bean 的方式:

package com.itheima.consumer.config;import lombok.extern.slf4j.Slf4j;  
import org.springframework.amqp.core.*;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;@Slf4j  
@Configuration  
public class DelayExchangeConfig {@Bean  
    public DirectExchange delayExchange(){  
        return ExchangeBuilder  
                .directExchange("delay.direct") // 指定交换机类型和名称  
                .delayed() // 设置delay的属性为true  
                .durable(true) // 持久化  
                .build();  
    }@Bean  
    public Queue delayedQueue(){  
        return new Queue("delay.queue");  
    }  
      
    @Bean  
    public Binding delayQueueBinding(){  
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");  
    }  
}  

4.2.4. 发送延迟消息

发送消息时,必须通过 x-delay 属性设定延迟时间:

@Test  
void testPublisherDelayMessage() {  
    // 1.创建消息  
    String message = "hello, delayed message";  
    // 2.发送消息,利用消息后置处理器添加消息头  
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {  
        @Override  
        public Message postProcessMessage(Message message) throws AmqpException {  
            // 添加延迟消息属性  
            message.getMessageProperties().setDelay(5000);  
            return message;  
        }  
    });  
}

warning 注意: 延迟消息插件内部会维护一个本地数据库表,同时使用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的 CPU 开销,同时延迟消息的时间会存在误差。因此,不建议设置延迟时间过长的延迟消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/596278.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

5-在Linux上部署各类软件

1. MySQL 数据库安装部署 1.1 MySQL 5.7 版本在 CentOS 系统安装 注意:安装操作需要 root 权限 MySQL 的安装我们可以通过前面学习的 yum 命令进行。 1.1.1 安装 配置 yum 仓库 # 更新密钥 rpm --import https://repo.mysql.com/RPM-GPG-KEY-mysql-2022# 安装Mysql…

rk3588局域网推流

最近无意间看见在网上有使用MediaMtx插件配合ffmpeg在Windows来进行推流,然后在使用其他软件进行拉流显示数据图像的,既然windows都可以使用 ,我想linux应该也可以,正好手上也有一块RK3588的开发板,就测试了一下&#…

iOS ------ JSONModel源码

一,JSONModel的基本使用 1,基本使用方法 - (instancetype)initWithDictionary:(NSDictionary *)dict error:(NSError **)err; - (instancetype)initWithData:(NSData *)data error:(NSError **)error; - (instancetype)initWithString:(NSString *)str…

Linux网络-部署YUM仓库及NFS共享服务

目录 一.YUM仓库服务 1.YUM概述 1.1.YUM(Yellow dog Updater Modified) 2.准备安装源 2.1.软件仓库的提供方式 2.2.RPM软件包的来源 2.3.构建CentOS 7 软件仓库 2.4.在软件仓库中加入非官方RPM包组 3.一键安装软件包的工具: 好处&a…

申请Sectigo证书流程详解

Sectigo(前身为Comodo CA),是目前主流SSL证书的一种,目前全球范围内应用度也非常广泛,是目前众多品牌中市场份额最大的一个品牌了,在全球证书市场份额占比约为40%。 其超高的市场份额占比主要还是基于其超…

021、Python+fastapi,第一个Python项目走向第21步:ubuntu 24.04 docker 安装mysql8集群、redis集群(二)

系列文章目录 pythonvue3fastapiai 学习_浪淘沙jkp的博客-CSDN博客https://blog.csdn.net/jiangkp/category_12623996.html 前言 安装redis 我会以三种方式安装,在5月4号修改完成 第一、直接最简单安装,适用于测试环境玩玩 第二、conf配置安装 第三…

【Leetcode 42】 接雨水

基础思路: (1)需要将问题最小化,首先计算第i个位置最多容纳多少雨水(细长的一条水柱),然后求和就是总的雨水量; (2)第i个位置容纳雨水量 min(左侧最高, 右…

​《MATLAB科研绘图与学术图表绘制从入门到精通》示例:绘制德国每日风能和太阳能产量3D线图

在MATLAB中,要绘制3D线图,可以使用 plot3 函数。 在《MATLAB科研绘图与学术图表绘制从入门到精通》书中通过绘制德国每日风能和太阳能产量3D线图解释了如何在MATLAB中绘制3D线图。 购书地址:https://item.jd.com/14102657.html

牛客热题:单链表排序

📟作者主页:慢热的陕西人 🌴专栏链接:力扣刷题日记 📣欢迎各位大佬👍点赞🔥关注🚓收藏,🍉留言 文章目录 牛客热题:单链表排序题目链接方法一&…

【XR806开发板试用】基于MQTT与Cjson库的花式点灯

一、项目介绍 久闻openharmony大名,一直没有机会接触,感谢极术社区和全志社区的这次活动,让我能够了解并上手这个系统。 openhamony 1.1的内核是基于liteos内核系统进行构建的,liteos作为物联网系统,结合xr806小型开…

美团KV存储squirrel和Celler学习

文章目录 美团在KV存储squirrel优化和改进在水平方向1、对Gossip协议进行优化 在垂直扩展方面1、forkless RDB数据复制优化2、使用多线程,充分利用机器的多核能力 在高可用方面 美团持久化kv存储celler优化和改进水平扩展优化1、使用bulkload进行数据导入2、线程模型…

Adobe系列软件安装

双击解压 先运行Creative_Cloud_Set_Up.exe。 完毕后,运行AdobeGenP.exe 先Path,选路径,如 C:\Program Files\Adobe 后Search 最后Patch。 关闭软件,修图!

电力能源箱3D可视化:开启智慧能源管理新篇章

随着科技的不断进步,电力能源箱的管理与维护逐渐向着智能化、可视化的方向发展。3D可视化技术的崛起,不仅极大地提升了能源管理的效率,更以其直观、生动的特点,引领着电力能源管理领域迈入了一个全新的时代。 电力能源箱作为电力系…

解决一个朋友的nbcio-boot的mysql数据库问题

1、原先安装mysql5.7数据库,导入我的项目里的带数据有报错信息 原因不明 2、只能建议用docker进行msyql5.7的安装 如下,可以修改成自己需要的信息 docker run -p 3306:3306 --name mastermysql -v /home/mydata/mysql/data:/var/lib/mysql -e MYSQL_R…

为什么感觉没有效果

以前在辅导小儿作业的时候,我会在常用的搜索引擎里去寻找答案,一般情况下都能解决问题。 但是最近一段时间,我发现,搜索引擎搜出来的结果还没有利用短视频搜出来的答案更全面,短视频软件不仅可以显示AI整理出来的答案…

js api part4

其他事件 页面加载事件 外部资源(如图片、外联CSS和JavaScript等)加载完毕时触发的事件 原因:有些时候需要等页面资源全部处理完了做一些事情,老代码喜欢把 script 写在 head 中,这时候直接找 dom 元素找不到。 事件…

2010-2022年上市公司彭博ESG披露评分、分项得分数据

2010-2022年上市公司彭博ESG披露评分、分项得分数据 1、时间:2010-2022年 2、来源:Bloomberg ESG 指数 3、指标:股票代码、股票简称、年份、ESG披露评分、环境披露评分、社会信息披露评分、治理披露评分 4、范围:上市公司 5、…

OpenNJet:下一代云原生应用引擎

OpenNJet:下一代云原生应用引擎 前言一、技术架构二、新增特性1. 透明流量劫持2. 熔断机制3. 遥测与故障注入 三、Ubuntu 发行版安装 OpentNJet1. 添加gpg 文件2. 添加APT 源3. 安装及启动4. 验证 总结 前言 OpenNJet,是一款基于强大的 NGINX 技术栈构建…

Java苍穹外卖04-

一、缓存菜品 1.问题说明 2.实现思路 就是点击到这个分类的时候就可以展示相应的菜品数据 3.代码实现 在user的菜品的contoller中&#xff1a;增加判断redis中是否存在所需数据&#xff0c;不存在添加&#xff0c;存在直接取得 这里注意&#xff1a;你放进去用的是List<Di…

【Osek网络管理测试】[TG3_TC3]tSleepRequestMin_L

&#x1f64b;‍♂️ 【Osek网络管理测试】系列&#x1f481;‍♂️点击跳转 文章目录 1.环境搭建2.测试目的3.测试步骤4.预期结果5.测试结果 1.环境搭建 硬件&#xff1a;VN1630 软件&#xff1a;CANoe 2.测试目的 验证DUT进入NMLimpHome状态后请求睡眠的最短时间是否正确…