RabbitMQ 能保证消息可靠性吗

系列文章目录

消息队列选型——为什么选择RabbitMQ
RabbitMQ 五种消息模型


RabbitMQ 能保证消息可靠性吗

  • 系列文章目录
  • 前言
  • 一、消息可靠性的定义
  • 二、几种不可靠的场景
  • 三、防意外丢失
    • 1. 消息持久化
    • 2. 队列持久化
    • 3. 发布确认
      • 3.1 简单发布确认
      • 3.2 批量发布确认
      • 3.3 异步发布确认
    • 4. 手动接收确认
    • 5. 死信队列
  • 四、防重复传递
    • 1. 消息确认机制
    • 2. 幂等性校验(需代码实现)
  • 五、不可靠场景的对策
  • 六、总结


前言

前面我们在做MQ组件选型时,提到了rabbitMQ的消息可靠性,那么它到底可靠到什么程度?又是如何保证消息可靠性的呢?今天我们就一起来看一下


一、消息可靠性的定义

消息可靠性是指在消息传递过程中,确保消息能够被完整、准确、可靠地传递到目的地。更具体的说分为两个角度:

  1. 不会意外丢失
  2. 不会重复传递

因此,我们必须保证消息不会因为网络故障、系统故障或其他异常原因而丢失或重复传递,否则可能导致业务逻辑错误、数据损坏或系统崩溃等问题

二、几种不可靠的场景

  1. 消息漏发送:生产者在发送消息时,如果不观察RabbitMQ服务器的确认消息,可能导致有些消息在网络中丢失而不自知
  2. 消息重复发送:如果生产者在发送消息时,由于网络抖动或者其他原因,生产者无法从RabbitMQ收到消息确认,此时生产者会重发同样一条消息,从而导致消息重复
  3. 消息未储存:rabbitMQ服务器宕机,导致已经在rabbit服务器内的消息直接丢失
  4. 消费者重复消费:如果消费者和MQ都不记得曾经消费过的消息,主动拉取或推送了旧的消息,导致重复消费,

三、防意外丢失

在这里,必须提前声明一点:即消息意外丢失因为rabbitMQ经由转换机,如果匹配不到任何队列,是会主动丢弃该消息的,这种丢失属于业务配置上的主动丢弃,不记在意外丢失中

1. 消息持久化

消息持久化需要在消息生产者修改代码

   String MESSAGE = "Hello, RabbitMQ!";
   // 设置消息持久化
   AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
      .contentType("text/plain")
      .deliveryMode(2) // deliveryMode=1代表不持久化,deliveryMode=2代表持久化
      .build();
   channel.basicPublish("", MESSAGE_QUEUE, properties, MESSAGE.getBytes("UTF-8"));

也可以直接使用内置的properties

   channel.basicPublish("", MESSAGE_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes("UTF-8"));

2. 队列持久化

尽管我们上面已经使用了消息持久化,但是这是不够的,消息本身不会作为一个实体存在硬盘上,真正落在硬盘上的是队列,及队列中的消息。所以,要想保存消息,还得把消息所在的队列持久化,因此需要在声明队列时,将其 durable 属性设置为true

    // 设置队列持久化
    boolean durable = true;
    channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

注意,该属性不可修改,如果要把一个队列改成持久化,得先删除,再创建才行


3. 发布确认

我们上面已经成功把消息做了持久化,不过这并不能彻底避免消息丢失,比如在消息发布者发布消息的过程中,在消息成功持久化之前,rabbitMQ就崩溃了,此时消息仍然会丢失。因此,有必要执行发布确认的操作

即消息发送后,MQ要对生产者发送消息确认,确认已经持久化后,再进行发布确认
在这里插入图片描述
发布确认默认不开启,如果要开启,需要在channel上设置

    Channel channel = connection.createChannel();
    // 将信道设置为发布确认
    channel.confirmSelect();

进行完该项设置后,还需要针对确认消息的类型,适当的修改发送方代码。一般来说,发布确认有以下类型

3.1 简单发布确认

