深度解析RocketMq源码-消费者索引ConsumeQueue

1.绪论

rocketmq的broker中关于消息持久化的组件主要包含三个,分别是:持久化消息到文件中的组件commitLog;根据消息key索引commitLog日志的indexFile;消费者根据topic和queueId查询commitLog日志的consumeQueue。前面已经介绍commitLog和indexFile,这节我们就来学习一下consumeQueue。

2. consumeQueue

4.1.1 consumeQueue的中每条消息的组成

消费者在消息的时候,只知道它在哪个topic的哪个queue里面消费哪个tags的拿条消息,那我们怎么由此定位到一条消息呢?所以我们需要为commitLog建立一条索引。

其实每个topic+queueId都会建立一个ConsumeQueue文件,而这个映射关系存储在broker中consumeQueueTable中,我们查询消息的时候,过consumeQueueTable根据queueId和topic快速定位到我们需要的ConsumeQueue,然后我们再根据消费者所提交的consumeQueueOffset*每条consumequeue索引的大小便能找到我们所以要的consume索引文件的位置,再根据里面存储的commitLog的物理偏移量便能在commitLog中定位到具体的消息的位置。

commitLog的存储结构可以如下所示:

└── TopicTest
    ├── 0
            └── 00000000000000000000
    ├── 1
            └── 00000000000000000000
    ├── 2
            └── 00000000000000000000
    └── 3
            └── 00000000000000000000

可以通过下图来说明,consumequeue是如何组成的,并且和commitLog的关系:

3.consumeQueue中消息的创建和获取

consumequeue其实底层和commitLog是一样的,其实由多个mappedFile来构成的,这里我们就不在讨论consumequeue的具体存储逻辑。有兴趣的小伙伴可以看这篇文章:

深度解析RocketMq源码-持久化组件(四) CommitLog_rocketmq一主一备commitlog-CSDN博客

接下来我们主要看一看consumequeue是什么时候创建的,并且在消费者知道它需要消费的topic和queueId过后,如何找到它具体要消费的哪条消息的,这其实也是rocketmq的核心之一。

3.1. consumequeue建立消息

其实consumequeue是一个采用mappedFile持久化数据的组件,它写入数据其实发分两步:

1.根据topic和queueId在topic和queueid与consumequeue的映射表(consumeQueueTable)中找到Consumequeue。

2.构造一条consumequeue记录,包括8字节的8字节的commitLog的offset + 4字节的消息大小+8自己的tagsCode,然后顺序写入到consumequeue中。

3.1.1 根据topic和queueId找到需要写入的consumequeue

  public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
        //根据topic和queueId找到对应的consumequeue
        ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
        //根据consumequeue顺序写入consumemequeue的索引数据
        cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));
    }
 //根据queueId和topic检索到这个topic和queueId是属于哪个topic的
    public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        //根据consumeQueueTable和queueId获取到具体的ConsumeQueue
        ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
            ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }

        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            //如果consumeQueue为空,便新建一个Consumequeue
            ConsumeQueue newLogic = new ConsumeQueue(
                topic,
                queueId,
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
                this);
            //并且设置到consumeQueueTable中
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                if (MixAll.isLmq(topic)) {
                    lmqConsumeQueueNum.getAndIncrement();
                }
                logic = newLogic;
            }
        }

        return logic;
    }

3.1.2 向consumequeue中写入一条记录

//commitLog的真正组成8字节的commitLog的offset + 4字节的消息大小+8自己的tagsCode
    private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
        final long cqOffset) {
        if (offset + size <= this.maxPhysicOffset) {
            log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
            return true;
        }

        this.byteBufferIndex.flip();
        //consumqueue的每条数据占20个字节
        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
        //8字节的commotLog的物理偏移量
        this.byteBufferIndex.putLong(offset);
        //4字节消息大小
        this.byteBufferIndex.putInt(size);
        //8字节的tagsCode
        this.byteBufferIndex.putLong(tagsCode);
        //写入到哪个offset
        final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
        //获取到offset的最后一条数据
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
        if (mappedFile != null) {

            if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
                //将跟新mappedFilde1写指针为expectLogicOffset
                this.minLogicOffset = expectLogicOffset;
                this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
                this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
                this.fillPreBlank(mappedFile, expectLogicOffset);
                log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                    + mappedFile.getWrotePosition());
            }

            if (cqOffset != 0) {
                //
                long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

                if (expectLogicOffset < currentLogicOffset) {
                    log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                    return true;
                }

                if (expectLogicOffset != currentLogicOffset) {
                    LOG_ERROR.warn(
                        "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset,
                        currentLogicOffset,
                        this.topic,
                        this.queueId,
                        expectLogicOffset - currentLogicOffset
                    );
                }
            }
            //实际的物理地址为offset+size大小
            this.maxPhysicOffset = offset + size;
            //将数据写入到bytebuffer中
            return mappedFile.appendMessage(this.byteBufferIndex.array());
        }
        return false;
    }

