消息中间件之RocketMQ源码分析(二十二)

Broker主从同步流程

配置数据同步流程

配置数据包含4种类型:Topic配置、消费者位点、延迟位点、订阅关系配置。每种配置数据由一个继承自ConfigManager的类来管理,继承关系如图。Slave如何从Master同步这些配置呢?我们先来看一下初始化服务的步骤
在这里插入图片描述

  • 第一步:Master Broker在启动时,初始化一个BrokerOuterAPI,这个服务的功能包含Broker注册到Namesrv、Broker从Namesrv解绑、获取Topic配置信息、获取消费者位点信息、获取延迟位点信息及订阅关系等。
  • 第二步:Slave Broker在初始化Controller的定时任务时,会初始化SlaveSynchronize服务,每60s调用一次SlaveSynchronize.syncAll()方法
  • 第三步:syncAll()方法依次调用4种配置数据(Topic配置、消费者位点、延迟位点、订阅关系配置)的同步方法同步全量数据
    在这里插入图片描述
  • 第四步:syncAll()中执行的4个方法都通过Remoting模块同步调用BrokerOuterAPI,并从Master Broker获取数据,保存到Slave中
  • 第五步:Topic配置和订阅关系配置随着保存内存信息的同时持久化到磁盘上;消费者位点通过BrokerController初始化定时任务持久化到磁盘上;延迟位点信息通过ScheduleMessageService定时将内存持久化到磁盘上
    在这里插入图片描述

CommitLog数据同步流程

CommitLog的数据同步分为同步复制和异步复制两种。
同步复制是指生产者生产消息后,等待Master Broker将数据同步到Slave Broker后,再返回生产者数据存储状态;异步复制是指生产者在生产消息后,不用等待Slave同步,直接返回Master存储结果
在这里插入图片描述

  • 异步复制
    Master Broker启动时会启动HAService.AcceptSocketService服务,当监听到来自Slave的注册请求时会创建一个HAConnection,同时HAConnection会创建ReadSocketService和WriteSocketService
    两个服务并启动,开始主从数据同步。ReadSocketService接收Slave同步数据请求,并将这些信息保存在HAConnection中WriteSocketService根据HAConnection中保存的Slave同步请求,从CommitLog中查询数据,并发送给Slave.
    注:ReadSocketService和WriteSocketService是两个独立工作的线程服务,它们通过HAConnection中的公共变量将CommitLog同步给Slave

slaveRequestOffset表示Slave请求同步的位点值;
slaveAckOffset表示slave已经保存的位点值
在这里插入图片描述

  • 同步复制
    在CommitLog将消息存储到PageCache后,会调用CommitLog的handleHA()/submitReplicaRequest方法处理同步复制。
    当BrokerRole配置为SYNC_MASTER时表示当前Master Broker需要同步将消息"发送"到Slave.根据Master Broker CommitLog
    的存储结果构造一个GroupCommitRequest放入HAService中,再将GroupComitRequest放入GroupTransferService服务中,
    等待GroupTransferService同步成功的锁。如果同步成功那么GroupCommit中的锁会被唤醒,并设置flushOK为True,表示生产
    者发送的消息被Master Broker和Slave Broker 同时保存。
    一个Master Broker可以配置多个Slave Broker,当需要同步数据时,通过service.getWaitNotifyObject().wakeupAll()来唤醒全部的Slave同步。虽然多个Slave都同步了数据,但是一旦Master Broker不可用时,消费者只会从一个Slave中拉取消息,所以生产环境建议Slave不要配置太多。
    注:Slave在发送请求数据的Request时,会带上Slave请求的位点HAConnection.slaveRequestOffset,该值如果等于-1(默认),则表示没有Slave请求过位点数据
    在这里插入图片描述
  • ReadSocketService后台服务不断接收Slave Broker上报的offset,每上报一次都通知HAService.notifyTransferSome()方法,判断Slave同步的位点是否大于Master标记的已同步位点,如果大于则更新标记值,同时通知同步复制服务GroupTransferService.
    GroupTransferService扫描所有的同步请求,依次判断哪些GroupCommitRequest的待同步复制的位点是比已同步位点小的,
    释放GroupCommitRequest中的锁,消息处理线程可以将消息存储成功的结果返回给生产者
  • 消费队列文件(ConsumeQueue)和索引文件(IndexFile)这两个文件是在SlaveBroker上追加CommitLog后由ReputMessageService进行创建的,所以不需要同步

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/416960.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

