RocketMQ 5.0 无状态实时性消费详解

作者:绍舒

背景

RocketMQ 5.0 版本引入了 Proxy 模块、无状态 pop 消费机制和 gRPC 协议等创新功能,同时还推出了一种全新的客户端类型:SimpleConsumer。

SimpleConsumer 客户端采用了无状态的 pop 机制,彻底解决了在客户端发布消息、上下线时可能出现的负载均衡问题。然而,这种新机制也带来了一个新的挑战:当客户端数量较少且消息数量较少时,可能会出现消息消费延时的情况。

在当前的消息产品中,消费普遍使用了长轮询机制,即客户端向服务端发送一个超时时间相对较长的请求,该请求会一直挂起,除非队列中存在消息或该请求到达设定的长轮询时间。

然而,在引入 Proxy 之后,目前的长轮询机制出现了一个问题。客户端层面的长轮询和 Proxy 与 Broker 内部的长轮询之间互相耦合,也就是说,一次客户端对 Proxy 的长轮询只对应一次 Proxy 对 Broker 的长轮询。因此,在以下情况下会出现问题:当客户端数量较少且后端存在多个可用的 Broker 时,如果请求到达了没有消息的 Broker,就会触发长轮询挂起逻辑。此时,即使另一台 Broker 存在消息,由于请求挂在了另一个 Broker 上,也无法拉取到消息。这导致客户端无法实时接收到消息,即 false empty response。

这种情况可能导致以下现象:用户发送一条消息后,再次发起消费请求,但该请求却无法实时拉取到消息。这种情况对于消息传递的实时性和可靠性产生了不利影响。

AWS 的文档里也有描述此等现象,他们的解决方案是通过查询是所有的后端服务,减少 false empty response。

在这里插入图片描述

其他产品

在设计方案时,首先是需要目前存在的消息商业化产品是如何处理该问题的。

MNS 采取了以下策略,主要是将长轮询时间切割为多个短轮询时间片,以尽可能覆盖所有的 Broker。

首先,在长轮询时间内,会对后端的 Broker 进行多次请求。其次,当未超过短轮询配额时,优先使用短轮询消费请求来与 Broker 进行通信,否则将使用长轮询,其时间等于客户端的长轮询时间。此外,考虑到过多的短轮询可能会导致 CPU 和网络资源消耗过多的问题,因此在短轮询超过一定数量且剩余时间充足时,最后一次请求将转为长轮询。

然而,上述策略虽以尽可能轮询完所有的 Broker 为目标,但并不能解决所有问题。当轮询时间较短或 Broker 数量较多时,无法轮询完所有的 Broker。即使时间足够充足的情况下,也有可能出现时间错位的情况,即在短轮询请求结束后,才有消息在该 Broker 上就绪,导致无法及时取回该消息。

解法

技术方案

首先,需要明确该问题的范围和条件。该问题只会在客户端数量较少且请求较少的情况下出现。当客户端数量较多且具备充足的请求能力时,该问题不会出现。因此,理想情况是设计一个自适应的方案,能够在客户端数量较多时不引入额外成本来解决该问题。

为了解决该问题,关键在于将前端的客户端长轮询和后端的 Broker 长轮询解耦,并赋予 Proxy 感知后端消息个数的能力,使其能够优先选择有消息的 Broker,避免 false empty response。

考虑到 Pop 消费本身的无状态属性,期望设计方案的逻辑与 Pop 一致,而不在代理中引入额外的状态来处理该问题。

另外,简洁性是非常重要的,因此期望该方案能够保持简单可靠,不引入过多的复杂性。

  1. 为了解决该问题,本质上是要将前端的客户端长轮询和后端的 Broker 长轮询解耦开来,并赋予 Proxy 感知后端消息个数的能力,能够优先选择有消息的 Broker,避免 false empty response。
  2. 由于 Pop 消费本身的无状态属性,因此期望该方案的设计逻辑和 Pop 一致,而不在 Proxy 引入额外的状态来处理这个事情。
  3. Simplicity is ALL,因此期望这个方案简单可靠。

我们使用了 NOTIFICATION,可以获取到后端是否有尚未消费的消息。拥有了上述后端消息情况的信息,就能够更加智能地指导 Proxy 侧的消息拉取。

