MQ可靠性策略
- 发送者的可靠性问题
- 生产者的重连
- 生产者确认
- MQ的可靠性
- 数据持久化
- Lazy Queue
- 消费者的可靠性问题
- 消费者确认机制
- 消息失败处理
- 业务幂等性
- 简答问题
发送者的可靠性问题
生产者的重连
可能存在由于网络波动,出现的客户端连接MQ失败,我们可以通过配置文件配置解决
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true #开启超时重试机制
initial-interval: 1000ms #失败后的初始等待时间
multiplier: 1 #失败后下次的等待时长倍数,下次等待时间= initial-interval * multiplier
max-attempts: 3 #最大重试次数
生产者确认
RabbitMQ提供了Publisher Confirm和Publisher Return两种确认机制,开机确认机制后,在MQ成功收到消息后会返回确认消息给生产者,返回的结果有以下几种情况:
- 消息投递到了MO,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
- 其他情况都会返回NACK,告知投递失败
通过配置文件配置生产者的消息类型:
spring:
rabbitmq:
publisher-confirm-type: correlated #开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制
这里的publisher-confirm-type 有三种模式可选:
none
:关闭confirm机制
simple
: 同步阻塞等待MQ的回执消息
correlated
:MQ异步调用方式返回回执消息
异步回调方式:
我们完成一个任务将消息交由消息队列中,就进行别的任务了,当消息队列返回异常问题,在过来进行对应的处理
我们需要调用ReturnCallback函数完成消息失败后的操作:
在使用之前需要配置ReturnCallback,每个RabbitTemplate只能配置一个ReturnCallback
@Configuration
public class CommonConfig implements ApplicationContextAware{
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException{
//获取RabbitTemplate
RabbitTemplate rabbittemplate =applicationContext.getBean(RabbitTemplate.class);
//设置ReturnCallback
rabbitTemplate.setReturnCallback(message,replyCode,replyText,exchange,routingKey)->{
//处理操作
}
}
}
通过ConfirmCallback来处理消息失败:
每一个消息指定一个ConfirmCallback
void test() throws InterruptedException{
CorrelationData cd= new CorrelationData();
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.confirm>(){
@Override
public void onFailure(Throwable ex){
//Future 发生异常是的逻辑处理,基本不会触发
}
@Override
public void onSuccess(CorrelationData.confirm result){
//Future接收到回执的处理逻辑,参数中的result就是回执内容
if(result.isAck()){ //result.isack,boolean类型,true代表ack回执,false表示nack回执
//处理逻辑
}else{
//异常处理
}
}
});
}
rabbittemplate.convertAndSend("","",cd);
MQ的可靠性
在默认情况下,Rabbitmq会将接收到的数据保存在内存中以降低消息收发的延迟,这样会有问题:
- 一旦MQ宕机,内存的消息会丢失
- 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞(消息对队列将消息保存到磁盘,此时MQ阻塞)
数据持久化
RabbitMQ的数据持久化包括:
- 交换机持久化(Durable 永久的,Transient临时的)
- 队列持久化(Durable 永久的,Transient临时的)
- 消息持久化
消息的持久化:
void test(){
Message message =MessageBuilder
.withBody("hello".getBytes(StandardCharsets.UTF-8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();
}
会将消息在过程中持久化到磁盘不会导致MQ阻塞
消息的持久化性能不是很高,可以通过Lazy Queue进行消息的持久化
Lazy Queue
惰性队列
特征:
- 接收到消息后直接存入磁盘而非内存(内存只保留最近消息,默认2048条)
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持百万条数据的消息存储
在3.12 版本后,所有的队列都是Lazy Queue模式,无法更改
在java中要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode
属性为lazy:
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() //开启lazy模式
.build();
}
通过注解也可以实现
@RabbitListener(queuesToDeclare= @Queue(
name="lazy.queue",
durable="true",//持久化
arguments =@Argument(name="x-queue-mode",value="lazy")))
public void listenLazyQueue(String msg){
//消费处理
}
消费者的可靠性问题
消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制,当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
ack:
成功处理消息,RabbitMQ从队列中删除该信息
nack:
消息处理失败,RabbitMQ需要再次投递信息
reject:
消息处理失败并拒绝该消息,RabbitMQ从队列中删除该信息
那么如何实现呢:
SprinaAMOP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式
有三种方式:
none:
不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
manual:
手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
auto:
自动模式。SprinGAMOP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack.当业务出现异常时:
如果是业务异常,则会自动返回nack
如果是消息处理或校验异常,自动返回reject
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: none #none,关闭
消息失败处理
如果消费者返回nack,那就会重复进行,这样大大影响效率
我们可以利用Spring的retry机制,在消费者出现异常的时利用本地重试:
spring:
rabbitmq:
listener:
simple:
prefetch: 1
retry:
enabled: true #开启消费者失败重试
initial-interval: 1000ms #初始的失败等待时间
multiplier: 1 #下次失败的等待时长的倍数
max-attempts: 3 # 最大重试次数
stateless: true # true无状态,false有状态,如果业务中包含事务,这里改为false
在开启重试模式之后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种实现:
RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject,丢弃消息,默认这种方式
ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
将失败消息重新投递到error交换机中,可以绑定error消息队列,将来发送信息给开发人员等操作消息
将失败策略改为RepublishMessageRecoverer:
- 首先,定义接收失败消息的交换机,队列绑定
- 定义RepublishMessageRecoverer
@Bean
public MessageRecoverer test(RabbitTemplate rabbittemplate){
return new RepublishMessageRecoverer(rabbitTemplate,"交换机名称","key值")
}
业务幂等性
幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x)),在程序开发中,则指同一个业务,执行一次或多次对业务状态的影响是一致的
幂等的使用场景
:防止某一数据被重复进行修改
幂等业务
:根据id的查询业务,根据id的删除业务等
非幂等
:用户下单,扣减库存等
如何实现幂等:
方案一唯一消息id
给每一个消息都设置一个唯一id,利用id区分是否重复消息:
- 每一条消息都生成一个唯一id,与消息一起投递给消费者
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息id保存到数据库
- 如果下次又收到相同的消息,去数据库查询判断是否存在,存在则为重复消息放弃处理
@Bean
public MessageConverter messageConverter(){
//定义消息转换器
Jackson2JsonMessageConverter jjmc= new jackson2JsonMessageConverter();
//配置自动创建消息id,用于识别不同消息,也可以在业务中基于id判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
但是这样会造成额外的操作冗余,比如还需要写数据库等等
方案二:结合业务逻辑,基于业务本身做判断
简答问题
如何保证支付服务与交易服务之间的订单状态一致性:
- 首先,支付服务会在正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步
- 其次,为了保证消息的可靠性,我们采取了生产者确认机制,消费者确认,消费者失败重试等策略,确保消息投递和处理的可靠性,同时可开启了MQ的持久化,避免因服务宕机导致消息丢失
- 最后,我们还会交易服务更新订单状态时作业业务幂等判断,避免因消息重复导致订单异常
如果交易服务处理失败,还有什么方案:
在交易服务设置定时任务,定期查询订单生产状态,这样即使MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性