目录
数据导入
MQ的常见问题
消息可靠性问题
生产者确认机制
SpringAMQP实现生产者确认
消息持久化
消费者消息确认
失败重试机制
消费者失败消息处理策略
死信交换机
TTL
延时队列
待更
数据导入
资料下载地址:day05MQ高级
MQ的常见问题
- 消息可靠性:如何确保消息至少被消费一次
- 延迟消息问题:如何实现消息的延迟投递
- 消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题
- 高可用问题:如何避免单点的MQ故障而导致的不可用问题
消息可靠性问题
消息丢失的三大类:
- 发送时丢失:
- 生产者发送的消息未送达到exchange
- 消息到达exchange后未到达queue
- MQ宕机,queue将消息丢失
- consumer接收到消息后未消费就宕机
生产者确认机制
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:
- publisher-confirm,发送者确认
- 消息成功投递到交换机,返回ack。
- 消息未投递到交换机,返回nack。
- publisher-return,发送者回执
- 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
需要注意的是,确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。
SpringAMQP实现生产者确认
在publisher模块中配置如下内容
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
- publish-confirm-type:开启publisher-confirm,这里支持两种类型:
- simple:同步等待confirm结果,直到超时
- correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
- publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
- template.mandatory:定义消息路由失败时的策略。
- true:则调用ReturnCallback;
- false:则直接丢弃消点
在生产者模块中配置全局ReturnCallback(一个RabbitTemplate只能配置一个ReturnCallback)
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate template = applicationContext.getBean(RabbitTemplate.class);
template.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
replyCode,replyText,exchange,routingKey,message.toString());
}));
}
}
进行测试
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
String routingKey = "simple";
String message = "hello, spring amqp!";
//准备消息id
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(
result->{
if (result.isAck()){
log.debug("消息发送成功,ID:{}",correlationData.getId());
}else {
log.error("消息发送失败,ID:{},原因{}",correlationData.getId(),result.getReason());
}
},
ex->{
log.error("消息发送异常,ID:{},原因:{}",correlationData.getId(),ex.getMessage());
}
);
rabbitTemplate.convertAndSend("amq.topic", routingKey, message,correlationData);
}
}
运行观察控制台
测试一种路由失败的情况,这种情况可以正常发送到交换机,但是不能发送到Queue
消息持久化
MQ默认是内存存储,当服务重启后,数据就会丢失。因此我们需要对交换机与队列进行持久化操作。在消费者模块添加如下代码
@Configuration
public class CommonConfig {
@Bean
public DirectExchange directExchange(){
/**
* name:交换机名称
* durable:是否持久化
* autoDelete:当没有队列绑定时是否删除
*/
return new DirectExchange("direct.exchange",true,false);
}
@Bean
public Queue simpleQueue(){
/**
* 使用Builder创建持久化队列
* 使用 new Queue("名称")创建也可以,默认就是持久化的
*/
return QueueBuilder.durable("simple.queue").build();
}
}
启动消费者,就可以看到交换机与队列被持久到磁盘中,但需要注意的时,消息并没有持久化,当重启服务器消息还是会丢失。之前我们发送的消息是String类型,现在,我们使用AMQP的Message对消息进行持久化。
@Test
public void testDurableMessage() throws Exception {
Message msg = MessageBuilder.withBody("hello spring".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
rabbitTemplate.convertAndSend("simple.queue",msg);
}
消费者消息确认
RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:
- manual:手动ack,需要在业务代码结束后,调用api发送ack。(业务处理成功后,调用channel.basicAck()手动签收,如果出现异常,则调用channle.basicNack()方法。)
- auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack。
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除。
manual模式对代码有一定入侵,需要添加发送ack的代码。因此不推荐使用
auto模式是通过Spring的AOP机制,来对消息进行自动确认。推荐使用
none模式不对消息进行确认,不使用
在消费者模块的配置文件中配置如下内容
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto
进行测试,在监听器处添加错误代码
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
System.out.println(1/0);
}
}
进行debug观察Rabbit控制台
对断点放行,会发现控制台抛出错误后立即再进入断点,那么就可以确定,MQ会再次投递失败的消息。取消断点放行,会发现控制台无休止进行打印错误,这种处理方式并不友好,因此我们可以自定义失败重试机制。
失败重试机制
当消费者消费消息抛出异常后,会将消息投递给MQ。而MQ又会立即投递给消费者。这样循环往复会导致MQ的消息处理飙升,带来不必要的压力。因此我们可以采用Spring的重试机制(在本地重试,不返回ack也不返回nack),来避免这种情况。
消费者模块的配置文件添加如下内容
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true #开启消费者失败重试
initial-interval: 1000 #初始的失败等待时长为1s
multiplier: 2 #下次失败的等待时长倍数,下次灯带时长 = multiplier * last-interval
max-attempts: 3 #最大重试次数
stateless: true # true无状态;false有状态,如果业务中包含事务,这里改为false
接下来进行测试
首先是重试时间分别为1,2对应着配置中的1s与1s*2,如果还有下次重试次数那么重试时间就是1s*2*2。其次是在RabbitMQ中找不到这条错误的消息了。具体原因如下
消费者失败消息处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。流程图如下
添加一个新的Config
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.exchange");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue",true);
}
@Bean
public Binding errorBinging(){
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate,"error.exchange","error");
}
}
重启发送一条消息测试
观察Rabbit的控制台
死信交换机
当一个队列中的消息满足下列情况之一时,可以成为死信 (dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false。
- 消息是一个过期消息,超时无人消费。
- 要投递的队列消息堆积满了,最早的消息可能成为死信。
如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机 (Dead Letter Exchange,简称DLX)
与RepublishRecoverer的区别在于,该种方式是通过MQ进行转发,而RepublishRecoverer是通过消费者进行转发。如果只是保存失败的消息,那么推荐使用RepublishRecoverer。
TTL
TTL(time to live)超时时间分为两种情况:
- 消息本身设置了超时时间
- 消息所在的队列设置了超时时间
当消息到达存活时间后还没有被消费会被自动清除。如果同时设置了消息过期时间和队列过期时间,以时间短的为准,队列过期会将所有消息移除,如果一个已经过期的消息不在队列顶端时并不会立即移除,一旦它到了队列顶端则会进行判断是否移除。
延时队列
我们可以通过TTL来实现一个延时队列,对消息设置过期时间存放在ttl.queue,但是没有消费者监听该队列,等到过期之后,放入死信队列,而消费者监听死信队列,对过期消息进行消费,从而实现延时队列。具体流程如下
接下来实现延时队列
编写ttl部分
@Slf4j
@Configuration
public class TTLMessageConfig {
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue")
.ttl(10000)//超时时间
.deadLetterExchange("dl.exchange")//指定死信队列
.deadLetterRoutingKey("dl")//死信队列的路由key
.build();
}
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.exchange");
}
@Bean
public Binding simpleBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
}
编写消费者方的监听
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.queue",durable = "true"),
exchange = @Exchange(name = "dl.exchange"),
key = "dl"
))
public void listenDlQueue(String msg){
log.info("消费者接收到了延时消息:{}",msg);
}
编写测试方法
@Test
public void testTTLMessage() throws Exception {
Message msg = MessageBuilder.withBody("hello TTL".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
rabbitTemplate.convertAndSend("ttl.exchange","ttl",msg);
log.info("消息成功发送!");
}
至此实现了延时处理消息。
待更
明天在更新后续部分