通过重构 NOTIFICATION,我们对其进行了一些改进,以更好地适应这个方案的要求。

pop with notify

一个客户端的请求可以被抽象为一个长轮询任务,该轮询任务由通知任务和请求任务组成。

通知任务的目的是获取 Broker 是否存在可消费的消息,对应的是 Notification 请求;而请求任务的目的是消费 Broker 上的消息,对应的是 Pop 请求。

首先,长轮询任务会执行一次 Pop 请求,以确保在消息积压的情况下能够高效处理。如果成功获取到消息,则会正常返回结果并结束任务。如果没有获取到消息,并且还有剩余的轮询时间,则会向每个 Broker 提交一个异步通知任务。

在任务通知返回时,如果不存在任何消息,长轮询任务将被标记为已完成状态。然而,如果相关的 Broker 存在消息,该结果将被添加到队列中,并且消费任务将被启动。该队列的目的在于缓存多个返回结果,以备将来的重试之需。对于单机代理而言,只要存在一个通知结果返回消息,Proxy 即可进行消息拉取操作。然而,在实际的分布式环境中,可能会存在多个代理,因此即使通知结果返回消息存在,也不能保证客户端能够成功拉取消息。因此,该队列的设计旨在避免发生这种情况。

在这里插入图片描述

消费任务会从上述队列中获取结果,若无结果,则直接返回。这是因为只有在通知任务返回该 Broker 存在消息时,消费任务才会被触发。因此,若消费任务无法获取结果,可推断其他并发的消费任务已经处理了该消息。

消费任务从队列获取到结果后,会进行加锁,以确保一个长轮询任务只有一个正在进行的消费任务,以避免额外的未被处理的消息。

在这里插入图片描述

如果获取到消息或长轮询时间结束,该任务会被标记完成并返回结果。但如果没有获取到消息(可能是其他客户端的并发操作),则会继续发起该路由所对应的异步通知任务,并尝试进行消费。

自适应切换

考虑到当请求较多时,无需采用 pop with notify 机制,可使用原先的 pop 长轮询 broker 方案,但是需要考虑的是,如何在两者之间进行自适应切换。目前是基于当前 Proxy 统计的 pop 请求数做判断,当请求数少于某一值时,则认为当前请求较少,使用 pop with notify;反之则使用 pop 长轮询。

由于上述方案基于的均为单机视角,因此当消费请求在 proxy 侧不均衡时,可能会导致判断条件结果有所偏差。

Metric

为了之后进一步调优长轮询和观察长轮询的效果,我们设计了一组 metric 指标,来记录并观测实时长轮询的表现和损耗。

  1. 客户端发起的长轮询次数 (is_long_polling)
  2. pop with notify 次数 (通过现有 rpc metric 统计)
  3. 首次 pop 请求命中消息次数 (未触发 notify) (is_short_polling_hit)

使用方式

在使用时需明确长轮询和短轮询的区分,可以参考 AWS 的定义,当轮询时间大于 0 时,长轮询生效。

在这里插入图片描述

可以看到需明确一个长轮询最小时间,因为长轮询时间过小时无意义,AWS 的最小值采取了 1 秒。

在这里插入图片描述

在目前版本的 Apache RocketMQ 服务端中,采用了最小 5 秒的限制,即需超过 5 秒才能触发长轮询,该值可在 ProxyConfig#grpcClientConsumerMinLongPollingTimeoutMillis 中配置或修改。

对于 SimpleConsumer 而言,可以通过 awaitDuration 字段来调整长轮询时间。

SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    .setConsumerGroup(consumerGroup)
    // set await duration for long-polling.
    .setAwaitDuration(awaitDuration)
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .build();

总结

通过如上方案,我们成功设计了一套基于无状态消费方式的实时消费方案,在做到客户端无状态消费的同时,还能够避免 false empty response,保证消费的实时性,同时,相较于原先 PushConsumer 的长轮询方案,能够大量减少用户侧无效请求数量,降低网络开销。

RocketMQ 学习社区体验地址

RocketMQ 学习社区重磅上线!AI 互动,一秒了解 RocketMQ 功能源码。RocketMQ 学习社区是国内首个基于 AIGC 提供的知识服务社区,旨在成为 RocketMQ 学习路上的“贴身小二”。

