延迟队列的理解与使用

目录

一、场景引入

二、延迟队列的三种场景

1、TTL对队列进行延迟

2、创建通用延时消息对消息延迟

3、使用rabbitmq的延时队列插件

x-delayed-message使用

父pom文件

pom文件

配置文件

config

生产者

消费者

结果


一、场景引入

我们知道可以通过TTL来对队列进行设置过期时间;通过后置处理器MessagePostProcessor对消息进行设置过期时间;

 那么根据TTL及MessagePostProcessor机制可以处理关于延迟方面的问题。

比如:秒杀之后,给30分钟时间进行支付,如果30分钟后,没有支付,订单取消。

比如:餐厅订座,A用户早上8点预定的某家餐厅下午6点的座位,B用户中午12点预定的下午5点的座位;根据场景我们需要的时先让B用户进行消费,然后A用户再消费;这时TTL和MessagePostProcessor延迟就已经不能满足订餐的场景了;

因为TTL是对队列进行延迟,MessagePostProcessor是对消息进行延迟,但是MessagePostProcessor对消息延迟是不能根据订座的时间去排序消费的;

/**
 * 比如当我们发送第一个消息时延迟的时间时50s,而发送的第二个消息延迟时间是30s,虽然延迟30s的消息比延迟50s发送的晚
 * 但按照我们设想的情况,延迟30s的消息应该先消费;可是实际情况却不是这样,而是延迟50s的消息到达时间后 30s的才能消费!(队列先进先出)
 * 那这样此方式的不足就出现了!
 * 场景:
 *      A用户和B用户预定餐厅,A用户先开始预定的,预定的是下午6点。B用户比A用户预定操作晚一些,但是B用户预定的时间是下午5点。通过此场景
 *      我们希望的是B用户先进行用餐(因为他预定的吃饭时间比A早一些,需要先安排吃饭。不能说A用户没到6点B用户预定5点的吃不了。),根据此
 *      场景 之前的队列延迟还是消息延迟都不能满足场景需求了,这样就需要另一种延迟方式进行解决了! ->使用rabbitmq的延时队列插件
 */

/**
 * 注意:
 *      TTL是对队列进行延迟,只要是在此队列中的消息都会按照TTL设置时间进行延迟;
 *      MessagePostProcessor是对消息进行延迟;
 *
 *      如果我们不仅使用了消息延迟,而且还使用了队列延迟,那么延迟的时间就会以小的时间为准!
 *   理解:
 *      如果a消息设置的消息延迟是30s,b消息设置的延迟时间是90s,队列设置的延迟是60s。那么a消息最终的延迟是30s(a的消息延迟与队列延迟
 *      比对以延迟时间小的为准!),b消息最终延迟的时间是60s(b的消息延迟与队列延迟比对以延迟的时间小的为准!)
 */

 二、延迟队列的三种场景

1、TTL对队列进行延迟

   @Bean
    public Queue directQueueLong(){
        return   QueueBuilder.durable("业务队列名称")
                .deadLetterExchange("死信交换机名称")
                .deadLetterRoutingKey("死信队列 RoutingKey")
                .ttl(20000) // 消息停留时间
                //.maxLength(500)
                .build();
    }

监听死信队列,即可处理超时的消息队列

缺点:

上述实现方式中,ttl延时队列中所有的消息超时时间都是一样的,如果不同消息想设置不一样的超时时间,就需要建立多个不同超时时间的消息队列,比较麻烦,且不利于维护。

 2、创建通用延时消息对消息延迟

rabbitTemplate.convertAndSend("交换机名称", "RoutingKey","对象",
message => {

 
        message.getMessageProperties().setExpiration(String.valueOf(5000))
       // 设置消息的持久化属性
 
//这样发送的消息就会被持久化到 RabbitMQ 中,即使 RabbitMQ 重启,消息也不会丢失。                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);

        return message;
            }
           );

缺点:

该种方式可以创建一个承载不同超时时间消息的消息队列,但是这种方式有一个问题,如果消息队列中排在前面的消息没有到超时时间,即使后面的消息到了超时时间,先到超时时间的消息也不会进入死信队列,而是先检查排在最前面的消息队列是否到了超时时间,如果到了超时时间才会继续检查后面的消息。

 3、使用rabbitmq的延时队列插件

使用rabbitmq的延时队列插件,实现同一个队列中有多个不同超时时间的消息,并按时间超时顺序出队

1、下载延迟插件

在 RabbitMQ 的 3.5.7 版本之后,提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列 ,同时需保证 Erlang/OPT 版本为 18.0 之后。

下载地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

2、安装插件并启用

