RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

1、前往RabbitMQ官网下载往RabbitMQ添加延迟消息的插件

RabbitMQ官网下载插件的网址:https://www.rabbitmq.com/community-plugins.html

2、下载rabbitmq_delayer_message_exchange插件(注:RabbitMQ是什么版本的,下载的插件就得是什么版本的,得对应上,以下截图为官方文档的对插件版本的要求说明) 

 3、把这个插件传输到服务器上

4、根据官网的指示把插件放到RabbitMQ指定的文件夹下

RabbitMQ官网指示安装插件步骤的网址:https://www.rabbitmq.com/installing-plugins.html

我这里安装RabbitMQ的系统是CentOS,所以放在

5、拷贝插件到指定的目录下

例:

cp rabbitmq_delayed_message_exchange-3.10.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins/

效果图:

 6、安装延迟队列插件

输入以下命令安装延迟队列插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

效果图:

7、重启RabbitMQ

输入以下命令重启RabbitMQ

systemctl restart rabbitmq-server.service

效果图:

8、查看插件是否安装成功

 进入RabbitMQ的管理页面,进入Exchange的管理页面,新增Exchange,在Type里面可以看到x-delayed-message的选项,证明延迟队列插件安装成功

9、基于插件实现延迟队列的原理示意图

原先我们没下插件之前实现延迟队列是基于图下这种方式实现的

但我们下载插件后就能通过交换机延迟消息的方式来实现消息的延迟了(由步骤8可见,我们验证插件是否安装成功是从Exchange进去的,而不是从Queues进去的)

10、基于插件延迟队列的代码实现

(1)在config包里新建一个名为DelayedQueueConfig的类用于编写配置队列延迟的代码

代码如下:

package com.ken.springbootrqbbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayedQueueConfig {

    //队列
    public static final String DELAYED_QUEUE_NAME = "delayed_queue";

    //交换机
    public static final String DELAYED_EXCHANGE_NAME = "DELAYED_EXCHANGE";

    //交换机
    public static final String DELAYED_ROUTING_KEY = "delayed";

    //声明延迟队列
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    //声明延迟交换机
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> arguments = new HashMap<>(3);
        //设置延迟类型
        arguments.put("x-delayed-type","direct");
        /**
         * 声明自定义交换机
         * 第一个参数:交换机的名称
         * 第二个参数:交换机的类型
         * 第三个参数:是否需要持久化
         * 第四个参数:是否自动删除
         * 第五个参数:其他参数
         */
        return new CustomExchange(DELAYED_QUEUE_NAME,"x-delayed-message",true,false,arguments);
    }

    //绑定队列和延迟交换机
    @Bean
    public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
                                                      @Qualifier("delayedExchange") Exchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }

}

 (2)在SendMsgController类里写一个接口,让其能往延迟队列里发送消息

代码如下:

package com.ken.springbootrqbbitmq.controller;

import com.ken.springbootrqbbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * 发送延迟消息
 */
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date(),message);
        rabbitTemplate.convertAndSend("normal_exchange","normal01","消息来着ttl为10s的队列:" + message);
        rabbitTemplate.convertAndSend("normal_exchange","normal02","消息来着ttl为40s的队列:" + message);
    }

    @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {
        log.info("当前时间:{},发送一条时长{}毫秒的TTL消息给normal03队列:{}", new Date(),ttlTime,message);
        rabbitTemplate.convertAndSend("normal_exchange","normal03",message,msg -> {
            //发送消息的时候延迟时长
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }

    /**
     * 给延迟队列发送消息
     * @param message
     * @param delayTime
     */
    @GetMapping("/sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {
        log.info("当前时间:{},发送一条时长{}毫秒的消息给延迟队列:{}", new Date(),delayTime,message);
        rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_QUEUE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg -> {
            //发送消息的时候延迟时长
            msg.getMessageProperties().setDelay(delayTime);
            return msg;
        });
    }

}

(3)在consumer包里新建一个名为DelayQueueConsumer的类用于编写消费延迟队列的消费者代码

效果图:

代码如下:

package com.ken.springbootrqbbitmq.consumer;

import com.ken.springbootrqbbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 延迟队列消费者
 */
@Slf4j
@Component
public class DelayQueueConsumer {

    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    private void receiveDelayQueue(Message message) {
        String msg = new String(message.getBody());
        log.info("当前时间{},收到延迟队列的消息",new Date(),msg);
    }

}