3.2 根据topic和messagequeue检索消息

先根据topic和queueId找到以及consumeoffset查询一条消息分三步:

1.根据topic和queueId查询到对应的consumequeue;

2.根据consumeoffset在consumequeue中找到一条consumequeue的记录,里面包含一个属性就是实际消息在commitLog中的物理偏移量和大小;

3.根据物理偏移量和消息大小在commitLog中获取到实际消息内容。

    //根据queueId和topic检索到这个topic和queueId是属于哪个topic的
    public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        //根据consumeQueueTable和queueId获取到具体的ConsumeQueue
        ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
            ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }

        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            //如果consumeQueue为空,便新建一个Consumequeue
            ConsumeQueue newLogic = new ConsumeQueue(
                topic,
                queueId,
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
                this);
            //并且设置到consumeQueueTable中
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                if (MixAll.isLmq(topic)) {
                    lmqConsumeQueueNum.getAndIncrement();
                }
                logic = newLogic;
            }
        }

        return logic;
    }

再根据consumeoffset在consumequeue中获取到具体consumequeue的索引数据。

public boolean get(final long address, final CqExtUnit cqExtUnit) {
        if (!isExtAddr(address)) {
            return false;
        }

        final int mappedFileSize = this.mappedFileSize;
        final long realOffset = unDecorate(address);

        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(realOffset, realOffset == 0);
        if (mappedFile == null) {
            return false;
        }

        int pos = (int) (realOffset % mappedFileSize);

        SelectMappedBufferResult bufferResult = mappedFile.selectMappedBuffer(pos);
        if (bufferResult == null) {
            log.warn("[BUG] Consume queue extend unit({}) is not found!", realOffset);
            return false;
        }
        boolean ret = false;
        try {
            ret = cqExtUnit.read(bufferResult.getByteBuffer());
        } finally {
            bufferResult.release();
        }

        return ret;
    }

4.总结

至此,我们已经大概了解消息在进入到broker过后做了什么。在生产者推送消息到broker过后,为了保证数据的能够快速的持久化,是直接按照到达顺序写入到commitLog中的,然后就会给主线程返回生产消息成功的通知。但是消费者需要根据topic和queueId获取到一条消息,并且需要根据消息的key检索一条消息。为了满足上述两个需求,rocketmq会启动一个线程,扫描commitLog,如果有新的消息写入,便会构建IndexFile和consumequeue两个文件,其实相当于两个索引文件。这一步骤在我们后面章节会详细介绍。

其实先持久化文件,然后启动线程对消息做其他处理,这一思想的本质就是为了增大吞吐量。在其他框架中也会应用到这种思想,比如elasticsearch中,在写消息的时候,会同时写入到transLog和memory buffer中后便会返回成功,后续单独启动线程根据memory buffer中的数据来进行其他操作,比如分词,建立倒排索引等,可以看出translog其实就类似于rokcetmq的commitLog。所以万变不离其中,只要有一份持久化数据过后,便可以跟客户端返回成功了,然后再单独的启动线程根据这份持久化数据做定制化处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/754707.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Profibus协议转profinet协议网关模块连接电机保护器与PLC通讯

一、背景 工业通讯中常见的协议有&#xff1a;Modbus协议&#xff0c;ModbusTCP协议&#xff0c;Profinet协议&#xff0c;Profibus协议&#xff0c;Profibus DP协议&#xff0c;EtherCAT协议&#xff0c;EtherNET协议等在现代工业控制系统中具有重要的角色。而Profibus协议转…

