RocketMQ源码之消息刷盘分析

前言

刷盘是将内存中的消息写入磁盘,分为同步刷盘和异步刷盘。同步刷盘指一条消息写入磁盘才返回成功,异步刷盘指写入内存就返回成功,稍后异步线程刷盘。
在创建CommitLog对象的时候,会初始化刷盘服务:

//代码位置:org.apache.rocketmq.store.CommitLog
public CommitLog(final DefaultMessageStore defaultMessageStore) {
   
    //省略代码
    //同步刷盘
    if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
   
        this.flushCommitLogService = new GroupCommitService();
    } else {
   
        //异步刷盘
        this.flushCommitLogService = new FlushRealTimeService();
    }
    //实时提交服务
    this.commitLogService = new CommitRealTimeService();

    //省略代码
}

RocketMQ默认的是异步刷盘。刷盘服务的继承关系如下:
在这里插入图片描述
从上面类图可以看出,不管是同步刷盘还是异步刷盘都是继承了FlushCommitLogService类,GroupCommitService类用于同步刷盘,FlushRealTimeService用于异步刷盘。如果transientStorePoolEnable=true,并且是异步刷盘,则消息append时会放入writeBuffer(堆外内存),CommitRealTimeService的作用是将堆外内存中的消息异步写入到fileChannel中

在CommitLog启动的时候,会启动刷盘服务,代码如下:

//org.apache.rocketmq.store.CommitLog#start
   public void start() {
   
        this.flushCommitLogService.start();

        if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
   
            this.commitLogService.start();
        }
    }

isTransientStorePoolEnable的判断为transientStorePoolEnable为true,并且是异步刷盘。由于transientStorePoolEnable默认为false,所以默认不会进行commit操作

  public boolean isTransientStorePoolEnable() {
   
        return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
            && BrokerRole.SLAVE != getBrokerRole();
    }

在putMessage方法中,当消息写到缓存以后,就会调用handleDiskFlush方法进行刷盘,handleDiskFlush方法如下:

//代码位置:org.apache.rocketmq.store.CommitLog#handleDiskFlush
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
   
        // Synchronization flush
        //同步刷盘
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
   
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            //等待消息存储成功
            if (messageExt.isWaitStoreMsgOK()) {
   
                //创建一个同步刷盘请求
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                //添加同步刷盘请求
                service.putRequest(request);
                CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
                PutMessageStatus flushStatus = null;
                try {
   
                    //阻塞获取刷盘状态
                    flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
                            TimeUnit.MILLISECONDS);
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
   
                    //flushOK=false;
                }
                //刷盘的状态不成功
                if (flushStatus != PutMessageStatus.PUT_OK) {
   
                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: " + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
   
                //唤醒刷盘服务
                service.wakeup();
            }
        }
        // Asynchronous flush
        else {
   
            //TransientStorePool开关不可用
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable())

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/956171.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

navicat 折线图或面积图

折线图或面积图将信息显示为以直线段连接的一系列数据点。 折线图 面积图 堆积面积图 图表属性 选择图表类型后&#xff0c;可以更改其属性来自定义图表&#xff1a; 选项 描述 常规 背景颜色 设置图表区域的背景颜色。 不透明度 设置背景颜色的不透明度。 显示边框…

【数模学习笔记】插值算法和拟合算法

声明&#xff1a;以下笔记中的图片以及内容 均整理自“数学建模学习交流”清风老师的课程资料&#xff0c;仅用作学习交流使用 文章目录 插值算法定义三个类型插值举例插值多项式分段插值三角插值 一般插值多项式原理拉格朗日插值法龙格现象分段线性插值 牛顿插值法 Hermite埃尔…

机器学习皮马印第安人糖尿病数据集预测报告

目录 1.项目选题与设计方案 1.1项目选题 1.2设计方案 2.功能实现 2.1 主要功能介绍 2.2 开发环境及平台介绍 2.3 实现过程 2.3.1数据分析 2.3.2算法设计 2.3.3 python代码 3.个人心得体会 1.项目选题与设计方案 1.1项目选题 我国的糖尿病患者初诊时约&#xff18;&a…

STM32 FreeRTOS任务通知

目录 任务通知的简介 任务通知相关API函数介绍 任务通知的简介 任务通知是 FreeRTOS 中一种用于任务间通信的机制&#xff0c;它允许一个任务向其他任务发送简单的通知或信号&#xff0c;以实现任务间的同步和协作。任务通知通常用于替代二值信号量或事件标志组&#xff0c;提…

提示词的艺术----AI Prompt撰写指南(个人用)

提示词的艺术 写在前面 制定提示词就像是和朋友聊天一样&#xff0c;要求我们能够清楚地表达问题。通过这个过程&#xff0c;一方面要不断练习提高自己地表达能力&#xff0c;另一方面还要锻炼自己使用更准确精炼的语言提出问题的能力。 什么样的提示词有用&#xff1f; 有…

微服务网关,如何选择?

什么是API网关 API网关&#xff08;API Gateway&#xff09;是微服务架构中的一个关键组件&#xff0c;它充当了客户端与后端服务之间的中间层。其主要功能包括请求路由、协议转换、负载均衡、安全认证、限流熔断等。通过API网关&#xff0c;客户端无需直接与多个微服务交互&a…

PortSwigger靶场练习---第二关-查找和利用未使用的 API 端点

