一、RabbitMQ下载并使用插件
1、查看RabbitMQ插件的文件路径
docker inspect rabbitmq
找到Mounts下面Name:rabbitmq_plugin的Source即为插件路径
使用 cd 进入到该目录
2、下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
3、使用插件
1、进入到容器内部
docker exec -it rabbitmq /bin/bash
2、启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3、查看插件是否使用成功
rabbitmq-plugins list
4、重启容器
docker restart rabbitmq
二、使用延时队列
1、生产者
@Test
public void test01(){
//创建消息后置处理器对象
MessagePostProcessor postProcessor = message -> {
//设置消息过期时间/毫秒
message.getMessageProperties().setHeader("x-delay","10000");
return message;
};
rabbitTemplate.convertAndSend("exchange.test.delay",
"routing.key.test.delay",
"hello delay message by plug" + new SimpleDateFormat("HH:mm:ss").format(new Date()),
postProcessor);
}
2、消费者
@RabbitListener(queues={"queue.test.delay"})
public void lisenter03(String data, Message message, Channel channel) throws IOException {
log.info("接收到 : " + data);
log.info("当前时间 : " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
三、其他(消息可靠性-生产者确认)
1、配置application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
virtual-host: /
publisher-confirm-type: correlated #交换机确认
publisher-returns: true #队列确认
2、编写配置类
@Component
@Slf4j
public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
//构造后执行
@PostConstruct
public void InitRabbit(){
rabbitTemplate.setReturnsCallback(this);
rabbitTemplate.setConfirmCallback(this);
}
//发送到交换机执行的回调函数
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("confirm 回调 data : " + correlationData);
log.info("confirm 回调 ack : " + ack);
log.info("confirm 回调 cause : " + cause);
}
//发送到队列失败才执行的回调函数
//使用延时队列时成功也会执行
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("returnedMessage 回调 消息 : " + returnedMessage.getMessage().getBody());
log.info("returnedMessage 回调 状态码 : " + returnedMessage.getReplyCode());
log.info("returnedMessage 回调 描述 : " + returnedMessage.getReplyText());
log.info("returnedMessage 回调 交换机 : " + returnedMessage.getExchange());
log.info("returnedMessage 回调 路由键 : " + returnedMessage.getRoutingKey());
}
}
四、延时队列的其他实现思路
可以通过配置消息的过期时间和死信队列,消费者监听死信队列,同样可以实现延时消息的效果