RabbitMQ 的高阶应用及可靠性保证

       

目录

一、RabbitMQ 高阶应用        

     1.1 消息何去何从

        1.2 过期时间

        1.3 死信队列

        1.4 延迟队列

        1.5 优先级队列

        1.6 消费质量保证(QOS)

二、持久化

三、生产者确认

四、消息可靠性和重复消费

        4.1 消息可靠性

        4.2 重复消费问题


上篇文章介绍了 RabbitMQ 的基本概念和使用,这篇文章就来介绍下其高阶应用和可靠性保证。

一、RabbitMQ 高阶应用        

        RabbitMQ 还提供了诸多高级特性,比如:过期时间、交换器备份、死信队列、延迟队列、优先级队列、持久化、消费端消息分发等等,下面介绍几个重要特性。

     1.1 消息何去何从

        mandatory 参数,当设置为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当设置为 false 时,出现上述情形,则消息直接被丢弃。那么生产者如何获取到未被路由到合适队列的消息呢?需要实现 listener,SpringBoot 中需要实现 ReturnCallback。

        immediate 参数,为 true 时,如果交换机将消息路由到队列时发现队列上并不存在消费者,那么这条消息将不会被存入队列中。当与路由键匹配的队列都没有消费者时,该消息会 return 给生产者。

        概括来说,mandatory 参数告诉服务器至少将消息路由到一个队列中,否则将消息返回给生产者。immediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递,如果所有匹配的队列上无消费者,则将消息返回给生产者,不用将消息存入队列等待消费者。

        RabbitMQ 3.0 版本去掉了 immediat 参数的支持,官方解释是:会影响镜像队列的性能,增加代码的复杂性,建议采用 TTL 和 DLX 的方法替换。

        1.2 过期时间

        RabbitMQ 可以对消息和队列设置 TTL。

        设置消息的TTL。方法一:通过队列的属性设置,队列中的所有消息都有相同的过期时间,一旦消息过期,就会立即从队列中抹去。方法二:对消息单独设置,每条消息的TTL可以不同,即使消息过期也不会立即从队列中抹去,在投递前判定。如果两者一同使用,则以最小的那个为准,消息的生存时间一旦超过了设置的TTL,就会变成“死信”,消费者则无法收到该消息。设置过期时间的方法如下:

Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 6000);
Queue queue = new Queue(vodQueue, true, false, false, args);

        1.3 死信队列

        DLX,全称为 Dead-Letter-Exchange,死信交换器。当一个消息在队列中变成死信之后,它能被发送到另一个交换器中,这个交换器即是 DLX,绑定死信交换器的队列称为死信队列。消息变为死信的情况:

  • 消息被拒绝,并且设置的 requeue 为 false
  • 消息过期
  • 队列达到最大长度

        DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动的将消息重新发布到设置的 DLX 上去,进而被路由到另一个队列中,即死信队列。可以监听这个队列中的消息进行相应的处理。设置死信队列的方法:

Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx-exchange");
Queue queue = new Queue(vodQueue, true, false, false, args);

        对于 RabbitMQ 来说,DLX 是一个非常有用的特性。他可以处理异常情况,消息不能被消费者正确消费而被至于死信队列中的情况后去分析程序可以通过这个死信队列中的内容来分析当时所遇到的一场情况。进而可优化改善系统。

        1.4 延迟队列

        所谓延迟队列是指当消息被发出后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到消息。

        在AMQP协议中,或者 RabbitMQ 本身并没有直接支持延迟队列的功能,但可以通过 DLX 和TTL 来实现。        

        1.5 优先级队列

         优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。设置优先级队列

Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10);
Queue queue = new Queue(vodQueue, true, false, false, args);

        在发送消息时设置当前的优先级,默认最低为0,最高为队列设置的最大优先级。优先级高的消息可以被优先消费,这个是有前提的:如果在消费者消费速度大于生产者的生产速度且broker 中没有消息堆积的情况,对发送的消息设置优先级也就没有什么实际意义了。因为生产者刚发送完一条消息就被消费了,那么就意味着 broker 中至多有一条消息,对于单条消息来说优先级是没意义的。       

        1.6 消费质量保证(QOS)

        当 RabbitMQ 队列拥有多个消费者时,队列收到的消息将以轮询的方式分发给消费者。每条消息只会发送给一个消费者。这种方式非常适合扩展,而且是专门为并发程序设计的。如果现在的负载加重,只需要创建更多的消费者即可。

        这种方式不那么优雅,分发中不管消费者的消息是否处理完了,试想一下,某些消费者的任务繁重,来不及处理消息并确认,而某些消费者由于某些原因很快处理完了所分配的消息,进而进程空闲,这样会造成总体的吞吐量下降。该如何处理这种情况呢?引入Qos,他会告诉 broker 我没消费完当前消息前,不要给我新消息了,这就保证了消费质量。Qos对于拉模式是无效的。                设置方法如下:

// prefetchSize和prefetchCount设置为0,说明无限制
// prefetchSize: 指定消费者可以接收的最大内容量(单位通常是字节)。如果设置了非零值,RabbitMQ 会阻止发布者发送更多的消息,直到消费者发送了足够多的确认来释放足够的容量。默认情况下,RabbitMQ 并不实现 prefetchSize 参数,所以通常设置为0,表示不对此做限制。
// prefetchCount: 更常用的一个参数,表示消费者最多可以接收多少个未确认的消息。当达到这个数量后,RabbitMQ 将暂停向该消费者推送更多消息,直到消费者确认了部分消息,腾出了“槽位”。例如,设置为1意味着消费者每次处理完一个消息并发送确认之后,才能接收下一个消息。
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

二、持久化

        持久化可以提高 RabbitMQ 的可靠性,防止在异常情况下消息的丢失。RabbitMQ 的持久化分成三部分:交换器的持久化、队列的持久化和消息的持久化。

        交换器的持久化可以在声明交换器的时候设置,如果交换器不设置持久化,RabbitMQ 重启后,交换器的元数据会丢失,不过消息不会丢失,只是不能将消息发送给这个交换器了。

        队列的持久化在声明队列的时候设置,如果不设置队列的持久化,RabbitMQ 服务重启后,队列的元数据会丢失,此时数据也会丢失。

        将交换器、队列、消息都设置成持久化后能保证数据不丢失吗?答案是否定的。

  1. 从消费者的角度来看,将 autoAck 设置成 true,那么当消费者接受到相关消息后,还没来得及处理就宕机了,这样也算数据丢失。
  2. 在持久化的消息正确存入 RabbitMQ 之后,还需要一段时间才能存入磁盘。RabbitMQ 不会为每条消息都进行同步存盘(调用内核的fsync)的处理,可能仅保存在操作系统缓存之中而不是物理磁盘之中。如果在这段时间内,RabbitMQ 发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。

        上面的问题可以通过镜像队列机制来解决。相当于配置了副本,如果主节点在此特殊时期挂掉了,可以自动切换到从节点,这样有效的保证了高可用性,除非整个集群都挂掉。虽然这样也不能完全保证不丢失,但这样已经好很多。

三、生产者确认

        生产者将消息发送出去后,消息到底有没有到达服务器呢?如果不进行配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下,生产者不知道消息有没有正确的到达服务器。如果到达服务器之前就丢失了,持久化操作也解决不了问题,因为还没到达服务器,何谈持久化?针对这个问题提供了两种解决方式:

  • 通过事务机制实现
  • 通过发送确认机制实现

        开启事务多了几个环节,只有消息成功被 RabbitMQ 接收,事务才能提交,否则便可在捕获异常之后进行处理。但事务会严重影响 RabbitMQ 的性能,大大降低吞吐量。

        发送方确认是一种轻量级的机制,生产者将信道设置成 confirm(确认)模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID,一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包括消息的唯一id),这就使得生产者知晓消息已经正确到达目的地。如果消息和队列是持久化的,那么确认消息会在消息写入磁盘之后发出。

        事务机制在一条消息发出后会使发送端阻塞,以等待rabbitmq的回应,之后才能发送下一条消息。相比消息确认机制,发送方确认机制是异步的。事务机制和确认机制是互斥的不能共存。