第二关&#xff1a;Finding and exploiting an unused API endpoint 实验&#xff1a;查找和利用未使用的 API 端点 PortSwigger靶场地址&#xff1a; Dashboard | Web Security Academy - PortSwigger 题目&#xff1a; 官方提示&#xff1a; 在 Burp 的浏览器中&#xff0c…

深度学习中的张量 - 使用PyTorch进行广播和元素级操作

深度学习中的张量 - 使用PyTorch进行广播和元素级操作 元素级是什么意思&#xff1f; 元素级操作在神经网络编程中与张量的使用非常常见。让我们从一个元素级操作的定义开始这次讨论。 一个_元素级_操作是在两个张量之间进行的操作&#xff0c;它作用于各自张量中的相应元素…

使用C语言实现栈的插入、删除和排序操作

栈是一种后进先出(LIFO, Last In First Out)的数据结构,这意味着最后插入的元素最先被删除。在C语言中,我们可以通过数组或链表来实现栈。本文将使用数组来实现一个简单的栈,并提供插入(push)、删除(pop)以及排序(这里采用一种简单的排序方法,例如冒泡排序)的操作示…

【逆境中绽放:万字回顾2024我在挑战中突破自我】

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 ​&#x1f4ab;个人格言:“没有罗马,那就自己创造罗马~” 文章目录 一、引言二、个人成长与盘点情感与心理成长学习与技能提升其它荣誉 三、年度创作历程回顾创作内容概…

Spring Boot + Apache POI 实现 Excel 导出:BOM物料清单生成器(支持中文文件名、样式美化、数据合并)

目录 引言 Apache POI操作Excel的实用技巧 1.合并单元格操作 2.设置单元格样式 1. 创建样式对象 2. 设置边框 3. 设置底色 4. 设置对齐方式 5. 设置字体样式 6.设置自动换行 7. 应用样式到单元格 3. 定位和操作指定单元格 4.实现标签-值的形式 5.列宽设置 1. 设…

C语言进阶习题【1】指针和数组(2)——字符数组

3. 字符数组练习 sizeof 只关注占用内存空间的大小&#xff0c;单位是字节&#xff0c;不关心内存中存放的是什么 sizeof 是操作符 strlen是求字符串长度的&#xff0c;统计的是\0之前出现的字符个数&#xff0c;一定要找到\0才算结束&#xff0c;所以可能存在越界访问的 strle…

嵌入式工程师必学(67):SWD仿真接口(for ARM)的使用方法

概述: JTAG JTAG代表联合测试行动小组(定义JTAG标准的小组),旨在作为测试板的一种方式。JTAG允许用户与微控制器的各个部分进行对话。在许多情况下,这涉及一组指令或对电路板进行编程。JTAG标准定义了5个引脚: TCK: Test Clock TMS: Test Mode Select TDI: Test Data-…

代理模式实现

一、概念&#xff1a;代理模式属于结构型设计模式。客户端不能直接访问一个对象&#xff0c;可以通过代理的第三者来间接访问该对象&#xff0c;代理对象控制着对于原对象的访问&#xff0c;并允许在客户端访问对象的前后进行一些扩展和处理&#xff1b;这种设置模式称为代理模…

华为HuaweiCloudStack(一)介绍与架构

本文简单介绍了华为HCS私有云解决方案&#xff0c;并从下至上介绍HCS的整体架构&#xff0c;部署架构、部署方式等内容。 目录 HCS简介 HCS架构 纵向结构 ?管理平台类型 HCS节点类型 FusionSphere OpenStack CPS ServiceOM SC 运营面 OC 运维面 HCS部署架构 regi…

【视觉惯性SLAM:十七、ORB-SLAM3 中的跟踪流程】

17.1 跟踪流程流程图 ORB-SLAM3 的跟踪模块是整个系统的重要组成部分&#xff0c;负责实时确定相机在三维空间中的姿态位置&#xff0c;并保持关键帧之间的连续性。其基本目标是将输入的视频流与已有地图数据进行对齐&#xff0c;完成位姿估计和地图更新。 流程图概述 一个…

【机器学习实战入门项目】MNIST数字分类机器学习项目

Python 深度学习项目&#xff1a;手写数字识别 为了使机器更加智能&#xff0c;开发者们正在深入研究机器学习和深度学习技术。人类通过不断练习和重复来学习执行某项任务&#xff0c;从而记住如何完成这些任务。然后&#xff0c;大脑中的神经元会自动触发&#xff0c;他们能够…

Python Pyside6 加Sqlite3 写一个 通用 进销存 系统 初型

图: 说明: 进销存管理系统说明文档 功能模块 1. 首页 显示关键业务数据商品总数供应商总数本月采购金额本月销售金额显示预警信息库存不足预警待付款采购单待收款销售单2. 商品管理 商品信息维护商品编码(唯一标识)商品名称规格型号单位分类进货价销售价库存数量预警…

Ubuntu安装K8S

第一步&#xff1a; 安装docker Install Docker #注意docker是早期的名称已经过时了&#xff0c;因此请使用如下命令&#xff0c;一步到位安装docker-ce。 第二步&#xff1a;设置K8S源&#xff1a; &#xff08;大陆使用aliyun源&#xff0c;大陆外使用google源&#xff09;…

Linux图形界面详解以及替换桌面程序方法

说明&#xff1a;本文章主要说明Linux图形界面的启动流程&#xff0c;以及使用自己的图形化应用替换桌面程序的方法&#xff0c;类似与安卓启动会启动Launcher&#xff0c;使用自己程序替换Launcher一样&#xff0c;实现应用独占系统&#xff0c;或者设计自己的桌面程序。 一、…