RabbitMQ生产者的可靠性

目录

MQ使用时会出现的问题

生产者的可靠性

1、生产者重连

2、生产者确认

3、数据持久化

交换机持久化

队列持久化

消息持久化

LazyQueue懒加载


MQ使用时会出现的问题

  • 发送消息时丢失:
    • 生产者发送消息时连接MQ失败
    • 生产者发送消息到达MQ后未找到Exchange
    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
    • 消息到达MQ后,处理消息的进程发生异常
  • MQ导致消息丢失:
    • 消息到达MQ,保存到队列后,尚未消费就突然宕机
  • 消费者处理消息时:
    • 消息接收后尚未处理突然宕机
    • 消息接收后处理过程中抛出异常

生产者的可靠性

1、生产者重连

首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。

在生产者yml文件添加配置开启重试机制

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

关掉RabbitMQ服务:

docker stop mq

然后测试发送一条消息,会发现会每隔1秒重试1次,总共重试了3次。消息发送的超时重试机制配置成功了!

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码 

2、生产者确认

一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。 不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:

  • MQ内部处理消息的进程发生了异常
  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由

针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher ConfirmPublisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执

总结如下:

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。 默认两种机制都是关闭状态,需要通过配置文件来开启。

 在生产者yml文件添加配置开启重试机制

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

一般我们推荐使用correlated,回调机制。

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

@Slf4j
@Configuration
public class MqConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                log.error("触发return callback,");
                log.debug("交换机exchange: {}", returnedMessage.getExchange());
                log.debug("routingKey: {}", returnedMessage.getRoutingKey());
                log.debug("消息message: {}", returnedMessage.getMessage());
                log.debug("replyCode: {}", returnedMessage.getReplyCode());
                log.debug("replyText: {}", returnedMessage.getReplyText());
            }
        });
    }
}

定义ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback

 /**
     * 生产者消息确认
     * @throws InterruptedException
     */
    @Test
    public void testPublisherConfirm() throws InterruptedException {
        // 准备消息
        CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
        cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
               log.error("消息回调失败",ex);
            }

            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                log.debug("收到confirm callback回执");
                if (result.isAck()){
                    log.debug("发送消息成功,收到 ack!");
                }else {
                    log.debug("发送消息失败,收到 nack, reason : {}", result.getReason());
                }
            }
        });
        // 发送消息
        rabbitTemplate.convertAndSend("mq.direct","blue","hello",cd);

        Thread.sleep(2000);
    }

 当路由不对时,提示没有这个路由

 当交换机不对时,提示没有这个交换机

总结:

SpringAMQP中生产者消息确认的几种返回值情况

1、消息投递到了MQ,但是路由失败。会return路由异常原因,返回ACK

2、临时消息投递到了MQ,并且入队成功,返回ACK

3、持久消息投递到了MQ,并且入队完成持久化,返回ACK

4、其它情况都会返回NACK,告知投递失败

如何处理生产者的确认消息?
1、生产者确认需要额外的网络和系统资源开销,尽量不要使用
2、如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己业务问题
3、对于nack消息可以有限次数重试,依然失败则记录异常消息

3、数据持久化

MQ的可靠性
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题

1、MQ宕机,内存中的消息会丢失
2、内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞小

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化
交换机持久化

在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:

 设置为Durable就是持久化模式,Transient就是临时模式。

队列持久化

在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:

 设置为Durable就是持久化模式,Transient就是临时模式。

消息持久化

在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个properties

注意:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。 不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

 @Test
    public void persistence() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        //MessageDeliveryMode.NON_PERSISTENT 非持久化
        //MessageDeliveryMode.PERSISTENT 持久化
        Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

 消息写入磁盘

LazyQueue懒加载

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储

而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。

 控制台配置Lazy模式

代码配置Lazy模式

在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可设置队列为Lazy模式:

 通过bean声明

