Lison
<dreamlison@163.com>
, v1.0.0
, 2023.06.23
RabbitMQ-进阶 死信队列、延迟队列、防丢失机制
文章目录
- RabbitMQ-进阶 死信队列、延迟队列、防丢失机制
- 死信队列
- 延迟队列
- 延迟队列介绍
- **延迟队列_死信队列_的实现**
- 延迟队列_插件实现
- 下载插件
- RabbitMQ 配置类
- RabbitMQ 生产者
- RabbitMQ 消费者
- 测试
- RabbitMQ防止消息丢失
- 消息丢失场景
- 生产者发送消息没有发送到rabbit交换机
- 交换机没有发送到队列
- 交换机、队列、消息没有设置持久化
- 消费者接收到消息没有执行业务逻辑,导致消息丢失
死信队列
概念
在MQ中,当消息成为死信(Dead message)后,消息中间件可以 将其从当前队列发送到另一个队列中,这个队列就是死信队列。而 在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死 信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。
消息成为死信的情况
- 队列消息长度到达限制
- 消费者拒签消息,并且不把消息重新放入原队列
- 消息到达存活时间未被消费
代码实现
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig2 {
private final String DEAD_EXCHANGE = "dead_exchange";
private final String DEAD_QUEUE = "dead_queue";
private final String NORMAL_EXCHANGE = "normal_exchange";
private final String NORMAL_QUEUE = "normal_queue";
// 死信交换机
@Bean(DEAD_EXCHANGE)
public Exchange deadExchange(){
return ExchangeBuilder
.topicExchange(DEAD_EXCHANGE)
.durable(true)
.build();
}
// 死信队列
@Bean(DEAD_QUEUE)
public Queue deadQueue(){
return QueueBuilder
.durable(DEAD_QUEUE)
.build();
}
// 死信交换机绑定死信队列
@Bean
public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange, @Qualifier(DEAD_QUEUE)Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("dead_routing")
.noargs();
}
// 普通交换机
@Bean(NORMAL_EXCHANGE)
public Exchange normalExchange(){
return ExchangeBuilder
.topicExchange(NORMAL_EXCHANGE)
.durable(true)
.build();
}
// 普通队列
@Bean(NORMAL_QUEUE)
public Queue normalQueue(){
return QueueBuilder
.durable(NORMAL_QUEUE)
.deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机
.deadLetterRoutingKey("dead_routing") // 死信队列路由关键字
.ttl(10000) // 消息存活10s
.maxLength(10) // 队列最大长度为10
.build();
}
// 普通交换机绑定普通队列
@Bean
public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,@Qualifier(NORMAL_QUEUE)Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("my_routing")
.noargs();
}
}
测试
1、生产者发送消息
@Test
public void testDlx(){
// 存活时间过期后变成死信
// rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
// 超过队列长度后变成死信
// for (int i = 0; i < 20; i++) {
// rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
// }
// 消息拒签但不返回原队列后变成死信
rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
}
2、
@Component
public class DlxConsumer {
@RabbitListener(queues = "normal_queue")
public void listenMessage(Message message, Channel channel) throws IOException {
// 拒签消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
}
}
延迟队列
延迟队列介绍
什么是延时队列?
延时队列即就是放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费
但RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果
延迟交换机主要帮我们解决什么问题
(1)当我们的业务比较复杂的时候, 需要针对不同的业务消息类型设置不同的过期时间策略, name必然我们也需要为不同的队列消息的过期时间创建很多的Queue的Bean对象, 当业务复杂到一定程度时, 这种方式维护成本过高;
(2)就是队列的先进先出原则导致的问题,当先进入队列的消息的过期时间比后进入消息中的过期时间长的时候,消息是串行被消费的,所以必然是等到先进入队列的消息的过期时间结束, 后进入队列的消息的过期时间才会被监听,然而实际上这个消息早就过期了,这就导致了本来过期时间为3秒的消息,实际上过了13秒才会被处理,这在实际应用场景中肯定是不被允许的
适用场景
(1)商城订单超时未支付,取消订单
(2)使用权限到期前十分钟提醒用户
(3)收益项目,投入后一段时间后产生收益
延迟队列_死信队列_的实现
1、创建SpringBoot订单模块,添加SpringMVC、RabbitMQ、 lombok依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
2、编写配置文件
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: 123456
virtual-host: /
# 日志格式
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
3、创建队列和交换机
@Configuration
public class RabbitConfig {
// 订单交换机和队列
private final String ORDER_EXCHANGE = "order_exchange";
private final String ORDER_QUEUE = "order_queue";
// 过期订单交换机和队列
private final String EXPIRE_EXCHANGE = "expire_exchange";
private final String EXPIRE_QUEUE = "expire_queue";
// 过期订单交换机
@Bean(EXPIRE_EXCHANGE)
public Exchange deadExchange(){
return ExchangeBuilder
.topicExchange(EXPIRE_EXCHANGE)
.durable(true)
.build();
}
// 过期订单队列
@Bean(EXPIRE_QUEUE)
public Queue deadQueue(){
return QueueBuilder
.durable(EXPIRE_QUEUE)
.build();
}
// 将过期订单队列绑定到交换机
@Bean
public Binding bindDeadQueue(@Qualifier(EXPIRE_EXCHANGE) Exchange exchange,@Qualifier(EXPIRE_QUEUE) Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("expire_routing")
.noargs();
}
// 订单交换机
@Bean(ORDER_EXCHANGE)
public Exchange normalExchange(){
return ExchangeBuilder
.topicExchange(ORDER_EXCHANGE)
.durable(true)
.build();
}
// 订单队列
@Bean(ORDER_QUEUE)
public Queue normalQueue(){
return QueueBuilder
.durable(ORDER_QUEUE)
.ttl(10000) // 存活时间为10s,模拟30min
.deadLetterExchange(EXPIRE_EXCHANGE) // 绑定死信交换机
.deadLetterRoutingKey("expire_routing") //死信交换机的路由关键字
.build();
}
// 将订单队列绑定到交换机
@Bean
public Binding bindNormalQueue(@Qualifier(ORDER_EXCHANGE) Exchange exchange,@Qualifier(ORDER_QUEUE) Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("order_routing")
.noargs();
}
}
4、编写下单的控制器方法,下单后向订单交换机发送消息
@Test
public String placeOrder(String orderId){
System.out.println("处理订单数据...");
// 将订单id发送到订单队列
rabbitTemplate.convertAndSend("order_exchange", "order_routing", orderId);
return "下单成功,修改库存";
}
5、编写监听死信队列的消费者
// 过期订单消费者
@Component
public class ExpireOrderConsumer {
// 监听队列
@RabbitListener(queues = "expire_queue")
public void listenMessage(String orderId){
System.out.println("查询"+orderId+"号订单的状态,如果已支付则无需处理,如果未支付则需要回退库存");
}
}
延迟队列_插件实现
在使用死信队列实现延迟队列时,会遇到一个问题:RabbitMQ只会移除队列顶端的过期消息,如果第一个消息的存活时长较长,而第二个消息的存活时长较短,则第二个消息并不会及时执行。
RabbitMQ虽然本身不能使用延迟队列,但官方提供了延迟队列插件,安装后可直接使用延迟队列
下载插件
RabbitMQ 实现了一个插件 x-delay-message 来实现延时队列,我们可以从 官网下载到它
https://www.rabbitmq.com/community-plugins.html
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
选择 .ez 格式的文件下载,下载后放置 RabbitMQ 的安装目录下的 plugins 目录下,如我的路径为
docker cp rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez rabbitmq1:/plugins
docker exec rabbitmq1 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
RabbitMQ 配置类
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Slf4j
public class RabbitConfig3 {
/**
* 交换机
*/
public static final String DELAY_EXCHANGE = "delay_exchange";
/**
* 队列
*/
public static final String DELAY_QUEUE = "delay_queue";
/**
* 路由
*/
public static final String DELAY_KEY = "delay_key";
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
return rabbitTemplate;
}
/**
* 直接模式队列1
*/
@Bean
public Queue directOneQueue() {
return new Queue("cundream");
}
/**
* 延时队列交换机
*
* @return
*/
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
}
/**
* 延时队列
*
* @return
*/
@Bean
public Queue delayQueue() {
return new Queue(DELAY_QUEUE, true);
}
/**
* 给延时队列绑定交换机
*
* @return
*/
@Bean
public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_KEY).noargs();
}
}
RabbitMQ 生产者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class RabbitMqServiceImpl implements RabbitMqService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendDelayMessage(Object object, long millisecond) {
this.rabbitTemplate.convertAndSend("delay_exchange",
"delay_key",
object.toString(),
message -> {
message.getMessageProperties().setHeader("x-delay", millisecond);
return message;
}
);
}
}
RabbitMQ 消费者
import cn.hutool.json.JSONUtil;
import com.github.cundream.springbootbuilding.common.rabbitmq.RabbitConst;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @className: com.github.cundream.springbootbuilding.common.rabbitmq.consumer-> ReceiveDealyConsumer
* @description:
* @author: 李村
* @createDate:
*/
@Slf4j
@RabbitListener(queuesToDeclare = @Queue(RabbitConst.DELAY_QUEUE))
@Component
public class ReceiveDealyHandler {
@RabbitHandler
public void directHandlerManualAck(Object object, Message message, Channel channel) {
// 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("直接队列1,手动ACK,接收消息:{}", object.toString());
// 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
// 处理失败,重新压入MQ
channel.basicRecover();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
测试
通过测试,第一条消息在 5s后接收到,第二条消息在 10s后接收到,说明我们的延时队列已经成功
@RequestMapping(value = "/delayMessage",method = RequestMethod.GET)
public void delayMessage() {
String message1 = "这是第一条消息";
String message2 = "这是第二条消息";
rabbitMqService.sendDelayMessage(message1, 5000);
rabbitMqService.sendDelayMessage(message2, 10000);
}
RabbitMQ防止消息丢失
消息丢失场景
MQ消息丢失场景主要有三个:
- 消息生产者,发送消息后,rabbitMq服务器没有收到;导致消息丢失
- rabbitmq收到消息后,没有持久化保存,导致消息丢失
- 消费者收到消息后,没来得及处理,消费者宕机,导致消息丢失
生产者发送消息没有发送到rabbit交换机
解决方案:消息异步确认机制(confirm机制)
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: 123456
virtual-host: /
publisher-confirms: true # 消息异步确认机制(confirm机制)
开启confirm机制后,在生产者每次发送消息,都会调用回调代码;开发人员,需要写回调函数的逻辑,处理发送失败的消息
@Component
@Slf4j
public class RabbitMQConfirmAndReturn implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
/**
* confirm机制只保证消息到达exchange,不保证消息可以路由到正确的queue
* @param correlationData 发送的消息的信息(交换机,路由,消息体等)
* @param ack true成功,false失败
* @param cause 发生错误的信息
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 失败,一般解决方案,是将发送失败消息,存入定时任务队列;尝试重新发送消息;再多次失败,
// 就不再发送,转为人工处理
if (!ack) {
log.error("rabbitmq confirm fail,cause:{}", cause);
// ...... 失败处理逻辑
}
}
}
交换机没有发送到队列
解决方案:Return模式,确保消息从交换机发送到队列。
1、开启return模式
#开启 return 机制
spring:
rabbitmq:
publisher-returns: true
2、开发回调函数
@Component
public class Sender implements RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setReturnCallback(this);
}
//通过实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发)
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息主体message: " + message);
System.out.println("消息replyCode: " + replyCode);
System.out.println("描述: " + replyText);
System.out.println("消息使用的交换器exchange: " + exchange);
System.out.println("消息使用的路由键routing: " + routingKey);
}
}
交换机、队列、消息没有设置持久化
交换机、队列、消息没有持久化,当rabbitmq的服务重启之后,这些信息就会丢失。
交换机持久化
在声明交换机的时候,设置持久化属性
/**
* 构造参数说明:
* 参数1:交换机名称
* 参数2:durable:true表示持久化,false表示不持久化
* 参数3:autoDelete:true自动删除,false不自动删除
*/
@Bean
public TopicExchange exchange() {
return new TopicExchange("exchangeName", true, false);
}
队列持久化
在声明队列的时候,设置持久化属性
public Queue queue() {
/**
* @param queueName 队列名称
* @param durable 队列持久化,true持久化,false不持久化
* @param exclusive 是否排他, true不排他,false排他;此处配置一般false
* @param autoDelete 是否自动删除,无生产者,队列自动删除
* @param args 队列参数
*/
return new Queue("queueName", true, false, false, args);
}
消息持久化
消息的持久化是默认持久的。无需配置
消费者接收到消息没有执行业务逻辑,导致消息丢失
解决方案:手动确认消息机制
配置文件配置
**spring.rabbitmq.listener.simple.acknowledge-mode=manual**
spring:
rabbitmq:
host: 127.0.0.1
#host: 10.106.10.91
port: 5672
username: admin
password: 123456
virtual-host: pub
publisher-confirms: true # 开启发送确认
publisher-returns: true # 开启发送失败回退
#开启ack
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual #采取手动应答
#concurrency: 1 # 指定最小的消费者数量
#max-concurrency: 1 #指定最大的消费者数量
retry:
enabled: true # 是否支持重试
@Component
public class Consumer {
@RabbitHandler
public void consumeMsg(String msg, Channel channel, Message message) throws IOException {
//拿到消息延迟消费
try {
// .... 消费消息业务逻辑
/**
* deliveryTag 消息的随机标签信息
* multiple 是否批量;true表示一次性的将小于deliveryTag的值进行ack
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (InterruptedException e) {
e.printStackTrace();
/**
* deliveryTag 消息的随机标签信息
* multiple 是否批量;true表示一次性的将小于deliveryTag的值进行ack
* requeue 被拒绝的消息是否重新入队列
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
当业务出现意料之外的一场;消息就会重新回到队列中;会分发到其他正常consumer中进行消费