或许是全网最全的延迟队列

什么是延迟队列

作用:用来存储延迟消息
延迟消息:生产者发送一个消息给mq,然后mq会经过一段时间(延迟时间),然后在把这个消息发送给消费者

应用场景

  1. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
  2. 推送某些数据的定时任务
  3. 微信公众号文章的延迟发布
  4. 订单超时未支付自动取消订单

实现延迟队列

在rabbitmq中没有提供真正意义上的延迟队列。要实现延迟队列有两套方案

  1. 方案一:基于死信队列中的消息TTL过期模式的进行改造,不监听对应队列,使消息过期后全部进入死信队列以达成延时效果,主要有队列TTL消息TTL两种
  2. 方案二:使用延时队列插件,让交换机管理消息延时时间(常用)

创建工程

创建springBoot工程,勾选需要的依赖
image.png
添加RabbitMQ配置

spring.rabbitmq.host=xxxx
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=DeadQueue

使用TTL+死信队列

队列TTL案例

对队列QA设置过期时间 10S,队列QB设置过期时间 40S,不监听QA、QB队列,使消息进入队列后不被消费导致TTL超时进入QD延迟队列

Y是死信交换机,QD是死信队列

对队列设置TTL
缺点:每增加一个新的时间需求,就要新增一个队列
创建RabbitMQ配置文件

package com.dmbjz.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/* RabbitMQ的交换机、队列配置文件 */
@Configuration
public class ExchangeQueueConfig {

    public static final String X_EXCHANGE = "X";
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    public static final String DEAD_LETTER_QUEUE = "QD";

    /*创建X交换机*/
    @Bean
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }

    /*创建死信交换机*/
    @Bean
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //声明队列 A ttl 为 10s 并绑定到对应的死信交换机
    @Bean("queueA")
    public Queue queueA(){
        Map<String, Object> args = new HashMap<>(3);
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);      //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-routing-key", "YD");                     //声明当前队列的死信路由 key
        args.put("x-message-ttl", 10000);                                //声明队列的 TTL
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }


    // 声明队列 A 绑定 X 交换机
    @Bean
    public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }


    //声明队列 B ttl 为 40s 并绑定到对应的死信交换机
    @Bean("queueB")
    public Queue queueB(){
        Map<String, Object> args = new HashMap<>(3);
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);         //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-routing-key", "YD");                        //声明当前队列的死信路由 key
        args.put("x-message-ttl", 40000);                                   //声明队列的 TTL
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }

    //声明队列 B 绑定 X 交换机
    @Bean
    public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
    }

    //声明死信队列 QD
    @Bean("queueD")
    public Queue queueD(){
        return new Queue(DEAD_LETTER_QUEUE);
    }

    //声明死信队列 QD 绑定关系
    @Bean
    public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
                                        @Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

生产者代码:

package com.dmbjz.controller;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;
import java.util.Date;

/* 生产者发送消息Controller */
@RestController
@RequestMapping("/ttl")
@Slf4j
public class SendMessageController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendMessage/{message}")
    public void sendMsg(@PathVariable String message){

        log.info("当前时间:{},发送一条信息给两个TTL队列,消息内容:{}",new Date(),message);
        rabbitTemplate.convertAndSend("X","XA",message.getBytes(StandardCharsets.UTF_8));
        rabbitTemplate.convertAndSend("X","XB",message.getBytes(StandardCharsets.UTF_8));

    }
}

消费者代码:

package com.dmbjz.consumer;


import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/* 队列TTL消费者 */
@Component
@Slf4j
public class DeadLetterQueueConsumer {


    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel)throws Exception{
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列的消息:{}",new Date(),msg);
    }
}

浏览器访问地址测试:

http://localhost:8080/ttl/sendMessage/测试消息TTL

image.png


消息TTL案例

