全网最细RocketMQ源码四:消息存储

看完上一章之后,有没有很好奇,生产者发送完消息之后,server是如何存储,这一章节就来学习

入口

SendMessageProcessor.processRequest
在这里插入图片描述
在这里插入图片描述

  private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
        final RemotingCommand response = preSend(ctx, request, requestHeader);
        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

        if (response.getCode() != -1) {
            return CompletableFuture.completedFuture(response);
        }

        final byte[] body = request.getBody();

        int queueIdInt = requestHeader.getQueueId();
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

        if (queueIdInt < 0) {
            queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
        }

        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(requestHeader.getTopic());
        msgInner.setQueueId(queueIdInt);

        if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
            return CompletableFuture.completedFuture(response);
        }

        msgInner.setBody(body);
        msgInner.setFlag(requestHeader.getFlag());
        MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
        msgInner.setPropertiesString(requestHeader.getProperties());
        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
        msgInner.setBornHost(ctx.channel().remoteAddress());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
        String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));

        CompletableFuture<PutMessageResult> putMessageResult = null;
        Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
        String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (transFlag != null && Boolean.parseBoolean(transFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(response);
            }
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
        // 使用defaultMessageStore.aysncPutMessage存储
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }
        return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
    }

实际真正的负责存储就是DefaultMessageStore, 不过在讲述DefaultMessageStore的时候,我们是自底往上学,因为DefaultMessageStore比较复杂,从顶往下学容易学乱。先从地基开始,然后再看高楼大厦

MappedFile

public class MappedFile extends ReferenceResource {
    // 内存页大小:4k
    public static final int OS_PAGE_SIZE = 1024 * 4;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    // 当前进程下 所有的 mappedFile占用的总虚拟内存大小
    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);

    // 当前进程下 所有的 mappedFile个数
    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
    // 当前mappedFile数据写入点
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    // 当前mappedFIle数据落盘位点(flushedPosition 之前的数据 都是安全数据,flushedPosition~wrotePosition之间的数据 属于脏页)
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    // 文件大小
    protected int fileSize;
    // 文件通道
    protected FileChannel fileChannel;
    /**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     */
    protected ByteBuffer writeBuffer = null;
    protected TransientStorePool transientStorePool = null;

    // 文件名称(commitLog ConsumeQueue:文件名就是 第一条消息的 物理偏移量   索引文件: 年月日小时分钟秒.. )
    private String fileName;
    // 文件名转long
    private long fileFromOffset;
    // 文件对象
    private File file;
    // 内存映射缓冲区,访问虚拟内存
    private MappedByteBuffer mappedByteBuffer;
    // 该文件下 保存的第一条 msg 的存储时间
    private volatile long storeTimestamp = 0;
    // 当前文件如果是 目录内 有效文件的 首文件的话,该值为true
    private boolean firstCreateInQueue = false;
  • 构造方法
    在这里插入图片描述
    在这里插入图片描述

  • appendMessage方法
    在这里插入图片描述
    在这里插入图片描述

  • appendMessage(byte[] data)
    在这里插入图片描述

  • flush
    在这里插入图片描述

MappedFileQueue

