【RabbitMQ】RabbitMQ保证消息不丢失的N种策略的思想总结

在这里插入图片描述

文章目录

    • 生产者端(消息发布端)保证机制
    • RabbitMQ服务器端保证机制
    • 消费者端(消息接收端)保证机制
    • 除了MQ自带的机制,还能做的操作
    • 持久化的原理
    • ACK思想

更多相关内容可查看

消息从发送,到消费者接收,会经理多个过程 , 其中的每一步都可能导致消息丢失

image.png

生产者端(消息发布端)保证机制

  1. 消息确认机制(Confirm)
    • RabbitMQ提供了消息确认模式。生产者将消息发送给RabbitMQ后,RabbitMQ会返回一个确认回执给生产者。生产者可以通过设置channel.confirmSelect()开启确认模式,然后使用异步或者同步的方式等待确认。
    • 在异步确认方式下,生产者可以通过注册一个回调函数来处理确认消息。例如:
    channel.addConfirmListener(new ConfirmListener() {
        @Override
        public void handleAck(long deliveryTag, boolean multiple) throws IOException {
            // 消息成功被接收,记录日志或者更新状态
        }
        @Override
        public void handleNack(long deliveryTag, boolean multiple) throws IOException {
            // 消息未被接收,进行重发或者记录错误
        }
    });
    
    • 同步确认方式则是通过channel.waitForConfirms()方法来等待确认,这种方式会阻塞生产者线程,直到收到所有已发送消息的确认回执或者超时。
  2. 消息持久化
    • 生产者在发送消息时,可以将消息设置为持久化。在AMQP协议中,消息有两个属性用于控制消息的持久化:deliveryMode。当deliveryMode = 2时,表示消息是持久化消息。
    • 例如,在Java客户端中,可以这样设置:
    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
       .deliveryMode(2)
       .build();
    channel.basicPublish("", "queue_name", properties, message.getBytes());
    
    • 这样可以确保消息在RabbitMQ服务器端持久化存储,即使RabbitMQ服务器意外重启,消息也不会丢失。

RabbitMQ服务器端保证机制

  1. 队列持久化
    • 除了消息持久化,队列也需要进行持久化设置。在定义队列时,需要将队列设置为持久化队列。例如,在RabbitMQ管理界面或者通过代码(以Java为例)进行如下设置:
    boolean durable = true;
    channel.queueDeclare("queue_name", durable, false, false, null);
    
    • 这样当RabbitMQ服务器重启时,持久化队列及其内部的持久化消息都会被恢复。
  2. 集群与高可用配置
    • RabbitMQ可以配置为集群模式。在集群环境下,消息会在多个节点之间进行复制和同步。如果一个节点出现故障,其他节点可以继续提供服务,保证消息不会丢失。
    • 例如,可以使用RabbitMQ的镜像队列(Mirrored Queues)功能。镜像队列会在多个节点上保存相同内容的队列,当主节点上的队列发生变化(如收到新消息或者消息被消费)时,变化会被同步到镜像节点。这样,即使主节点出现故障,镜像节点也可以接替工作,确保消息的可用性。

消费者端(消息接收端)保证机制

  1. 手动确认机制(Acknowledge)
    • 消费者可以通过手动确认机制来确保消息被正确处理后才从队列中移除。在大多数RabbitMQ客户端中,都支持手动确认模式。
    • 例如,在Java客户端中,消费者可以通过channel.basicConsume()方法设置autoAck = false来开启手动确认模式。然后在成功处理消息后,通过channel.basicAck()方法来确认消息,告诉RabbitMQ可以将该消息从队列中移除。
    • 如果消费者在处理消息过程中出现异常,没有确认消息,RabbitMQ会认为该消息没有被正确处理,会将消息重新放回队列,等待下一次被消费,从而保证消息不会因为消费者端的异常而丢失。
  2. 消费者预取计数(Qos - Quality of Service)
    • 消费者可以设置预取计数(basicQos),限制一次从队列中获取的消息数量。这样可以防止消费者因为接收过多消息而无法及时处理,导致消息在消费者端丢失。
    • 例如,在Java客户端中,可以这样设置预取计数:
    int prefetchCount = 1;
    channel.basicQos(prefetchCount);
    
    • 这表示消费者一次最多从队列中获取1条消息,只有在成功处理并确认这条消息后,才会获取下一条消息,确保消费者能够有条不紊地处理消息,减少消息丢失的风险。

除了MQ自带的机制,还能做的操作

