MQ高级
发送者可靠性,MQ的可靠性,消费者可靠性。
发送者可靠性
发送者重连
连接重试的配置文件:
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
关闭MQ。
运行测试用例结果,进行了三次重连。
11-12 10:15:41:975 INFO 23824 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.21.101:5672]
11-12 10:15:43:997 INFO 23824 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.21.101:5672]
11-12 10:15:46:002 INFO 23824 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.21.101:5672]
发送者确认
总结如下:
-
当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
-
临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
-
持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
-
其它情况都会返回NACK,告知投递失败
其中ack
和nack
属于Publisher Confirm机制,ack
是投递成功;nack
是投递失败。而return
则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启
开启发送者确认:
在publisher模块的application.yaml
中添加配置:
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制
这里publisher-confirm-type
有三种模式可选:
-
none
:关闭confirm机制 -
simple
:同步阻塞等待MQ的回执 -
correlated
:MQ异步回调返回回执
一般我们推荐使用correlated
,回调机制。
定义ReturnCallback
每个RabbitTemplate
只能配置一个ReturnCallback
,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:
package com.itheima.publisher.config;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
@Slf4j
@RequiredArgsConstructor
public class MQconfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("触发return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
});
}
}
由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数
这里的CorrelationData中包含两个核心的东西:
-
id
:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆 -
SettableListenableFuture
:回执结果的Future对象
将来MQ的回执就会通过这个Future
来返回,我们可以提前给CorrelationData
中的Future
添加回调函数来处理消息回执:
我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback
@Test
public void testObgect1() throws InterruptedException {
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
//failure出现异常时的处理逻辑,基本不会触发。
log.error("发送失败",ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
//判断是否成功发送到服务器
if (result.isAck()){
log.debug("发送消息成功,收到 ack!");
}else{ // result.getReason(),String类型,返回nack时的异常描述
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
}
}
});
//准备Map数据
Map map = new HashMap();
map.put("name","jack");
map.put("age",21);
rabbitTemplate.convertAndSend("obgect.queue1",map,cd);
//用来接受返回的值
Thread.sleep(2000);
}
可以看到,由于传递的RoutingKey
是错误的,路由失败后,触发了return callback
,同时也收到了ack。
当我们修改为正确的RoutingKey
以后,就不会触发return callback
了,只收到ack。
而如果连交换机都是错误的,则只会收到nack。
注意:
开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:
路由失败:一般是因为RoutingKey错误导致,往往是编程导致
交换机名称错误:同样是编程错误导致
MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。
MQ的可靠性
数据持久化
默认情况下springamqp发送的消息就是持久化的,不需要做特殊处理。
@Test
public void testTopic3(){
//自定义消息为非持久化
Message build = MessageBuilder.withBody("这是一个大新闻啊".getBytes())
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();
//修改交换机的名字为hm.topic
String name = "hm.topic";
//修改路由键为blue
rabbitTemplate.convertAndSend(name,"china.goods",build);
}
注意:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。
不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。
Lazy Queue
MQ可靠性小结
消费者可靠性
消费者确认机制
消费者失败重试策略
相关文档:
哈哈哈
业务幂等性
修改发送者代码,修改消息转换器:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
修改消费者代码来获取消息id:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(Message message) throws InterruptedException {
log.info("spring 消费者接收到消息:【" + message.getBody().toString() + "】");
log.info("spring 消费者接收到消息id为:【" + message.getMessageProperties().getMessageId().toString() + "】");
if (true) {
throw new RuntimeException("故意的");
}
log.info("消息处理完成");
}
运行结果: 由于前面的步骤设置了报错,因此重发了3次。
11-12 16:20:12:977 INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:12:977 INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:13:978 INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:13:978 INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:14:979 INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:14:979 INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:14:985 WARN 27400 --- [ntContainer#1-1] o.s.a.r.retry.RepublishMessageRecoverer : Republishing failed message to exchange 'error.direct' with routing key error
只需要修改消费者的处理逻辑部分即可:
package com.hmall.trade.listener;
import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class Orderlisten {
private final IOrderService orderService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "trade.pay.success.queue", durable = "true"),
exchange = @Exchange(name = "pay.topic"),
key = "pay.success"
))
public void listenPaySuccess(Long orderId){
//1、根据id获取订单信息
Order order = orderService.getById(orderId);
//判断订单状态,只有待支付才修改
if (order == null || order.getStatus() != 1){
//条件错误直接结束
return;
}
orderService.markOrderPaySuccess(orderId);
}
}
小结,小小面试题
延迟消息
死信交换机
消息的消费者代码:
创建普通交换机和信息队列,将消息队列的死信绑定死信交换机
dleConfigtasion.class
package com.itheima.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class dleConfigrasion {
//交给Bean注解来进行处理
//创建交换机
@Bean
public DirectExchange ttlExchange(){
//参数:交换机名称,是否持久化,是否自动删除,持久化默认为开启(持久化就是是否保存到磁盘)
return ExchangeBuilder.directExchange("ttl.direct").build();
}
//创建消息队列
@Bean
public Queue ttltQueue(){
// return new Queue("fanout.queue1");
//使用build来创建消息队列
//指定该队列的死信交换机
return QueueBuilder.durable("ttl.queue").deadLetterExchange("dlt.direct").build();
}
// 绑定队列和交换机
@Bean
public Binding bindingfanoutQueue1red(Queue ttltQueue, DirectExchange ttlExchange){
return BindingBuilder.bind(ttltQueue).to(ttlExchange).with("hi");
}
}
死信交换机和消息队列:
leatinMq.class
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dlt.queue"),
exchange = @Exchange(name = "dlt.direct",type = ExchangeTypes.DIRECT),
key = {"hi"}
))
public void Directlisten1dlt(String msg){
System.err.println("消费者接收到队列dlt.direct的消息:"+msg+"_"+ LocalTime.now());
}
消息发送方:需要指定消息的存活时间:
@Test
public void testTopic(){
//修改路由键为blue
rabbitTemplate.convertAndSend("ttl.direct", "hi", "这是一个大新闻啊",
new MessagePostProcessor() {
//可以对生成的message进行处理
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置过期时间为10秒钟
message.getMessageProperties().setExpiration("1000");
return message;
}
});
}
代码结束。
延时插件:延时插件下载地址
插件官方文档:延时插件官方文档
详细操作文档: 黑马文档