RocketMQ相关知识知多少

一、RocketMQ的定义

官网网址:领域模型概述 | RocketMQ

Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。【RocketMQ是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式的特点,是采用java语言开发的分布式的消息系统。 】

RocketMQ领域模型

核心特性:

  • 云原生:无限弹性扩缩,K8S友好

  • 高吞吐:万亿级吞吐保证,同时满足微服务与大数据场景

  • 流处理:提供轻量、高扩展、高性能和丰富功能的流计算引擎

  • 金融级:稳定性,广泛用于交易核心链路

  • 架构极简:零外部依赖,Shared-nothing架构

  • 生态友好:无缝对接微服务、实时计算、数据湖等周边生态

二、消息模型

Apache RocketMQ 中消息的生命周期主要分为消息生产、消息存储、消息消费这三部分。

消息生产

生产者(Producer):

Apache RocketMQ 中用于产生消息的运行实体,一般集成于业务调用链路的上游。生产者是轻量级匿名无身份的。

消息存储

  • 主题(Topic):

    Apache RocketMQ 消息传输和存储的分组容器,主题内部由多个队列组成,消息的存储和水平扩展实际是通过主题内的队列实现的。

  • 队列(MessageQueue):

    Apache RocketMQ 消息传输和存储的实际单元容器,类比于其他消息队列中的分区。 Apache RocketMQ 通过流式特性的无限队列结构来存储消息,消息在队列内具备顺序性存储特征。

  • 消息(Message):

    Apache RocketMQ 的最小传输单元。消息具备不可变性,在初始化发送和完成存储后即不可变。

消息消费

  • 消费者分组(ConsumerGroup):

    Apache RocketMQ 发布订阅模型中定义的独立的消费身份分组,用于统一管理底层运行的多个消费者(Consumer)。同一个消费组的多个消费者必须保持消费逻辑和配置一致,共同分担该消费组订阅的消息,实现消费能力的水平扩展。

  • 消费者(Consumer):

    Apache RocketMQ 消费消息的运行实体,一般集成在业务调用链路的下游。消费者必须被指定到某一个消费组中。

  • 订阅关系(Subscription):

    Apache RocketMQ 发布订阅模型中消息过滤、重试、消费进度的规则配置。订阅关系以消费组粒度进行管理,消费组通过定义订阅关系控制指定消费组下的消费者如何实现消息过滤、消费重试及消费进度恢复等。

    Apache RocketMQ 的订阅关系除过滤表达式之外都是持久化的,即服务端重启或请求断开,订阅关系依然保留。

一个Topic分布在多个Broker上,一个Broker可以配置多个Topic,他们之间是多对多的关系。如果某个Topic消息量很大,应该给它多配置几个队列,并且尽量多分布在不同Broker上,以减轻某个Broker的压力。Topic消息量都比较均匀的情况下,如果某个broker上的队列越多,则该broker压力越大。】

三、功能特性

1.普通信息:

普通消息:一般应用于微服务解耦、事件驱动、数据集成等场景,这些场景大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。

普通消息生命周期

  • 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 待消费:消息被发送到服务端,对消费者可见,等待消费者消费状态。

  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间内没有收到消费者的响应,RocketMQ会对消息进行重试。

  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除:RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

2.定时消息:

定时消息:使用 RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。

基于定时消息的超时任务处理具备如下优势:

  • 精度高、开发门槛低:基于消息通知方式不存在定时阶梯间隔。可以轻松实现任意精度事件触发,无需业务去重。

  • 高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。RocketMQ 的定时消息具有高并发和水平扩展的能力

定时消息生命周期:

  • 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达。

  • 待消费:定时时刻到达后,服务端将消息重新写入普通存储引擎,对下游消费者可见,等待消费者消费的状态。

  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ 会对消息进行重试处理。

  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除:Apache RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

【定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延,影响定时精度】

3.顺序消息:

.顺序消息:顺序消息仅支持MessageType为FIFO的主题,即顺序消息只能发送至类型为顺序消息的主题中,发送的消息的类型必须和主题的类型一致。和普通消息发送相比,顺序消息发送必须要设置消息组。(推荐MessageQueueSelector 的方式)。要保证消息的顺序性需要单一生产者串行发送。

单线程使用 MessageListenerConcurrently 可以顺序消费,多线程环境下使用 MessageListenerOrderly 才能顺序消费。

4.事务消息:

事务消息:是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。简单来讲,就是将本地事务(数据库的 DML 操作)与发送消息合并在同一个事务中。例如,新增一个订单。在事务未提交之前,不发送订阅的消息。发送消息的动作随着事务的成功提交而发送,随着事务的回滚而取消。

!!不建议单一进程创建大量生产者

!!不建议频繁创建和销毁生产者

四、消费者分类

RocketMQ 支持 PushConsumer 、 SimpleConsumer 以及 PullConsumer 这三种类型的消费者。【在实际使用场景中,PullConsumer 仅推荐在流处理框架中集成使用,大多数消息收发场景使用 PushConsumer 和 SimpleConsumer 就可以满足需求。】

1.PushConsumer:高度封装的消费类型,消费消息仅仅通过通过消费监听器监听并返回结果。 消息的获取、消费状态提交以及消费重试都通过 RocketMQ 的客户端 SDK 完成。

  • 返回消费成功:以 Java SDK 为例,返回,表示该消息处理成功,服务端按照消费结果更新消费进度。ConsumeResult.SUCCESS

  • 返回消费失败:以 Java SDK 为例,返回,表示该消息处理失败,需要根据消费重试逻辑判断是否进行重试消费。ConsumeResult.FAILURE

  • 出现非预期失败:例如抛异常等行为,该结果按照消费失败处理,需要根据消费重试逻辑判断是否进行重试消费。

使用 PushConsumer 消费者消费时,不允许使用以下方式处理消息,否则 RocketMQ 无法保证消息的可靠性。

  • 错误方式一:消息还未处理完成,就提前返回消费成功结果。 此时如果消息消费失败,RocketMQ 服务端是无法感知的,因此不会进行消费重试。

  • 错误方式二:在消费监听器内将消息再次分发到自定义的其他线程,消费监听器提前返回消费结果。 此时如果消息消费失败,RocketMQ 服务端同样无法感知,因此也不会进行消费重试。

  • PushConsumer 严格限制了消息同步处理及每条消息的处理超时时间,适用于以下场景:

    • 消息处理时间可预估:如果不确定消息处理耗时,经常有预期之外的长时间耗时的消息,PushConsumer 的可靠性保证会频繁触发消息重试机制造成大量重复消息。

    • 无异步化、高级定制场景:PushConsumer 限制了消费逻辑的线程模型,由客户端 SDK 内部按最大吞吐量触发消息处理。 该模型开发逻辑简单,但是不允许使用异步化和自定义处理流程。

2.简单消费者:SimpleConsumer 是一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成。

SimpleConsumer 适用于以下场景:

  • 消息处理时长不可控:如果消息处理时长无法预估,经常有长时间耗时的消息处理情况。 建议使用 SimpleConsumer 消费类型,可以在消费时自定义消息的预估处理时长,若实际业务中预估的消息处理时长不符合预期,也可以通过接口提前修改。

  • 需要异步化、批量消费等高级定制场景:SimpleConsumer 在 SDK 内部没有复杂的线程封装,完全由业务逻辑自由定制,可以实现异步分发、批量消费等高级定制场景。

  • 需要自定义消费速率:SimpleConsumer 是由业务逻辑主动调用接口获取消息,因此可以自由调整获取消息的频率,自定义控制消费速率。

五、消费者过滤

消费者订阅了某个主题后,Apache RocketMQ 会将该主题中的所有消息投递给消费者。若消费者只需要关注部分消息,可通过设置过滤条件在 Apache RocketMQ 服务端进行过滤,只获取到需要关注的消息子集,避免接收到大量无效的消息。【使用 Apache RocketMQ 的消息过滤功能,可以帮助消费者更高效地过滤自己需要的消息集合,避免大量无效消息投递给消费者,降低下游系统处理压力。】

过滤的含义指的是将符合条件的消息投递给消费者,而不是将匹配到的消息过滤掉。

Apache RocketMQ 的消息过滤功能通过生产者和消费者对消息的属性、标签进行定义,并在 Apache RocketMQ 服务端根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费。

 

消息过滤主要通过以下几个关键流程实现:

  • 生产者:生产者在初始化消息时预先为消息设置一些属性和标签,用于后续消费时指定过滤目标。

  • 消费者:消费者在初始化及后续消费流程中通过调用订阅关系注册接口,向服务端上报需要订阅指定主题的哪些消息,即过滤条件。

  • 服务端:消费者获取消息时会触发服务端的动态过滤计算,Apache RocketMQ 服务端根据消费者上报的过滤条件的表达式进行匹配,并将符合条件的消息投递给消费者。