通过RabbitMQ本身所提供的机制基本上已经可以保证消息不丢失 , 但是因为一些特殊的原因还是会发送消息丢失问题 , 例如 : 回调丢失 , 系统宕机, 磁盘损坏等 , 这种概率很小 , 但是如果想规避这些问题 , 进一步提高消息发送的成功率, 也可以通过程序自己进行控制

设计一个消息状态表 , 主要包含 : 消息id , 消息内容 , 交换机 , 消息路由key , 发送时间, 签收状态等字段 , 发送方业务执行完毕之后 , 向消息状态表保存一条消息记录, 消息状态为未签收 , 之后再向MQ发送消息 , 消费方接收消息消费完毕之后 , 向发送方发送一条签收消息 , 发送方接收到签收消息之后 , 修改消息状态表中的消息状态为已签收 ! 之后通过定时任务扫描消息状态表中这些未签收的消息 , 重新发送消息, 直到成功为止 , 对于已经完成消费的消息定时清理即可 !

持久化的原理

  1. 消息持久化在生产者端的原理

    • 消息属性设置:在生产者端,消息持久化主要通过设置消息的属性来实现。在AMQP(高级消息队列协议)中,消息有一个deliveryMode属性。当deliveryMode = 2时,就表示该消息是持久化消息。例如,在Java使用RabbitMQ客户端发送消息时,会构建一个AMQP.BasicProperties对象来设置消息的各种属性,其中就包括deliveryMode
    • 消息存储过程:当生产者发送持久化消息时,RabbitMQ客户端会将消息标记为需要持久化。这个标记信息会随着消息一起发送给RabbitMQ服务器。RabbitMQ服务器收到消息后,会根据这个标记将消息存储到磁盘上的存储介质中,而不是仅仅存储在内存中。具体存储位置和方式取决于RabbitMQ的存储配置,通常会存储在一个消息存储文件(如在默认的基于文件存储的配置下,消息存储在msg_store_persistent文件中)中,这个文件会定期进行刷盘操作,确保消息能够真正持久地保存到磁盘。
  2. 消息持久化在RabbitMQ服务器端(队列持久化)的原理

    • 队列定义与存储:队列持久化是消息持久化的重要组成部分。在RabbitMQ中,队列是消息存储的容器。当创建一个持久化队列时,RabbitMQ会将队列的定义信息(如队列名称、队列属性、绑定关系等)存储到磁盘上的队列元数据文件中。例如,在Linux系统下,这些元数据可能存储在/var/lib/rabbitmq/mnesia/rabbit@localhost目录下的相关文件中。
    • 消息存储与恢复:对于持久化队列中的消息,当消息被发送到该队列时,RabbitMQ会将消息存储到与该队列关联的存储区域。这个存储区域可以是基于文件系统的,也可以是基于数据库(如使用插件支持)的存储方式。在基于文件系统的存储中,消息会按照一定的规则写入到消息存储文件中。当RabbitMQ服务器重启时,它会首先读取队列的元数据文件,恢复队列的定义,然后根据队列的定义和存储的消息文件,将消息重新加载到内存中的队列里,从而保证消息不会丢失,并且队列的结构和消息的顺序等都能恢复到服务器重启前的状态。
  3. 消息持久化的整体流程原理总结

    • 生产者将带有持久化标记(deliveryMode = 2)的消息发送给RabbitMQ服务器。RabbitMQ服务器收到消息后,先检查消息所属的队列是否为持久化队列。如果是,就将消息存储到磁盘上的消息存储区域,同时将队列的相关信息(包括消息的索引等)存储到磁盘上的队列元数据区域。当消费者从队列中获取消息时,RabbitMQ会从磁盘存储区域读取消息并发送给消费者。如果RabbitMQ服务器因为各种原因(如断电、软件故障等)重启,它会根据磁盘上存储的队列元数据和消息内容,重新构建队列并将消息加载回队列,使得消息可以继续被消费者获取,从而实现了消息的持久化,保证消息在整个生命周期内不会因为服务器故障等原因而丢失。

