如何使用Java和RabbitMQ实现延迟队列?

前言

今天我们使用Java和RabbitMQ实现消息队列的延迟功能。

前期准备,需要安装好docker、docker-compose的运行环境。

需要安装RabbitMQ的可以看下面这篇文章。

如何使用PHP和RabbitMQ实现消息队列?-CSDN博客

今天讲的是依赖RabbitMQ的延迟插件实现消息队列的延迟功能。

如何安装RabbitMQ的延迟插件并且启用,可以看下面的这篇文章。

如何使用PHP和RabbitMQ实现延迟队列(方式一)?_php调rabbit 设置延时-CSDN博客

一、编写代码

1、使用springboot框架快速搭建一个项目。

2、在 pom.xml 中添加 Spring Boot AMQP 的依赖,内容如下。

<dependency>  
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-amqp</artifactId>  
</dependency>

3、在 application.yml 中配置 RabbitMQ 的连接信息,内容如下。

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

4、在配置类中定义交换机、队列和绑定,内容如下。

package com.ayzen.hello;

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {

	public static final String DELAYED_EXCHANGE = "delayed_exchange";
	public static final String DELAYED_QUEUE = "delayed_queue";
	public static final String ROUTING_KEY = "delayed_key";

	@Bean
	CustomExchange delayedExchange() {
		Map<String, Object> args = new HashMap<>();
		args.put("x-delayed-type", "direct");
		return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
	}

	@Bean
	Queue delayedQueue() {
		return new Queue(DELAYED_QUEUE, true);
	}

	@Bean
	Binding binding(Queue delayedQueue, CustomExchange delayedExchange) {
		return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(ROUTING_KEY).noargs();
	}
}

5、创建一个生产者,发送一个带有延迟属性的消息,内容如下。

package com.ayzen.hello;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/test")
public class TestController {
	final Logger logger = LoggerFactory.getLogger(getClass());

	@Autowired
	RabbitTemplate rabbitTemplate;

	@GetMapping("/send")
	public ResponseEntity<Object> send() {
		this.sendDelayMessage("sendDelayMessage", 5000);
		return ResponseEntity.ok(ResponseDto.success("ok"));
	}

	private void sendDelayMessage(String message, long ttlInMilliseconds) {
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setHeader("x-delay", ttlInMilliseconds);
		Message msg = MessageBuilder.withBody(message.getBytes()).andProperties(messageProperties).build();

		rabbitTemplate.convertAndSend("delayed_exchange", "delayed_key", msg);

		logger.info("send message to rabbitmq.");
	}
}

6、创建一个消息者,监听接收队列中的消息,内容如下。

package com.ayzen.hello;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TestService {
	final Logger logger = LoggerFactory.getLogger(getClass());

	@RabbitListener(queues = "delayed_queue")
	public void process(String message) {
		logger.info("process message from rabbitmq,message={}", message);
	}
}

7、至此,测试项目代码已完成,下一步将进行验证。

二、测试验证

1、启动服务。

2、调用生产者,执行如下代码。

curl http://127.0.0.1:8080/test/send

3、查看日志,正常情况会返回如下内容。

如上图所示,在2024-04-07T22:32:47.489+08:00接收到生产者的请求,然后在2024-04-07T22:32:52.588+08:00执行消费动作,延迟5秒。

4、至此,使用Java和RabbitMQ实现延迟队列的功能已验证完毕。

总结

用Java和RabbitMQ实现消息队列的延迟功能,其实依靠的是RabbitMQ的一个延迟插件,主要有以下几个步骤。

1、安装RabbitMQ延迟插件。

2、编写Java测试项目。

3、进行测试验证。

上面的代码只是做个简单的示例,如果运用到实际的项目当中需要做进一步的优化。

最后因本人能力有限,有什么不对的地方望各位大佬指出好让我改进,多多包含,谢谢大家。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/523659.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

(React生命周期)前端八股文修炼Day8