5个-最佳开源RPA框架

在最近两年中,RPA加上AI,即智能自动化流程,已经成为频繁讨论的话题,特别是在企业和机构的数字化转型过程中。自动化与智能化成为了提高效率的关键手段,而RPA便是迈向这一未来的起始步骤。 可以将RPA视为人体的躯干神经…

Docker 入门笔记

课程地址 容器技术概述 docker能做什么:将应用程序代码和依赖打包为一个镜像,作为交付介质,在各种环境中部署 相比于虚拟机,docker 只虚拟出一个隔离的程序运行环境,其需要则资源大大减少 容器内的程序就好像直接运…

pytorch 图像数据集管理

目录 1.数据集的管理说明 2.数据集Dataset类说明 3.图像分类常用的类 ImageFolder 1.数据集的管理说明 pytorch使用Dataset来管理训练和测试数据集,前文说过 torchvision.datasets.MNIST 这些 torchvision.datasets里面的数据集都是继承Dataset而来&#xff0c…

QT Mingw编译ffmpeg源码以及测试

文章目录 前言下载msys2ysamFFmpeg 搭建编译环境安装msys2安装QT Mingw编译器到msys环境中安装ysam测试 编译FFmpeg 前言 FFmpeg不像VLC有支持QT的库文件,它仅提供源码,需要使用者自行编译成对应的库,当使用QTFFmpeg实现播放视频以及视频流时…

Linux下快速创建大文件的4种方法总结

1、使用 dd 命令创建大文件 dd 命令用于复制和转换文件,它最常见的用途是创建实时 Linux USB。dd 命令是实际写入硬盘,文件产生的速度取决于硬盘的读写速度,根据文件的大小,该命令将需要一些时间才能完成。 假设我们要创建一个名…

Vuepress的使用

介绍 将markdown静态资源转换成html。 动态资源的转换还有很多,为什么要使用Vuepress? 目录分析 项目配置 详情 具体配置请看文档 插件配置 vuepress-theme-vdoing 主题插件 npm install vuepress-theme-vdoing -D先安装依赖配置主题 使用vuep…

外包干了6个月,技术退步明显。。。。。

先说一下自己的情况,本科生,2019年我通过校招踏入了重庆一家软件公司,开始了我的职业生涯。那时的我,满怀热血和憧憬,期待着在这个行业中闯出一片天地。然而,随着时间的推移,我发现自己逐渐陷入…

GEE入门篇|遥感专业术语(实践操作4):光谱分辨率(Spectral Resolution)

目录 光谱分辨率(Spectral Resolution) 1.MODIS 2.EO-1 光谱分辨率(Spectral Resolution) 光谱分辨率是指传感器进行测量的光谱带的数量和宽度。 您可以将光谱带的宽度视为每个波段的波长间隔,在多个波段测量辐射亮…

android开发与实战,那些年Android面试官常问的知识点

前言 在做android项目开发时,大家都知道如果程序出错了,会弹出来一个强制退出的弹 出框,这个本身没什么问题,但是这个UI实在是太丑了,别说用户接受不了,就连 我们自己本身可能都接受不了。虽然我们在发布程…

Vue:【亲测可用】父组件数组包对象,传给子组件对象,子组件修改属性(字段)后,父组件没有更新

场景&#xff1a;vue中父组件数组包对象&#xff0c;传给子组件对象&#xff0c;子组件修改属性&#xff08;字段&#xff09;后&#xff0c;父组件没有更新 代码&#xff1a; # 父组件 <div v-for"(object, name, index) in arr" :key"index"><…