即发送后,单条单条的消息是否被rabbitMQ服务器接受

	String message = "Hello, RabbitMQ!";
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
     // 设置简单发布确认
    channel.confirmSelect();
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        
    if (channel.waitForConfirms()) {
         System.out.println("Message published successfully.");
    } else {
         System.err.println("Failed to publish message.");
    }

可以看到,这种方式其实采用的是发一条消息,确认一次,效率并不高。

3.2 批量发布确认

批量发布和简单发布,在调用方法上并没有区别,只是发送的消息,从发一条就等待确认一次,变成了发一批,才确认一次。

	int MESSAGE_COUNT = 100;
    String message = "Hello, RabbitMQ!";
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 设置批量发布确认
    channel.confirmSelect();
    for (int i = 0; i < MESSAGE_COUNT; i++) {
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    }

    int outstandingConfirms = MESSAGE_COUNT;
    while (outstandingConfirms > 0) {
        outstandingConfirms -= channel.waitForConfirms();
    }
    System.out.println("All messages published successfully.");

此种方式,虽然仍然会同步阻塞,但从每条确认一次进化到批量确认一次,大大节约了网络耗时。但是可能会出现一些消息发布成功,但是一些消息未成功的情况,不易进行排查和处理

3.3 异步发布确认

异步确认则采用的另一种方案,通过给channel设置一个确认监听器,来异步的做确认,即将发布消息和确认处理放在不同的线程中处理

   int MESSAGE_COUNT = 100;
   String message = "Hello, RabbitMQ!";
   ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
   Set<Long> failConfirmMessages = new HashSet<>();
   // 异步发布确认
   channel.confirmSelect();
   // 需设置两个监听器,前者为肯定确认,后者为否定确认
   channel.addConfirmListener(new ConfirmCallback() {
       @Override
       // deliveryTag 代表 投递消息的序号;multiple为true,则代表确认所有小于或等于当前消息deliveryTag的状态,为false,代表仅确认该条消息
       public void handle(long deliveryTag, boolean multiple) throws IOException {
           if (multiple) {
               ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
               // 清除所有小于该序号的消息
               confirmed.clear();
           } else {
           	   // 仅清除本条消息
               outstandingConfirms.remove(deliveryTag);
           }
       }
   }, new ConfirmCallback() {
       @Override
       public void handle(long deliveryTag, boolean multiple) throws IOException {
           System.err.println("Failed to publish message.");
           failConfirmMessages.add(deliveryTag);
       }
   });
   for (int i = 0; i < MESSAGE_COUNT; i++) {
       long nextSeqNo = channel.getNextPublishSeqNo();
       channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
       outstandingConfirms.put(nextSeqNo, message);
   }
   // 一段时间过后
   ......
   // 看最后是否还有消息被确认丢失,此时可选择是否要重新发送
   if (failConfirmMessages .size() == 0 && outstandingConfirms.size() == 0) {
   		System.out.println("All messages published successfully.");
   } else {
		System.out.println("Some messages need republish.");
   }
   

在这里插入图片描述

通过异步方式做确认,能提升性能,缺点是需要一些多线程的知识,实现难度较高。

4. 手动接收确认

如果第三点,是保证消息发送者到MQ服务器之间,消息不会丢失。那么同理,还需要保证MQ服务器到消费者间,消息不会丢失。

这时候,就需要手动接收确认了,即消费者得到消息后,先进行业务处理(或消息存储),直到业务处理完成后。再告知rabbitMQ服务器,消息我收到了。从而避免了自动ack后,消费者宕机导致的消息未处理完就丢失的问题,其示例代码如下

 // 创建消费者对象
 final Consumer consumer = new DefaultConsumer(channel) {
     @Override
     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
         String message = new String(body, "UTF-8");

         try {
             // 处理消息
             System.out.println("Received message: " + message);

             // 显式 ack 消息
             channel.basicAck(envelope.getDeliveryTag(), false); // 第二个参数表示是否批量处理

         } catch (Exception ex) {
             // 处理消息时发生异常,拒绝消息并重新将其放回队列中
             channel.basicNack(envelope.getDeliveryTag(), false, true);
         }
     }
 };

 // 开始消费消息,使用手动ack
 boolean autoAck = false;
 channel.basicConsume(QUEUE_NAME, autoAck, consumer);