下载完成后直接把插件放在 /home/rabbitmq 目录,然后拷贝到容器内plugins目录下(my-rabbit是容器的name,也可以使用容器id)

docker cp /home/rabbitmq/rabbitmq_delayed_message_exchange-3.12.0.ez rabbitmq:/plugins

 进入 Docker 容器

docker exec -it rabbit /bin/bash

 在plugins内启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 退出容器

exit

重启 RabbitMQ

docker restart my-rabbit

 貌似不重启也能生效!

 结果:

 就多了一个x-delayed-message交换机

x-delayed-message使用

父pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.1</version>
<!--        <version>2.2.5.RELEASE</version>-->
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.chensir</groupId>
    <artifactId>spring-boot-rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-rabbitmq</name>

    <properties>
        <java.version>8</java.version>
        <hutool.version>5.8.3</hutool.version>
        <lombok.version>1.18.24</lombok.version>
    </properties>

    <description>spring-boot-rabbitmq</description>

    <packaging>pom</packaging>

    <modules>
        <module>direct-exchange</module>
        <module>fanout-exchange</module>
        <module>topic-exchange</module>
        <module>game-exchange</module>
        <module>dead-letter-queue</module>
        <module>delay-queue</module>
        <module>delay-queue2</module>
    </modules>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>cn.hutool</groupId>
                <artifactId>hutool-all</artifactId>
                <version>${hutool.version}</version>
            </dependency>

            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>${lombok.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>


</project>

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.chensir</groupId>
        <artifactId>spring-boot-rabbitmq</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath>../pom.xml </relativePath>
    </parent>

    <artifactId>delay-queue2</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

配置文件

logging.level.com.chensir = debug
server.port=8086
#host
spring.rabbitmq.host=121.40.100.66
#默认5672
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#连接到代理时用的虚拟主机
spring.rabbitmq.virtual-host=/
#每个消费者每次可最大处理的nack消息数量
spring.rabbitmq.listener.simple.prefetch=1
#表示消息确认方式,其有三种配置方式,分别是none、manual(手动)和auto(自动);默认auto
spring.rabbitmq.listener.simple.acknowledge-mode=auto
#监听重试是否可用
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5
#最大重试时间间隔
spring.rabbitmq.listener.simple.retry.max-interval=3000ms
#第一次和第二次尝试传递消息的时间间隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000ms
#应用于上一重试间隔的乘数
spring.rabbitmq.listener.simple.retry.multiplier=2
#决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
spring.rabbitmq.listener.simple.default-requeue-rejected=false

config

package com.chensir.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    @Bean
    public MessageConverter messageConverter(){
        return  new Jackson2JsonMessageConverter();
    }


    @Bean
    public CustomExchange customExchange(){
        Map<String,Object> args = new HashMap<>();
        //延迟时以direct直连方式绑定
        args.put("x-delayed-type","direct");

        // name:交换机名称 type:类型 固定值x-delayed-message
        return new CustomExchange("chen-x-delayedExchange","x-delayed-message",true,false,args);
    }
    @Bean
    public Queue directQueueLong(){
        return   QueueBuilder.durable("chen-DirectQueue")
                .build();
    }

    @Bean
    public Binding binding(){
        return  BindingBuilder.bind(directQueueLong()).to(customExchange()).with("direct123").noargs();
    }
}

生产者

package com.chensir.provider;

import com.chensir.model.OrderIngOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
@Slf4j
public class DirectProvider {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public  void  send(){
            OrderIngOk orderIngOk = new OrderIngOk();
            orderIngOk.setOrderNo("202207149687-1");
            orderIngOk.setId(1);
            orderIngOk.setUserName("倪海杉前-延迟40秒");
            rabbitTemplate.convertAndSend("chen-x-delayedExchange", "direct123",orderIngOk,m->{
                m.getMessageProperties().setDelay(40*1000); //设置延迟时间,对延迟交换机有效
                // m.getMessageProperties().setExpiration(String.valueOf(30*1000)); 设置过期时间,对队列有效
                return m;
            });

        OrderIngOk orderIngOk2 = new OrderIngOk();
        orderIngOk2.setOrderNo("202207149687-2");
        orderIngOk2.setId(2);
        orderIngOk2.setUserName("倪海杉后-延迟30秒");
        rabbitTemplate.convertAndSend("chen-x-delayedExchange", "direct123",orderIngOk2,m->{
            m.getMessageProperties().setDelay(30*1000);
            return m;
        });
            log.debug("消息生产完成");



    }

}

消费者

package com.chensir.consumer;

import cn.hutool.json.JSONUtil;
import com.chensir.model.OrderIngOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@Slf4j
public class DirectConsumer {

