RabbitMQ--延迟队列

(一)延迟队列

1.概念

 延迟队列是一种特殊的队列,消息被发送后,消费者并不会立刻拿到消息,而是等待一段时间后,消费者才可以从这个队列中拿到消息进行消费

2.应用场景

 延迟队列的应用场景很多,就比如大部分定时的场景,我们都可以利用延迟队列例如:闹钟定时,预约会议,空调定时开关等

 但是RabbitMQ是没有直接给我们提供延迟队列的,但是我们可以通过上一篇博客说的ttl和死信来达到延迟队列的效果,具体操作如下

 首先我们有一个交换机和一个队列,然后此队列又指定一个死信交换机,死信交换机绑定一个死信队列,然后我们消费者并不是从正常队列中获取消息,而是从死信队列中获取消息,通过给消息/队列设置过期时间来影响消息到达死信队列的时间,消费者拿到消息就会延迟,这样就可以模拟出延迟的效果。

那接下来就是我们的代码实现

首先我们通过设置队列ttl来实现

 @Bean("ttlExchange")
    public Exchange ttlExchange(){
        return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();
    }
    @Bean("ttlQueue")
    public Queue ttlQueue(){
        return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(5000)
                .deadLetterExchange(Constants.DEAD_EXCHANGE)
                .deadLetterRoutingKey("dead")
                .build();
    }
    @Bean("ttlBind")
    public Binding ttlBind(@Qualifier("ttlExchange") Exchange ackExchange,@Qualifier("ttlQueue") Queue queue){
        return BindingBuilder.bind(queue).to(ackExchange).with("ttl").noargs();
    }
    @Bean("deadExchange")
    public Exchange deadExchange(){
        return ExchangeBuilder.directExchange(Constants.DEAD_EXCHANGE).durable(true).build();
    }
    @Bean("deadQueue")
    public Queue deadQueue(){
        return QueueBuilder.durable(Constants.DEAD_QUEUE).build();
    }
    @Bean("deadBind")
    public Binding deadBind(@Qualifier("deadExchange") Exchange ackExchange,@Qualifier("deadQueue") Queue queue){
        return BindingBuilder.bind(queue).to(ackExchange).with("dead").noargs();
    }

然后生产者代码没什么变化

 @RequestMapping("ttl")
    public String TTLPro(){
        String s1="ttl test";
        Message message=new Message(s1.getBytes(StandardCharsets.UTF_8));
//        message.getMessageProperties().setExpiration("10000");
        RabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl",message);
        return "发送成功";
    }

只不过消费者订阅的是死信队列

@RabbitListener(queues = Constants.DEAD_QUEUE)
    public void ListenerQueue2(Message message,Channel channel) throws IOException {
        long Tag=message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "
                    +Tag);
            int num=3/0;     //模拟失败
            channel.basicAck(Tag,false);
            System.out.println("处理完成");
        }catch (Exception e){
            channel.basicReject(Tag,false);
        }
    }

这样过了5s后我们就可以从死信队列中获取到延迟消息了

那我们再来通过设置消息的ttl来看一下

首先我们要把队列的ttl给取消掉,记得要删队列

 @Bean("ttlExchange")
    public Exchange ttlExchange(){
        return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();
    }
    @Bean("ttlQueue")
    public Queue ttlQueue(){
        return QueueBuilder.durable(Constants.TTL_QUEUE)
                .deadLetterExchange(Constants.DEAD_EXCHANGE)
                .deadLetterRoutingKey("dead")
                .build();
    }
    @Bean("ttlBind")
    public Binding ttlBind(@Qualifier("ttlExchange") Exchange ackExchange,@Qualifier("ttlQueue") Queue queue){
        return BindingBuilder.bind(queue).to(ackExchange).with("ttl").noargs();
    }
    @Bean("deadExchange")
    public Exchange deadExchange(){
        return ExchangeBuilder.directExchange(Constants.DEAD_EXCHANGE).durable(true).build();
    }
    @Bean("deadQueue")
    public Queue deadQueue(){
        return QueueBuilder.durable(Constants.DEAD_QUEUE).build();
    }
    @Bean("deadBind")
    public Binding deadBind(@Qualifier("deadExchange") Exchange ackExchange,@Qualifier("deadQueue") Queue queue){
        return BindingBuilder.bind(queue).to(ackExchange).with("dead").noargs();
    }

  然后我们发送一个ttl时间为10s的,再发送一个5s的,我们知道这样两条数据是会发生错误的,因为我们设置消息过期时间,我们RabbitMQ(性能问题)并不会遍历整个消息队列看看谁过没过期,如果过期的消息不在队头,那么只有当使用的时候,才会真正的进行一些过期处理,比如传给死信交换机 