PS:需要注意的是,手动ack可能带来重复消费的问题,比如消息处理成功后,在执行channel.basicAck时宕机,导致RabbitMQ服务器没收到消息接收确认的信号,超时后会认为该消息未被接收

5. 死信队列

在某些情况下(如手动ACK),如消费者在暂时无法处理该消息,RabbitMQ 可能会将消息重新放回队列,但大量的重新放回会导致消息堆积,也是不可取的。

// 如下,消费者可以向rabbitMQ发送nack的消息,且设置requeue参数为false
 void basicNack(long deliveryTag, boolean multiple, boolean requeue)
            throws IOException;

为了避免这种情况,RabbitMQ 提供了死信队列的功能。当消息因为某些原因不能被消费时,RabbitMQ 将消息放入死信队列而不是重新放回队列,防止消息丢失
在这里插入图片描述

四、防重复传递

上面一节,我们为rabbitMQ在消息传递过程中,各个节点都有防消息丢失的配置。这一节,我们来说rabbitMQ为了防止一条消息重复传递而做的努力

1. 消息确认机制

上面,我们说了发布确认和接收确认。其实,不管是发布和接收,这都属于消息确认机制的一种,而消息确认机制是AMQP协议所规定的发布确认是为了防止丢失消息,接收确认则是为了防止重复消费,当消费者成功接收到消息并完成处理后,发送确认通知给 RabbitMQ,RabbitMQ 才会将该消息标记为已消费,防止重复传递

2. 幂等性校验(需代码实现)

在消息生产者发送消息之前,消息可以被设置上全局唯一uuid,而消费者在消费前,则会判断该uuid是否已经消费过。

// 生产者发送消息之前,将消息标记为idempotent
// 通过设置 messageId 属性为一个唯一值,即可标记该消息为幂等消息
String messageId = UUID.randomUUID().toString();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .messageId(messageId)
        .build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());

// 消费者在处理消息之前,检查该消息是否已经被消费过
// 如果该消息已经被消费过,则直接确认消息
String messageId = properties.getMessageId();
if (processedIds.contains(messageId)) {
    channel.basicAck(envelope.getDeliveryTag(), false);
    return;
}
// 处理消息,并将 messageId 加入已处理集合
// ...
processedIds.add(messageId);

以上代码仅展示原理,实际上分布式高并发的情况下,uuid应该交由专门的服务器用雪花算法等方式去产生全局唯一的uuid。同样消费者处的processedIds也会进行远端存储

五、不可靠场景的对策

现在,让我们回头来看看不可靠场景下,rabbitMQ和我们开发者能用什么对策解决

  1. 消息漏发送:生产者在发送消息时,如果不观察RabbitMQ服务器的确认消息,可能导致有些消息在网络中丢失而不自知
  2. 消息重复发送:如果生产者在发送消息时,由于网络抖动或者其他原因,生产者无法从RabbitMQ收到消息确认,此时生产者会重发同样一条消息,从而导致消息重复
  3. 消息未储存:rabbitMQ服务器宕机,导致已经在rabbit服务器内的消息直接丢失
  4. 消费者重复消费:如果消费者不记得曾经消费过的消息,主动拉取或被推送了旧的消息,导致重复消费,
场景场景解释解决对策
消息漏发送生产者在发送消息时,如果不观察RabbitMQ服务器的确认消息,可能导致有些消息在网络中丢失而不自知发布确认
消息重复发送如果生产者在发送消息时,由于网络抖动或者其他原因,生产者无法从RabbitMQ收到消息确认,此时生产者会重发同样一条消息,从而导致消息重复无策略
消息未储存rabbitMQ服务器宕机,导致已经在rabbit服务器内的消息直接丢失队列、消息持久化
消费者重复消费如果消费者和MQ都不记得曾经消费过的消息,主动拉取或推送了旧的消息,导致重复消费接受确认、幂等性校验(代码实现)

六、总结

