RocketMQ-源码架构二

梳理一些比较完整,比较复杂的业务线

消息持久化设计

RocketMQ的持久化文件结构

消息持久化也就是将内存中的消息写入到本地磁盘的过程。而磁盘IO操作通常是一个很耗性能,很慢的操作,所以,对消息持久化机制的设计,是一个MQ产品提升性能的关键,甚至可以说是最为重要的核心也不为过。接下来梳理RocketMQ是如何在本地磁盘中保存消息的

RocketMQ消息直接采用磁盘文件保存消息,默认路径在${user_home}/store目录。这些存储目录可以在broker.conf中自行指定。

存储文件主要分为三个部分:

  • CommitLog:存储消息的元数据。所有消息都会顺序存入到CommitLog文件当中。CommitLog由多个文件组成,每个文件固定大小1G。以第一条消息的偏移量为文件名。

  • ConsumerQueue:存储消息在CommitLog的索引。一个MessageQueue一个文件,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog。

  • IndexFile:为消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程

另外,还有几个辅助的存储文件,主要记录一些描述消息的元数据:

  • checkpoint:数据存盘检查点。里面主要记录commitlog文件、ConsumeQueue文件以及IndexFile文件最后一次刷盘的时间戳。

  • config/*.json:这些文件是将RocketMQ的一些关键配置信息进行存盘保存。例如Topic配置、消费者组配置、消费者组消息偏移量Offset 等等一些信息。

  • abort:这个文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下,会在启动时创建,而关闭服务时删除。但是如果遇到一些服务器宕机,或者kill -9这样一些非正常关闭服务的情况,这个abort文件就不会删除,因此RocketMQ就可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的操作。

整体的消息存储结构,官方做了个图进行描述:

Producer发过来的所有消息,不管是属于哪个Topic,Broker都统一存在CommitLog文件当中,然后分别构建ConsumeQueue文件和IndexFile两个索引文件,用来辅助消费者进行消息检索。这种设计最直接的好处是可以较少查找目标文件的时间,让消息以最快的速度落盘。对比Kafka存文件时,需要寻找消息所属的Partition文件,再完成写入。当Topic比较多时,这样的Partition寻址就会浪费非常多的时间。所以Kafka不太适合多Topic的场景。而RocketMQ的这种快速落盘的方式,在多Topic的场景下,优势就比较明显了。

在文件形式上:CommitLog文件的大小是固定的。文件名就是当前CommitLog文件当中存储的第一条消息的Offset。

ConsumeQueue文件主要是加速消费者进行消息索引。每个文件夹对应RocketMQ中的一个MessageQueue,文件夹下的文件记录了每个MessageQueue中的消息在CommitLog文件当中的偏移量。这样,消费者通过ConsumeQueue文件,就可以快速找到CommitLog文件中感兴趣的消息记录。而消费者在ConsumeQueue文件中的消费进度,会保存在config/consumerOffset.json文件当中。

IndexFile文件主要是辅助消费者进行消息索引。消费者进行消息消费时,通过ConsumeQueue文件就足够完成消息检索了,但是如果消费者指定时间戳进行消费,或者要按照MessageId或者MessageKey来检索文件,比如RocketMQ管理控制台的消息轨迹功能,ConsumeQueue文件就不够用了。IndexFile文件就是用来辅助这类消息检索的。他的文件名比较特殊,不是以消息偏移量命名,而是用的时间命名。但是其实,他也是一个固定大小的文件。

这是对RocketMQ存盘文件最基础的了解,但是只有这样的设计,是不足以支撑RocketMQ的三高性能的。RocketMQ如何保证ConsumeQueue、IndexFile两个索引文件与CommitLog中的消息对齐?如何保证消息断电不丢失?如何保证文件高效的写入磁盘?等等。如果你想要去抓住RocketMQ这些三高问题的核心设计,那么还是需要到源码当中去深究。

commitLog写入

消息存储的入口在: DefaultMessageStore.asyncPutMessage方法

CommitLog的asyncPutMessage方法中会给写入线程加锁,保证一次只会允许一个线程写入。写入消息的过程是串行的,一次只会允许一个线程写入。

最终进入CommitLog中的DefaultAppendMessageCallback#doAppend方法,这里就是Broker写入消息的实际入口。这个方法最终会把消息追加到MappedFile映射的一块内存里,并没有直接写入磁盘。而是在随后调用ComitLog#submitFlushRequest方法,提交刷盘申请。刷盘完成之后,内存中的文件才真正写入到磁盘当中。

在提交刷盘申请之后,就会立即调用CommitLog#submitReplicaRequest方法,发起主从同步申请。

文件同步刷盘与异步刷盘

入口:CommitLog.submitFlushRequest

这里涉及到了对于同步刷盘与异步刷盘的不同处理机制。这里有很多极致提高性能的设计,对于我们理解和设计高并发应用场景有非常大的借鉴意义。

同步刷盘和异步刷盘是通过不同的FlushCommitLogService的子服务实现的。

//org.apache.rocketmq.store.CommitLog的构造方法
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
    this.flushCommitLogService = new GroupCommitService();
} else {
    this.flushCommitLogService = new FlushRealTimeService();
}
​
this.commitLogService = new CommitRealTimeService();

同步刷盘采用的是GroupCommitService子线程。虽然是叫做同步刷盘,但是从源码中能看到,他实际上并不是来一条消息就刷一次盘。而是这个子线程每10毫秒执行一次doCommit方法,扫描文件的缓存。只要缓存当中有消息,就执行一次Flush操作。

而异步刷盘采用的是FlushRealTimeService子线程。这个子线程最终也是执行Flush操作,只不过他的执行时机会根据配置进行灵活调整。所以可以看到,这里异步刷盘和同步刷盘的最本质区别,实际上是进行Flush操作的频率不同。

我们经常说使用RocketMQ的同步刷盘,可以保证Broker断电时,消息不会丢失。但是可以看到,RocketMQ并不可能真正来一条消息就进行一次刷盘,这样在海量数据下,操作系统是承受不了的。而只要不是来一次消息刷一次盘,那么在Broker直接断电的情况接下,就总是会有内存中的消息没有刷入磁盘的情况,这就会造成消息丢失。所以,对于消息安全性的设计,其实是重在取舍,无法做到绝对。

同步刷盘和异步刷盘最终落地到FileChannel的force方法。这个force方法就会最终调用一次操作系统的fsync系统调用,完成文件写入。

//org.apache.rocketmq.store.MappedFile#flush
public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
        if (this.hold()) {
            int value = getReadPosition();
​
            try {
                //We only append data to fileChannel or mappedByteBuffer, never both.
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    this.fileChannel.force(false);
                } else {
                    this.mappedByteBuffer.force();
                }
            } catch (Throwable e) {
                log.error("Error occurred when force data to disk.", e);
            }
​
            this.flushedPosition.set(value);
            this.release();
        } else {
            log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
            this.flushedPosition.set(getReadPosition());
        }
    }
    return this.getFlushedPosition();
}

另外一个CommitRealTimeService这个子线程则是用来写入堆外内存的。应用可以通过配置TransientStorePoolEnable参数开启堆外内存,如果开启了堆外内存,会在启动时申请一个跟CommitLog文件大小一致的堆外内存,这部分内存就可以确保不会被交换到虚拟内存中。而CommitRealTimeService处理消息的方式则只是调用mappedFileQueue的commit方法。这个方法只是往操作系统的PagedCache里写入消息,并不主动进行刷盘操作。会由操作系统通过Dirty Page机制,在某一个时刻进行统一刷盘。例如我们在正常关闭操作系统时,经常会等待很长时间。这里面大部分的时间其实就是在做PageCache的刷盘。

//org.apache.rocketmq.store.MappedFileQueue
public boolean commit(final int commitLeastPages) {
    boolean result = true;
    MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
    if (mappedFile != null) {
        int offset = mappedFile.commit(commitLeastPages);
        long where = mappedFile.getFileFromOffset() + offset;
        result = where == this.committedWhere;
        this.committedWhere = where;
    }
​
    return result;
}

在梳理同步刷盘与异步刷盘的具体实现时,可以看到一个小点,RocketMQ是如何让两个刷盘服务间隔执行的?RocketMQ提供了一个自己实现的CountDownLatch2工具类来提供线程阻塞功能,使用CAS驱动CountDownLatch2的countDown操作。每来一个消息就启动一次CAS,成功后,调用一次countDown。而这个CountDonwLatch2在Java.util.concurrent.CountDownLatch的基础上,实现了reset功能,这样可以进行对象重用

CommigLog主从复制

入口:CommitLog.submitReplicaRequest

主从同步时,也体现到了RocketMQ对于性能的极致追求。最为明显的,RocketMQ整体是基于Netty实现的网络请求,而在主从复制这一块,却放弃了Netty框架,转而使用更轻量级的Java的NIO来构建。

在主要的HAService中,会在启动过程中启动三个守护进程。

//HAService#start
public void start() throws Exception {
    this.acceptSocketService.beginAccept();
    this.acceptSocketService.start();
    this.groupTransferService.start();
    this.haClient.start();
}

这其中与Master相关的是acceptSocketService和groupTransferService。其中acceptSocketService主要负责维护Master与Slave之间的TCP连接。groupTransferService主要与主从同步复制有关。而slave相关的则是haClient。

至于其中关于主从的同步复制与异步复制的实现流程,还是比较复杂的,有兴趣的同学可以深入去研究一下。

推荐一篇可供参考的博客 RocketMQ源码分析之主从数据复制-CSDN博客

分发ConsumeQueue和IndexFile

当CommitLog写入一条消息后,在DefaultMessageStore的start方法中,会启动一个后台线程reputMessageService。源码就定义在DefaultMessageStore中。这个后台线程每隔1毫秒就会去拉取CommitLog中最新更新的一批消息。如果发现CommitLog中有新的消息写入,就会触发一次doDispatch。

//org.apache.rocketmq.store.DefaultMessageStore中的ReputMessageService线程类
public void doDispatch(DispatchRequest req) {
    for (CommitLogDispatcher dispatcher : this.dispatcherList) {
        dispatcher.dispatch(req);
    }
}

dispatchList中包含两个关键的实现类CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex。源码就定义在DefaultMessageStore中。他们分别用来构建ConsumeQueue索引和IndexFile索引。

并且,如果服务异常宕机,会造成CommitLog和ConsumeQueue、IndexFile文件不一致,有消息写入CommitLog后,没有分发到索引文件,这样消息就丢失了。DefaultMappedStore的load方法提供了恢复索引文件的方法,入口在load方法。

过期文件删除机制

入口: DefaultMessageStore.addScheduleTask -> DefaultMessageStore.this.cleanFilesPeriodically()

在这个方法中会启动两个线程,cleanCommitLogService用来删除过期的CommitLog文件,cleanConsumeQueueService用来删除过期的ConsumeQueue和IndexFile文件。

在删除CommitLog文件时,Broker会启动后台线程,每60秒,检查CommitLog、ConsumeQueue文件。然后对超过72小时的数据进行删除。也就是说,默认情况下, RocketMQ只会保存3天内的数据。这个时间可以通过fileReservedTime来配置。

触发过期文件删除时,有两个检查的纬度,一个是,是否到了触发删除的时间,也就是broker.conf里配置的deleteWhen属性。另外还会检查磁盘利用率,达到阈值也会触发过期文件删除。这个阈值默认是72%,可以在broker.conf文件当中定制。但是最大值为95,最小值为10。

然后在删除ConsumeQueue和IndexFile文件时,会去检查CommitLog当前的最小Offset,然后在删除时进行对齐。

需要注意的是,RocketMQ在删除过期CommitLog文件时,并不检查消息是否被消费过。 所以如果有消息长期没有被消费,是有可能直接被删除掉,造成消息丢失的。

RocketMQ整个文件管理的核心入口在DefaultMessageStore的start方法中,整体流程总结如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/222510.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

单片机的基本概念——什么是单片机、单片机的分类以及单片机的发展历史、发展趋势

什么是单片机 本文主要涉及了什么是单片机、单片机的分类、单片机发展史以及单片机的发展趋势的一些内容 文章目录 什么是单片机一、 什么是单片机1.1 微型计算机1.2 单板机1.3 单片机1.3.1 单片机的分类 二、 单片机的发展历史2.1 发展阶段2.2 单片机的特点2.3 单片机的发展趋…

ACM32F403/F433 12 位多通道,支持 MPU 存储保护功能,应用于工业控制,智能家居等产品中

ACM32F403/F433 芯片的内核基于 ARMv8-M 架构,支持 Cortex-M33 和 Cortex-M4F 指令集。芯片内核 支持一整套DSP指令用于数字信号处理,支持单精度FPU处理浮点数据,同时还支持Memory Protection Unit (MPU)用于提升应用的…

【Kubernetes】kubeadm安装k8s1.25.0高可用集群

k8s集群搭建(v1.25.0) 一、初始化实验环境二、安装containerd服务2.1、安装containerd2.2、安装docker2.3、配置镜像加速器三、安装初始化k8s需要的软件包四、kubeadm初始化k8s集群4.1、设置容器运行时4.2、生成并修改配置文件4.2、初始化安装4.3、修改c…

C语言之数组(精讲)

目录 数组 数组的声明(使用数组前的准备) 访问数组(数组的使用方法) 数组的遍历 数组初始化 1.在声明变量时,除了必要的情况下,都需要对变量进行初始化。 2.我们还可以像下面在声明数组时不指定元素…

win10与 vm虚拟机win7共享文件夹创建

1:在win10(主机)电脑先随意共享一个文件夹 2:在win10(主机)上创建一个网络映射 右键此电脑选择映射网络驱动器 成功后会多出这个网络位置 3:win7虚拟机设置 在虚拟机中点击计算机右键添加一个网络位置

云HIS:新一代云架构医院信息管理系统源码(java语言)

云HIS信息管理云平台,提供全方位的临床系统应用,是国内领先的以云计算为基础,以云计算赋能医疗机构,是颠覆传统医疗信息化业态的技术与模式创新,以SaaS方式,为医疗机构提供信息系统服务,满足从医…

Scrapy爬虫数据存储为JSON文件的解决方案

什么是JSON文件 JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,易于人们阅读和编写,同时也易于机器解析和生成。它基于JavaScript Spark语言的一个子集,但独立于Smashing语言,因此在许多中…

【面试经典150 | 二叉树】翻转二叉树

文章目录 写在前面Tag题目来源题目解读解题思路方法一:递归方法二:迭代 写在最后 写在前面 本专栏专注于分析与讲解【面试经典150】算法,两到三天更新一篇文章,欢迎催更…… 专栏内容以分析题目为主,并附带一些对于本题…

Linux 编译过程分析

文章目录 一、源码foo.hhello.cfoo1.cfoo2.c GCC 指令预处理命令hello.i 编译(Compile only)命令foo2.s 汇编命令readelfreadelf -hreadelf -Sreadelf -rreadelf -sstrip 链接 本文基于《深度探索Linux操作系统:系统构建和原理解析》 一、源…

2022年南美地区医疗机器人市场及全球概况报告

今天分享的是机器人系列深度研究报告:《2022年南美地区医疗机器人市场及全球概况报告》。 (报告出品方:Apollo Reports) 报告共计:172页 研究方法论 2.1通过桌面研究和内部存储库的假设 a)最初,各个类别…

深度学习实战64-黑白照片着色的模型应用,快速部署实现黑白图片快速上色的功能

大家好,我是微学AI,今天给大家介绍一下深度学习实战64-黑白照片着色的模型应用,快速部署实现黑白图片快速上色的功能。图片上色是一个具有多模态不确定性和高度不适定性的挑战性问题。直接训练深度神经网络通常会导致错误的语义颜色和低色彩丰富度。虽然基于Transformer的方…

从零开始学习 JS APL(六):完整指南和实例解析

学习目标: 1. 能够利用正则表达式校验输入信息的合法性 2. 具备利用正则表达式验证小兔鲜注册页面表单的能力 学习内容: 正则表达式 综合案例 阶段案例 学习时间: 周一至周五晚上 7 点—晚上9点周六上午 9 点-上午 11 点周日下午 3 点-下…

力扣11.盛最多水的容器

题目描述 思路 用双指针法。 每次向内移动较短的那个板&#xff0c;能带来更大的效益。 代码 class Solution {public int maxArea(int[] height) {int res 0;int i 0,j height.length - 1;while(i < j){res height[i] < height[j] ? Math.max((j - i) * height…

BearPi Std 板从入门到放弃 - 引气入体篇(7)(DAC)

简介 基于前面的文章, 缩略STM32CubeMx创建项目的过程&#xff0c;直接添加DAC相关初始化; 开发板 &#xff1a; Bearpi Std(小熊派标准板) 主芯片: STM32L431RCT6 LED : PC13 \ 推挽输出即可 \ 高电平点亮 串口: Usart1 KEY1 : PB2 \ 上拉 \ 按下下降沿触发(一次)\ 用于增…

2024年MCM/ICM美国大学生数学建模竞赛备战指南

01 2024美赛基本要求 1.关于时间&#xff08;北京时间&#xff09; 比赛开始时间&#xff1a; 2024年2月2日6:00至 2024年2月6日9:00 提交截止时间&#xff1a;2024年2月6日10:00 结果发布时间&#xff1a;结果将于2024年5月31日或之前发布 2.关于规则 完整的解决方案现…

海上液化天然气 LNG 终端 ,数字孪生监控系统

液化天然气 (Liquefied Natural Gas&#xff0c;简称 LNG) 在能源转型过程中被广泛认可为相对较清洁的能源选择。 相对于传统的煤炭和石油燃料&#xff0c;LNG 的燃烧过程产生的二氧化碳 (CO2) 排放较低。LNG 的燃烧释放的二氧化碳排放较少&#xff0c;因此对应对气候变化和减…

ChatGPT哪些行业需要学习?

2023年随着OpenAI开发者大会的召开&#xff0c;最重磅更新当属GPTs&#xff0c;多模态API&#xff0c;未来自定义专属的GPT。微软创始人比尔盖茨称ChatGPT的出现有着重大历史意义&#xff0c;不亚于互联网和个人电脑的问世。360创始人周鸿祎认为未来各行各业如果不能搭上这班车…

Modbus数据类型转换

前请提要: 从PLC读取的数值,不管是读正负整数还是正负浮点数,读取过来后都会变成UInt16,也就是Ushort类型 一、ushort(UInt16)转成 Int32 源代码方法: //ushort类型转Int32类型的方法private int ushortToInt32(ushort[] date, int start){//先进行判断,长度是否正确…

【算法题】一种字符串压缩表示的解压(js)

输入&#xff1a;2dff 输出 !error 两个d不需要压缩&#xff0c;故输入不合法 输入:4eA 输出:!error 全部由小写英文字母组成&#xff0c;压缩后不会出现&#xff0c;故输出不合法 function solution(str) {const error "!error";// 只能包含小写字母和数字 [^a-z0…

无需公网IP实现公网远程访问本地WebDAV服务

windows搭建WebDAV服务&#xff0c;并内网穿透公网访问【无公网IP】 文章目录 windows搭建WebDAV服务&#xff0c;并内网穿透公网访问【无公网IP】1. 安装IIS必要WebDav组件2. 客户端测试3. cpolar内网穿透3.1 打开Web-UI管理界面3.2 创建隧道3.3 查看在线隧道列表3.4 浏览器访…