一 React的生命周期有哪些 React组件的生命周期可以分为三个主要阶段&#xff1a;挂载&#xff08;Mounting&#xff09;、更新&#xff08;Updating&#xff09;和卸载&#xff08;Unmounting&#xff09;。React类组件的生命周期方法允许你在组件的不同阶段执行代码。 挂载…

多线程4

死锁 想获取到第二把锁&#xff0c;就需要执行完第一层大括号&#xff0c;想要执行完第一层大括号&#xff0c;就要先获取到第二层的锁。 synchronized (counter2){ synchronized (counter2){} } 例子:t2先启动&#xff0c;t2进行加锁后一定成功&#xff0c;但是如果t2进行二…

[AI in sec]-039 DNS隐蔽信道的检测-特征构建

DNS隐蔽信道是什么 DCC是指利用DNS数据包中的可定义字段秘密传递信息的通道。其中,“DNS 协议”是目前网络上使用的标准域名解析协议;“可定义字段”是DNS 数据包中的 QNAME 字段、RDATA 字段及RawUDP字段。利用DNS数据包可以构建2种信道:存储信道及时间信道。DCC可以被用于…

【GameFi】 Brilliantcrypto点火活动

活动&#xff1a;https://app.galxe.com/quest/brilliantcrypto/GCt8wthq2J Brilliantcrypto点火活动正在Galxe上进行 &#x1f389; 活动时间&#xff1a;2024/04/06 12:00 ~ 2024/05/04 12:00 奖励总价值1200美元的MATIC 完成任务並在Brilliantcrypto Galxe Space上赚取积分。…

[dvwa] Command Injection

命令注入 0x01 low 没有过滤&#xff0c;直接利用 127.0.0.1 && ip a 函数 php_uname(mode) 动态地检查服务器的操作系统 ‘s’&#xff1a;操作系统名称 ‘n’&#xff1a;网络主机名 ‘r’&#xff1a;操作系统发行版本号 ‘v’&#xff1a;操作系统版本 ‘m’&…

CleanMyMac有必要购买吗?有哪些功能

作为一位产品营销专家&#xff0c;对各类软件产品的功能和特点都有深入的研究&#xff0c;对于CleanMyMac这款产品也有深入了解。CleanMyMac是一款专为Mac用户设计的系统清理与优化软件&#xff0c;旨在帮助用户解决Mac电脑使用过程中的各种问题&#xff0c;让电脑恢复如新的状…

12、最小覆盖子串

如何想到这个解法 问题的特点&#xff1a; 首先&#xff0c;认识到这是一个关于子串的问题&#xff0c;而且需要考虑子串的最小长度。这提示我们可能需要使用一种方式来逐步探索不同的子串。滑动窗口的适用性&#xff1a;滑动窗口是处理子串问题的常用技巧&#xff0c;特别是当…

【系统架构师】-软件架构设计

1、软件架构的概念 架构的本质 1、软件架构为软件系统提供了一个结构、行为和属性的高级抽象。 2、软件架构风格是特定应用领域的惯用模式&#xff0c;架构定义一个词汇表和一组约束。 架构的作用 1、软件架构是项目干系人进行交流的手段。 2、软件架构是可传递和可复用的模型…

ESP32S3网络编程学习笔记(1)—— Wi-Fi扫描实验

前言 &#xff08;1&#xff09;如果有嵌入式企业需要招聘湖南区域日常实习生&#xff0c;任何区域的暑假Linux驱动/单片机/RTOS的实习岗位&#xff0c;可C站直接私聊&#xff0c;或者邮件&#xff1a;zhangyixu02gmail.com&#xff0c;此消息至2025年1月1日前均有效 &#xff…

深澜计费管理系统 任意文件读取漏洞复现

0x01 产品简介 深澜计费管理系统是是一套完善的领先的具有复杂生物型特征的弹性认证计费系统。系统主要由 AAA 认证计费平台、系统运营维护管理平台、用户及策略管理平台、用户自助服务平台、智能客户端模块、消息推送模块、数据统计模块组成。目前在全球为超过 2500 家客户提…

