消息的TTL(Time To Live)就是消息的存活时间。
RabbitMQ可以对队列和消息分别设置TTL。
对队列设置存活时间,就是队列没有消费者连着的保留时间。
对每一个单独的消息单独的设置存活时间。超过了这个时间,我们认为这个消息就死了,称之为死信。
如果队列设置了TTL,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者xmessage-ttl属性来设置时间,两者是一样的效果。
死信路由
一个消息在成为死信后,会进死信路由。
记住这里是路由而不是队列,一个路由可以对应很多队列。(什么是死信)
普通消息成为死信的条件:死信将被路由到死信交换机、再路由到普通队列,普通队列发送消息给消费者。
被拒收的消息:一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。(basic.reject/ basic.nack)requeue=false
TTL到期的消息:
消息的TTL到了,消息过期了。被队列丢弃的消息:
队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
死信交换机
死信交换机Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
延时队列
先设置消息在TTL后变成死信,再设置队列里死信统一被路由到一个指定的交换机,那个指定的交换机专门处理死信,达成延时队列的效果。
手动ack&异常消息统一放在一个队列处理建议的两种方式
catch异常后,手动发送到指定队列,然后使用channel给rabbitmq确认消息已消费
给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败
/**
* 创建队列,交换机,延迟队列,绑定关系 的configuration
* 不会重复创建覆盖
* 1、第一次使用队列【监听】的时候才会创建
* 2、Broker没有队列、交换机才会创建
*/
@Configuration
public class MyRabbitMQConfig {
// @RabbitHandler
// public void listen(Message message){
// System.out.println("收到消息:------>"+message);
// }
/**
* 延时队列
* @return
*/
@Bean
public Queue orderDelayQueue(){
HashMap<String, Object> arguments = new HashMap<>();
/**
* x-dead-letter-exchange :order-event-exchange 设置死信路由
* x-dead-letter-routing-key : order.release.order 设置死信路由键
* x-message-ttl :60000
*/
arguments.put("x-dead-letter-exchange","order-event-exchange");
arguments.put("x-dead-letter-routing-key","order.release.order");
arguments.put("x-message-ttl",30000);
Queue queue = new Queue("order.delay.queue", true, false, false,arguments);
return queue;
}
/**
* 死信队列
* @return
*/
@Bean
public Queue orderReleaseQueue(){
Queue queue = new Queue("order.release.order.queue",true,false,false);
return queue;
}
/**
* 死信路由[普通路由]
* @return
*/
@Bean
public Exchange orderEventExchange(){
/*
* String name,
* boolean durable,
* boolean autoDelete,
* Map<String, Object> arguments
* */
TopicExchange topicExchange = new TopicExchange("order-event-exchange",true,false);
return topicExchange;
}
/**
* 交换机与延时队列的绑定
* @return
*/
@Bean
public Binding orderCreateOrderBinding(){
/*
* String destination, 目的地(队列名或者交换机名字)
* DestinationType destinationType, 目的地类型(Queue、Exhcange)
* String exchange,
* String routingKey,
* Map<String, Object> arguments
* */
Binding binding = new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
return binding;
}
/**
* 死信路由与普通死信队列的绑定
* @return
*/
@Bean
public Binding orderReleaseOrderBinding(){
Binding binding = new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
return binding;
}
}