PS:RocketMQ 社区以 RocketMQ 5.0 资料为主要训练内容,持续优化迭代中,回答内容均由人工智能模型生成,其准确性和完整性无法保证,且不代表 RocketMQ 学习社区的态度或观点。
立即体验 RocketMQ 学习社区(建议 PC 端体验完整功能):https://rocketmq-learning.com/

为了帮助用户更全面的了解 RocketMQ 5.0,同时收集更多反馈,**「寻找 RocketMQ 首席评测官」**活动惊喜上线!

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WRUz4bGL-1690130818194)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/df4c1c35104e46c086cbd7f6bdc7c5aa~tplv-k3u1fbpfcp-zoom-1.image “image”)]

点击此处,即可报名参加!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/45120.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MB5B在HDB上的性能调优

背景 MB5B是用于查询物料的收发以及现有库存。日常业务查询,通常会按照月份查看某片地区物料的库存以及收发状态。 调优思路 按照客户日常操作的习惯,得到日常操作的数据范围,选出数据量最为突出最有代表性的地区和物料;利用SE30分别运行不同数量级的数据,比如20个门店、…

基于LoRA进行Stable Diffusion的微调

文章目录 基于LoRA进行Stable Diffusion的微调数据集模型下载环境配置微调过程 基于LoRA进行Stable Diffusion的微调 数据集 本次微调使用的数据集为: LambdaLabs的Pokemon数据集 使用git clone命令下载数据集 git clone https://huggingface.co/datasets/lambd…

17. 电话号码的字母组合

题目描述 给定一个仅包含数字 2-9 的字符串,返回所有它能表示的字母组合。答案可以按 任意顺序 返回。 给出数字到字母的映射如下(与电话按键相同)。注意 1 不对应任何字母。 示例 1: 输入:digits "23" …

数据结构的复杂度

> 作者简介:დ旧言~,目前大一,现在学习Java,c,c,Python等 > 座右铭:松树千年终是朽,槿花一日自为荣。 > 望小伙伴们点赞👍收藏✨加关注哟💕&#x1…

结构型设计模式之桥接模式【设计模式系列】

系列文章目录 C技能系列 Linux通信架构系列 C高性能优化编程系列 深入理解软件架构设计系列 高级C并发线程编程 设计模式系列 期待你的关注哦!!! 现在的一切都是为将来的梦想编织翅膀,让梦想在现实中展翅高飞。 Now everythi…

【玩转Linux】标准io缓冲区的操作

(꒪ꇴ꒪ ),hello我是祐言博客主页:C语言基础,Linux基础,软件配置领域博主🌍快上🚘,一起学习!送给读者的一句鸡汤🤔:集中起来的意志可以击穿顽石!作者水平很有限,如果发现错误&#x…

【前缀和】LeetCode 560. 和为k的字数组

文章目录 题目描述方法1 暴力方法2 暴力优化方法3 前缀和方法4 前缀和优化 题目描述 力扣560题,链接:https://leetcode.cn/problems/subarray-sum-equals-k 方法1 暴力 暴力法,三重for循环,时间复杂度 O ( N 3 ) O(N^3) O(N3)&a…

WebClient,HTTP Interface远程调用阿里云API

HTTP Interface Spring 允许我们通过定义接口的方式&#xff0c;给任意位置发送 http 请求&#xff0c;实现远程调用&#xff0c;可以用来简化 HTTP 远程访问。需要webflux场景才可 <dependency><groupId>org.springframework.boot</groupId><artifactId&…

二十三种设计模式第十七篇--迭代子模式

迭代子模式是一种行为型设计模式&#xff0c;它允许你按照特定方式访问一个集合对象的元素&#xff0c;而又不暴露该对象的内部结构。迭代子模式提供了一种统一的方式来遍历容器中的元素&#xff0c;而不需要关心容器的底层实现。 该模式包含以下几个关键角色&#xff1a; 迭…

K8S初级入门系列之五-Pod的高级特性

一、前言 前一篇我们了解了Pod的基本概念和操作&#xff0c;本篇我们继续研究Pod的一些高级特性&#xff0c;包括Pod的生命周期&#xff0c;pod探针&#xff0c;pod的调度等。 二、生命周期 1、Pod的生命周期 Pod的生命周期示意图如下&#xff1a; 挂起(Pending)&#xff0c…