public class MappedFileQueue {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);

    private static final int DELETE_FILES_BATCH_MAX = 10;

    // mfq 管理的目录(CommitLog: ../store/commitlog  或者  consumeQueue: ../store/xxx_topic/0)
    private final String storePath;

    // 目录下每个文件大小(commitLog文件 默认 1g     consumeQueue 文件 默认 600w字节)
    private final int mappedFileSize;

    // list,目录下的每个 mappedFile 都加入该list
    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

    // 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,内部线程处理完后 会返回给我们结果  结果 就是 mappedFile对象。
    private final AllocateMappedFileService allocateMappedFileService;

    // 目录的刷盘位点(它的值: curMappedFile.fileName + curMappedFile.wrotePosition)
    private long flushedWhere = 0;
    private long committedWhere = 0;

    // 当前目录下最后一条msg存储时间
    private volatile long storeTimestamp = 0;
  • load方法
    在这里插入图片描述

  • getLastMappedFile

 /**
     * 参数1:startOffset ,文件起始偏移量
     * 参数2:needCreate ,当 list 为空时,是否创建mappedFile
     */
    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
        // 该值控制是否创建 mappedFile,当需要创建mappedFile时,它充当文件名的结尾
        // 两种情况会创建:
        // 1. list内没有mappedFile
        // 2. list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..
        long createOffset = -1;

        MappedFile mappedFileLast = getLastMappedFile();

        if (mappedFileLast == null) {// 情况1  list内没有mappedFile
            // createOffset 取值 必须是 mappedFileSize 的倍数 或者 0
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        }

        if (mappedFileLast != null && mappedFileLast.isFull()) { // 情况2  list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..
            // 上一个文件名 转long + mappedFileSize
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }


        if (createOffset != -1 && needCreate) {// 这里面是创建 新的 mappedFile 的逻辑。
            // 获取待创建文件的 绝对路径(下次即将要创建的文件名)
            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);

            // 获取 下下次 要创建的文件的 绝对路径
            String nextNextFilePath = this.storePath + File.separator
                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);

            MappedFile mappedFile = null;


            if (this.allocateMappedFileService != null) {
                // 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,
                // 内部线程处理完后 会返回给我们结果  结果 就是 mappedFile对象。

                // 当mappedFileSize >= 1g 的话,这里创建的mappedFile 会执行它的 预热方法。
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
            } else {
                try {
                    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                } catch (IOException e) {
                    log.error("create mappedFile exception", e);
                }
            }


            if (mappedFile != null) {
                if (this.mappedFiles.isEmpty()) {
                    mappedFile.setFirstCreateInQueue(true);
                }
                this.mappedFiles.add(mappedFile);
            }

            return mappedFile;
        }

        return mappedFileLast;
    }

  • flush
 /**
     * @param flushLeastPages (0 表示强制刷新, > 0 脏页数据必须达到 flushLeastPages 才刷新)
     * @return boolean true 表示本次刷盘无数据落盘   false 表示本次刷盘有数据落盘
     */
    public boolean flush(final int flushLeastPages) {
        boolean result = true;

        // 获取当前正在刷盘的文件 (正在顺序写的mappedFile)
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);

        if (mappedFile != null) {
            // 获取mappedFile 最后一条消息的存储时间
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            // 调用mf 刷盘 ,返回mf的最新的落盘位点
            int offset = mappedFile.flush(flushLeastPages);
            // mf起始偏移量 + mf最新的落盘位点
            long where = mappedFile.getFileFromOffset() + offset;
            // true 表示本次刷盘无数据落盘   false 表示本次刷盘有数据落盘
            result = where == this.flushedWhere;
            // 将最新的目录刷盘位点 赋值给 flushedWhere
            this.flushedWhere = where;

            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }
        return result;
    }

CommitLog

ConsumeQueue

DefaultMessageStore

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/323967.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

tensorflow报错: DNN library is no found

错误描述 如上图在执行程序的时候&#xff0c;会出现 DNN library is no found 的报错 解决办法 这个错误基本上说明你安装的 cudnn有问题&#xff0c;或者没有安装这个工具。 首先检测一下你是否安装了 cudnn 进入CUDA_HOME下&#xff0c;也就是进入你的cuda的驱动的安装目…

Excel地址

解题思路&#xff1a; 根据题中歪歪和笨笨的话可以有两种解法。 1.输入的数为多大&#xff0c;则循环1多少次&#xff0c;当值为27时就要进行进位操作。这时要分情况讨论。 当集合中元素为一个时&#xff0c;如26&#xff0c;则需要变为1 1&#xff0c;集合元素个数加一。 当…

linux docker安装 rustdesk

这里写自定义目录标题 1&#xff1a;软件介绍&#xff1a;2&#xff1a;安装1. 服务器端2. 客户端 3&#xff1a;配置5&#xff1a;其他1:rustdesk 官方Docker Compose 1&#xff1a;软件介绍&#xff1a; 名称作用官网项目地址rustdesk实现多端互控https://rustdesk.com/inde…

L1-025 正整数A+B(Java)

题的目标很简单&#xff0c;就是求两个正整数A和B的和&#xff0c;其中A和B都在区间[1,1000]。稍微有点麻烦的是&#xff0c;输入并不保证是两个正整数。 输入格式&#xff1a; 输入在一行给出A和B&#xff0c;其间以空格分开。问题是A和B不一定是满足要求的正整数&#xff0…

Mindspore 公开课 - GPT

GPT Task 在模型 finetune 中&#xff0c;需要根据不同的下游任务来处理输入&#xff0c;主要的下游任务可分为以下四类&#xff1a; 分类&#xff08;Classification&#xff09;&#xff1a;给定一个输入文本&#xff0c;将其分为若干类别中的一类&#xff0c;如情感分类、…

嵌入式软件工程师面试题——2025校招社招通用(十八)

说明&#xff1a; 面试群&#xff0c;群号&#xff1a; 228447240面试题来源于网络书籍&#xff0c;公司题目以及博主原创或修改&#xff08;题目大部分来源于各种公司&#xff09;&#xff1b;文中很多题目&#xff0c;或许大家直接编译器写完&#xff0c;1分钟就出结果了。但…

pandas列与列之间的计算

1.简单计算 &#xff08;1&#xff09;首先导入pandas模块并读取数据 import pandas as pd adress"D:/pandas练习文件/计算.xlsx" datapd.read_excel(adress) &#xff08;2&#xff09; 计算销售额 销售额售价*销售数量 data["销售额"]data["售价&…

Unet系列网络解析

Unet UNet最早发表在2015的MICCAI上&#xff0c;到2020年中旬的引用量已经超过了9700多次&#xff0c;估计现在都过万了&#xff0c;从这方面看足以见得其影响力。当然&#xff0c;UNet这个基本的网络结构有太多的改进型&#xff0c;应用范围已经远远超出了医学图像的范畴。我…

