前言
昨天写了一篇关于Java和RabbitMQ使用插件实现延迟队列功能的文章,今天来讲下另外一种方式,不需要RabbitMQ的插件。
前期准备,需要安装好docker、docker-compose的运行环境。
需要安装RabbitMQ的可以看下面这篇文章。
如何使用PHP和RabbitMQ实现消息队列?-CSDN博客
使用RabbitMQ插件实现延迟队列功能的文章,可以查看下面的文章。
如何使用Java和RabbitMQ实现延迟队列?-CSDN博客
一、编写代码
1、使用springboot框架快速搭建一个项目。
2、在 pom.xml
中添加 Spring Boot AMQP 的依赖,内容如下。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3、在 application.yml
中配置 RabbitMQ 的连接信息,内容如下。
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
4、在配置类中定义交换机、队列和绑定,内容如下。
package com.ayzen.hello;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
@Bean
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange"); // 设置死信交换机
args.put("x-dead-letter-routing-key", "dlx_key"); // 设置死信路由
args.put("x-message-ttl", 5000); // 设置默认 TTL(毫秒),也可以动态设置
return new Queue("delay_queue", true, false, false, args);
}
@Bean
public Queue dlxQueue() {
return new Queue("dlx_queue", true);
}
@Bean
public DirectExchange delayExchange() {
return new DirectExchange("delay_exchange");
}
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("dlx_exchange");
}
@Bean
public Binding binding(Queue delayQueue, DirectExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with("dlx_key");
}
@Bean
public Binding dlxBinding(Queue dlxQueue, DirectExchange dlxExchange) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("dlx_key");
}
}
5、创建一个生产者,发送一个带有延迟属性的消息,内容如下。
package com.ayzen.hello;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/test")
public class TestController {
final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public ResponseEntity<Object> send() {
this.sendDelayMessage("sendDelayMessage", 5000);
return ResponseEntity.ok(ResponseDto.success("ok"));
}
private void sendDelayMessage(String message, long ttlInMilliseconds) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration(Long.toString(ttlInMilliseconds));
Message msg = MessageBuilder.withBody(message.getBytes()).andProperties(messageProperties).build();
rabbitTemplate.send("delay_exchange", "dlx_key", msg);
logger.info("send message to rabbitmq.");
}
}
6、创建一个消息者,监听接收队列中的消息,内容如下。
package com.ayzen.learnjava.hello.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TestService {
final Logger logger = LoggerFactory.getLogger(getClass());
@RabbitListener(queues = "dlx_queue")
public void process(String message) {
logger.info("process message from rabbitmq,message={}", message);
}
}
7、至此,测试项目代码已完成,下一步将进行验证。
二、测试验证
1、启动服务。
2、调用生产者,执行如下代码。
curl http://127.0.0.1:8080/test/send
3、查看日志,正常情况会返回如下内容。
如上图所示,在2024-04-08T10:23:40.478+08:00接收到生产者的请求,然后在2024-04-08T10:23:45.587+08:00执行消费动作,延迟5秒。
4、至此,使用Java和RabbitMQ实现延迟队列的功能已验证完毕。
总结
用Java和RabbitMQ实现消息队列的延迟功能的另外一种方式,其实依靠的是死信交换机,主要有以下几个步骤。
1、配置死信交换机(Dead-Letter-Exchange)。
2、编写Java测试项目。
3、进行测试验证。
上面的代码只是做个简单的示例,如果运用到实际的项目当中需要做进一步的优化。
最后因本人能力有限,有什么不对的地方望各位大佬指出好让我改进,多多包含,谢谢大家。