Spring RabbitMQ 实现消息队列延迟

1.概述

要实现RabbitMQ的消息队列延迟功能,一般采用官方提供的 rabbitmq_delayed_message_exchange插件。但RabbitMQ版本必须是3.5.8以上才支持该插件,否则得用其死信队列功能。

2.安装RabbitMQ延迟插件

  • 检查插件 使用rabbitmq-plugins list命令用于查看RabbitMQ安装的插件。

rabbitmq-plugins list

检查RabbitMQ插件安装情况

a57655d2aca32ef7138f7e33f48f86d5.png
  • 下载插件

如果没有安装插件,则直接访问官网进行下载

https://www.rabbitmq.com/community-plugins.html
a66bfbcf3c1c9602b0f8b2a6fc82c7b3.png d896c1601fcf8339dc9720288c5f93b7.png
  • 安装插件

下载后,将其拷贝到RabbitMQ安装目录的plugins目录;并进行解压,如:

E:\software\RabbitMQ Server\rabbitmq_server-3.11.13\plugins
b0c8e53d6f10563e253c63a9f4345214.png

打开cmd命令行窗口,如果系统已经配置RabbitMQ环境变量,则直接执行以下的命令进行安装;否则需要进入到RabbitMQ安装目录的sbin目录。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
11c3b67b5d724326d0817f624d3ecc95.png

3.实现RabbitMQ消息队列延迟功能

  • pom.xml配置信息文件中,添加相关依赖文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>com.olive</groupId>
 <artifactId>rabbitmq-spring-demo</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.7.7</version>
  <relativePath />
 </parent>
 <dependencies>
  <!--rabbitmq-->
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
  </dependency>
  
 <dependency>
      <groupId>org.eclipse.paho</groupId>
      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
      <version>1.2.5</version>
  </dependency>

 </dependencies>
 <build>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
     <source>1.8</source>
     <target>1.8</target>
    </configuration>
   </plugin>
  </plugins>
 </build>
</project>
  • application.yml配置文件中配置RabbitMQ信息

server:
  port: 8080
spring:
  #给项目来个名字
  application:
    name: rabbitmq-spring-demo
  #配置rabbitMq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin123
    #虚拟host。可以不设置,使用server默认host;不同虚拟路径下的队列是隔离的
    virtual-host: /
  • RabbitMQ配置类

package com.olive.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * RabbitMQ配置类
 **/
@Configuration
public class RabbitMqConfig {
 
 public static final String DELAY_EXCHANGE_NAME = "delayed_exchange";
 
 public static final String DELAY_QUEUE_NAME = "delay_queue_name";
 
 public static final String DELAY_ROUTING_KEY = "delay_routing_key";

 @Bean
 public CustomExchange delayExchange() {
  Map<String, Object> args = new HashMap<>();
  args.put("x-delayed-type", "direct");
  return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);
 }

 @Bean
 public Queue queue() {
  Queue queue = new Queue(DELAY_QUEUE_NAME, true);
  return queue;
 }

 @Bean
 public Binding binding(Queue queue, CustomExchange delayExchange) {
  return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
 }
}
  • 发送消息

实现消息发送,设置消息延迟5s。

package com.olive.service;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.olive.config.RabbitMqConfig;

/**
 * 消息发送者
 **/
@Service
public class CustomMessageSender {
 
 @Autowired
 private RabbitTemplate rabbitTemplate;

 public void sendMsg(String msg) {
  SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  System.out.println("消息发送时间:" + sdf.format(new Date()));
  rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE_NAME, 
    RabbitMqConfig.DELAY_ROUTING_KEY, 
    msg, 
    new MessagePostProcessor() {
     @Override
     public Message postProcessMessage(Message message) throws AmqpException {
      // 消息延迟5秒
      message.getMessageProperties().setHeader("x-delay", 5000);
      return message;
     }
    });
 }
}
  • 接收消息

package com.olive.service;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.olive.config.RabbitMqConfig;

/**
 * 消息接收者
 **/
@Component
public class CustomMessageReceiver {
 
 @RabbitListener(queues = RabbitMqConfig.DELAY_QUEUE_NAME)
 public void receive(String msg) {
  SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  System.out.println(sdf.format(new Date()) + msg);
  System.out.println("Receiver:执行取消订单");
 }
}
  • 测试验证