四、消息可靠性和重复消费

        只要涉及到消息中间件,消息可靠性和重复消费就是无可避免的话题,那 RabbitMQ 是如何设计的呢?

        4.1 消息可靠性

  1. 持久化设置,这在上文已经介绍,通过持久化队列、交换器和消息来保存消息。
  2. 事务和确认机制:上文已经介绍了生产者的确认机制,通过这个机制来保证生产者发送的消息不回丢失。
  3. 消费者消息确认:可以通过消息的手动ack来保证消息能消费完成
  4. 消息镜像队列:设置队列为镜像队列,可以将消息复制到多个节点,即使某个节点宕机,消息仍可以从其他节点获取。

        通过以上措施的组合使用,可以大大提高 RabbitMQ 消息传递的可靠性,尽可能减少消息丢失的风险。然而,即使采取了所有措施,也不能完全保证100%的消息不丢失,因为消息在传输过程中可能还受到网络、硬件故障等因素的影响。在实际应用中,需要根据业务场景权衡消息的可靠性、性能和成本。

        4.2 重复消费问题

        在 RabbitMQ 中,重复消费指的是同一个消息被多个消费者或者同一个消费者消费多次的现象。这种问题可能会导致数据不一致或者业务逻辑错误。造成重复消费的原因可能有:

  1. 消费者ACK确认失败:消费者接收到消息并开始处理,但是在处理完毕并发送 ACK 确认之前断开了连接,比如网络抖动或消费者进程异常退出,导致 RabbitMQ 未收到ACK确认,于是消息重新入队等待再次被消费。
  2. 消息重回队列:在有死信交换机(Dead Letter Exchange, DLX)或者消息TTL(Time To Live)到期后重新投递的情况下,消息可能被重新发送到原来的队列或另一个队列,从而被再次消费。
  3. 消费者超时设置不当:如果消费者的超时设置过短,可能会在消息处理未完成时就已经被认为超时,消息会被重新放回队列。

        那如何解决重复消费问题呢?

  1. 消息确认机制:确保消费者正确使用手动确认模式(Manual Acknowledgments),只有当消息处理成功后才发送 ACK 确认给 RabbitMQ,否则在遇到异常时可以重新消费。
  2. 幂等性设计:消费者的业务逻辑应当设计为幂等的,即使同一条消息被消费多次,处理结果也是相同的,不影响业务状态。例如,通过消息ID或业务流水号来判断消息是否已经处理过。
  3. 防重ID:在消息体中携带一个全局唯一的ID,消费者在处理消息前,先检查这个ID是否已经被处理过,如果已经处理过,则直接丢弃消息。

        总之,避免重复消费的关键在于消息确认机制、幂等性设计以及合理的重试和补偿策略。同时,完善的日志记录和监控也是非常重要的,以便在出现问题时能够快速定位和修复。

往期经典推荐

探秘 RabbitMQ 的设计理念与核心技术要点-CSDN博客

走进 Mybatis 内核世界:理解原理,释放更多生产力-CSDN博客

深入浅出 Kafka 消费者:解密分布式消息流的幕后英雄_kafka消费-CSDN博客

深入剖析Kafka生产者:揭秘消息从发送到落地的全过程-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/486245.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

皓学IT:JavaWEB_Cookie

一、Cookie 1.1.Cookie概述 Cookie翻译成中文是小甜点&#xff0c;小饼干的意思。在HTTP中它表示服务器送给客户端浏览器的小甜点。其实Cookie就是一个键和一个值构成的&#xff0c;随着服务器端的响应发送给客户端浏览器。然后客户端浏览器会把Cookie保存起来&#xff0c;当…