智慧数据中心可视化:高效管理与直观监控的未来

随着数据中心的规模和复杂性不断增加&#xff0c;传统管理方式难以满足需求。智慧数据中心通过图扑可视化实现实时数据监控和智能分析&#xff0c;将复杂的基础设施直观呈现&#xff0c;极大提升了运维效率、故障排查速度和资源优化能力&#xff0c;为企业提供现代化、智能化的…

mac app应用程序如何自定义图标, 更换.app为自己喜欢的图标或者图片 详细图文讲解

在mac系统中&#xff0c;我们可以对任何的app应用程序更换或者自定义图标&#xff0c; 这个图标可以是拥有的app的图标&#xff0c;或者是你自己制作的 x.icns 图标 或者是 任意的图片&#xff0c; 建议大小512x512 。 自定义图标方法如下&#xff1a; 1. 更换为已有app的图标…

词向量模型

文章目录 RNN词向量模型模型整体框架训练数据构建CBOW与Skip-gram模型负采样 RNN 卷积神经网络&#xff08;CNN&#xff09;主要应用计算机视觉&#xff0c;而递归神经网络&#xff08;RNN&#xff09;主要应用于自然语言处理。 递归神经网络会涉及处理之前所有的数据&#x…

Paragon NTFS与Tuxera NTFS有何区别 Mac NTFS 磁盘读写工具选哪个好

macOS系统虽然以稳定、安全系数高等优点著称&#xff0c;但因其封闭性&#xff0c;不能对NTFS格式磁盘写入数据常被人们诟病。优质的解决方案是使用磁盘管理软件Paragon NTFS for Mac&#xff08;点击获取激活码&#xff09;和Tuxera NTFS&#xff08;点击获取激活码&#xff0…

51单片机STC89C52RC——11.1 蜂鸣器播放音乐

目录 目的/效果 一&#xff0c;STC单片机模块 二&#xff0c;蜂鸣器 2.1 介绍 2.2 板子位置电路图 2.3 发声原理 2.4 音符和频率 三&#xff0c;创建Keil项目 四&#xff0c;代码 4.1 乐谱代码 4.1.1 《义勇军进行曲》 4.1.2 《天空之城》 4.1.3 《小美满》 4.1.…

2024年湖南建筑安全员考试题库,精准题库。

31.安全考核的对象应包括施工企业各管理层的&#xff08;&#xff09;、相关职能部门及岗位和工程项目参建人员。 A.技术负责人 B.安全负责人 C.主要负责人 D.第一负责人 答案&#xff1a;C 32.安全防护设施应标准化、定型化、&#xff08;&#xff09;。 A.规范化 B.工…

TFMath Caculator:一个简单的Java AWT计算器

目录 背景&#xff1a; 代码展示: 代码解析: 输出结果: 总结: 背景&#xff1a; 使用Java AWT(Abstract Window Toolkit)库创建的简单计算器应用-TFMath Calculator。这个计算器允许用户输入两个数字&#xff0c;点击号按钮后&#xff0c;计算器会计算这两个数字的和&…

【Redis四】主从复制、哨兵以及Cluster集群

目录 一.主从复制、哨兵、集群的区别 二.Redis主从复制 1.作用 2.原理 3.流程 三.搭建Redis 主从复制 1.源码编译安装以及配置文件修改 1.1.修改 Redis 配置文件&#xff08;Slave节点操作&#xff09; 2.验证主从复制 2.1.在Master节点上看日志 2.2.在Master节点上…

混凝土搅拌站中的智能化系统应用

随着科技的飞速发展&#xff0c;混凝土搅拌站已经进入了现代化、智能化的新时代。现代自动化、智能化技术的应用&#xff0c;使得混凝土搅拌站更加高效、准确、可靠&#xff0c;同时也提高了生产效率和质量。本文将带你深入探索混凝土搅拌站中运用到现代自动化、智能化的方方面…

k8s架构设计思想

1.谷歌borg云计算管理平台 一类&#xff1a;infrastucture platform software 另一类&#xff1a;borg为主的非虚拟化技术&#xff0c;调度进程 核心是轻量级作业调度&#xff0c;不是做虚拟化/云平台的 borg本身用了一些容器技术 生产业务product workload要求高可用&#xf…