package com.olive.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.olive.service.CustomMessageSender;

@RestController
public class DelayMessageController {
 
 @Autowired
 private CustomMessageSender customMessageSender;
 
 @GetMapping("/sendMessage")
 public String sendMessage() {
  // 发送消息
  customMessageSender.sendMsg("你已经支付超时,取消订单通知!");
  return "success";
 }

}

发送消息,访问

http://127.0.0.1:8080/sendMessage

查看控制台打印的信息

67b0bd789d8dc7b5b3b6073c2d9241c9.png

3a05b0b00bb8222770378bbcf8c9fe4b.gif

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/15431.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

workerman开发者必须知道的几个问题

1、windows环境限制 windows系统下workerman单个进程仅支持200个连接。 windows系统下无法使用count参数设置多进程。 windows系统下无法使用status、stop、reload、restart等命令。 windows系统下无法守护进程&#xff0c;cmd窗口关掉后服务即停止。 windows系统下无法在一个…

appuploader 常规使用登录方法

转载&#xff1a;登录appuploader 登录appuploader 常规使用登录方法 双击appuploader.exe 启动appuploader 点击底部的未登录&#xff0c;弹出登录框 在登录框内输入apple开发者账号 如果没有apple开发者账号&#xff0c;只是普通的apple账号&#xff0c;请勾选上未支付688…

本地运行 minigpt-4

1.环境部署 参考官方自带的README.MD&#xff0c;如果不想看官方的&#xff0c;也可参考MiniGPT-4&#xff5c;开源免费可本地进行图像对话交互的国产高级大语言增强视觉语言理解模型安装部署教程 - openAI 当然&#xff0c;所有的都要按照作者说明来&#xff0c;特别是版本号…

什么是3D渲染,3D渲染在CG项目中为何如此重要?

随着科技的发展&#xff0c;现如今任何人都可以使用免费软件在个人计算机上创作 3D 图像&#xff0c;当然也有人对于专业 3D 艺术的创作方式及其相关工作流程存在一些误解&#xff0c;认为创建一个模型后&#xff0c;在上面放上材料和纹理&#xff0c;就可以立马得到一个漂亮的…

SpringCloud源码之OpenFeign

OpenFeign 基于 OpenFeign 2.2.6.RELEASE版本进行源码阅读 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId><version>2.2.6.RELEASE</version> </dependen…

【细读Spring Boot源码】监听器合集-持续更新中

前言 监听器汇总 归属监听器名称作用cloudBootstrapApplicationListenercloudLoggingSystemShutdownListenercloudRestartListenercloudLoggingSystemShutdownListenerspringbootEnvironmentPostProcessorApplicationListener用于触发在spring.factories文件中注册的Environm…

市级大数据中心大数据资源平台概要设计方案(ppt可编辑)

本资料来源公开网络&#xff0c;仅供个人学习&#xff0c;请勿商用&#xff0c;如有侵权请联系删除。 大数据管理中心发展背景 为建设卓越全球城市&#xff0c;实现政府治理能力现代化目标&#xff0c;由市大数据中心牵头&#xff0c;在政务公共数据管理和互联网政务服务方面…

numpy的下载、数据类型、属性、数组创建

下载numpy 因为numpy不依赖于任何一个包所以numpy可以直接使用pip命令直接下载 下载命令&#xff1a; pip install numpy # 默认从https://pypi.org/simple 下载 pip install numpy -i https://pypi.tuna.tsinghua.edu.cn/simple/ # 从清华大学资源站点下载 pip install nump…