ACK思想

  1. ACK(Acknowledge)的基本概念

    • 在消息队列(MQ)中,ACK是一种确认机制,用于消费者向消息队列告知消息已经被成功接收和处理。当消费者从消息队列中获取到一条消息后,它需要在处理完该消息后发送一个ACK信号给消息队列,以表示消息已经被正确处理。
  2. 手动ACK和自动ACK的区别

    • 自动ACK:在某些MQ的默认配置中,可能采用自动ACK模式。在这种模式下,消费者一旦接收到消息,消息队列就会立即认为该消息已经被成功处理,然后将消息从队列中移除。这种方式的优点是简单快捷,适用于一些对消息处理可靠性要求不高的场景。例如,在简单的日志收集系统中,如果偶尔丢失一条日志消息对系统影响不大,就可以使用自动ACK。但是,这种方式存在风险,因为如果消费者在接收到消息后,还没来得及处理就出现故障(如程序崩溃、网络问题等),那么消息就会丢失。
    • 手动ACK:手动ACK模式则要求消费者在成功处理消息后,通过代码显式地向消息队列发送ACK信号。例如,在RabbitMQ中,使用Java客户端时,消费者可以通过channel.basicAck()方法来发送ACK。只有在收到ACK信号后,消息队列才会将消息从队列中移除。这种方式可以确保消息在真正被处理后才从队列中移除,大大提高了消息处理的可靠性。如果消费者在处理消息过程中出现异常,没有发送ACK,消息队列会认为该消息没有被正确处理,会将消息重新放回队列,等待下一次被消费。
  3. ACK在消息队列中的重要性

    • 保证消息不丢失:通过手动ACK机制,消息队列可以确保消息不会因为消费者端的故障而丢失。例如,在一个电商订单处理系统中,订单消息通过消息队列传递给库存管理系统。如果库存管理系统在处理订单消息时出现故障,没有发送ACK,那么订单消息会被重新放回队列,等待库存管理系统恢复后再次处理,这样就保证了订单消息不会丢失,从而保证了整个业务流程的准确性。
    • 实现消息的精确消费:ACK机制有助于消息队列精确地控制消息的消费顺序和状态。消息队列可以根据ACK信号来判断哪些消息已经被成功处理,哪些消息还需要重新分配给消费者进行处理。这对于一些对消息处理顺序和完整性要求较高的场景非常重要,比如金融交易系统中的交易消息处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/941225.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

重拾设计模式--外观模式

文章目录 外观模式(Facade Pattern)概述定义 外观模式UML图作用 外观模式的结构C 代码示例1C代码示例2总结 外观模式(Facade Pattern)概述 定义 外观模式是一种结构型设计模式,它为子系统中的一组接口提供了一个统一…

新版国标GB28181设备端Android版EasyGBD支持国标GB28181-2022,支持语音对讲,支持位置上报,开源在Github

经过近3个月的迭代开发,新版本的国标GB28181设备端EasyGBD安卓Android版终于在昨天发布到Github了,最新的EasyGBD支持了国标GB28181-2022版,还支持了语音对讲、位置上报、本地录像等功能,比原有GB28181-2016版的EasyGBD更加高效、…

element-puls封装表单验证

项目场景: 提示:这里简述项目相关背景: 在做项目中会有一些简单的表单非空验证,这些验证比较简单,就是代码看着有点多,做起来浪费时间,所以我们可以将这个方法封装起来,然后挂载全…

Unity命令行传递自定义参数 命令行打包

命令行参数增加位置 -executeMethod 某脚本.某方法 参数1 参数2 参数3 ... 例如执行EditorTest.GetCommandLineArgs方法 增加两个命令行参数 Version=125 CDNVersion=100 -executeMethod EditorTest.GetCommandLineArgs Version=125 CDNVersion=100 Unity测试脚本 需要放在…

【Java基础面试题033】Java泛型的作用是什么?

Java的基础语法可以看尚硅谷的这个PDF:尚硅谷JavaSE基础/《Java从入门到精通(JDK17版)》_尚硅谷电子书.pdf Autism_Btkrsr/Blog_md_to_pdf - 码云 - 开源中国 (gitee.com) 回答重点 Java泛型的作用是通过在编译时检查类型安全,允许程序员编写更通用和…

Flutter环境搭建

1.Flutter 简介 1.1 Flutter 是什么 ? Flutter 是一个 UI SDK(Software Development Kit)跨平台解决方案:可以实现一套代码发布移动端(iOS、Android、HarmonyOS)、Web端、桌面端目前很多公司都在用它&…

COMSOL with Matlab

文章目录 基本介绍COMSOL with MatlabCOMSOL主Matlab辅Matlab为主Comsol为辅 操作步骤常用指令mphopenmphgeommghmeshmphmeshstatsmphnavigatormphplot常用指令mphsavemphlaunchModelUtil.clear 实例教学自动另存新档**把语法套用到边界条件**把语法套用到另存新档 函数及其微分…

AlipayHK支付宝HK接入-商户收款(PHP)

一打开支付宝国际版 二、点开商户服务 三、下载源码

设计模式之 abstract factory

适用场景 一个系统要独立于它的产品的创建、组合和表示时。一个系统要由多个产品系列中的一个来配置时。当你要强调一系列相关的产品对象的设计以便进行联合使用时。当你提供一个产品类库,而只想显示它们的接口而不是实现时 架构演示 首先client这个东西可以接触到…