【node-1】node validation exception. bootstrap checks failed

记录ElasticSearch 内存分配不足报错 背景做出的改变说在最后&#xff1a;最后访问es&#xff1a; 背景 从报错信息中看到&#xff0c;文件&#xff0c;虚拟内存的最大值太低&#xff0c;我们需要调整设置虚拟内存大小&#xff0c;以满足ElasticSearch 运行需求。 做出的改变 …

剑指offer40.最小的k个数

简直不要太简单了这道题&#xff0c;先给数组排个序&#xff0c;然后输出前k个数就好了。我用的是快排&#xff0c;这是我的代码&#xff1a; class Solution {public int[] getLeastNumbers(int[] arr, int k) {int n arr.length;quickSort(arr, 0, n-1);int[] res new int…

拆解雪花算法生成规则 | 京东物流技术团队

1 介绍 雪花算法&#xff08;Snowflake&#xff09;是一种生成分布式全局唯一 ID 的算法&#xff0c;生成的 ID 称为 Snowflake IDs 或 snowflakes。这种算法由 Twitter 创建&#xff0c;并用于推文的 ID。目前仓储平台生成 ID 是用的雪花算法修改后的版本。 雪花算法几个特性…

在 vue3 中使用 ScrollReveal

文章目录 什么是 ScrollReveal安装使用介绍 什么是 ScrollReveal ScrollReveal 官网链接&#xff1a;https://scrollrevealjs.org/ ScrollReveal 是一个 JavaScript 库&#xff0c;用于在元素进入/离开视口时轻松实现动画效果。 先看个入门示例&#xff1a; ScrollReveal …

[SSM]Spring IoC注解式开发

目录 十二、Spring IoC注解式开发 12.1回顾注解 12.1.1自定义注解 12.1.2使用注解 12.1.3通过反射机制读取注解 12.2声明Bean的注解 12.3Spring注解的使用 12.4选择性实例化Bean 12.5负责注入的注解 12.5.1Value 12.5.2Autowired与Qualifier 12.5.3Resource 12.6全…

【数据挖掘】使用 LSTM 进行时间和序列预测

一、说明 每天&#xff0c;人类在执行诸如过马路之类的任务时都会做出被动预测&#xff0c;他们估计汽车的速度和与汽车的距离&#xff0c;或者通过猜测球的速度并相应地定位手来接球。这些技能是通过经验和实践获得的。然而&#xff0c;由于涉及众多变量&#xff0c;预测天气或…

【Linux命令200例】chown修改文件或目录的所有者

&#x1f3c6;作者简介&#xff0c;黑夜开发者&#xff0c;全栈领域新星创作者✌&#xff0c;2023年6月csdn上海赛道top4。 &#x1f3c6;本文已收录于专栏&#xff1a;Linux命令大全。 &#x1f3c6;本专栏我们会通过具体的系统的命令讲解加上鲜活的实操案例对各个命令进行深入…

iOS-持久化

目的 1.快速展示&#xff0c;提升体验 已经加载过的数据&#xff0c;用户下次查看时&#xff0c;不需要再次从网络&#xff08;磁盘&#xff09;加载&#xff0c;直接展示给用户 2.节省用户流量&#xff08;节省服务器资源&#xff09; 对于较大的资源数据进行缓存&#xf…

MonoBehaviour 组件

MonoBehaviour 组件是指继承了 MonoBehaviour 类的脚本组件&#xff0c;可以附加到游戏对象上&#xff0c;用于控制游戏对象的行为和交互。 MonoBehaviour 类是 Unity 中的一个基类&#xff0c;提供了许多方法和事件&#xff0c;用于处理输入、渲染、碰撞、协程等操作。 Unity…

vue项目启动npm run serve常见报错及解决办法

报错1&#xff1a; 如图&#xff1a; 解决方法&#xff1a;重新安装core-js , npm i core-js 报错2&#xff1a; Syntax Error: EslintPluginImportResolveError: unable to load resolver “alias”. 解决方法&#xff1a;npm install eslint-import-resolver-alias -D 报…