@RequestMapping("ttl")
    public String TTLPro(){
        String s1="ttl test";
        Message message=new Message(s1.getBytes(StandardCharsets.UTF_8));
        message.getMessageProperties().setExpiration("10000");
        RabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl",message);
        message.getMessageProperties().setExpiration("5000");
        RabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl",message);
        return "发送成功";
    }

如果正常,会在5s后接收到第一个消息,在10s后接收到第二个消息,但是此时我们会同一时间(10s)接收到两条消息

  那这个问题在上一篇ttl的时候就说过了,这里依然是个问题,虽然设置队列的ttl不会有这个问题,但是设置队列ttl我们针对不同延迟时间就需要创建多个队列,这是不太合理的,所以针对这个问题,我们有一个延迟队列的插件可以使用

 3.延迟队列插件

延迟队列插件,会给我们提供一个特殊的交换机,来完成我们的延迟功能

这是我们插件的下载地址

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

  我们需要找到ez文件并下载,但是注意这里的版本要与你的RabbitMQ版本可以匹配,否则之后会出现问题

那插件下完后,我们要找到对应目录,下载插件

上面两个目录,我们可以任选一个下载即可,没有这个目录,我们手动创建

然后把下载的ez文件,copy到这个目录中即可,然后我们可以使用命令 rabbitmq-plugins list 来查看插件列表,看看我们有没有成功放进去,但是注意,即使我们成功放进去并成功显示了,也可能会出错,这就可能是你们下载的RabbitMQ版本与整个延迟插件的版本不匹配,重新下载其他版本即可

然后我们启动插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange

之后重启服务service rabbitmq-server restart

在没有发生错误的情况下,我们就发现我们会多了一个默认的交换机

此时我们代码中就不需要声明普通交换机了而是直接使用默认交换机即可

我们生产者代码是需要改一下的,我们需要调用一个方法来设置延迟时间

@RequestMapping("/delay2")
public String delay2() {
 //发送带ttl的消息 
 rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed", 
"delayed test 20s..."+new Date(), messagePostProcessor -> {
 messagePostProcessor.getMessageProperties().setDelayLong(20000L); 
 return messagePostProcessor;
 });
 rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed", 
"delayed test 10s..."+new Date(), messagePostProcessor -> {
 messagePostProcessor.getMessageProperties().setDelayLong(10000L); //设置延迟时间 
 return messagePostProcessor;
 });
 return "发送成功!";
}

此时我们就可以在10s正确接收一个消息,在20s正确接收另一个消息 

  注意我们使用TTL+死信时消息传递给交换机后映射之后一直在正常队列中的,等待TTL时间到了把消息给死信交换机再映射到死信队列再拿到消息,我们使用插件的时候,消息是在RabbitMQ给我们提供的那个特殊的交换机中的,等待时间到了,再映射给队列,然后从队列中拿消息

4.常见面试题

介绍下延迟队列

我们可以这样回答:

 延迟队列是一个特殊的队列,消息发送后,消费者并不会立刻拿到,而是等待一定延迟时间后才发送给消费者进行消费

 并且延迟队列的应用场景很多,比如订单支付,智能家电,以及定时邮箱

 但是延迟队列在RabbitMQ中并没有直接给我们提供,我们可以通过TTL+死信的方式或者使用延迟插件的方式来实现延迟功能

 两者的区别:

1.通过TTL+死信

 优点:比较灵活,不需要我们额外引入插件

 缺点:我们设置消息TTL的时候可能会出现顺序的问题,而且我们需要多创建死信队列和死信交换机,完成一些绑定,增加了系统的复杂性

2.基于插件实现的延迟队列

 优点:通过插件能够简化延迟消息的实现,并且避免了时序问题

 缺点:需要依赖插件,不同版本RabbitMQ需要不同版本插件,有运维工作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/957088.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

3DsMax设置中文界面

按键盘上的“Win”键,直接输入3dsmax,选择Simplified Chinese打开,之后就都是中文了

opencv在图片上添加中文汉字(c++以及python)

opencv在图片上添加中文汉字(c以及python)_c opencv绘制中文 知乎-CSDN博客 环境: ubuntu18.04 desktopopencv 3.4.15 opencv是不支持中文的。 这里C代码是采用替换原图的像素点来实现的,实现之前我们先了解一下汉字点阵字库。…

线程同步与Mutex

梦想是逃离世界… 文章目录 一、什么是线程同步?二、线程同步机制三、互斥锁(Mutex)四、loock 和 unlock五、Mutex的四种类型 一、什么是线程同步? 线程同步(Thread Synchronization)是多线程编程中的一个重要概念,它…

基于SpringBoot和PostGIS的全球首都信息管理设计与实现

目录 前言 一、首都空间表的设计 1、三张空间表的结构 二、SpringBoot后台管理的设计与实现 1、模型层的实现 2、业务层及控制层实现 三、前端的实现与成果可视化 1、新增数据的保存 2、首都的实际管理成果 3、全球首都信息 四、总结 前言 首都,一个国家的…

计算机网络 (50)两类密码体制

前言 计算机网络中的两类密码体制主要包括对称密钥密码体制(也称为私钥密码体制、对称密码体制)和公钥密码体制(也称为非对称密码体制、公开密钥加密技术)。 一、对称密钥密码体制 定义: 对称密钥密码体制是一种传…

【数据结构篇】顺序表 超详细

目录 一.顺序表的定义 1.顺序表的概念及结构 1.1线性表 2.顺序表的分类 2.1静态顺序表 2.2动态顺序表 二.动态顺序表的实现 1.准备工作和注意事项 2.顺序表的基本接口: 2.0 创建一个顺序表 2.1 顺序表的初始化 2.2 顺序表的销毁 2.3 顺序表的打印 3.顺序…

C 语言雏启:擘画代码乾坤,谛观编程奥宇之初瞰

大家好啊,我是小象٩(๑ω๑)۶ 我的博客:Xiao Xiangζั͡ޓއއ 很高兴见到大家,希望能够和大家一起交流学习,共同进步。* 这一课主要是让大家初步了解C语言,了解我们的开发环境,main函数,库…

根据 Web 服务器端的架构相关知识,将PHP改JAVA重构企业网站系统

目录 案例 【题目】 【问题 1】(7 分) 【问题 2】(8 分) 【问题 3】(10 分) 答案 【问题 1】解析 【问题 2】解析 【问题 3】解析 相关推荐 案例 阅读以下关于应用服务器的叙述,在答题纸上回答问题 1 至问题 3。 【题目】 某电子产品制造公司&#xff0c…

多选multiple下拉框el-select回显问题(只显示后端返回id)

首先保证v-model的值对应options数据源里面的id <el-form-item prop"subclass" label"分类" ><el-select v-model"formData.subclass" multiple placeholder"请选择" clearable :disabled"!!formData.id"><e…

java快速导出word文档

点关注不迷路&#xff0c;欢迎再访&#xff01; 精简博客内容&#xff0c;尽量已行业术语来分享。 努力做到对每一位认可自己的读者负责。 帮助别人的同时更是丰富自己的良机。 文章目录 前言一.添加 Apache POI 依赖二.填充文档内容三.导出文档效果测试 前言 在 Java 应用程序…