【快刊合集】计算机类SCI,IEEE出版社,中科院2/1区TOP,分数逐年攀升!!

本期推荐 【SciencePub学术】本期&#xff0c;小编给大家推荐的是1本计算机类的甄选好刊&#xff0c;该期刊隶属于IEEE出版社旗下&#xff0c;最新的影响因子已达到7&#xff0c;是1本业内认可度非常高的期刊。 01 期刊基本信息 【期刊简介】IF&#xff1a;7.5-8.0&#xff0…

Git进阶命令-reset

一、reset命令使用场景 有时候我们提交了一些错误的或者不完善的代码&#xff0c;需要回退到之前的某个稳定的版本,面对这种情况有两种解决方法: 解决方法1&#xff1a;修改错误内容&#xff0c;再次commit一次 解决方法2&#xff1a;使用git reset 命令撤销这一次错误的com…

网络类型及数据链路层协议

目录 一、网络的分类 二、数据链路层协议 1、MA网络以太网协议 2、P2P网络 3、HDLC ---高级数据链路控制协议 HDLC地址借用 三、PPP协议 1、PPP协议的优点 2、PPP数据帧封装结构 3、PPP会话的搭建 4、LCP建立——链路建立阶段 4.1协商阶段 4.2认证阶段 4.3 PAP---密…

Java基础【上】韩顺平(反射、类加载、final接口、抽象类、内部类)

涵盖知识点&#xff1a;反射、类加载、单例模式、final、抽象类、接口、内部类&#xff08;局部内部类、匿名内部类、成员内部类、静态内部类&#xff09; P711 反射机制原理 创建如下目录结构&#xff0c;在模块下创建src文件夹&#xff0c;文件夹要设置为Sources文件夹&…

农夫山泉财报公布在即,消费升级的瓶装水市场或将重新洗牌

农夫山泉财报公布在即&#xff0c;消费升级的瓶装水市场或将重新洗牌 新年伊始&#xff0c;2024年对于中国瓶装水行业注定是一个地动山摇的一年&#xff0c;随着农夫山泉董事长钟睒睒上次被媒体集体关注&#xff0c;农夫山泉遭遇上市以来的最大舆论风波。 3月26日农夫山泉的财…

2024 年 8 个最佳 PDF 转 JPG 转换器[免费和付费]

虽然 PDF&#xff08;便携式文档文件&#xff09;是一种流行的文档共享格式&#xff0c;但有时您可能希望将 PDF 文件转换为JPG&#xff0c;然后在网页或社交媒体上共享它们。 在本文中&#xff0c;我们将讨论适用于 Windows 10 和 11 的出色 PDF 到 JPG 转换器的所有特性。 …

精准、快速、便捷:游标尺模式在软件设计中的三大优势

​&#x1f308; 个人主页&#xff1a;danci_ &#x1f525; 系列专栏&#xff1a;《设计模式》 &#x1f4aa;&#x1f3fb; 制定明确可量化的目标&#xff0c;并且坚持默默的做事。 &#x1f680; 转载自&#xff1a;探索设计模式的魅力&#xff1a;精准、快速、便捷&#xf…

注册马来西亚商标常见问题

马来西亚商标法于1983年9月1日正式生效。这部商标法废除了马来亚、沙巴和沙捞越三地区各自的商标法规和申请程序&#xff0c;使马来西亚有了一部统一商标法。此外&#xff0c;马来西亚有关商标的法规包括1983年9月1日同时生效的《1983年商标法实施细则》。在马来西亚&#xff0…

matlab ICP配准高阶用法——统计每次迭代的配准误差并可视化

目录 一、概述二、代码实现三、结果展示1、原始点云2、配准结果3、配准误差本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫。 一、概述 在进行论文写作时,需要做对比实验,来分析改进算法的性能,期间用到了迭代误差分布统…

Java中的锁:实现并发控制与资源共享