RabbitMQ 能保证消息可靠性吗?答案是绝大部分情况可靠,但仅靠其自身机制无法做到100%。比如对于没有收到发布确认信息,导致消息生产者重复传递这种场景就并没有好的办法,只能通过开发者额外代码去解决,比如发消息带全局唯一id,然后由消费者去做幂等性校验。而针对更极端的场景,如RabbitMQ硬盘故障导致消息丢失,就得依托镜像部署等手段去处理了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/32429.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

miniconda 安装 windows+linux

虽然常用anaconda&#xff0c;但是有时候只需要管理环境的时候&#xff0c;用miniconda更方便 如果想安装anaconda&#xff0c;可以参考这篇&#xff1a;anaconda安装 一、linux下 1.下载 链接&#xff1a;miniconda文档 cd /usr/localwget https://repo.anaconda.com/mini…

简要介绍 | CUDA底层原理:加速高性能计算的关键技术

注1&#xff1a;本文系“简要介绍”系列之一&#xff0c;仅从概念上对CUDA的底层原理进行非常简要的介绍&#xff0c;不适合用于深入和详细的了解。 CUDA底层原理&#xff1a;加速高性能计算的关键技术 CUDA Refresher: The GPU Computing Ecosystem | NVIDIA Technical Blog 1…

python自动化办公——定制化读取Excel数据并写入到word表格

Python自动化办公——Excel写word表格 文章目录 Python自动化办公——Excel写word表格一、引言二、数据准备三、python代码1、方法一2、方法二3、方法三 一、引言 最近到了毕业设计答辩的时候&#xff0c;老师让我帮毕业生写一段毕业设计的功能就是提供一个学士学位授予申请表…

基于CST软件的对数周期天线设计

摘要&#xff1a; 对数周期天线存在多种形式&#xff0c;主要包括齿片形、齿线型和偶极型等。其中&#xff0c;1960年提出的对数周期偶极子天线&#xff08;LPDA&#xff09;是结构最简单、应用最广泛且性能优良的一类对数周期天线&#xff0c;广泛应用于 UHF、VHF、HF 频段的…

python中os库用法详解(总结)

os库主要是对文件和文件夹进行操作&#xff0c;在Python中对⽂件和⽂件夹的操作要借助os模块⾥⾯的相关功能。 具体步骤如下&#xff1a; 1. 导⼊os模块 import os 2. 使⽤ os 模块相关功能 os.函数名() 1、⽂件重命名 os.rename(⽬标⽂件名, 新⽂件名) 示例代码&#…

MySQL:七种 SQL JOINS 的实现(图文详解)

MySQL&#xff1a;7种SQL JOINS的实现 前言一、图示表示二、代码举例1、INNER JOIN&#xff08;内连接&#xff09;2、LEFT JOIN&#xff08;左连接&#xff09;3、RIGHT JOIN&#xff08;右连接&#xff09;4、OUTER JOIN&#xff08;全连接&#xff09;5、LEFT EXCLUDING JOI…

nbcio-vue中formdesigner的组件显示不正常的处理

今天看演示系统的formdesigner组件显示不正常&#xff0c;也不知道是什么时候开始的事情&#xff0c; 如下&#xff1a; 对组件的操作倒是正常&#xff0c;但看本地是正常的&#xff0c;如下&#xff1a; 开始也不知道是什么原因&#xff0c;看代码也是一样的&#xff0c;应该…

JavaScript 中内存泄漏的几种情况?

一、是什么 内存泄漏&#xff08;Memory leak&#xff09;是在计算机科学中&#xff0c;由于疏忽或错误造成程序未能释放已经不再使用的内存 并非指内存在物理上的消失&#xff0c;而是应用程序分配某段内存后&#xff0c;由于设计错误&#xff0c;导致在释放该段内存之前就失…

Hidl编程实战(一)——定义HAL服务

1. 概述 hidl基本知识可以参考官网 安卓官网-hidl 也讲解了C和Java实现hidl 本文讲解CHal服务的创建 2. 文件的创建 aosp整编过的代码&#xff0c;可以直接choosecombo后使用hidl-gen工具。如果没有整编过&#xff0c;可以单编hidl-gen工具。 hidl-gen工具可以用来协助创建h…

Windows和Linux动态注入