《MambaIR:一种基于状态空间模型的简单图像修复基线方法》学习笔记

paper&#xff1a;2402.15648 目录 摘要 一、引言 1、模型性能的提升依赖于网络感受野的扩大&#xff1a; 2、全局感受野和高效计算之间存在固有矛盾&#xff1a; 3、改进版 Mamba的巨大潜力 4、Mamba 在图像修复任务中仍面临以下挑战&#xff1a; 5、方法 6、主要贡献…

ngnix上传小文件成功,大文件报错

ngnix错误日志 "/var/tmp/nginx/client//0000001299" failed (2: No such file or directory), client: 10.188.141.160, server: 127.0.0.1, request: "POST /fts/sys/common/biUpload HTTP/1.1", host: "10.20.166.179", referrer: "http…

Word表格批量提取数据到Excel,批量提取,我爱excel

Word表格批量提取数据到Excel&#xff0c;Word导出到Excel - 我爱Excel助你高效办公 在日常办公中&#xff0c;Word表格常常用于记录和整理数据&#xff0c;但将这些数据从Word提取到Excel&#xff0c;特别是当涉及多个文件时&#xff0c;常常让人头疼。如果你经常需要将多个W…

【Zookeeper】Windows下安装Zookeeper(图文记录详细步骤,手把手包安装成功)

【Zookeeper】Windows下安装Zookeeper Zookeeper简介一、下载Zookeeper安装包1.1、官网下载Zookeeper1.2、网盘下载Zookeeper 二、解压Zookeeper安装包到指定目录三、Zookeeper安装目录下创建文件夹四、进入config目录五、复制zoo_sample.cfg文件&#xff0c;改名为zoo.cfg六、…

JDK长期支持版本(LTS)

https://blogs.oracle.com/java/post/the-arrival-of-java-23 jdk长期支持版本&#xff08;LTS&#xff09;&#xff1a;JDK 8、11、17、21&#xff1a;

深度学习J3周:RNN-心脏病预测

&#x1f368; 本文为&#x1f517;365天深度学习训练营中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 要求&#xff1a; 1.本地读取并加载数据 2.了解循环神经网络&#xff08;RNN&#xff09;的构建过程 3.测试集accuracy到达87% 拔高&#xff1a; 测试机a…

Linux C\C++方式下的文件I/O编程

【图书推荐】《Linux C与C一线开发实践&#xff08;第2版&#xff09;》_linux c与c一线开发实践pdf-CSDN博客 《Linux C与C一线开发实践&#xff08;第2版&#xff09;&#xff08;Linux技术丛书&#xff09;》(朱文伟&#xff0c;李建英)【摘要 书评 试读】- 京东图书 Lin…

FPGA:Quartus软件与操作系统版本对照表

文章目录 1.软件概述2.软件版本3.设计流程4.支持的设备5.新特性6.版本对照 1.软件概述 Quartus软件是由英特尔&#xff08;Intel&#xff09;公司开发的一款功能强大的FPGA&#xff08;现场可编程逻辑门阵列&#xff09;设计工具&#xff0c;广泛应用于数字电路设计、仿真、综…

ui设计公司分享:浅色 UI 设计

在数字化产品琳琅满目的今天&#xff0c;用户对于界面的要求早已不止于功能的实现&#xff0c;更追求一种舒适、无压的交互体验。而浅色UI设计&#xff0c;凭借其独特的魅力&#xff0c;正逐渐成为众多设计师营造优质体验的首选。 一、浅色UI设计的视觉优势 &#xff08;一&a…

Unity中实现伤害跳字效果(简单好抄)

第一步骤安装并导入Dotween插件&#xff08;也可以不用导入之后直接下载我的安装包&#xff09; 官网DOTween - 下载 第二步&#xff1a; 制作跳字预制体 建议把最佳适应打开&#xff0c;这样就不怕数字太大显示不全了。 第三步&#xff1a;创建一个空对象并编写脚本JumpNumbe…