(4)启动项目,往浏览器输入接口地址和参数,从而调用接口

[1]第一条消息

http://localhost:8080/ttl/sendDelayMsg/我是第一条消息/20000

[2]第二条消息

http://localhost:8080/ttl/sendDelayMsg/我是第二条消息/2000

效果图:

结论:基于测试发现在使用延迟插件的情况下,延迟时间短的消息会被先消费,这证明基于插件的延迟消息达到预期效果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/35275.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【UE5 Cesium】12-Cesium for Unreal 去除左下角的icon

问题 在视口左下角的icon如何去除&#xff1f; 解决方法 打开“CesiumCreditSystemBP” 将“Credit Widget Class”一项中的“ScreenCredit”替换为“ScreenCreditWidget” 编译之后icon就不显示了。

2023年5月PETS5(WSK)考试经验分享

由于本人明年打算出国联培的缘故&#xff0c;CSC国家留学基金委需要申请人的语言成绩达到一定的要求 英语&#xff08;PETS5&#xff09;&#xff1a;笔试总分55分&#xff08;含&#xff09;以上&#xff0c;其中听力部分18分&#xff08;含&#xff09;以上&#xff0c;口试…

2023最新AI创作系统/ChatGPT商业运营版网站程序源码+支持GPT4+支持ai绘画(MJ)+实时语音识别输入+免费更新版本

2023最新AI创作系统/ChatGPT商业运营版网站程序源码支持ai绘画支持GPT4.0实时语音识别输入文章资讯发布功能用户会员套餐免费更新版本 一、AI创作系统二、系统介绍三、系统程序下载四、安装教程五、主要功能展示六、更新日志 一、AI创作系统 1、提问&#xff1a;程序已经支持G…

“生鲜蔬”APP的设计与实现

1.引言 在这个科技与网络齐头并进的时代&#xff0c;外卖服务正在飞速发展&#xff0c;人们对外卖APP系统功能需求越来越多&#xff0c;开发APP的人员对自己的要求也要越来越高&#xff0c;要从所做APP外卖系统所实现的功能和用户的需求来对系统进行设计&#xff0c;还需要与当…

2023年船舶、海洋与海事工程国际会议(NAOME 2023) | Ei Scopus双检索

会议简介 Brief Introduction 2023年船舶、海洋与海事工程国际会议(NAOME 2023) 会议时间&#xff1a;2023年10月20日-22日 召开地点&#xff1a;中国镇江 大会官网&#xff1a;NAOME 2023-2023 International Conference on Naval Architecture and Ocean & Marine Engine…

腾讯云对象存储联合DataBend云数仓打通数据湖和数据仓库

随着数字化进程不断深入&#xff0c;数据呈大规模、多样性的爆发式增长。为满足更多样、更复杂的业务数据处理分析的诉求&#xff0c;湖仓一体应运而生。在Gartner发布的《Hype Cycle for Data Management 2021》中&#xff0c;湖仓一体&#xff08;Lake house&#xff09;首次…

ModaHub魔搭社区:基于阿里云 ACK 搭建开源向量数据库 Milvus

目录 一、准备资源 二、集群创建&#xff1a; 本集群基于Terway网络构建 二、连接刚刚创建的ACK集群 三、部署Milvus数据库 四、优化Milvus配置 简介&#xff1a; 生成式 AI&#xff08;Generative AI&#xff09;引爆了向量数据库&#xff08;Vector Database&#xff0…

【链表OJ】删除链表中重复的结点

⭐️ 往期链表相关OJ &#x1f4ab;链接1&#xff1a;链表分割 &#x1f4ab;链接2&#xff1a;链表中倒数第k个结点(快慢指针问题) &#x1f4ab;链接3&#xff1a;leetcode 876.链表的中间结点(快慢指针问题) &#x1f4ab;链接4&#xff1a;leetcode 206.反转链表 &#x1…

【数据结构与算法】内排序算法比较(C\C++)

实践要求 1. 问题描述 各种内部排序算法的时间复杂度分析结果只给出了算法执行时间的阶&#xff0c;或大概执行时间&#xff0c;试通过随机的数据比较各算法的关键字比较次数和关键字移动次数&#xff0c;以取得直观感受。 2. 基本要求 对以下10种常用的内部排序算法进行比较…