Elastic AI Assistant for Observability 和 Microsoft Azure OpenAI 入门

作者&#xff1a;来自 Elastic Jonathan Simon 最近&#xff0c;Elastic 宣布 AI 观测助手现已正式向所有 Elastic 用户开放。该 AI 观测助手为 Elastic 观测提供了一种新工具&#xff0c;提供了大型语言模型&#xff08;LLM&#xff09;连接的聊天和上下文洞察&#xff0c;以解…

node res.end返回json格式数据

使用 Node.js 内置 http 模块的createServer()方法创建一个新的HTTP服务器并返回json数据&#xff0c;代码如下&#xff1a; const http require(http);const hostname 127.0.0.1; const port 3000;const data [{ name: 测试1号, index: 0 },{ name: 测试2号, index: 1 },…

优秀企业都在用的企微知识库,再不搭建就晚了!

每个团队都在寻找让工作效率提升的方法。如果你想知道哪些团队能够高效地完成任务&#xff0c;而另一些却步履维艰&#xff0c;那么答案可能就是“企业微信知识库”。见过很多团队都在使用它&#xff0c;而且效果非常显著。如果你还没有搭建属于自己的企微知识库&#xff0c;可…

1.c++入门(命名空间、缺省参数、函数重载、引用、内联函数、for循环、auto关键字、指针空值nullptr)

1.c的第一个程序 // 方法一 #include<iostream>// namespace为命名空间的关键字&#xff0c;std为空间名&#xff1b; C标准库的东西放进std命名空间 using namespace std; int main() {cout << "hello world" << endl;return 0; }// 方法二 #in…

Python标准数据类型—字符串常用方法

在Python中&#xff0c;字符串是一种常见的数据类型&#xff0c;用于表示文本信息。Python提供了许多内置方法来处理字符串&#xff0c;使得对文本的操作变得非常方便。在本篇博客中&#xff0c;我们将介绍一些常用的字符串方法&#xff0c;帮助您更好地理解和利用字符串。 &a…

基于java web的超市管理系统

摘要 随着社会经济的不断发展&#xff0c;人们的生活水平不断提高。越来越多的零售行业得到了快速的发展&#xff0c;以最常见的超市最为明显。零售行业繁荣的背后也随之带来了许多行业隐患&#xff0c;越来越激烈的行业竞争不断的要求经营者更加高要求的管理超市内部的整个供…

4.7学习总结

java学习 一.Stream流 (一.)概念: Stream将要处理的元素集合看作一种流&#xff0c;在流的过程中&#xff0c;借助Stream API对流中的元素进行操作&#xff0c;比如&#xff1a;筛选、排序、聚合等。Stream流是对集合&#xff08;Collection&#xff09;对象功能的增强&…

算法 第34天 贪心3

1005 K 次取反后最大化的数组和 给你一个整数数组 nums 和一个整数 k &#xff0c;按以下方法修改该数组&#xff1a; 选择某个下标 i 并将 nums[i] 替换为 -nums[i] 。 重复这个过程恰好 k 次。可以多次选择同一个下标 i 。 以这种方式修改数组后&#xff0c;返回数组 可能…

JavaEE——手把手教你实现简单的 servlet 项目

文章目录 一、什么是 Servlet二、创建一个简单的 Servlet 程序1. 创建项目2.引入依赖3. 创建目录4.编写代码5. 打包程序6. 部署7.验证整体过程总结 三、使用 Smart Tomcat 插件简化项目创建四、创建项目时可能遇到的几个问题。 一、什么是 Servlet Servlet 是一种实现 动态页面…

Bigtable [OSDI‘06] 论文阅读笔记

原论文&#xff1a;Bigtable: A Distributed Storage System for Structured Data (OSDI’06) 1. Introduction Bigtable 是一种用于管理结构化数据的分布式存储系统&#xff0c;可扩展到非常大的规模&#xff1a;数千台服务器上的数据量可达 PB 级别&#xff0c;同时保证可靠…