    @RabbitHandler
    @RabbitListener(queues = "chen-DirectQueue" )
    public void  process(OrderIngOk orderIngOk) throws IOException {
        try {
           // 处理业务开始
            log.debug("接受到消息,并正常处理结束,{}", JSONUtil.toJsonStr(orderIngOk));
            // 处理业务结束
        } catch (Exception ex){
            throw ex;
        }
    }
}

结果

2023-08-29 18:27:45.983 DEBUG 15568 --- [           main] com.chensir.provider.DirectProvider      : 消息生产完成
2023-08-29 18:28:16.143 DEBUG 15568 --- [ntContainer#0-1] com.chensir.consumer.DirectConsumer      : 接受到消息,并正常处理结束,{"id":2,"OrderNo":"202207149687-2","userName":"倪海杉后-延迟30秒"}
2023-08-29 18:28:26.057 DEBUG 15568 --- [ntContainer#0-1] com.chensir.consumer.DirectConsumer      : 接受到消息,并正常处理结束,{"id":1,"OrderNo":"202207149687-1","userName":"倪海杉前-延迟40秒"}

可见 延迟40s的是先发送的,但是最终结果是先消费延迟30s的。这样就能达到我们订座的场景需求了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/95845.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【OpenCV入门】第一部分——图像处理基础

本文结构 图像处理的基本操作读取图像imread() 显示图像imshow()waitKey()destroyAllWindows() 保存图像imwrite() 复制图像copy() 获取图像属性 像素确定像素的位置获取像素的BGR值修改像素的BGR值 色彩空间GRAY色彩空间cvtColor()——从BGR色彩空间转换到GRAY色彩空间 HSV色彩…

Spring boot使用Kafka Java反序列化漏洞 CVE-2023-34040

文章目录 0.前言漏洞spring-kafka 介绍 1.参考文档2.基础介绍3.解决方案3.1. 升级版本3.2. 替代方案 4.Spring kafka 使用教程代码示例 0.前言 背景&#xff1a;公司项目扫描到 Spring-Kafka上使用通配符模式匹配进行的安全绕过漏洞 CVE-2023-20873 漏洞 中等风险 | 2023年8月…

Python框架【模板继承 、继承模板实战、类视图 、类视图的好处 、类视图使用场景、基于调度方法的类视图】(四)

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱敲代码的小王&#xff0c;CSDN博客博主,Python小白 &#x1f4d5;系列专栏&#xff1a;python入门到实战、Python爬虫开发、Python办公自动化、Python数据分析、Python前后端开发 &#x1f4e7;如果文章知识点有错误…

C 实现Window/DOS 键盘监听事件

今天是重新复习C语言实现的第一天&#xff0c;今天想编写C 对Windwos/Dos 键盘事件的学习。但是我在安装Visual Studio 2022 没有安装MFC 框架&#xff0c;今天记录下VS追加 MFC框架。 Visual Studio 2022 追加MFC 1、打开vs&#xff0c;点击创建新项目&#xff0c;右侧滑动框…

vue可编辑表格

内容包含:校验。下拉框。输入框。日期控件 效果图 1.代码目录 2.index.js import SjjEditable from ./src/editable.vue // import Vue from vueSjjEditable.install = function (Vue) {Vue.component(SjjEditable.name, SjjEditable) }export default SjjEditable 3.util…

C#基础知识点记录

目录 课程一、C#基础1.C#编译环境、基础语法2.Winform-后续未学完 课程二、Timothy C#底层讲解一、类成员0常量1字段2属性3索引器5方法5.1值参数&#xff08;创建副本&#xff0c;方法内对值的操作&#xff0c;不会影响原来变量的值&#xff09;5.2引用参数&#xff08;传的是地…

Tomcat安装及基本使用

1. 什么是Web服务器 Web服务器是一种应用程序&#xff08;软件&#xff09;&#xff0c;它封装了对HTTP协议的操作&#xff0c;使得开发人员无需直接操作协议&#xff0c;从而简化了Web开发。其主要功能是提供网上信息浏览服务。 Web服务器安装在服务器端&#xff0c;我们可以…

Microsoft Edge 主页启动diy以及常用的扩展、收藏夹的网站

一、Microsoft Edge 主页启动diy 二、常用的扩展 1、去广告&#xff1a;uBlock Origin 2、翻译&#xff1a; 页面翻译&#xff1a;右键就有了&#xff0c;已经内置了划词翻译 3、超级复制 三、收藏夹的网站

[C/C++]指针详讲-让你不在害怕指针

个人主页&#xff1a;北海 &#x1f390;CSDN新晋作者 &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏✨收录专栏&#xff1a;C/C&#x1f91d;希望作者的文章能对你有所帮助&#xff0c;有不足的地方请在评论区留言指正&#xff0c;大家一起学习交流&#xff01;&#x1f9…

spring cloud、gradle、父子项目、微服务框架搭建---spring secuity oauth2、mysql 授权(九)

文章目录 一、二、授权服务2.1 初始化表结构2.2 引入依赖2.3 自定义 用户详情类 UserDetailsService2.4 授权配置 AuthorizationServerConfiguration2.5 Web安全配置 WebSecurityConfiguration2.6 默认生成接口 三、资源服务3.1 引入依赖3.2 资源服务 ResourceServerConfig 四、…

本地镜像管理

查看 用户可以通过docker images命令查看本地所有镜像&#xff0c;如下&#xff1a; 这里一共有五个参数&#xff0c;含义分别如下&#xff1a; REPOSITORY 仓库名称&#xff0c;仓库一般用来存放同一类型的镜像。仓库的名称由其创建者指定。如果没有指定则为<none>。…

爬虫逆向实战(二十七)--某某招标投标网站招标公告

一、数据接口分析 主页地址&#xff1a;某网站 1、抓包 通过抓包可以发现数据接口是page 2、判断是否有加密参数 请求参数是否加密&#xff1f; 通过查看“载荷”模块可以发现&#xff0c;请求参数是一整个密文 请求头是否加密&#xff1f; 无响应是否加密&#xff1f; 通…

Python之动态规划

序言 最近在学习python语言&#xff0c;语言有通用性&#xff0c;此文记录复习动态规划并练习python语言。 动态规划&#xff08;Dynamic Programming&#xff09; 动态规划是运筹学的一个分支&#xff0c;是求解决策过程最优化的过程。20世纪50年代初&#xff0c;美国数学家…

【DevOps视频笔记】8. Jenkins 配置

一、Jenkins 入门配置 1. 工具 / 插件 介绍 二、插件和工具配置 1. 配置 JDK 和 Maven Stage 1&#xff1a;将服务器中 JDK 和 Maven 映射到 jenkins 容器中 Stage 2&#xff1a;jenkins 全局配置中 -- 指定JAVA_HOME目录 Stage 3&#xff1a;jenkins 全局配置中 -- 指定…

Vue 项目性能优化 — 实践指南

前言 Vue 框架通过数据双向绑定和虚拟 DOM 技术&#xff0c;帮我们处理了前端开发中最脏最累的 DOM 操作部分&#xff0c; 我们不再需要去考虑如何操作 DOM 以及如何最高效地操作 DOM&#xff1b;但 Vue 项目中仍然存在项目首屏优化、Webpack 编译配置优化等问题&#xff0c;所…

深度图相关评测网站

文章目录 1 单目/Stereo相关测评网站介绍12 单目/Stereo相关测评网站介绍23 单目/Stereo相关测评网站介绍3 1 单目/Stereo相关测评网站介绍1 https://vision.middlebury.edu/stereo/eval3/ 2 单目/Stereo相关测评网站介绍2 http://www.cvlibs.net/datasets/kitti/eval_stereo…

maven推包The environment variable JAVA_HOME is not correctly set

解决办法&#xff1a; 打开idea查看jdk安装位置 1.在/etc下面创建&#xff08;如果存在就是更新&#xff09;launchd.conf。里面添加一行&#xff1a; setenv JAVA_HOME /Library/Java/JavaVirtualMachines/jdk1.8.0_351.jdk/Contents/Home #JAVA_HOME后面是我的java安装路径…

vscode GDB 调试linux内核 head.S

遇到的问题 此前参考如下文章 https://zhuanlan.zhihu.com/p/510289859 已经完成了在ubuntu 虚拟机用vscode 调试linux 内核。但是美中不足的是&#xff0c;断点最早只能加在__primary_switched() 函数。无法停在更早的断点上&#xff0c;比如ENTRY(stext) 位置。参考《奔跑吧…

数据库的备份与分类以及日志

目录 1、数据库的概念 1.1、数据备份的重要性 1.2、造成数据丢失的原因 1.3、 数据库备份的分类 1.3.1、从物理与逻辑的角度&#xff0c; 1.3.2、原理图 1.3.3.1 完全备份&#xff1a; 1.3.2.2 差异备份 1.2.3.3、 增量备份 1.3.3、 备份方式比较 1.4、常见的备份方…

全新抖音快手小红书去水印系统网站源码 | 支持几十种平台

全新抖音快手小红书去水印系统网站源码 | 支持几十种平台