rabbit MQ的延迟队列处理模型示例(基于SpringBoot死信模式)

在这里插入图片描述

说明:
生产者P 往交换机X(type=direct)会发送两种消息:一、routingKey=XA的消息(消息存活周期10s),被队列QA队列绑定入列;一、routingKey=XB的消息(消息存活周期40s),被队列Q B队列绑定入列。QA、QB两个队列消息在失活(变成死信消息)以routingKey=YD发送到交换机Y(type=direct)。队列QD用routingKey绑定交换机Y消息入列。消费者监听处理QD的消息。
这个设计模型达到了消息从生产者到消费者延迟10s、40s不等的延迟队列处理。

这里用SpringBoot maven:
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
       </dependency>

在封装工具类中 其中【交换机】【队列】【绑定器】 可直接使用工具类,这里对案例图所用到组件器声明注解出来。
在这里插入图片描述

框内的组件和关系 可以在SpringBoot配置类中做出如下的组件声明与关系绑定:

package com.esint.configs;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * TTL延迟队列配置文件类
 *
 */
@Configuration
public class TtlQueueConfig {
    //
    //普通交换机的名称 X
    public static final String X_EXCHANGE = "X";

    //死信交换机名称 Y
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";

    //普通队列QA QB
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //死信队列名称QD
    public static final String DEAD_LETTER_QUEUE = "QD";
    //
    //声明X_EXCHANGE
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }

    //声明死信交换Y_DEAD_LETTER_EXCHANGE
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }
    //声明队列 QA
    @Bean("queueA")
    public Queue queueA(){
        Map<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信RoutingKey (死信后充当了消费者的发送路由)
        arguments.put("x-dead-letter-routing-key","YD");
        //消息过期时间
        arguments.put("x-message-ttl",10000);

        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();

    }

    //声明队列 QB
    @Bean("queueB")
    public Queue queueB(){
        Map<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信RoutingKey (死信后充当了消费者的发送路由)
        arguments.put("x-dead-letter-routing-key","YD");
        //消息过期时间
        arguments.put("x-message-ttl",40000);

        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();

    }
    //声明死信队列QD
    @Bean("queueD")
    public Queue queueD(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }
    //捆绑
    //绑定队列QA与交换机X_EXCHANGE
    @Bean
    public Binding queueABingXExchange(@Qualifier("queueA") Queue queueA,
                                      @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    //绑定队列QB与交换机X_EXCHANGE
    @Bean
    public Binding queueBBingXExchange(@Qualifier("queueB") Queue queueB,
                                      @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    //绑定队列QD与交换机Y_Exchange
    @Bean
    public Binding queueDBingYExchange(@Qualifier("queueD") Queue queueD,
                                       @Qualifier("yExchange")DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

生产者与交换机X:这里方便测试 我们把生产者放在一个Controller逻辑里
package com.esint.controller;

//发送延迟消息

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMesController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/senMsg/{message}")
    public void sendMes(@PathVariable String message){
        log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),message);

        rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);
        rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);
    }
}

消费者与死信队列创建一个监听者示例:
package com.esint.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 队列TTL消费者
 */

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    //接受消息
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws Exception{

        String msg = new String(message.getBody());
        log.info("当前时间:{},收到私信队列的消息:{}",new Date().toString(),msg);
    }
}

rabbitmq的配置文件:

spring:
  rabbitmq:
    host: *.*.*.*
    port: 5672
    username: guest
    password: guest

接下来可以启动SpringBoot: 启动后,配置方法类会把交换机/队列/绑定器初始化配置

队列:
在这里插入图片描述

交换机:
在这里插入图片描述
点开详细后,也能考到他们之间的绑定关系:

在这里插入图片描述

在这里插入图片描述

消息发布测试:

生产者发送消息:

浏览器:
http://127.0.0.1:19092/ttl/senMsg/nice

通过生产者发送:nice

当前时间:Tue Nov 21 14:50:05 CST 2023,发送一条消息给两个TTL队列:nice

消费者在10s后和40秒分别收到了消息:
在这里插入图片描述


拓展:是不是有一种可能,如果再队列中不设置过期时间,在生产者发送消息时设置过期时间 来实现过期时间自由设定,而延迟自由?

结论是不能:
rabbitMQ队列只会检查第一个消息是否过期。举例如果第一个消息的ttl为30s,第二个消息ttl为3s。第二个消息不会再3s后到达,而是会在第一个过期后,再第二个到达。

示例验证:

增加一个无过期时间约束的队列,以routing-key为XC绑定X交换机,过期后以routing-key为YD绑定Y交换机。
过期时间放生产者发送时设定。

在的rabbitMQ配置类中增加QC 绑定前(X routing-key=XC)后(Y routing-key=YD)交换机:

    // 优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时
    public static final  String QUEUE_C ="QC";
    
    //声明队列 QC 优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时
    @Bean("queueC")
    public Queue queueC(){

        Map<String,Object> arguments = new HashMap<>(2);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置routing-key
        arguments.put("x-dead-letter-routing-key","YD");
        return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
    }