【MySQL】数据管理——DML操作数据

目录 DML&#xff08;数据操作语言&#xff09;添加数据插入单行语法插入多行语法SQL示例将查询结果插入到新表中语法1&#xff1a;语法2&#xff1a; 修改数据语法示例关于SQL的运算符算术运算符比较运算符逻辑运算符 案例 删除数据DELETE命令语法 TRUNCATE TABLE 命令语法代码…

宝塔FTP服务设置并结合cpolar内网穿透实现远程传输文件

文章目录 1. Linux安装Cpolar2. 创建FTP公网地址3. 宝塔FTP服务设置4. FTP服务远程连接小结 5. 固定FTP公网地址6. 固定FTP地址连接 宝塔FTP是宝塔面板中的一项功能&#xff0c;用于设置和管理FTP服务。通过宝塔FTP&#xff0c;用户可以创建FTP账号&#xff0c;配置FTP用户权限…

数据结构——lesson4带头双向循环链表实现

前言✨✨ &#x1f4a5;个人主页&#xff1a;大耳朵土土垚-CSDN博客 &#x1f4a5; 所属专栏&#xff1a;数据结构学习笔记​​​​​​ &#x1f4a5;双链表与单链表的区分&#xff1a;单链表介绍与实现 &#x1f4a5;对于malloc函数有疑问的:动态内存函数介绍 感谢大家的观看…

为什么推荐使用ref而不是reactive

为什么推荐使用ref而不是reactive 局限性问题&#xff1a; reactive本身存在一些局限性&#xff0c;可能会在开发过程中引发一些问题。这需要额外的注意力和处理&#xff0c;否则可能对开发造成麻烦。数据类型限制&#xff1a; reactive声明的数据类型仅限于对象&#xff0c;而…

RK3568 android11 调试陀螺仪模块 MPU6500

一&#xff0c;MPU6500功能介绍 1.简介 MPU6500是一款由TDK生产的运动/惯性传感器&#xff0c;属于惯性测量设备&#xff08;IMU&#xff09;的一种。MPU6500集成了3轴加速度计、3轴陀螺仪和一个板载数字运动处理器&#xff08;DMP&#xff09;&#xff0c;能够提供6轴的运动…

【毛毛讲书】【端粒:年轻、健康、长寿的新科学】是什么决定了我们的寿命?

重磅推荐专栏&#xff1a; 《大模型AIGC》 《课程大纲》 《知识星球》 本专栏致力于探索和讨论当今最前沿的技术趋势和应用领域&#xff0c;包括但不限于ChatGPT和Stable Diffusion等。我们将深入研究大型模型的开发和应用&#xff0c;以及与之相关的人工智能生成内容&#xff…

es获取某个索引下字段的分词结果

//查看某个索引下字段的分词结果 GET /haha/_analyze { "field": "title", "text":"哈哈。" }

云尚办公-0.1.0

二、用户管理接口 1. 建表 角色与用户是多对多的关系&#xff0c;所以除了角色表和用户表外&#xff0c;还需要第三张表表示这两者间的对应关系。关系表中的用户id和角色id分别以对应表中的id作为外键。 CREATE TABLE sys_user (id BIGINT(20) NOT NULL AUTO_INCREMENT COM…

Vue3切换路由白屏刷新后才显示页面内容

1.首先检查页面路由以及页面路径配置是否配置错误。 在router-view 中给路由添加key标识。 &#xff01;&#xff01;注意&#xff1a;有使用layout封装布局的&#xff0c;是在layout下的主页面中的 router-view 添加标识&#xff0c;不是在src根目录下main.vue中修改&#xf…

[云原生] K8s之pod进阶

一、pod的状态说明 &#xff08;1&#xff09;Pod 一直处于Pending状态 Pending状态意味着Pod的YAML文件已经提交给Kubernetes&#xff0c;API对象已经被创建并保存在Etcd当中。但是&#xff0c;这个Pod里有些容器因为某种原因而不能被顺利创建。比如&#xff0c;调度不成功(…