对消息设置过期时间,不监听QC队列,消息超时后自动进入QD延迟队列
缺点:如果积压在队列前面的消息延时时长很长,而后面积压的消息延时时长很短,积压时间短的消息并不会被提前放入死信队列;如果QC恰好又设置了积压上限,无法被积压的消息将直接进入延时队列,达不到延时效果
对消息设置TTL
修改配置文件:

    //声明队列 QC
    @Bean
    public Queue queueC(){
        Map<String, Object> args = new HashMap<>(3);
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);      //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-routing-key", "YD");                     //声明当前队列的死信路由 key
        return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
    }

    //声明队列 QC 绑定 X 交换机
    @Bean
    public Binding queuebCBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueC()).to(xExchange).with("XC");
    }

生产者代码:

    //声明队列 QC
    @Bean
    public Queue queueC(){
        Map<String, Object> args = new HashMap<>(3);
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);      //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-routing-key", "YD");                     //声明当前队列的死信路由 key
        return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
    }

    //声明队列 QC 绑定 X 交换机
    @Bean
    public Binding queuebCBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueC()).to(xExchange).with("XC");
    }

浏览器访问地址进行测试:

http://localhost:8080/ttl/sendMessagExpira/测试消息1/10000
http://localhost:8080/ttl/sendMessagExpira/测试消息2/1000

延时插件

使用延时队列插件实现延时队列功能,原理为交换机管理消息延时时间
插件版本需要兼容 RabbitMQ 版本,具体参考其发布说明**,**延时队列插件下载:github
插件安装步骤

1.将安装目录的延时队列插件拷贝到RabbitMQ插件目录
	cp rabbitmq_delayed_message_exchange-3.8.0.ez /root/rabbitmq_server-3.8.8/plugins
    
    
2.安装延时队列插件   
	rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    

3、重启RabbitMQ服务
	systemctl restart rabbitmq-server

延时队列插件安装完成
重启服务后交换机多了延迟类型
案例演示:
延时队列插件实际落地固定为图中架构模式
延时队列插件架构图
创建配置文件:

package com.dmbjz.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/* 延时队列插件案例 RabbitMQ配置类 */
@Configuration
public class DelayedQueueConfig {


    private static final String delayed_queue_name = "delayed.queue";
    private static final String delayed_exchange_name = "delayed.exchange";
    private static final String delayed_routingkey = "delayed.routingkey";


    /*创建延时插件的交换机,需要使用自定义方法进行创建
    *   插件版非死信队列,不需要路由到不同的交换机进行指定过期时间,所以固定为 direct 类型交换机
    * */
    @Bean
    public CustomExchange delayedExchange(){

        Map<String,Object> map = new HashMap<>(1);
        map.put("x-delayed-type","direct");       //延迟队列类型,固定值

        return new CustomExchange(delayed_exchange_name,"x-delayed-message",
                true,false,map);

    }

    /*队列*/
    @Bean
    public Queue delayQueue(){
        return QueueBuilder.durable(delayed_queue_name).build();
    }

    /*绑定,自定义交换机绑定多一个 noargs方法 */
    @Bean
    public Binding delayBing(@Qualifier("delayQueue") Queue delayQueue,
                             @Qualifier("delayedExchange") CustomExchange delayedExchange){
        return BindingBuilder.bind(delayQueue).to(delayedExchange)
                .with(delayed_routingkey)
                .noargs();
    }
}

生产者代码:

    /*延时插件案例*/
    @RequestMapping("/sendMessagPlugin/{message}/{time}")
    public void sendMsgPlugin(@PathVariable String message,
                              @PathVariable Integer time){

        MessageProperties properties = new MessageProperties();
        properties.setDelay(time);      //设置延时时间
        Message msg = new Message(message.getBytes(StandardCharsets.UTF_8),properties);

        log.info("当前时间:{},发送具有过期时间为{}毫秒的信息给延时插件队列,消息内容:{}",new Date(),time,message);
        rabbitTemplate.convertAndSend("delayed.exchange","delayed.routingkey",msg);

    }

消费者代码:

package com.dmbjz.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/* 延时队列插件 消费者 */
@Component
@Slf4j
public class DelayQueueConsumer {