//绑定队列QC与交换机X_EXCHANGE   优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时
    @Bean
    public Binding queueCBindXExchange(@Qualifier("queueC") Queue queueC,
                                       @Qualifier("xExchange")DirectExchange xExchange){
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }

生产者:


    @GetMapping("/sendttl/{message}/{ttlTime}")
    public void sendMes(@PathVariable String message,@PathVariable String ttlTime){
/**
 * 死信队列做延迟时的缺陷:
 * rabbitMQ只会检查第一个消息是否过期带来的问题就是,如果第一个消息的ttl为30s,第二个消息ttl为3s。第二个消息不会再3s后到达,而是会在第一个过期后,再第二个到达。
 */
        log.info("当前时间:{},发送一条ttl为{}ms的消息给QC队列:{}",new Date().toString(),ttlTime,message);
        rabbitTemplate.convertAndSend("X","XC",message,mes->{
            mes.getMessageProperties().setExpiration(ttlTime);
            return mes;
        });
    }

消费者不变,启动服务!

生产者发送消息:第一条 3000ms
http://127.0.0.1:19092/ttl/sendttl/第一条30000ms消息/30000
http://127.0.0.1:19092/ttl/sendttl/第二条3000ms消息/3000

在这里插入图片描述
结论:第二条虽然早早过期,它依然需要等待第一条过期后,才能排到他。rabbitMQ的队列过期检查机制。

总结:
阻塞层在队列。
只能满足固定延迟时段的消息,如果延迟时间不一致,及时后来消息的延迟短,也会等待它的上一条出去后才能被检测到是否到期才被消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/177391.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

笔记59:序列到序列学习Seq2seq

本地笔记地址&#xff1a;D:\work_file\&#xff08;4&#xff09;DeepLearning_Learning\03_个人笔记\3.循环神经网络\第9章&#xff1a;动手学深度学习~现代循环神经网络 a a a a a a a a a a a a a a a

YOLO目标检测——卫星遥感多类别检测数据集下载分享【含对应voc、coco和yolo三种格式标签】

实际项目应用&#xff1a;卫星遥感目标检测数据集说明&#xff1a;卫星遥感多类别检测数据集&#xff0c;真实场景的高质量图片数据&#xff0c;数据场景丰富&#xff0c;含网球场、棒球场、篮球场、田径场、储罐、车辆、桥、飞机、船等类别标签说明&#xff1a;使用lableimg标…

Mysql中自增主键是如何工作的

自增主键的特点是当表中每新增一条记录时&#xff0c;主键值会根据自增步长自动叠加&#xff0c;通常会将自增步长设置1&#xff0c;也就是说自增主键值是连续的。那么MySQL自增主键值一定会连续吗&#xff1f;今天这篇文章就来说说这个问题&#xff0c;看看什么情况下自增主键…

MATLAB实现灰色预测

久违了&#xff0c;前段时间由于学习压力大&#xff0c;就没怎么更新MATLAB相关的内容&#xff0c;今天实在学不进去了&#xff0c;换个内容更新一下~ 本贴介绍灰色预测模型&#xff0c;这也是数学建模竞赛常见算法中的一员&#xff0c;和许多预测模型一样——底层原理是根据已…

笔试题之指针结合数组的精讲2

&#x1d649;&#x1d65e;&#x1d658;&#x1d65a;!!&#x1f44f;&#x1f3fb;‧✧̣̥̇‧✦&#x1f44f;&#x1f3fb;‧✧̣̥̇‧✦ &#x1f44f;&#x1f3fb;‧✧̣̥̇:Solitary-walk ⸝⋆ ━━━┓ - 个性标签 - &#xff1a;来于“云”的“羽球人”。…

接口测试 —— requests 的基本了解

● requests介绍及安装 ● requests原理及源码介绍 ● 使用requests发送请求 ● 使用requests处理响应 ● get请求参数 ● 发送post请求参数 ● 请求header设置 ● cookie的处理 ● https证书的处理 ● 文件上传、下载 requests介绍 ● requests是python第三方的HTTP…

python -opencv 中值滤波 ,均值滤波,高斯滤波实战

python -opencv 中值滤波 &#xff0c;均值滤波&#xff0c;高斯滤波实战 cv2.blur-均值滤波 cv2.medianBlur-中值滤波 cv2.GaussianBlur-高斯滤波 直接看代码吧&#xff0c;代码很简单&#xff1a; import copy import math import matplotlib.pyplot as plt import matp…

对线程的创建

一&#xff0c;概括 二&#xff0c;线程构建方式一&#xff08;继承Thread类&#xff09; 三&#xff0c;案例 父类&#xff1a; package Duoxiancheng;public abstract class Name {public static void main(String[] args) {//3&#xff0c;创建一个Thread线程类对象Thr…

