RocketMQ 为何性能高

本文主要从性能角度考虑 RocketMQ 的实现。

整体架构

这是网络上流行的 RocketMQ 的集群部署图。

RocketMQ 主要由 Broker、NameServer、Producer 和 Consumer 组成的一个集群。

**NameServer:整个集群的注册中心和配置中心,管理集群的元数据。包括 Topic 信息和路由信息、Producer 和 Consumer 的客户端注册信息、Broker 的注册信息。**Broker:负责接收消息的生产和消费请求,并进行消息的持久化和消息的读取。**Producer:负责生产消息。**Consumer:负责消费消息。在实际生产和消费消息的过程中,NameServer 为生产者和消费者提供 Meta 数据,以确定消息该发往哪个 Broker 或者该从哪个 Broker 拉取消息。有了 Meta 数据后,生产者和消费者就可以直接和 Broker 交互了。这种点对点的交互方式最大限度降低了消息传递的中间环节,缩短了链路耗时。

网络模型

RocketMQ 使用 Netty 框架实现高性能的网络传输。

基于 Netty 实现网络通信模块

Netty 主要特点

**具有统一的 API,用户无需关心 NIO 的编程模型和概念。通过 Netty 的 ChannelHandler 可以对通信框架进行灵活的定制和扩展。**Netty 封装了网络编程涉及的基本功能:拆包解包、异常检测、零拷贝传输。**Netty 解决了 NIO 的 Epoll Bug,避免空轮询导致 CPU 的 100% 利用率。**支持多种 Reactor 线程模型。**使用范围广泛,有较好的的开源社区支持。Hadoop、Spark、Dubbo 等项目都集成 Netty。**Netty 的高性能传输的体现

非阻塞 IORactor 线程模型**零拷贝。使用 FileChannel.transfer 避免在用户态和内核态之间的拷贝操作;通过 CompositeByteBuf 组合多个 ByteBuffer;通过 slice 获取 ByteBuffer 的切片;通过 wrapper 把普通 ByteBuffer 封装成 netty.ByteBuffer。**RocketMQ 网络模型

RocketMQ 的 Broker 端基于 Netty 实现了主从 Reactor 模型。架构如下:

具体流程:

eventLoopGroupBoss 作为 acceptor 负责接收客户端的连接请求eventLoopGroupSelector 负责 NIO 的读写操作NettyServerHandler 读取 IO 数据,并对消息头进行解析disatch 过程根据注册的消息 code 和 processsor 把不同的事件分发给不同的线程。由 processTable 维护(类型为 HashMap)业务线程池隔离

RocketMQ 对 Broker 的线程池进行了精细的隔离。使得消息的生产、消费、客户端心跳、客户端注册等请求不会互相干扰。如下是各个业务执行线程池和 Broker 处理的报文类型的对应关系,从下图中我们也可以看出 Broker 的核心功能点。

消息的生产

RocketMQ 支持三种消息发送方式:同步发送、异步发送和 One-Way 发送。One-Way 发送时客户端无法确定服务端消息是否投递成功,因此是不可靠的发送方式。

客户端发送时序图

流程说明

**客户端 API 调 DefaultMQProducer 的 send 方法进行消息的发送。**makeSureStateOk 检查客户端的发送服务是否 ok。RocketMQ 客户端维护了一个单例的 MQClientInstance,可通过 start 和 shutdown 来管理相关的网络服务。**tryToFindTopicPublishInfo 用来获取 Topic 的 Meta 信息,主要是可选的 MessageQueue 列表。**selectOneMessageQueue 根据当前的故障容错机制,路由到一个特定的 MessageQueue。**sendKernelImpl 的核心方法是调用 NettyRemotingClient 的 sendMessage 方法,该方法中会根据用户选择的发送策略进行区别处理,时序图中只体现了同步发送的方式。**invokeSync 通过调用 Netty 的 channel.writeAndFlush 把消息的字节流发送到 TCP 的 Socket 缓冲区,至此客户端消息完成发送。三种发送方式实现上的区别