RocketMQ 支持Tag标签过滤和SQL属性过滤,这两种过滤方式对比如下:

六、消费重试

消费重试指的是,消费者在消费某条消息失败后,Apache RocketMQ 服务端会根据重试策略重新消费该消息,超过一定次数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中。

消息重试的触发条件

  • 消费失败,包括消费者返回消息失败状态标识或抛出非预期异常。

  • 消息处理超时,包括在PushConsumer中排队超时。

消息重试策略主要行为

  • 重试过程状态机:控制消息在重试流程中的状态和变化逻辑。

  • 重试间隔:上一次消费失败或超时后,下次重新尝试消费的间隔时间。

  • 最大重试次数:消息可被重试消费的最大次数。

消息重试策略差异

根据消费者类型不同,消息重试策略的具体内部机制和设置方法有所不同,具体差异如下:

PushConsumer消费消息时,消息的几个主要状态如下:

  • Ready:已就绪状态。消息在Apache RocketMQ服务端已就绪,可以被消费者消费。

  • Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态。

  • WaitingRetry:待重试状态,PushConsumer独有的状态。当消费者消息处理失败或消费超时,会触发消费重试逻辑判断。如果当前重试次数未达到最大次数,则该消息变为待重试状态,经过重试间隔后,消息将重新变为已就绪状态可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。

  • Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机。

  • DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试,会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。

SimpleConsumer消费消息时,消息的几个主要状态如下:

  • Ready:已就绪状态。消息在Apache RocketMQ服务端已就绪,可以被消费者消费。

  • Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态。

  • Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机。

  • DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试,会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/677375.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

深入理解Java中的位运算符

哈喽,各位小伙伴们,你们好呀,我是喵手。运营社区:C站/掘金/腾讯云;欢迎大家常来逛逛 今天我要给大家分享一些自己日常学习到的一些知识点,并以文字的形式跟大家一起交流,互相学习,一…

Python深度学习基于Tensorflow(15)OCR验证码 文本检测与识别实例

文章目录 文本检测文本识别CTC层生成验证码并制作数据集建立模型模型推理 参考 文本检测 文本检测和目标检测类似,其不同之处在于文本目标具有序列特征,有连续性,可以通过结合 Faster R-CNN 和 LSTM 的方式进行文本检测,如 CTPN …

Android Graphics 显示系统 - Android Jank detection with FrameTimeline

“ 最近有公司同事在处理UI卡顿及FPS自动化监测的问题,我也顺便看了一点相关的内容,其中在Perfetto的官方说明文档中有一篇关于利用FrameTimeLine进行Jank监测的解读,个人觉得蛮有意思的,借助工具翻译该篇文章并加上本人拙劣的解读…

linux(centos7)开机自启jar文件

问题 之前参考网上说的直接在/etc/rc.local文件中增加sh文件启动语句,但是没有效果: /root/dashboard/dashboard_backend/start_dashboard.sh 权限也增加了,还是不行: chmod x /etc/rc.local 排查 排查了一下: 查…

基于聚类和回归分析方法探究蓝莓产量影响因素与预测模型研究附录

🌟欢迎来到 我的博客 —— 探索技术的无限可能! 🌟博客的简介(文章目录) 目录 背景数据说明数据来源思考 附录数据预处理导入包以及数据读取数据预览数据处理 相关性分析聚类分析数据处理确定聚类数建立k均值聚类模型 …

FFmpeg播放器的相关概念【1】

播放器框架 相关术语 •容器/文件(Conainer/File):即特定格式的多媒体文件,比如mp4、flv、mkv等。 • 媒体流(Stream):表示时间轴上的一段连续数据,如一段声音数据、一段…

BIOS主板(非UEFI)安装fedora40的方法

BIOS主板(非UEFI)安装fedora40的方法 现实困难:将Fedora-Workstation-Live-x86_64-40-1.14.iso写入U盘制作成可启动U盘启动fedora40,按照向导将fedora40安装到真机的sda7分区中得到报错如下内容: Failed to find a suitable stage1 device: E…

氯气安全阀检测流程揭秘:保障化工安全新举措

在化工行业中,氯气作为一种重要的工业原料,广泛应用于多个生产领域。 然而,氯气的危险性也不容忽视,一旦发生泄漏或超压等安全事故,后果不堪设想。因此,氯气安全阀的重要性便显得尤为突出。 在这篇文章中…

WindowManager相关容器类