    @RabbitListener(queues = "delayed.queue")
    public void receiveDelayQueue(Message message, Channel channel)throws Exception{

        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列的消息:{}",new Date(),msg);

    }
}

浏览器访问地址进行测试:

http://localhost:8080/ttl/sendMessagPlugin/测试消息1/10000
http://localhost:8080/ttl/sendMessagPlugin/测试消息2/1000

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/247846.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

深拷贝、浅拷贝 react的“不可变值”

知识获取源–晨哥&#xff08;现实中的人 嘿嘿&#xff09; react中如果你想让一个值始终不变 或者说其他操作不影响该值 它只是作用初始化的时候 使用了浅拷贝–改变了初始值 会改变初始值(selectList1) 因为使用浅拷贝都指向同一个地址 const selectList1 { title: 大大, …

RabbitMQ入门案例

RabbitMQ 是目前比较主流的MQ消息队列中间件&#xff0c;下面简单总结RabbitMQ入门时所做的一些笔记 1.RabbitMQ 入门案例 需求&#xff1a;用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者 1.1 添加依赖 <!--rabbitmq 依赖客户端--> <de…

开具实习证明:在线实习项目介绍

大数据在线实习项目&#xff0c;是在线上为学生提供实习经验的项目。我们希望能够帮助想要在毕业后从事数据科学类工作的学生更加顺利地适应从教室到职场的转换&#xff1b;也帮助那些在工作中需要处理数据、实现数据价值的其他职能的从业者高效快速地掌握每天都能用起来的数据…

Vue.js 使用基础知识

Vue.js 是一款用于构建用户界面的渐进式框架&#xff0c;它专注于视图层。Vue.js 不同于传统的 JavaScript 框架&#xff0c;它采用了组件化的开发方式&#xff0c;使得开发者可以更加高效和灵活地构建交互式的 Web 应用程序。 目录 什么是 Vue.js安装 Vue.jsVue 实例模板语法插…

2024年天津体育学院专升本专业课网上报名确认缴费安排

天津体育学院2024年高职升本科专业考试报名安排 一、时间安排 1.报名时间&#xff1a;2023年12月19日9&#xff1a;00-12月21日17&#xff1a;00 2.缴费时间&#xff1a;2023年12月26日-27日 &#xff08;考试考务费&#xff1a;体育教育专业&#xff1a;160元/人&#xff…

深入分析ClassLocader工作机制

文章目录 一、ClassLoader简介1. 概念2. ClassLoader类结构分析 二、ClassLoader的双亲委派机制三、Class文件的加载流程1. 简介2. 加载字节码到内存3. 验证与解析4. 初始化Class对象 四、常见加载类错误分析1. ClassNotFoundException2. NoClassDefFoundError3. UnsatisfiledL…

用Excel绘制柱形图

在需要将数据用柱状图表示的时候&#xff0c;可以用Excel进行绘制。不单绘制柱形图&#xff0c;其他数据图也可以用Excel绘制。 接下来用绘制一个销售表的示例演示。 1.将数据输入Excel 数学书 语文书 英语书 一月 80 94 77 二月 95 86 84 三月 130 93 79 四月 …

测出Bug就完了?从4个方面教你Bug根因分析

一现状及场景 1、缺失bug根因分析环节 工作10年&#xff0c;虽然不是一线城市&#xff0c;也经历过几家公司&#xff0c;规模大的、规模小的都有&#xff0c;针对于测试行业很少有Bug根因环节&#xff0c;主流程基本上都是测试提交bug-开发修改-测试验证-发送报告&#xff0c…

2023版本QT学习记录 -2- 标准文件对话框

头文件的使用 #include "QFileDialog"函数原型 getOpenFileName效果 参数 未完待续

工业固体废物智能化综合管控平台

工业固体废物智能化综合管控平台&#xff0c;涵盖产废企业、运输企业、固废处置企 业等不同群体应用&#xff0c;根据不同群体设计不同的业务应用子系统功能&#xff0c;以及各个不 同群体的环保物联网平台子系统功能模块&#xff0c;同时具有移动端的应用APP。 建立产废企业端…