**同步发送:注册 ResponseFuture 到 responseTable,发送 Request 请求,并同步等待 Response 返回。**异步发送:注册 ResponseFuture 到 responseTable,发送 Request 请求,不需要同步等待 Response 返回,当 Response 返回后会调用注册的 Callback 方法,从而异步获取发送的结果。**One-Way:发送 Request 请求,不需要等待 Response 返回,不需要触发 Callback 方法回调。**客户端故障容错机制

MQFaultStrategy 实现了基于 RT 耗时的容错策略。当某个 Broker 的 RT 过大时,认为该 Broker 存在问题,会禁用该 Broker 一段时间。latencyMax 和 notAvailableDuration 的对应关系如下图:

客户端高效发送总结

One-Way 的发送方式是效率最高的,不需要同步等待过程,也不需要额外 CallBack 调用开销,但是消息发送不可靠MQClientInstance 的单例模式统一管理维护网络通道,发送消息前只需要做一次服务状态可用性检查即可Topic 的 Meta 信息在本地建立缓存,避免每次发送消息从 NameServer 拉取 Meta 数据高效的故障容错机制,保证消息发送失败时进行快速重发Broker 接收消息时序图

流程说明

**Broker 通过 Netty 接收 RequestCode 为 SEND_MESSAGE 的请求,并把该请求交给 SendMessageProcessor 进行处理。**SendMessageProcessor 先解析出 SEND_MESSAGE 报文中的消息头信息(Topic、queueId、producerGroup 等),并调用存储层进行处理。**putMessage 中判断当前是否满足写入条件:Broker 状态为 running;Broker 为 master 节点;磁盘状态可写(磁盘满则无法写入);Topic 长度未超限;消息属性长度未超限;pageCache 未处于繁忙状态(pageCachebusy 的依据是 putMessage 写入 mmap 的耗时,如果耗时超过 1s,说明由于缺页导致页加载慢,此时认定 pageCache 繁忙,拒绝写入)。**从 MappedFileQueue 中选择已经预热过的 MappedFile。**AppendMessageCallback 中执行消息的操作 doAppend,直接对 mmap 后的文件的 bytbuffer 进行写入操作。**Broker 端对写入性能的优化

自旋锁减少上下文切换

RocketMQ 的 CommitLog 为了避免并发写入,使用一个 PutMessageLock。PutMessageLock 有 2 个实现版本:PutMessageReentrantLock 和 PutMessageSpinLock。

PutMessageReentrantLock 是基于 java 的同步等待唤醒机制;PutMessageSpinLock 使用 Java 的 CAS 原语,通过自旋设值实现上锁和解锁。RocketMQ 默认使用 PutMessageSpinLock 以提高高并发写入时候的上锁解锁效率,并减少线程上下文切换次数。

MappedFile 预热和零拷贝机制

RocketMQ 消息写入对延时敏感,为了避免在写入消息时,CommitLog 文件尚未打开或者文件尚未加载到内存引起的 load 的开销,RocketMQ 实现了文件预热机制。

Linux 系统在写数据时候不会直接把数据写到磁盘上,而是写到磁盘对应的 PageCache 中,并把该页标记为脏页。当脏页累计到一定程度或者一定时间后再把数据 flush 到磁盘(当然在此期间如果系统掉电,会导致脏页数据丢失)。RocketMQ 实现文件预热的关键代码如下:

publicvoidwarmMappedFile(FlushDiskType type, int pages){ByteBuffer byteBuffer = this.mappedByteBuffer.slice();int flush = 0;long time = System.currentTimeMillis();for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {byteBuffer.put(i, (byte) 0);// force flush when flush disk type is syncif (type == FlushDiskType.SYNC_FLUSH) {if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {flush = i;mappedByteBuffer.force();}}…}// force flush when prepare load finishedif (type == FlushDiskType.SYNC_FLUSH) {log.info(“mapped file warm-up done, force to disk, mappedFile={}, costTime={}”,this.getFileName(), System.currentTimeMillis() - beginTime);mappedByteBuffer.force();}this.mlock();}代码分析

**对文件进行 mmap 映射。**对整个文件每隔一个 PAGE_SIZE 写入一个字节,如果是同步刷盘,每写入一个字节进行一次强制的刷盘。**调用 libc 的 mlock 函数,对文件所在的内存区域进行锁定。(系统调用 mlock 家族允许程序在物理内存上锁住它的部分或全部地址空间。这将阻止 Linux 将这个内存页调度到交换空间(swap space),即使该程序已有一段时间没有访问这段空间)。**同步和异步刷盘

RocketMQ 提供了同步刷盘和异步刷盘两种机制。默认使用异步刷盘机制。

当 CommitLog 在 putMessage() 中收到 MappedFile 成功追加消息到内存的结果后,便会调用 handleDiskFlush() 方法进行刷盘,将消息存储到文件中。handleDiskFlush() 便会根据两种刷盘策略,调用不同的刷盘服务。

抽象类 FlushCommitLogService 负责进行刷盘操作,该抽象类有 3 种实现:

GroupCommitService:同步刷盘FlushRealTimeService:异步刷盘CommitRealTimeService:异步刷盘并且开启 TransientStorePool每个实现类都是一个 ServiceThread 实现类。ServiceThread 可以看做是一个封装了基础功能的后台线程服务。有完整的生命周期管理,支持 start、shutdown、weakup、waitForRunning。

同步刷盘流程

所有的 flush 操作都由 GroupCommitService 线程进行处理当前接收消息的线程封装一个 GroupCommitRequest,并提交给 GroupCommitService 线程,然后当前线程进入一个 CountDownLatch 的等待**一旦有新任务进来 GroupCommitService 被立即唤醒,并调用 MappedFile.flush 进行刷盘。底层是调用 mappedByteBuffer.force ()**flush 完成后唤醒等待中的接收消息线程。从而完成同步刷盘流程异步刷盘流程

**RocketMQ 每隔 200ms 进行一次 flush 操作(把数据持久化到磁盘)当有新的消息写入时候会主动唤醒 flush 线程进行刷盘当前接收消息线程无须等待 flush 的结果。**消息消费

高性能的消息队列应该保证最快的消息周转效率:即发送方发送的一条消息被 Broker 处理完之后应该尽快地投递给消息的消费方。

消息存储结构

RocketMQ 的存储结构最大特点:

**所有的消息写入转为顺序写(相比于 Kafka,RocketMQ 即使对于 1w+ 以上的 Topic 也能够应付自如)**读写文件分离。通过 ReputMessageService 服务生成 ConsumeQueue

结构说明

**ConsumeQueue 与 CommitLog 不同,采用定长存储结构,如下图所示。为了实现定长存储,ConsumeQueue 存储了消息 Tag 的 Hash Code,在进行 Broker 端消息过滤时,通过比较 Consumer 订阅 Tag 的 HashCode 和存储条目中的 Tag Hash Code 是否一致来决定是否消费消息。**ReputMessageService 持续地读取 CommitLog 文件并生成 ConsumeQueue。顺序消费与并行消费

串行消费和并行消费最大的区别在于消费队列中消息的顺序性。顺序消费保证了同一个 Queue 中的消费时的顺序性。RocketMQ 的顺序性依赖于分区锁的实现。消息消费有推拉两种模式,我们这里只考虑推这种模式

并行消费

**并行消费的实现类为 ConsumeMessageConcurrentlyService。**PullMessageService 内置一个 scheduledExecutorService 线程池,主要负责处理 PullRequest 请求,从 Broker 端拉取最新的消息返回给客户端。拉取到的消息会放入 MessageQueue 对应的 ProcessQueue。**ConsumeMessageConcurrentlyService 把收到的消息封装成一个 ConsumeRequest,投递给内置的 consumeExecutor 独立线程池进行消费。**ConsumeRequest 调用 MessageListener.consumeMessage 执行用户定义的消费逻辑,返回消费状态。**如果消费状态为 SUCCESS。则删除 ProcessQueue 中的消息,并提交 offset。**如果消费状态为 RECONSUME。则把消息发送到延时队列进行重试,并对当前失败的消息进行延迟处理。串行消费

**串行消费的实现类为 ConsumeMessageOrderlyService。**PullMessageService 内置一个 scheduledExecutorService 线程池,主要负责处理 PullRequest 请求,从 Broker 端拉取最新的消息返回给客户端。拉取到的消息会放入 MessageQueue 对应的 ProcessQueue。**ConsumeMessageOrderlyService 把收到的消息封装成一个 ConsumeRequest,投递给内置的 consumeExecutor 独立线程池进行消费。**消费时首先获取 MessageQueue 对应的 objectLock,保证当前进程内只有一个线程在处理对应的的 MessageQueue, 从 ProcessQueue 的 msgTreeMap 中按 offset 从低到高的顺序取消息,从而保证了消息的顺序性。**ConsumeRequest 调用 MessageListener.consumeMessage 执行用户定义的消费逻辑,返回消费状态。**如果消费状态为 SUCCESS。则删除 ProcessQueue 中的消息,并提交 offset。**如果消费状态为 SUSPEND。判断是否达到最大重试次数,如果达到最大重试次数,就把消息投递到死信队列,继续下一条消费;否则消息重试次数 + 1,在延时一段时间后继续重试。**可见,串行消费如果某条消息一直无法消费成功会造成阻塞,严重时会引起消息堆积和关联业务异常。

Broker 端的 PullMessage 长连接实现

消息队列中的消息是由业务触发而产生的,如果使用周期性的轮询,不能保证每次都取到消息,且轮询的频率过快或者过慢都会对消息的延时有严重的影响。因此 RockMQ 在 Broker 端使用长连接的方式处理 PullMessage 请求。具体实现流程如下:

**PullRequest 请求中有个参数 brokerSuspendMaxTimeMillis,默认值为 15s,控制请求 hold 的时长。**PullMessageProcessor 接收到 Request 后,解析参数,校验 Topic 的 Meta 信息和消费者的订阅关系。对于符合要求的请求,从存储中拉取消息。**如果拉取消息的结果为 PULL_NOT_FOUND,表示当前 MessageQueue 没有最新消息。**此时会封装一个 PullRequest 对象,并投递给 PullRequestHoldService 内部线程的 pullRequestTable 中。**PullRequestHoldService 线程会周期性轮询 pullRequestTable,如果有新的消息或者 hold 时间超时 polling time,就会封装 Response 请求发给客户端。**另外 DefaultMessageStore 中定义了 messageArrivingListener,当产生新的 ConsumeQueue 记录时候,会触发 messageArrivingListener 回调,立即给客户端返回最新的消息。长连接机制使得 RocketMQ 的网络利用率非常高效,并且最大限度地降低了消息拉取时的等待开销。实现了毫秒级的消息投递。

RocketMQ 的其他性能优化手段

关闭偏向锁

在 RocketMQ 的性能测试中,发现存在大量的 RevokeBias 停顿,偏向锁主要是消除无竞争情况下的同步原语以提高性能,但考虑到 RocketMQ 中该场景比较少,便通过 - XX:-UseBiasedLocking 关闭了偏向锁特性。

在没有实际竞争的情况下,还能够针对部分场景继续优化。如果不仅仅没有实际竞争,自始至终,使用锁的线程都只有一个,那么,维护轻量级锁都是浪费的。偏向锁的目标是,减少无竞争且只有一个线程使用锁的情况下,使用轻量级锁产生的性能消耗。轻量级锁每次申请、释放锁都至少需要一次 CAS,但偏向锁只有初始化时需要一次 CAS。

偏向锁的使用场景有局限性,只适用于单个线程使用锁的场景,如果有其他线程竞争,则偏向锁会膨胀为轻量级锁。当出现大量 RevokeBias 引起的小停顿时,说明偏向锁意义不大,此时通过 - XX:-UseBiasedLocking 进行优化,因此 RocketMQ 的 JVM 参数中会默认加上 - XX:-UseBiasedLocking。

写在最后

最后附上阿里中间件的延时性能对比。RocketMQ 在低延迟方面依然具有领先地位,如下图所示,RocketMQ 仅有少量 10~50ms 的毛刺延迟,Kafka 则有不少 500~1000ms 的毛刺。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/37578.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

解析Android VNDK/VSDK Snapshot编译框架

1.背景 背景一&#xff1a; 为解决Android版本碎片化问题&#xff0c;引入Treble架构&#xff0c;它提供了稳定的新SoC供应商接口&#xff0c;引入HAL 接口定义语言&#xff08;HIDL/Stable AIDL&#xff0c;技术栈依然是Binder)&#xff0c;它指定了 vendor HAL 和system fr…

容器化背后的魔法之Docker底层逻辑解密

Docker内部工作原理是怎样的&#xff1f; 现在我们知道了Docker是什么以及它提供了哪些好处&#xff0c;让我们逐个重要的细节来了解。 什么是容器&#xff1f;它们是如何工作的&#xff1f; 在深入研究Docker的内部机制之前&#xff0c;我们首先要了解容器的概念。简单地说…

从2023中国峰会,看亚马逊云科技的生成式AI战略

“生成式AI的发展就像一场马拉松比赛&#xff0c;当比赛刚刚开始时&#xff0c;如果只跑了三四步就断言某某会赢得这场比赛&#xff0c;显然是不合理的。我们现在还处于非常早期的阶段。” 近日&#xff0c;在2023亚马逊云科技中国峰会上&#xff0c;亚马逊云科技全球产品副总裁…

JSX的基础使用

1. JSX嵌入变量作为子元素的使用 ①当变量是Number、String、Array类型时&#xff0c;可以直接显示&#xff1b; ②当变量是null、undefined、Boolean类型时&#xff0c;内容为空&#xff1b; 若想要展示nul、undefined、Boolean类型&#xff0c;转字符串&#xff1b;转换方式…

增强型视觉系统 (EVS)

增强型视觉系统 EVS 1、增强型视觉系统概览2、车载相机 HAL2.1 EVS 应用2.2 EVS 管理器2.3 EVS HIDL 接口2.4 内核驱动程序 《增强型视觉系统 (EVS) 1.1 集成指南》 车载相机 HAL 1、增强型视觉系统概览 为了增强视频串流管理和错误处理&#xff0c;Android 11 更新了车载相机…

图像处理之图像灰度化

图像灰度化 将彩色图像转化成为灰度图像的过程成为图像的灰度化处理。彩色图像中的每个像素的颜色有R、G、B三个分量决定&#xff0c;而每个分量有255中值可取&#xff0c;这样一个像素点可以有1600多万 (255255255)的颜色的变化范用。而灰度图像是R、G、B三个分量相同的一种特…

纯css3实现小鸡从鸡蛋破壳而出动画特效

实现一个使用纯css3实现小鸡破壳的效果 示例效果如下所示 示例代码 <template><div><div class"eggWrapper"><div class"chickHead"><div class"eyeDiv"></div><div class"eyeDiv"></di…

vue3+mapboxgl鼠标浮动显示cgcs2000

一、需求 鼠标在地图中浮动展示地图的经纬度&#xff0c;cgcs2000 xy 还有显示带号 二、实现效果 展示经度&#xff0c;纬度&#xff0c;x值&#xff0c;y值显示的是带号和y值 三、思路 3.1、mapbox获取经纬度方法 初始化地图后.on方法中有个mousemove方法 mapboxUtil._m…

故障排错篇之OSPF协议

一、OSPF邻居建立不成功 1、从理论上判断问题的所在 1.1、检查邻居两端的接口物理和协议状态是否UP&#xff0c;状态是否稳定&#xff0c;接口是否有丢包&#xff0c;两边互ping大包是否能通 若物理接口不Up或是不稳定&#xff08;有振荡现象&#xff09;&#xff0c;请排查…

lvs使用

1.前言 LVS&#xff08;Linux Virtual Server&#xff09;是一个基于 Linux 内核的负载均衡器&#xff0c;用于分发网络流量和将请求转发给后端服务器。LVS 提供了多种负载均衡算法和转发模式&#xff0c;以满足不同场景和需求的负载均衡需求&#xff0c;在LVS中定义虚拟服务的…

2023- itwangyang - mac mysql 终端启动命令

在mac上使用mysql终端进行操作时&#xff0c;需要先启动mysql服务。以下是启动mysql服务的命令&#xff1a; sudo /usr/local/mysql/support-files/mysql.server start执行该命令后&#xff0c;会出现一些提示信息&#xff0c;等待一段时间后mysql服务就启动成功了。 接下来&…

SSM框架最新整合保姆级教程(IDEA版)

SSM框架最新整合保姆级教程(IDEA版) 一、环境要求 ​ 环境&#xff1a; IDEAMySQL 5.7.19Tomcat 9Maven 3.6 要求&#xff1a; 需要熟练掌握MySQL数据库&#xff0c;Spring&#xff0c;JavaWeb及MyBatis知识&#xff0c;简单的前端知识&#xff1b; 完整代码&#xff1a;…

通信算法之177: 基于Matlab的OFDM通信系统关键基带算法设计7-流程

一. 接收算法流程 1.1 粗同步&#xff08;分组检测&#xff09; 1.2 载波同步&#xff08;精细频偏估计&#xff09; 多普勒频偏和晶振。频率偏差&#xff0c;会破坏子载波间的正交性&#xff0c;且这种频差对相位的影响还具有累加性。 1.3 精同步&#xff08;OFDM起始&…

【Spring Cloud系列】Hystrix应用详解

【Spring Cloud系列】Hystrix应用详解 文章目录 【Spring Cloud系列】Hystrix应用详解一、概述二、什么是Hystix三、Hystrix作用四、Hystrix设计原则五、Hystrix实现原理5.1 隔离5.2 熔断5.3 降级服务降级主要用于什么场景呢实现服务降级需要考虑几个问题降级分类 5.4 缓存请求…

如何使用3D转换工具HOOPS Exchange与LibConverter进行流缓存导出?

如果您正在使用HOOPS Communicator&#xff0c;您可能想在生成流缓存模型之前利用HOOPS Exchange的高级功能和转换选项。 申请HOOPS试用 HOOPS中文网 如何使用 如您所知&#xff0c;LibConverter是HOOPS Communicator软件包中包含的一个简单的API&#xff0c;conver…

单片机第一季:零基础8——蜂鸣器

蜂鸣器是一种一体化结构的电子讯响器&#xff0c;采用直流电压供电&#xff0c;广泛应用于计算机、打印机、复印机、报警器、电子玩具、汽车电子设备、电话机、定时器等电子产品中作发声器件。 蜂鸣器主要分为压电式蜂鸣器和电磁式蜂鸣器两种类型。 想要压电式蜂鸣器发声&…

spring AOP中pointcut表达式详解

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是「奇点」&#xff0c;江湖人称 singularity。刚工作几年&#xff0c;想和大家一同进步&#x1f91d;&#x1f91d; 一位上进心十足的【Java ToB端大厂…

从零开始理解Linux中断架构(19)--中断线程化irq_thread

前面一节讲到的中断流处理流程是在hard_irq 流程上&#xff0c;工作在中断堆栈上。还有一种情况是使用中断线程的情形。request_threaded_irq参数中有两个处理函数handler,thread_fn是有区别的。handler主中断处理例程&#xff0c;运行hard_irq 流程上。而如果驱动程序填写thre…

Java程序员需要掌握的前端知识(一)

对于前端知识&#xff0c;需要进一步巩固和加强&#xff0c;进入企业之后&#xff0c;要具备一定的接口调试&#xff0c;参数接收的能力&#xff0c;以及单体页面的开发&#xff0c;这里我学习一下前端知识巩固一下自身的技术栈和水平。本次笔记是跟学黑马的同名课程&#xff0…

Bash 有效电话号码

193 有效电话号码 给定一个包含电话号码列表&#xff08;一行一个电话号码&#xff09;的文本文件 file.txt&#xff0c;写一个单行 bash 脚本输出所有有效的电话号码。 你可以假设一个有效的电话号码必须满足以下两种格式&#xff1a; (xxx) xxx-xxxx 或 xxx-xxx-xxxx。&…