窗口中容器类介绍&#xff1a; 本节内容较多&#xff0c;建议结合前面的内容一起阅读&#xff1a; 1、addWindow的宏观概念 2、WindowManager#addView_1 3、WindowManager#addView_2 1&#xff09;、WindowContainer&#xff1a; class WindowContainer<E extends WindowC…

Python编程基础2

文件对象&#xff1a; open内建函数&#xff1a;通过了初始化输入、输出&#xff08;I/O&#xff09;操作的通用接口&#xff0c;成功打开文件后会返回一个文件对象&#xff0c;否则引发错误。file_objectopen&#xff08;file_name&#xff0c;mode‘r’&#xff09;:file_nam…

一款仅200kb好看的免费引导页源码

源码介绍: 这是一款200kb左右的引导页,超级好看,用服务器或者主机均可搭建 下载压缩包解压至根目录即可&#xff0c;页面内容在index.html里修改 左边图片采用的是API接口&#xff08;不喜欢可以自行更换,在66/67行&#xff09; 引导页压缩包放在下面了,有需要的朋友可以直接下…

【热点】老黄粉碎摩尔定律被,量产Blackwell解决ChatGPT耗电难题

6月3日&#xff0c;老黄又高调向全世界秀了一把&#xff1a;已经量产的Blackwell&#xff0c;8年内将把1.8万亿参数GPT-4的训练能耗狂砍到1/350&#xff1b; 英伟达惊人的产品迭代&#xff0c;直接原地冲破摩尔定律&#xff1b;Blackwell的后三代路线图&#xff0c;也一口气被…

杂谈k8s

其实看我之前的博客&#xff0c;k8s刚有点苗头的时候我就研究过&#xff0c;然后工作的时候间接接触 也自己玩过 但是用的不多就忘记了&#xff0c;正苦于不知道写什么&#xff0c;水一篇 简化容器应用程序的部署和管理 自动化部署、自动伸缩、负载均衡、存储管理、自我修复 支…

DP-Kmaens密度峰值聚类算法

我有个问题 关于 [密度值>密度阈值] 的判定这里&#xff0c;新进来的新数据怎么确定他的密度值&#xff1f;密度阈值又是怎样确定的呢&#xff1f;

Golang | Leetcode Golang题解之第129题求根节点到叶节点数字之和

题目&#xff1a; 题解&#xff1a; type pair struct {node *TreeNodenum int }func sumNumbers(root *TreeNode) (sum int) {if root nil {return}queue : []pair{{root, root.Val}}for len(queue) > 0 {p : queue[0]queue queue[1:]left, right, num : p.node.Left, …

RPM包方式离线部署gitlab

下载安装包 要求&#xff1a;可以联网&#xff0c;系统及版本与目标服务器一致。配置gitlab yum仓库 curl -s https://packages.gitlab.com/install/repositories/gitlab/gitlab-ce/script.rpm.sh | sudo bash 新建包存放目录 mkdir /root/gitlab 下载gitlab及相关安装包 …

Linux之线程及线程安全详解

前言&#xff1a;在操作系统中&#xff0c;进程是资源分配的基本单位&#xff0c;那么线程是什么呢&#xff1f;线程是调度的基本单位&#xff0c;我们该怎么理解呢&#xff1f; 目录 一&#xff0c;线程概念理解 二&#xff0c;Linux里面的线程原理 三&#xff0c;为什么要…

《MySQL索引》学习笔记

《MySQL索引》学习笔记 MySQL的体系结构存储引擎简介InnoDB简介MyISAM简介 索引索引结构BTreeHash索引思考索引分类 索引语法SQL性能分析索引使用最左前缀法则 索引失效的情况范围查询索引列运算字符串不加引号模糊查询or连接的条件数据分布影响 SQL提示覆盖索引前缀索引单列索…

【MyBatisPlus】DML编程控制

【MyBatisPlus】DML编程控制 文章目录 【MyBatisPlus】DML编程控制1、id生成策略2、逻辑删除 1、id生成策略 id生成策略控制&#xff08;TableId注解&#xff09; 名称&#xff1a;TableId 类型&#xff1a;属性注解 位置&#xff1a;模型类中用于表示主键的属性定义上方 作…

机器学习中的集成学习

&#x1f4ac;内容概要 1 集成学习概述及主要研究领域 2 简单集成技术  2.1 投票法  2.2 平均法  2.3 加权平均 3 高级集成技术  3.1 Bagging  3.2 Boosting  3.3 Bagging vs Boosting 4 基于Bagging和Boosting的机器学习算法  4.1 sklearn中的Bagging算法  4.2 sklea…