在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。
消息成为死信的情况:
1 队列消息长度到达限制。
2 消费者拒签消息,并且不把消息重新放入原队列。
3 消息到达存活时间未被消费。
创建死信队列
@Configuration
public class RabbitConfig4 {
private final String DEAD_EXCHANGE = "dead_exchange";
private final String DEAD_QUEUE = "dead_queue";
private final String NORMAL_EXCHANGE = "normal_exchange";
private final String NORMAL_QUEUE = "normal_queue";
// 死信交换机
@Bean(DEAD_EXCHANGE)
public Exchange deadExchange(){
return ExchangeBuilder
.topicExchange(DEAD_EXCHANGE)
.durable(true)
.build();
}
// 死信队列
@Bean(DEAD_QUEUE)
public Queue deadQueue(){
return QueueBuilder
.durable(DEAD_QUEUE)
.build();
}
// 死信交换机绑定死信队列
@Bean
public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange,@Qualifier(DEAD_QUEUE)Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("dead_routing")
.noargs();
}
// 普通交换机
@Bean(NORMAL_EXCHANGE)
public Exchange normalExchange(){
return ExchangeBuilder
.topicExchange(NORMAL_EXCHANGE)
.durable(true)
.build();
}
// 普通队列
@Bean(NORMAL_QUEUE)
public Queue normalQueue(){
return QueueBuilder
.durable(NORMAL_QUEUE)
.deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机
.deadLetterRoutingKey("dead_routing") // 死信队列路由关键字
.ttl(10000) // 消息存活10s
.maxLength(10) // 队列最大长度为10
.build();
}
// 普通交换机绑定普通队列
@Bean
public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,@Qualifier(NORMAL_QUEUE)Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("my_routing")
.noargs();
}
}
测试死信队列
生产者发送消息
@Test
public void testDlx(){
// 存活时间过期后变成死信
// rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
// 超过队列长度后变成死信
// for (int i = 0; i < 20; i++) {
// rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
// }
// 消息拒签但不返回原队列后变成死信
rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
}
消费者拒收消息
@Component
public class DlxConsumer {
@RabbitListener(queues = "normal_queue")
public void listenMessage(Message message, Channel channel) throws IOException {
// 拒签消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
}
}