目录
- 一、死信队列
- 1.过期时间代码实现
- 2.长度限制代码实现
- 3.测试消息拒收
- 4.死信队列小结
- 二、延迟队列
- 1.代码实现
- 1.1 生产者
- 1.2 生产者
一、死信队列
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
什么是死信队列
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;
消息成为死信的三种情况:
- 1.队列消息数量到达限制;比如队列最大只能存储10条消息,而发了11条消息,根据先进先出,最先发的消息会进入死信队列。
- 2.消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
- 3.原队列存在消息过期设置,消息到达超时时间未被消费;
死信的处理方式
死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,
① 丢弃,如果不是很重要,可以选择丢弃
② 记录死信入库,然后做后续的业务分析或处理
③ 通过死信队列,由负责监听死信的应用程序进行处理
综合来看,更常用的做法是第三种,即通过死信队列,将产生的死信通过程序的配置路由到指定的死信队列,然后应用监听死信队列,对接收到的死信做后续的处理
队列绑定死信交换机:
给队列设置参数:
x-dead-letter-exchange 和 x-dead-letter-routing-key
1.过期时间代码实现
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
//死信交换机
public static final String EXCHANGE_NAME_DLX = "exchange_dlx6";
//死信队列
public static final String QUEUE_NAME_DLX = "queue_dlx6";
//交换机
public static final String EXCHANGE_NAME = "test_exchange_dlx6";
//队列
public static final String QUEUE_NAME = "test_queue_dlx6";
// 1 交换机
@Bean("test_exchange_dlx")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
// 1 死信交换机
@Bean("exchange_dlx")
public Exchange dlxExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME_DLX).durable(true).build();
}
//2.Queue 队列
@Bean("test_queue_dlx")
public Queue bootQueue(){
Map<String, Object> args = new HashMap<>();
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", EXCHANGE_NAME_DLX);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", "dlx.#");
//设置ttl
args.put("x-message-ttl",10000);
//最大长度为10
args.put("x-max-length",10);
return QueueBuilder.durable(QUEUE_NAME).withArguments(args).build();
}
//2.死信 队列
@Bean("queue_dlx")
public Queue dlxQueue(){
return QueueBuilder.durable(QUEUE_NAME_DLX).build();
}
//3. 死信队列和死信交互机绑定关系 Binding
/*
1. 知道哪个队列
2. 知道哪个交换机
3. routing key
noargs():表示不指定参数
*/
@Bean
public Binding bindQueueExchange(@Qualifier("queue_dlx") Queue queue,
@Qualifier("exchange_dlx") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();
}
//3. 队列和交互机绑定关系 Binding
@Bean
public Binding bindQueueExchange1(@Qualifier("test_queue_dlx") Queue queue,
@Qualifier("test_exchange_dlx") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("test.dlx.#").noargs();
}
}
/**
* 发送测试死信消息:
* 1. 过期时间
* 2. 长度限制
* 3. 消息拒收
*/
@Test
public void testDlx(){
//1. 测试过期时间,死信消息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"test.dlx.hello","我是一条消息,我会死吗?");
}
test开头是正常队列,不是test开头的队列是死信
超过了10秒,没有被消费就进入死信队列
2.长度限制代码实现
/**
* 发送测试死信消息:
* 1. 过期时间
* 2. 长度限制
* 3. 消息拒收
*/
@Test
public void testDlx(){
for (int i = 0; i < 10; i++) {
//1. 测试过期时间,死信消息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"test.dlx.hello","我是一条消息,我会死吗?"+i);
}
}
test开头是正常队列,不是test开头的队列是死信
发队列最大只能存储10条消息,而发了11条消息,根据先进先出,最先发的消息会进入死信队列。
隔10s,没有被消费,会进入死信队列
3.测试消息拒收
在消费者端进行消息拒收
yml
spring:
rabbitmq:
host: 192.168.121.140
port: 5672
username: admin
password: admin
virtual-host: /
listener:
simple:
#表示手动确认
acknowledge-mode: manual
监听
拒绝签收,不重回队列 requeue=false
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class DlxListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1.接收转换消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
int i = 3/0;//出现错误
//3. 手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//e.printStackTrace();
System.out.println("出现异常,拒绝接受");
//4.拒绝签收,不重回队列 requeue=false
channel.basicNack(deliveryTag,true,false);
}
}
}
修改生产者测试代码
/**
* 发送测试死信消息:
* 1. 过期时间
* 2. 长度限制
* 3. 消息拒收
*/
@Test
public void testDlx(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"test.dlx.hello","我是一条消息,我会死吗?拒绝");
}
进入了死信队列
4.死信队列小结
1.死信交换机和死信队列和普通的没有区别
2.当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
3.消息成为死信的三种情况:
- 队列消息长度(数量)到达限制;
- 消费者拒接消费消息,并且不重回队列;
- 原队列存在消息过期设置,消息到达超时时间未被消费;
二、延迟队列
延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
场景:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行取消处理。这时就可以使用延时队列将订单信息发送到延时队列。
需求:
- 1.下单后,30分钟未支付,取消订单,回滚库存。
- 2.新用户注册成功30分钟后,发送短信问候。
实现方式:
很可惜,在RabbitMQ中并未提供延迟队列功能。
但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
1.代码实现
其实和死信队列差不多,加一个ttl时间就可以了
1.1 生产者
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
//死信交换机
public static final String EXCHANGE_NAME_DLX = "exchange_dlx";
//死信队列
public static final String QUEUE_NAME_DLX = "order_que_dlx";
//交换机
public static final String EXCHANGE_NAME = "test_exchange_dlx";
//队列
public static final String QUEUE_NAME = "order_queue";
// 1 交换机
@Bean("test_exchange_dlx")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
// 1 死信交换机
@Bean("exchange_dlx")
public Exchange dlxExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME_DLX).durable(true).build();
}
//2.Queue 队列
@Bean("test_queue_dlx")
public Queue bootQueue(){
Map<String, Object> args = new HashMap<>();
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", EXCHANGE_NAME_DLX);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", "dlx.order.#");
//设置ttl
args.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_NAME).withArguments(args).build();
}
//2.死信 队列
@Bean("queue_dlx")
public Queue dlxQueue(){
return QueueBuilder.durable(QUEUE_NAME_DLX).build();
}
//3. 死信队列和死信交互机绑定关系 Binding
/*
1. 知道哪个队列
2. 知道哪个交换机
3. routing key
noargs():表示不指定参数
*/
@Bean
public Binding bindQueueExchange(@Qualifier("queue_dlx") Queue queue,
@Qualifier("exchange_dlx") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("dlx.order.#").noargs();
}
//3. 队列和交互机绑定关系 Binding
@Bean
public Binding bindQueueExchange1(@Qualifier("test_queue_dlx") Queue queue,
@Qualifier("test_exchange_dlx") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("test.order.#").noargs();
}
}
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testDelay() throws InterruptedException {
//1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
"test.order.msg","订单信息:id=1,time=2022年03月30日11:41:47");
//2.打印倒计时10秒
for (int i = 10; i > 0 ; i--) {
System.out.println(i+"...");
Thread.sleep(1000);
}
}
}
运行程序创建订单延时队列
1.2 生产者
OrderListener
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class OrderListener implements ChannelAwareMessageListener {
@RabbitListener(queues = "order_que_dlx")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1.接收转换消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
System.out.println("根据订单id查询其状态...");
System.out.println("判断状态是否为支付成功");
System.out.println("取消订单,回滚库存....");
//3. 手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//e.printStackTrace();
System.out.println("出现异常,拒绝接受");
//4.拒绝签收,不重回队列 requeue=false
channel.basicNack(deliveryTag,true,false);
}
}
}