UG NX二次开发(C#)-显示-更改对象颜色

文章目录 1、前言2、UG NX中的更换对象颜色的功能3、采用UG NX二次开发实现颜色修改3.1 采用直接赋值对象颜色不能直接更改对象颜色3.2 采用NewDisplayModification的方法如下:1、前言 当一个三维模型展现在我们面前时,总会有颜色赋予三维模型的对象上,比如红色、蓝色、银灰…

if条件语句

if条件语句 条件测试 test 测试表达式是否成立&#xff0c;若成立返回0&#xff0c;否则返回其他数值 格式1 &#xff1a;test 条件表达式&#xff1b;格式2 &#xff1a;[ 条件表达式 ] echo $?参数作用-d测试是否为目录 (Directory)-e测试目录或文件是否存在(Exist)-f测…

直线导轨水平仪零位调整方法

对于直线导轨的使用&#xff0c;相信很多人都知道&#xff0c;这主要是因为直线导轨的使用范围非常广泛&#xff0c;小到抽屉&#xff0c;大到机械设备&#xff0c;我们都能看到他的身影&#xff0c;接触得多自然就熟悉了。 事实上&#xff0c;大家对直线导轨的了解可能就仅限于…

Cortex-A7中断详解(一)

STM32中断系统回顾 中断向量表NVIC&#xff08;内嵌向量中断控制器&#xff09;中断使能中断服务函数 中断向量表 中断向量表是一个表&#xff0c;表里面存放的是中断向量。 中断服务程序的入口地址或存放中断服务程序的首地址成为中断向量&#xff0c;因此中断向量表是一系…

【Linux入门】linux指令(1)

【Linux入门】linux指令&#xff08;1&#xff09; 目录 【Linux入门】linux指令&#xff08;1&#xff09;操作系统登录服务器Linux下的基本指令ls指令pwd指令Linux路径分割符 /cd指令touch指令mkdir指令&#xff08;重要&#xff09;rmdir指令&&rm指令&#xff08;重…

linux实现网络程序

1️⃣ 在linux下&#xff0c;通过套接字实现服务器和客户端的通信。 2️⃣ 实现单线程、多线程通信。或者实现线程池来通信。 3️⃣ 优化通信&#xff0c;增加守护进程。 有情提醒&#xff0c;类里面默认的函数是内联。内联函数在调用的地方展开&#xff0c;没有函数地址&…

Mac使用命令行工具解压和压缩rar文件

目前在Mac电脑里支持解压缩的格式主要有&#xff1a;zip、gz等&#xff0c;但是还不支持rar格式的文件&#xff0c;接下来带着大家学习一下如何解压缩rar格式文件。 1.下载rar工具 打开&#xff1a;https://www.rarlab.com/download.htm 根据自己电脑的芯片要求选择自己的安装…

【计算机基本原理-数据结构】数据结构中树的详解

【计算机基本原理-数据结构】数据结构中树的详解 1&#xff09;总览2&#xff09;树的相关概念3&#xff09;二叉树、满二叉树、完全二叉树4&#xff09;二叉查找树 - BST5&#xff09;平衡二叉树 - AVL6&#xff09;红黑树7&#xff09;哈弗曼树8&#xff09;B 树9&#xff09…

TCP流量控制与拥塞控制

什么是流量控制 一条TCP连接的每一侧主机都为该连接设置了接收缓存。当该TCP连接接收到正确的、有序的报文段&#xff0c;就会将数据放入接收缓存。相关联的应用会从缓存中读取数据。 如果发送者发送数据过快、过多&#xff0c;而接收方的应用程序从缓冲区读取的速度较慢&…

机器学习实战教程(十):逻辑回归

概述 逻辑回归&#xff08;Logistic Regression&#xff09;是一种用于解决二分类或多分类问题的统计学习方法。它以自变量线性组合的形式进行建模&#xff0c;并使用Sigmoid函数将结果映射到[0, 1]的值域内&#xff0c;表示样本属于某个类别的概率。 Logistic Regression是最…

Stable Diffusion-生式AI的新范式

! 扩散模型&#xff08;Stable Diffusion)现在是生成图像的首选模型。由于扩散模型允许我们以提示( prompts)为条件生成图像&#xff0c;我们可以生成我们所选择的图像。在这些文本条件的扩散模型中&#xff0c;稳定扩散模型由于其开源性而最为著名。 在这篇文章中&#xff0…

STM32平衡小车 TB6612电机驱动学习

TB6612FNG简介 单片机引脚的电流一般只有几十个毫安&#xff0c;无法驱动电机&#xff0c;因此一般是通过单片机控制电机驱动芯片进而控制电机。TB6612是比较常用的电机驱动芯片之一。 TB6612FNG可以同时控制两个电机&#xff0c;工作电流1.2A&#xff0c;最大电流3.2A。 VM电…