SQL错题集4

1.注意格式 %Y是指date的年&#xff0c;%m是指date的月 %Y-%m ’ 即为2004-01 2.查询在2025-10-15以后&#xff0c;同一个用户下单1个以上状态为购买成功的C课程或Java课程或Python课程的user_id C或Java或Python --> 缩写 in ( C,Java,Python ) in ( ) 含义为 或or 3. ca…

用“价值”的视角来看安全:《构建新型网络形态下的网络空间安全体系》

作者简介&#xff1a; 懒大王敲代码&#xff0c;正在学习嵌入式方向有关课程stm32&#xff0c;网络编程&#xff0c;数据结构C/C等 今天给大家介绍《构建新型网络形态下的网络空间安全体系》这本书&#xff0c;希望大家能觉得实用&#xff01; 欢迎大家点赞 &#x1f44d; 收藏…

分库分表以后,如何实现扩容?

在实际开发中&#xff0c;数据库的扩容和不同的分库分表规则直接相关&#xff0c;今天我们从系统设计的角度&#xff0c;抽象了一个项目开发中出现的业务场景&#xff0c;从数据库设计、路由规则&#xff0c;以及数据迁移方案的角度进行讨论。 从业务场景出发进行讨论 假设这…

新产品创设过程理念:转变新产品创设的思维和过程

很多企业的新产品创设的过程属于预定义过程&#xff0c;即预先定义好客户群体 和产品的定位&#xff0c;然后列出产品的功能清单&#xff0c;技术可行性分析完成后开始研发。在产品发布给客户使用前&#xff0c;与客户的交互很少。在这个过程中存在大量 的没有验证的假设。预定…

【电路笔记】-电容器

电容器 文章目录 电容器1、概述2、电容器的电容单位3、电容4、电容器示例15、电介质6、额定电压7、总结 电容器是简单的无源器件&#xff0c;当连接到电压源时&#xff0c;可以在极板上存储电荷。 1、概述 在本电容器简介文章中&#xff0c;我们将看到电容器是无源电子元件&am…

Leetcode—1502.判断能否形成等差数列【简单】

2023每日刷题&#xff08;五十九&#xff09; Leetcode—1502.判断能否形成等差数列 实现代码 class Solution { public:bool canMakeArithmeticProgression(vector<int>& arr) {sort(arr.begin(), arr.end());int diff abs(arr[1] - arr[0]);for(int i 1; i <…

Amazon SageMaker测评

Amazon SageMaker测评 1 前言2 功能体验2.1 构建域2.2 上传数据集2.3 设置 SageMaker Canvas2.4 构建、训练与分析 ML 模型2.5 生成预测模型 3 评价及建议 &#xff08;声明&#xff1a;本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 亚马逊云科技…

国标级联/流媒体音视频平台EasyCVR设备录像下载异常该如何解决?

视频监控TSINGSEE青犀视频平台EasyCVR能在复杂的网络环境中&#xff0c;将分散的各类视频资源进行统一汇聚、整合、集中管理&#xff0c;在视频监控播放上&#xff0c;视频安防监控汇聚平台可支持1、4、9、16个画面窗口播放&#xff0c;可同时播放多路视频流&#xff0c;也能支…

【渗透测试】常用的8种火狐插件

1、Max HacKBar 推荐理由&#xff1a;免费的hackbar插件&#xff0c;可快速使用SQL注入、XSS和Bypass等payload进行测试&#xff0c;可进行多种编码和解码&#xff0c;安装后F12即可使用。 2、FoxyProxy Standard 推荐理由&#xff1a;FoxyProxy是一个高级的代理管理工具&am…

世界5G大会

会议名称:世界 5G 大会 时间:2023 年 12 月 5 日-12 月 8 日 地点:河南郑州 一、会议简介 世界 5G 大会,是由国务院批准,国家发展改革委、科技部、工 信部与地方政府共同主办,未来移动通信论坛联合属地主管厅局联合 承办,邀请全球友好伙伴共同打造的全球首个 5G 领域…