Oracle常见操作

知识点1&#xff1a;格式化日期 select to_char(sysdate,yyyy-MM-dd HH:mm:ss) as time from dual;运行截图&#xff1a; 知识点2&#xff1a;解锁用户 alter user test account unlock;知识点3&#xff1a;修改密码 alter user test identified by test2;知识点4&#xff…

JVM-Arthas高效的监控工具

一、arthas介绍 3.选择监控哪个进程 4.进入具体进程 二、arthas的基础命令与基本操作 1.查询包含Java的系统属性&#xff1a; 命令&#xff1a;sysprop |grep java 1.查询不含Java的系统属性&#xff1a; 命令&#xff1a;sysprop | grep -v java 3.打印历史命令 命令&#…

vector容器解决杨辉三角

一、题目描述 118. 杨辉三角 给定一个非负整数 numRows&#xff0c;生成「杨辉三角」的前 numRows 行。 在「杨辉三角」中&#xff0c;每个数是它左上方和右上方的数的和。 示例 1: 输入: numRows 5 输出: [[1],[1,1],[1,2,1],[1,3,3,1],[1,4,6,4,1]]示例 2: 输入: numRo…

008 Linux_文件在磁盘上的管理

前言 本文将会向你介绍文件在磁盘上是如何被管理的 磁盘的物理存储结构 系统中的文件分为被打开的文件&#xff0c;和没被打开的文件&#xff0c;被打开的文件在内存中进行管理&#xff0c;而没有被打开的文件则在磁盘中进行管理 同样地&#xff0c;也需要对这些没被打开的文…

Vulnhub靶机:driftingblues 5

一、介绍 运行环境&#xff1a;Virtualbox 攻击机&#xff1a;kali&#xff08;10.0.2.15&#xff09; 靶机&#xff1a;driftingblues5&#xff08;10.0.2.21&#xff09; 目标&#xff1a;获取靶机root权限和flag 靶机下载地址&#xff1a;https://www.vulnhub.com/entr…

实验 2:语法分析程序的设计——以LL(1)为例(更适合njupt体质的宝宝)

实验 2&#xff1a;语法分析程序的设计——以LL(1)为例 PS&#xff1a;源码私聊 1.实验软件环境 ​ 开发平台&#xff1a;Windows 11 ​ 编程语言&#xff1a;C ​ 编译器版本&#xff1a;GCC 11.20 C20 ​ IDE&#xff1a;VS Code 2.实验原理描述 求 F i r s t First Fi…

基于springboot数码论坛系统源码和论文

网络的广泛应用给生活带来了十分的便利。所以把数码论坛与现在网络相结合&#xff0c;利用java技术建设数码论坛系统&#xff0c;实现数码论坛的信息化。则对于进一步提高数码论坛发展&#xff0c;丰富数码论坛经验能起到不少的促进作用。 数码论坛系统能够通过互联网得到广泛…

如何判断售卖的医疗器械产品是二类还是三类

售卖医疗器械需关注产品本身是否为一、二、三类医疗器械。第一类医疗器械为一般项目的经营范围无需取得备案或许可证即可销售。第二类医疗器械产品需办理第二类医疗器械的备案方可销售。第三类医疗器械需取得医疗器械经营许可证且许可证上的经营范围需与销售的产品对应方可销售…

IIC协议

文章目录 IIC介绍通信距离通信速度主从方式通信方式物理结构 IIC协议空闲状态开始信号、结束信号和应答信号向从机发送数据的过程读取从机数据的过程数据有效性协议一帧构成 IIC介绍 IIC(Inter&#xff0d;Integrated Circuit)总线是一种由 PHILIPS 公司开发的两线式串行总线。…

ArkUI-X跨平台已至,何需其它!

运行环境 DevEco Studio&#xff1a;4.0Release OpenHarmony SDK API10 开发板&#xff1a;润和DAYU200 自从写了一篇ArkUI-X跨平台的文章之后&#xff0c;好多人都说对这个项目十分关注。 那么今天我们就来完整的梳理一下这个项目。 1、ArkUI-X 我们之前可能更多接触的…

class_5:在c++中一个类包含另一个类的对象叫做组合

#include <iostream> using namespace std;class Wheel{ public://成员数据string brand; //品牌int year; //年限//真正的成员函数void printWheelInfo(); //声明成员函数 };void Wheel::printWheelInfo() {cout<<"我的轮胎品牌是&#xff1a;"<…

MySQL之多表查询

1.创建student和score表 CREATE TABLE student ( id INT(10) NOT NULL UNIQUE PRIMARY KEY , name VARCHAR(20) NOT NULL , sex VARCHAR(4) , birth YEAR, department VARCHAR(20) , address VARCHAR(50) ); 创建score表。SQL代码如下&#xff1a; CREATE TABLE sc…