华为IPD流程6大阶段370个流程活动详解_第一阶段:概念阶段 — 81个活动

华为IPD流程涵盖了产品从概念到上市的完整过程,各阶段活动明确且相互衔接。在概念启动阶段,产品经理和项目经理分析可行性,PAC评审后成立PDT。概念阶段则包括产品描述、市场定位、投资期望等内容的确定,同时组建PDT核心组并准备项目环境。团队培训涵盖团队建设、流程、业务…

每天40分玩转Django:Django部署

Django部署 一、今日学习内容概述 学习模块重要程度主要内容生产环境配置⭐⭐⭐⭐⭐settings配置、环境变量WSGI服务器⭐⭐⭐⭐⭐Gunicorn配置、性能优化Nginx配置⭐⭐⭐⭐反向代理、静态文件安全设置⭐⭐⭐⭐⭐SSL证书、安全选项 二、生产环境配置 2.1 项目结构调整 mypr…

主要是使用#includenlohmannjson.hpp时显示找不到文件,但是我文件已正确导入visual studio配置,也保证文件正确存在

问题: 主要是在项目配置中包括了C/C配置中文件位置,但是没有把nlohmann上一级的目录包括进去,导致#include"nlohmann/json.hpp"找不到文件位置 解决: 加上上一级目录到附加包含目录 596513661)] 总结: 找不…

tslib(触摸屏输入设备的轻量级库)的学习、编译及测试记录

目录 tslib的简介tslib的源码和make及make install后得到的文件下载tslib的主要功能tslib的工作原理tslib的核心组成部分tslib的框架和核心函数分析tslib的框架tslib的核心函数ts_setup()的分析(对如何获取设备名和数据处理流程的分析)函数ts_setup()自身的主要代码ts_setup()对…

Unity DOTS中的share component

Unity DOTS中的share component 内存管理创建流程修改流程销毁流程Reference share component是DOTS中一类比较特殊的component,顾名思义,它是全局共享的component,所有具有相同component值的entity,共享一个component&#xff0c…

EfficienetAD异常值检测之瓷砖表面缺陷检测(免费下载测试数据集和模型)

背景 当今制造业蓬勃发展,产品质量把控至关重要。从精密电子元件到大型工业板材,表面缺陷哪怕细微,都可能引发性能故障或外观瑕疵。人工目视检测耗时费力且易漏检,已无法适应高速生产线节奏。在此背景下,表面缺陷异常…

【从零开始入门unity游戏开发之——C#篇21】C#面向对象的封装——`this`扩展方法、运算符重载、内部类、`partial` 定义分部类

文章目录 一、this扩展方法1、扩展方法的基本语法2、使用扩展方法3、扩展方法的注意事项5、扩展方法的限制6、总结 二、运算符重载1、C# 运算符重载2、运算符重载的基本语法3. 示例:重载加法运算符 ()4、使用重载的运算符5、支持重载的运算符6、不能重载的运算符7、…

vscode 快速切换cangjie版本

前言 目前阶段cangjie经常更新,这就导致我们可能会需要经常在不同的版本之间切换。 在参加训练营时从张老师那学到了如何使用 vscode 的配置文件来快速进行cangjie版本的切换。 推荐一下张老师的兴趣组 SIGCANGJIE / 仓颉兴趣组 这里以 windows 下,配置…

RCE总结

文章目录 常见漏洞执行函数:1.系统命令执行函数2.代码执行函数 命令拼接符读取文件命令绕过:空格过滤绕过关键字绕过长度过滤绕过无参数命令执行绕过无字母数字绕过利用%0A截断利用回溯绕过利用create_function()代码注入无回显RCE1.反弹shell2.dnslog外…

springmvc的拦截器,全局异常处理和文件上传

拦截器: 拦截不符合规则的,放行符合规则的。 等价于过滤器。 拦截器只拦截controller层API接口。 如何定义拦截器。 定义一个类并实现拦截器接口 public class MyInterceptor implements HandlerInterceptor {public boolean preHandle(HttpServletRequest reque…

前端知识补充—HTML

1. HTML 1.1 什么是HTML HTML(Hyper Text Markup Language), 超⽂本标记语⾔ 超⽂本: ⽐⽂本要强⼤. 通过链接和交互式⽅式来组织和呈现信息的⽂本形式. 不仅仅有⽂本, 还可能包含图⽚, ⾳频, 或者⾃已经审阅过它的学者所加的评注、补充或脚注等等 标记语⾔: 由标签构成的语⾔…