文章目录
- 一.消息可靠性
- 1.生产者消息确认
- 2.消息持久化
- 3.消费者确认
- 4.消费者失败重试
MQ的一些常见问题
1.消息可靠性问题:如何确保发送的消息至少被消费一次
2.延迟消息问题:如何实现消息的延迟投递
3.高可用问题:如何避免单点的MQ故障而导致的不可用问题
4.消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题
一.消息可靠性
消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?
-
-发送时丢失:
-
生产者发送的消息未送达exchange
-
消息到达exchange后未到达queue
-
-
MQ宕机,queue将消息丢失
-
consumer接收到消息后未消费就宕机
1.生产者消息确认
生产者确认机制
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:
- publisher-confirm,发送者确认
消息成功投递到交换机,返回ack
消息未投递到交换机,返回nack
- publisher-return,发送者回执
消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突
简单来说:在publisher-confirm下的nack是消息投递到交换机失败返回的信息;在publisher-confirm下的ack是消息成功到达了消费者;在publisher-return下的ack是消息到达了交换机但是路由失败的返回信息
SpringAMQP实现生产者确认
一.想要实现生产者消息确认机制,需要在配置文件编写开启代码,即在微服务的application.yml中添加配置:
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
配置说明:
-
publish-confirm-type:开启publisher-confirm,这里支持两种类型:
-
simple:同步等待confirm结果,直到超时
-
correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback(推荐使用)
-
-
publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
-
template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
二.配置ReturnCallback
ReturnCallback是消息到达交换机但是没有成功进行路由的回调函数(作用于全局)
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
replyCode, replyText, exchange, routingKey, message.toString());
});
}
}
对于代码中的ApplicationContext是负责管理和组织Spring应用中的各个组件,如bean、配置文件等.
通过实现ApplicationContextAware这个接口,bean可以获取对ApplicationContext的引用,并因此获得访问应用上下文中的其他bean、资源和容器特性的能力,所以说实现了接口等同于获取到了bean容器,就可以获取到 rabbitTemplate并进行设置唯一的ReturnCallback
在回调函数中,消息路由失败会返回很多信息,其中使用路由键,消息的交换机的名称可以实现重发消息
三.在生产者类中发送消息并同时实现ConfirmCallback
ConfirmCallback同样是回调函数,与ReturnCallback不同的是ConfirmCallback可以创建多次
ConfirmCallback是对消息还没有进入到交换机就丢失的一种消息返回策略,当丢失后,执行回调函数并可以记录消息的失败原因和UUID,成功也是同理
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 消息体
String message = "hello, spring amqp!";
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 添加callback
correlationData.getFuture().addCallback(
result -> {
if(result.isAck()){
// ack,消息成功
log.debug("消息发送成功, ID:{}", correlationData.getId());
}else{
// nack,消息失败
log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
}
},
ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
// 发送消息
rabbitTemplate.convertAndSend("amq.direct", "simple", message, correlationData);
}
需要注意的是:在手动添加交换机的过程中,想要使用通配符"#"的话,应该设置交换机为topic类型!
总结:
SpringAMQP中处理消息确认的几种情况:
-
publisher-comfirm:
-
消息成功发送到exchange,返回ack
-
消息发送失败,没有到达交换机,返回nack
-
消息发送过程中出现异常,没有收到回执
-
-
消息成功发送到exchange,但没有路由到queue,调用ReturnCallback
2.消息持久化
MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。
1.交换机持久化:
@Bean
public DirectExchange simpleExchange(){
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new DirectExchange("simple.direct", true, false);
}
2.队列持久化:
@Bean
public Queue simpleQueue(){
// 使用QueueBuilder构建队列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}
3.消息持久化,SpringAMQP中的的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定
Message msg = MessageBuilder
.withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
.build();
但在springamqp中,其实已经在声明交换机和队列的时候将其持久化了,发送消息的方法convertAndSend()内部也将消息做了持久化,了解消息持久化的设置方法可以将以后不是很重要的交换机,队列,消息设置为非持久化
3.消费者确认
RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:
-
manual:手动ack,需要在业务代码结束后,调用api发送ack。
-
auto(推荐):自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack,抛出异常后,会不断重新发送消息即失败重试机制
-
none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
配置方式是修改application.yml文件,添加下面配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: none # none,关闭ack;manual,手动ack;auto:自动ack
4.消费者失败重试
当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
spring:
rabbitmq:
listener:
simple:
prefetch: 1
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初始的失败等待时长为1秒
multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
消费者失败消息处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
-
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
-
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
-
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐使用)
第三种处理策略是将重试失败的消息重新投递到指定的交换机,然后在投递到指定的队列中,形成了一个交换机-队列的错误消息存放容器,在这个容器中存放不仅有错误消息,还有错误消息头的异常栈信息
实现方式:
1.定义接收失败消息的交换机、队列及其绑定关系:
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(){
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}
2.定义RepublishMessageRecoverer:
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
总结:如何确保RabbitMQ消息的可靠性?
-
开启生产者确认机制,确保生产者的消息能到达队列
-
开启持久化功能,确保消息未消费前在队列中不会丢失
-
开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
-
开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理