摘要&#xff1a;最近对动态注入有一些兴趣因此搜索了些资料&#xff0c;简单整理了下相关的技术实现。本文只能够带你理解何如注入以及大概如何实现&#xff0c;对注入的方法描述的并不详细。   关键字&#xff1a;dll注入&#xff0c;hook&#xff0c;提权   读者须知&am…

基于Springboot+Vue的校园招聘系统(进阶版)

本项目是一年前写的一个项目的升级版&#xff0c;因为某些原因将它作了一个升级改进&#xff0c; 好多兄弟来问有没有演示&#xff0c;现在先来写个说明&#xff01;&#xff01;&#xff01; 目录 一. &#x1f981; 前言二. &#x1f981; 开源代码与组件使用情况说明三. &am…

Qt之事件过滤器讲解并且实现快捷键切换鼠标焦点

目录 1、需求背景2、使用Qt键盘事件3、安装事件过滤器4、事件处理级别 1、需求背景 现在有一个类似于下方图的ui&#xff0c;用户需要在输入前一行内容后&#xff0c;需要摁下指定案件能够跳转到下一行继续进行输入。 2、使用Qt键盘事件 一种更为直接的解决方案是子类化QLi…

如何在 Linux 中安装、设置和使用 SNMP?

概要 SNMP&#xff08;Simple Network Management Protocol&#xff09;是一种用于管理和监控网络设备的协议。它允许网络管理员通过远程方式收集设备的运行状态、性能数据和错误信息&#xff0c;以便进行故障排除和网络优化。在Linux系统中&#xff0c;我们可以安装、设置和使…

IDEA配置本地Maven详细教程

IDEA配置本地Maven详细教程 一、下载二、安装三、配置环境变量四、IDEA配置Maven 一、下载 官网下载&#xff1a;点击下载 网盘下载&#xff1a;点击下载 二、安装 将下载后的zip文件&#xff08;免安装版&#xff09;解压到自己想要放的位置&#xff0c;&#xff0c;我这里…

对MVVM和MVC开发模式的理解

对MVVM和MVC开发模式的理解 1、MVVM2、MVC3、MVVM与MVC的区别 1、MVVM MVVM最早由微软提出来&#xff0c;它借鉴了桌面应用程序的MVC思想&#xff0c;在前端页面中&#xff0c;把Model用纯JavaScript对象表示&#xff0c;View负责显示&#xff0c;两者做到了最大限度的分离&am…

UE4/5 通过Control rig的FullBody【蜘蛛模型,不用basic ik】

目录 根设置 FullBody IK 额外骨设置 ​编辑 晃动效果 根设置 第一步你需要准备一个蜘蛛模型&#xff0c;不论是官方示例或者是epic上购买的模型 然后我用的是epic上面购买的一个眼球蜘蛛&#xff1a; 第一步&#xff0c;我们从根创建一个空项【这个记得脱离父子级到root之…

多传感器时频信号处理:多通道非平稳数据的分析工具(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

四、用户管理

云尚办公系统&#xff1a;用户管理 B站直达【为尚硅谷点赞】: https://www.bilibili.com/video/BV1Ya411S7aT 本博文以课程相关为主发布&#xff0c;并且融入了自己的一些看法以及对学习过程中遇见的问题给出相关的解决方法。一起学习一起进步&#xff01;&#xff01;&#x…

【服务器远程工具】一款好用的xshell

这里写目录标题 背景Tabby简介安装使用SSHSFTPPowerShellGit 设置外观颜色快捷键窗口 插件支持总结 背景 作为一名后端开发&#xff0c;我们经常需要和Linux系统打交道&#xff0c;免不了要使用Xshell这类终端工具来进行远程管理。今天给大家推荐一款更炫酷的终端工具Tabby&…

【SQL应知应会】分析函数的点点滴滴(三)

欢迎来到爱书不爱输的程序猿的博客, 本博客致力于知识分享&#xff0c;与更多的人进行学习交流 本文收录于SQL应知应会专栏,本专栏主要用于记录对于数据库的一些学习&#xff0c;有基础也有进阶&#xff0c;有MySQL也有Oracle 分析函数的点点滴滴 1.什么是分析函数&#xff1a;…