RabbitMQ使用场景:
- 异步发送(验证码、短信、邮件)
- MySQL和Redis,ES之间的数据同步
- 分布式事务
- 削峰填谷
什么情况下消息容易丢失:
- 消息未到达交换机
- 消息未到达队列
- 队列中消息丢失
- 消费者未接收到消息
解决消息丢失的方法:
1、生产者确认机制
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
消息失败之后如何处理呢?
- 回调方法即时重发
- 记录日志
- 保存到数据库然后定时重发,成功发送后即刻删除表中的数据
2、消息持久化
MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。
1. 交换机持久化:
@Bean
public DirectExchange simpleExchange(){
//三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new DirectExchange("simple.direct",true,false);
}
2. 队列持久化:
@Bean
public Queue simpleQueue(){
//使用QueueBuilder构建队列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}
3. 消息持久化,SpringAMQP中的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定
Message msg = MessageBuilder
.withBody(message.getBytes(StandardCharsets.UTF_8)) //消息体
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久化
.build();
3、消费者确认
RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:
- manual:手动ack,需要在业务代码结束后,调用api发送ack。
- auto:自动ack,由Spring检测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack。
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除。
我们可以利用Spring的retry(重试)机制,再消费者出现异常时利用本地重试,设置重试次数,当次数达到了以后,如果消息依然失败,将消息投递到异常交换机,交由人工处理。
面试题:RabbitMQ如何保证消息不丢失
候选人:我们当时MySQL和Redis的数据双写一致性就是采用RabbitMQ实现同步的,这里面就要求了消息的高可用性,我们要保证消息不丢失。主要从三个层面考虑:
第一个是开启生产者确认机制,确保生产者的消息能到达队列,如果报错可以先记录到日志中,再去修复数据。
第二个是开启持久化功能,确保消息未消费前在队列中不会丢失,其中的交换机、队列和消息都要做持久化。
第三个是开启消费者确认机制为auto,由Spring确认消息处理成功后完成ack,当然也需要设置一定的重试次数,我们当时设置了3次,如果重试3次还没有收到消息,就将失败后的消息投递到异常交换机中,交由人工处理。