1.官网下载
在添加链接描述下载rabbitmq_delayed_message_exchange 插件,本文以v3.10.0为例
1.1.上传安装包
scp /Users/hong/资料/rabbitmq_delayed_message_exchange-3.10.0.ez root@10.211.55.4:/usr/local/software
1.2.将文件移入RabbitMQ的安装目录下的plugins目录
mv rabbitmq_delayed_message_exchange-3.10.0.ez /usr/local/software/rabbitmq_server-3.10.0/plugins
1.3.安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
1.4 重启后验证
rabbitmq-server start
2.两种实现方式图解
3.基于插件的延迟队列配置类
package com.hong.springboot.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import java.util.HashMap;
import java.util.Map;
/**
* @Description: 延迟队列配置类
* @Author: hong
* @Date: 2024-02-25 20:19
* @Version: 1.0
**/
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";
/**
* 基于延迟插件声明自定义交换机
* @return
*/
@Bean
public CustomExchange delayedExchange(){
Map<String,Object> map = new HashMap<>();
map.put("x-delayed-type","direct");
/**
* 声明自定义交换机
* 第1个参数:交换机名称
* 第2个参数:交换机类型
* 第3个参数:是否需要持久化
* 第4个参数:是否需要自动删除
* 第5个参数:其他参数
*/
return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,map);
}
@Bean
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE_NAME);
}
@Bean
public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
4.生产者发送消息
/**
* 基于延迟插件的发送消息
* @param message
* @param delayTime 延迟时间
*/
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, correlationData -> {
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info("当前时间:{},发送一条时长{}毫秒TTL信息给延迟队列delayed.queue:{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , delayTime, message);
}
5.消费者端代码
package com.hong.springboot.rabbitmq.consumer;
import com.hong.springboot.rabbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Description: 基于延迟插件的延迟消费者
* @Author: hong
* @Date: 2024-02-25 21:27
* @Version: 1.0
**/
@Slf4j
@Component
public class DelayQueueConsumer {
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayMessage(Message message){
String msg = new String(message.getBody());
log.info("当前时间:{},收到延迟队列信息{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , msg);
}
}
http://localhost:8080/ttl/sendDelayMsg/hello rabbitmq 1/20000
http://localhost:8080/ttl/sendDelayMsg/hello rabbitmq 2/2000
基于插件的延迟与基于死信队列的结果恰好相反更符合预期,因此在实际项目中通常采用延迟插件方式来实现rabbitMQ的延迟队列