第三节:如何理解Spring的两个特性IOC和AOP(自学Spring boot 3.x第一天)

大家好&#xff0c;我是网创有方&#xff0c;接下来教大家如何理解Spring的两个特性IOC和AOP。本节有点难&#xff0c;大家多理解。 IOC&#xff08;控制反转&#xff09; 定义与核心思想&#xff1a; IOC&#xff0c;全称Inversion of Control&#xff0c;即控制反转。 其核…

为什么ISO 45001职业健康安全管理体系是企业发展的基石

ISO 45001源自OHSAS 18001职业健康和安全管理体系&#xff0c;是全球第一个国际职业健康和安全管理标准。ISO&#xff08;国际标准化组织&#xff09;于2018年发布了这一标准&#xff0c;旨在帮助各类组织为员工提供一个更安全、更健康的工作环境。与OHSAS 18001相比&#xff0…

云原生之使用Docker部署RabbitMQ消息中间件

云原生之使用Docker部署RabbitMQ消息中间件 一、RabbitMQ介绍1.1 RabbitMQ简介1.2 RabbitMQ特点1.3 RabbitMQ使用场景 二、检查Docker环境2.1 检查Docker版本2.2 检查操作系统版本2.3 检查Docker状态 三、下载RabbitMQ镜像四、部署RabbitMQ服务4.1创建挂载目录4.2 运行RabbitMQ…

LabVIEW在光学与光子学实验室中的应用

光学与光子学实验室致力于光学和光子学前沿领域的研究&#xff0c;涉及超快光学、非线性光学、光纤通信、光子晶体等多个方向。实验室需要高精度的实验控制和数据采集系统&#xff0c;以进行复杂的光学实验&#xff0c;并对实验数据进行实时处理和分析。 项目需求 实时控制与监…

C++再谈构造函数、隐式类型转换、static成员、友元函数、内部类等的介绍

目录 前言一、再谈构造函数1. 构造函数体赋值2. 初始化列表3. 初始化列表初始化顺序4. 初始化隐式类转换 二、static成员1. 概念2. 特性 三、 友元1. 友元函数2. 友元类 四、内部类总结 前言 C再谈构造函数、隐式类型转换、static成员、友元函数、内部类等的介绍 一、再谈构造…

imx6ull/linux应用编程学习(3) 输入设备应用编程(上)(按键)

0.概念 输入设备&#xff1a;可以产生输入事件的设备 Linux系统设计了一个兼容所有输入设备的框架&#xff0c;就是input子系统&#xff0c;其直接向应用层提供了一套统一的接口&#xff0c;其在/dev/input目录下。 流程&#xff1a;如果要读取输入设备&#xff0c;一般遵循以下…

Spring AOP实战--之优雅的统一打印web请求的出参和入参

背景介绍 由于实际项目内网开发&#xff0c;项目保密&#xff0c;因此本文以笔者自己搭建的demo做演示&#xff0c;方便大家理解。 在项目开发过程中&#xff0c;团队成员为了方便调试&#xff0c;经常会在方法的出口和入口处加上log输出&#xff0c;由于每个人的log需求和输…

IOS17闪退问题Assertion failure in void _UIGraphicsBeginImageContextWithOptions

最近项目更新到最新版本IOS17&#xff0c;发现一个以前的页面突然闪退了。原来是IOS17下&#xff0c;这个方法 UIGraphicsBeginImageContext(CGSize size) 已经被移除&#xff0c;原参数如果size为0的话&#xff0c;会出现闪退现象。 根据说明&#xff0c;上述方法已经被替换…

《UDS协议从入门到精通》系列——图解0x38:请求上传

《UDS协议从入门到精通》系列——图解0x38&#xff1a;请求上传 一、简介二、数据包格式2.1 服务请求格式2.2 服务响应格式2.2.1 肯定响应2.2.2 否定响应 三、通信示例 Tip&#x1f4cc;&#xff1a;本文描述中但凡涉及到其他UDS服务的&#xff0c;将陆续提供链接跳转方式以便快…