@Bean
public Queue lazyQueue(){
    return QueueBuilder
            .durable("lazy.queue")
            .lazy() // 开启Lazy模式
            .build();
}
@RabbitListener(queuesToDeclare = @Queue(
        name = "lazy.queue",
        durable = "true",
        arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
    log.info("接收到 lazy.queue的消息:{}", msg);
}

也可以基于注解来声明队列并设置为Lazy模式: 

更新已有队列为lazy模式

对于已经存在的队列,也可以配置为lazy模式,但是要通过设置policy实现。 可以基于命令行设置policy:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

命令解读:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一个策略
  • Lazy :策略名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
  • --apply-to queues:策略的作用对象,是所有的队列

 也可以在控制台配置policy,进入在控制台的Admin页面,点击Policies,即可添加配置:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/107805.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Vue 3.3.6 ,得益于WeakMap,比之前更快了

追忆往昔&#xff0c;穿越前朝&#xff0c;CSS也是当年前端三剑客之一&#xff0c;风光的很&#xff0c;随着前端跳跃式的变革&#xff0c;CSS在现代前端开发中似乎有点默默无闻起来。 不得不说当看到UnoCss之前&#xff0c;我甚至都还没听过原子化CSS[1]这个概念&#xff08;…

时序预测 | Matlab实现ARIMA-LSTM差分自回归移动模型结合长短期记忆神经网络时间序列预测

时序预测 | Matlab实现ARIMA-LSTM差分自回归移动模型结合长短期记忆神经网络时间序列预测 目录 时序预测 | Matlab实现ARIMA-LSTM差分自回归移动模型结合长短期记忆神经网络时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 时序预测 | Matlab实现ARIMA-LSTM差…

springboot配置https

SSL &#xff1a; secure socket layer 是一种加密协议&#xff0c;SSL主要用于保护数据在 客户端和服务器之间的传输&#xff0c;&#xff0c;防止未经授权的访问和窃取敏感信息 在腾讯云申请ssl证书 申请了之后在我的域名中&#xff0c;&#xff0c;解析 解析了之后&…

Jmeter的接口自动化测试

在去年实施了一年的三端&#xff08;PC、无线M站、无线APP【Android、IOS】&#xff09;后&#xff0c;今年7月份开始&#xff0c;我们开始进行接口自动化的实施&#xff0c;目前已完成了整个框架的搭建以及接口的持续测试集成。今天做个简单的分享。 在开始自动化投入前&#…

虚拟化 vs. 裸金属:K8s 部署环境架构与特性对比

伴随着 IT 云化转型的逐步推进&#xff0c;越来越多的用户加入应用容器化改造的行列&#xff0c;并使用 Kubernetes&#xff08;K8s&#xff09;进行容器部署管理。然而&#xff0c;令不少用户感到困惑的是&#xff0c;由于大部分应用此前都部署在虚拟化或超融合环境&#xff0…

轻量封装WebGPU渲染系统示例<7>-材质多pass(源码)

当前示例源码github地址: https://github.com/vilyLei/voxwebgpu/blob/version-1.01/src/voxgpu/sample/MultiMaterialPass.ts 此示例渲染系统实现的特性: 1. 用户态与系统态隔离。 2. 高频调用与低频调用隔离。 3. 面向用户的易用性封装。 4. 渲染数据和渲染机制分离。 …

Mac电脑窗口管理Magnet中文 for mac

Magnet是一款Mac窗口管理工具&#xff0c;它可以帮助用户轻松管理打开的窗口&#xff0c;提高多任务处理效率。以下是Magnet的一些主要特点和功能&#xff1a; 分屏模式支持&#xff1a;Magnet支持多种分屏模式&#xff0c;包括左/右/顶部/底部 1/2 分屏、左/中/右 1/3 分屏、…

基于51单片机的温度测量报警系统的设计与制作

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、实习目的二、实习任务2.1 设计温度测量报警系统硬件电路2.2 温度测量报警系统软件编程、仿真与调试&#xff1b;2.3 完成温度测量报警系统的实物制作与调试…

基于定容积法标准容器容积标定中的电动针阀自动化解决方案

摘要&#xff1a;在目前的六氟化硫气体精密计量中普遍采用重量法和定容法两种技术&#xff0c;本文分析了重量法中存在的问题以及定容法的优势&#xff0c;同时也指出定容法在实际应用中还存在自动化水平较低的问题。为了提高定容法精密计量过程中的自动化水平&#xff0c;本文…

从工厂到社会:探索如何应用设计模式工厂模式

文章目录 &#x1f31f; 将设计模式工厂模式运用到社会当中&#x1f34a; 工厂模式在社会中的应用&#x1f389; 工厂&#x1f389; 餐厅&#x1f389; 运输 &#x1f34a; 工厂模式的优势&#x1f389; 代码简洁&#x1f389; 扩展性强&#x1f389; 便于维护和管理 &#x1f…

信钰证券:华为汽车概念股持续活跃 圣龙股份斩获12连板

近期&#xff0c;华为轿车概念股在A股商场遭到热捧&#xff0c;多只股票迭创前史新高。10月23日&#xff0c;华为轿车概念股再度走强&#xff0c;到收盘&#xff0c;板块内圣龙股份、银宝山新涨停&#xff0c;轿车ETF在重仓股提振下盘中一度上涨近2%。业界人士认为&#xff0c;…

Day13力扣打卡

打卡记录 奖励最顶尖的 k 名学生(哈希表排序) 用哈希表对所有的positive与negative词条进行映射&#xff0c;然后遍历求解。tip&#xff1a;常用的分割字符串的操作&#xff1a;1.stringstream配合getline() [格式buf, string, char]2.string.find()[find未找到目标会返回npos…

javaEE -9(7000字详解TCP/IP协议)

一&#xff1a; IP 地址 IP地址&#xff08;Internet Protocol Address&#xff09;是指互联网协议地址&#xff0c;又译为网际协议地址。 IP地址是IP协议提供的一种统一的地址格式&#xff0c;它为互联网上的每一个网络和每一台主机分配一个逻辑地址&#xff0c;以此来屏蔽物…

AOP 笔记

AOP【面向切面编程】 作用&#xff1a;在不惊动原始设计的基础上进行功能增强。 无侵入式编程 连接点&#xff1a;程序执行的任意位置&#xff0c;SpringAOP中&#xff0c;理解为方法的执行。 切入点&#xff1a;匹配连接点的式子,要追加功能的方法 通知&#xff08;写在通…

服务运营 |论文解读: 住院病人“溢出”:一种近似动态规划方法

摘要 在住院床位管理中&#xff0c;医院通常会将住院病人分配到相对应的专科病房&#xff0c;但随着病人的入院和出院&#xff0c;可能会出现病人所需的专科病房满员&#xff0c;而其他病房却有空余床位的情况。于是就有了 "溢出 "策略&#xff0c;即当病人等候时间…

【目标检测】Visdrone数据集和CARPK数据集预处理

之前的博文【目标检测】YOLOv5跑通VisDrone数据集对Visdrone数据集简介过&#xff0c;这里不作复述&#xff0c;本文主要对Visdrone数据集和CARPK数据集进行目标提取和过滤。 需求描述 本文需要将Visdrone数据集中有关车和人的数据集进行提取和合并&#xff0c;车标记为类别0&…

记录--vue3实现excel文件预览和打印

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 前言 在前端开发中&#xff0c;有时候一些业务场景中&#xff0c;我们有需求要去实现excel的预览和打印功能&#xff0c;本文在vue3中如何实现Excel文件的预览和打印。 预览excel 关于实现excel文档在…

M1本地部署Stable Diffusion

下载安装 参考博客: 在Mac上部署Stable Diffusion&#xff08;超详细&#xff0c;AI 绘画入门保姆级教程&#xff09; 安装需要的依赖库 brew install cmake protobuf rust python3.10 git wget 可能中途会存在下载报错或者下载卡主的问题,需要切国内源 brew进行替换源: …

UE4/5 竖排文字文本

方法一、使用多行文本组件 新建一个Widget Blueprint 添加Text 或者 Editable Text(Multi-Line) 、TextBox(Multi-Line) 组件。 添加文字&#xff0c;调整字号&#xff0c;调整成竖排文字。 在Wrapping &#xff08;换行&#xff09;面板中 &#xff1a; 勾选 Auto Wrap te…

iphone备份后怎么转到新手机,iphone备份在哪里查看

iphone备份会备份哪些东西&#xff1f;iphone可根据需要备份设备数据、应用数据、苹果系统等。根据不同的备份数据&#xff0c;可备份的数据类型不同&#xff0c;有些工具可整机备份&#xff0c;有些工具可单项数据备份。本文会详细讲解苹果手机备份可以备份哪些东西。 一、ip…