【mysql实践】如何查看阿里云RDS的MySQL库中的binlog日志

背景&#xff1a; 工作中我们为了查看MySQL中数据修改的历史记录时&#xff0c;会通过查看binlog日志。但由于binlog日志是二进制文件&#xff0c;需要解析之后&#xff0c;才能用文本查看工具打开。这次笔者使用flink进行实时统计时就多次遇到了这个问题。经常看笔者最近博客…

redhat6安装mysql8.0.33

1、下载mysql 官网地址&#xff1a;https://downloads.mysql.com/archives/community/ 下载步骤&#xff1a; 过滤操作系统版本 下载后&#xff0c;上传到服务器Downloads目录 2、安装mysql8 解压压缩包 tar -xvf mysql-8.0.31-1.el9.x86_64.rpm-bundle.tar [rootrhel64 …

山海鲸Cesium:帮你用更简单的方式升级视效

CesiumJS作为绝大多数人都在用的开源地球可视化引擎&#xff0c;视觉效果并不拔尖&#xff0c;这让很多giser都想着有一天升级一下视效&#xff0c;从众多平庸的项目中脱颖而出。然而&#xff0c;对于一些使用Cesium的项目来说&#xff0c;要想达到Cesium for unreal的视觉效果…

Jetson Orin Nano Developer Kit

Jetson Orin Nano Developer Kit包括Jetson Orin Nano 8GB模块&#xff0c;该模块具有NVIDIA安培GPU(具有1024个CUDA内核和32个第三代张量内核)和6核ARM CPU&#xff0c;能够运行多个并发AI应用程序管道并提供高推断性能。 开发套件载体板支持所有Jetson Orin Nano和Orin NX模块…

多层感知机与深度学习算法概述

多层感知机与深度学习算法概述 读研之前那会儿我们曾纠结于机器学习、深度学习、神经网络这些概念的异同。现在看来深度学习这一算法竟然容易让人和他的爸爸机器学习搞混…可见深度学习技术的影响力之大。深度学习&#xff0c;作为机器学习家族中目前最有价值的一种算法&#…

Java安全——安全提供者

Java安全 安全提供者 在Java中&#xff0c;安全提供者&#xff08;Security Provider&#xff09;是一种实现了特定安全服务的软件模块。它提供了一系列的加密、解密、签名、验证和随机数生成等安全功能。安全提供者基础设施在Java中的作用是为开发人员提供一种扩展和替换标准…

Java性能权威指南-总结26

Java性能权威指南-总结26 数据库性能的最佳实践异常日志 数据库性能的最佳实践 异常 Java的异常处理一直有代价高昂的坏名声。其代价确实比处理正常的控制流高一些&#xff0c;不过在大多数情况下&#xff0c;这种代价并不值得浪费精力去绕过。另一方面&#xff0c;因为异常处…

【面试】美团面试真题和答案

文章目录 前言1.线程池有几种实现方式&#xff1f;2.线程池的参数含义&#xff1f;3.锁升级的过程&#xff1f;4.i 如何保证线程安全&#xff1f;5.HashMap和ConcurrentHashMap有什么区别&#xff1f;6.Autowired和Resource区别&#xff1f;7.说说常用的设计模式8.Redis为什么这…

SpringBoot2+Vue2实战(十二)springboot一对一,一对多查询

新建数据库表 Course Data TableName("t_course") public class Course implements Serializable {private static final long serialVersionUID 1L;/*** id*/TableId(value "id", type IdType.AUTO)private Integer id;/*** 课程名称*/private String…

微信小程序制作 购物商城首页 【内包含源码】

1、实现效果 手机效果预览,这里的首页使用到了轮播图。页面图片数据可以替换成自己的数据。 2、开发者工具效果图 3、项目的目录结构 4、首页核心代码 4.1 index.js 这里用来存放数据,页面的数据。目前是假数据,也可以调用接口接收真实数据 // index.jsimport {request }…

【我的创作纪念日】关于某站的音频爬虫+GUI

文章目录 一、前言&机遇二、爬虫代码三、爬虫GUI四、文件打包五、结果展示未来可期 一、前言&机遇 许久没看私信内容&#xff0c;一上线就看到了官方的私信&#xff0c;我已经来到CSDN1024天啦&#xff01; 想到注册这个号的初衷是学习记录爬虫&#xff0c;后面渐渐变…