JAVA中的锁 锁的概念锁机制为什么要使用锁锁的种类乐观锁/悲观锁独享锁/共享锁互斥锁/读写锁可重入锁/不可重入锁公平锁/非公平锁分段锁/自旋锁CAS/AQS synchronized概念应用场景四种使用场景效果对比synchronized的特点Lock/ReentrantLock对比 Volatile概念Java内存模型线程可…

多工作空间并存时ROS的环境变量异常问题

今天突然发现当多工作空间并存时&#xff0c;我的ROS的环境变量发生了比较诡异的异常&#xff0c;按照我之前的理解&#xff0c;在.bashrc文件中按顺序设定了ROS的环境变量后&#xff0c;ROS就会按照.bashrc中编写的环境变量来搜寻功能包&#xff0c;例如在.bashrc文件使用sour…

Nuclei Poc开发

1、Poc开发工具介绍 Nuclei&#xff1a;https://nuclei.projectdiscovery.io/ Cloud Platfrom云平台&#xff1a;https://cloud.projectdiscovery.io/ 2、目标站点简介 目标演示站点&#xff1a;http://glkb-jqe1.aqlab.cn/nacos/#/login 指纹&#xff1a;Nacos 已知常用漏洞…

【微服务】Eureka(服务注册,服务发现)

文章目录 1.基本介绍1.学前说明2.当前架构分析1.示意图2.问题分析 3.引出Eureka1.项目架构分析2.上图解读 2.创建单机版的Eureka1.创建 e-commerce-eureka-server-9001 子模块2.检查父子pom.xml1.子 pom.xml2.父 pom.xml 3.pom.xml 引入依赖4.application.yml 配置eureka服务5.…

【Web APIs】事件高级

目录 1.事件对象 1.1获取事件对象 1.2事件对象常用属性 2.事件流 1.1事件流的两个阶段&#xff1a;冒泡和捕获 1.2阻止事件流动 1.3阻止默认行为 1.4两种注册事件的区别 3.事件委托 1.事件对象 1.1获取事件对象 事件对象&#xff1a;也是一个对象&#xff0c;这个对象里…

rapidssl证书通配符证书800元

RapidSSL旗下的DV基础型通配符SSL证书可以同时保护多个域名站点&#xff0c;保护主域名以及主域名下的所有子域名。这款通配符SSL证书可以为网站提供数据加密服务&#xff0c;营造安全的上网环境&#xff0c;确保用户在网站上的数据安全传输。今天就随SSL盾小编了解RapidSSL旗下…

2024年HCIE考试题二

27、以下关于在网络中选择认证点位置的描述中&#xff0c;错误的是哪一项&#xff1f; A.在网络的接入层部署认证&#xff0c;有利于实现权限的细颗粒度管理和网络的高安全性 B.用户认证点从接入层上移到汇聚层之后&#xff0c;可能会导致用户的MAC认证失败 C.当用户认证点从…

DC电源模块的设计与调试技巧

BOSHIDA DC电源模块的设计与调试技巧 DC电源模块的设计与调试是电子工程师在实际项目中常常需要面对的任务。一个稳定可靠的DC电源模块对于电路的正常运行起到至关重要的作用。以下是一些设计与调试的技巧&#xff0c;帮助工程师们更好地完成任务。 第一&#xff0c;正确选择…

vue3.0 + ts + eslint报错:error Parsing error: ‘>‘ expected

eslint报错 这里加上对应的 eslint配置即可&#xff1a; parser: vue-eslint-parser, parserOptions: {parser: "typescript-eslint/parser",ecmaVersion: 2020,sourceType: module, }具体如下&#xff1a; module.exports {parser: vue-eslint-parser,parserOpti…

代码随想录阅读笔记-栈与队列【删除字符串中的所有相邻重复项】

题目 给出由小写字母组成的字符串 S&#xff0c;重复项删除操作会选择两个相邻且相同的字母&#xff0c;并删除它们。 在 S 上反复执行重复项删除操作&#xff0c;直到无法继续删除。 在完成所有重复项删除操作后返回最终的字符串。答案保证唯一。 示例&#xff1a; 输入&am…