【小黑送书—第九期】>>重磅!这本30w人都在看的Python数据分析畅销书:更新了!

想学习python进行数据分析&#xff0c;这本《利用python进行数据分析》是绕不开的一本书。目前该书根据Python3.10已经更新到第三版。 Python 语言极具吸引力。自从 1991 年诞生以来&#xff0c;Python 如今已经成为最受欢迎的解释型编程语言。 pandas 诞生于2008年。它是由韦…

__int128类型movaps指令crash

结论 在使用__int128时&#xff0c;如果__int128类型的内存起始地址不是按16字节对齐的话&#xff0c;有些汇编指令会抛出SIGSEGV使程序crash。 malloc在64位系统中申请的内存地址&#xff0c;是按16字节对齐的&#xff0c;但一般使用时经常会申请一块内存自己切割使用&#…

Selenium浏览器自动化测试框架

介绍 Selenium [1] 是一个用于Web应用程序测试的工具。Selenium测试直接运行在浏览器中&#xff0c;就像真正的用户在操作一样。支持的浏览器包括IE&#xff08;7, 8, 9, 10, 11&#xff09;&#xff0c;Mozilla Firefox&#xff0c;Safari&#xff0c;Google Chrome&#xff…

vs调试输出,不显示线程已退出

如题&#xff1a;一堆线程退出的信息&#xff0c;招人烦。 其实在vs设置里可以关闭&#xff1a; 工具-->选项-->调试-->输出窗口&#xff1a;

动态跳过测试用例

动态跳过测试用例 说明 我们可以通过指定环境变量来动态判断是否执行指定的测试用例设置环境变量有很多种方法&#xff0c;例如命令行方式&#xff0c;格式&#xff1a;--env keyval1,key2val2 &#xff0c;若需要指定多个环境变量则需要逗号来隔开&#xff0c;而不是空格 t…

innoDB的缓冲池(Buffer Pool)的工作原理

数据存在磁盘了&#xff0c;总不能次次和磁盘交互吧&#xff0c;所以innoDB有一个缓冲池&#xff08;Buffer Pool&#xff09;&#xff0c;有了缓冲池后&#xff0c;读写就优先在缓冲池了。读先在缓冲池读&#xff0c;没有再去磁盘加载进缓冲池&#xff1b;写也是先写缓冲池&am…

调试接口速度,打印毫秒数,找出慢的地方,优化

方法的最开头写上 $start_time microtime(true); 然后代码行里 dump(All 1: time ’ . (microtime(true) - $start_time)); dump(All 1.2: time ’ . (microtime(true) - $start_time)); 类似这样的最终打印

[C++] STL_stack queue接口的模拟实现

文章目录 1、stack1.1 stack的介绍1.2.1 stack的构造1.2.2 进、出栈等接口的模拟实现 2、queue2.1 queue的介绍2.2 queue的使用2.2.1 queue构造2.2.2 入、出队等接口的模拟实现 1、stack 1.1 stack的介绍 stack的文档介绍 1. stack是一种容器适配器&#xff0c;专门用在具有…

Linux程序之可变参数选项那些事!

一、linux应用程序如何接收参数&#xff1f; 1. argc、argv Linux应用程序执行时&#xff0c;我们往往通过命令行带入参数给程序&#xff0c;比如 ls /dev/ -l 其中参数 /dev/ 、-l都是作为参数传递给命令 ls 应用程序又是如何接收这些参数的&#xff1f; 通常应用程序都…

thinkphp文件夹生成zip压缩包

一、准备工作&#xff0c;使用phpinfo()查看有没有zip扩展 <?php echo phpinfo(); ?>Thinkphp使用PHP自带的ZipArchive压缩文件或文件夹 显示enabled 说明已经配置好 如果没有安装扩展的&#xff0c;请参照以下方法&#xff1a; 1、下载对应版本的扩展包&#xff1a…

还不知道指针和引用的区别,一篇文章教会你

1、引用的概念 1.引用不是新定义一个变量&#xff0c;而是给已存在变量取了一个别名 2.编译器不会为引用变量开辟内存空间&#xff0c;它和它引用的变量共用同一块内存空间 比如:孙悟空&#xff0c;可以叫他孙悟空&#xff0c;也可以叫齐天大圣。本质他们就是一个人 2、引用的定…

opencv- CLAHE 有限对比适应性直方图均衡化

CLAHE&#xff08;Contrast Limited Adaptive Histogram Equalization&#xff09;是一种对比度有限的自适应直方图均衡化技术&#xff0c;它能够提高图像的对比度而又避免过度增强噪声。 在OpenCV中&#xff0c;cv2.createCLAHE() 函数用于创建CLAHE对象&#xff0c;然后可以…