消息中间件之RocketMQ源码分析(四)

消费者的Rebalance机制

客户端是通过Rebalance服务做到高可靠的。当发生Broker掉线、消费者实例掉线、
Topic扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡的,以支持全部队列的正常消费的?

Rebalance服务的类图

在这里插入图片描述

RebalanceImpl的核心属性

在这里插入图片描述

  • ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable:记录MessageQueue和ProcessQueue的关系,MessageQueue可以简单地理解为ConsumeQueue的客户端实现;ProcessQueue是保存Pull消息的本地容器
  • ConcurrentMap<String, Set< MessageQueue >> topicSubscribeInfoTable:Topic路由信息。保存Topic和MessageQueue的关系
  • ConcurrentMap<String(Topic), SubscriptionData> subscriptionInner:真正的订阅关系,保存当前消费者组订阅了哪些Topic的哪些Tag
  • AllocateMessageQueueStrategy allocateMessageQueueStrategy:MessageQUeue消息分配策略的实现
  • MQClientInstance mQClientFactory:client实例对象

RebalanceImpl的核心方法

  • boolean lock():为MessageQueue加锁
    在这里插入图片描述

  • void doRebalance():执行Rebalance操作
    在这里插入图片描述

  • void messageQueueChanged():通知Message发生变化,这个方法在Push和Pull两个类中被重写
    在这里插入图片描述

  • boolean removeUnnecessaryMessageQueue():去掉不再需要的MessageQueue
    在这里插入图片描述

  • void dispatchPullRequest():执行消息拉取请求

+

  • boolean updateProcessQueueTableInRebalance():在Rebalance中更新processQueue
    在这里插入图片描述

Rebalance过程

在这里插入图片描述

  • 消费者实例在收到Broker通知后是怎么执行Reblance的?这个操作是通过调用
    MQClientInstance.rebalanceImmediately()来实现的
    在这里插入图片描述
  • 这种设计是RocketMQ种典型的锁方式,执行wakeup命令后,this.waitForRunning()就会暂停,再执行this.mqClientFactory.doRebalance()
    在这里插入图片描述
  • doRebalance()方法主要有以下几个步骤
    1.查找当前clientId对应的全部的消费者组,全部执行一次Rebalance.
    虽然消费者实现分别为Pull消费和Push消费两种默认实现,调用的是不同实现类的Rebalance方法,但是实现逻辑都差不多
    在这里插入图片描述
    2.判断Rebalance开关,如果没有被暂停,则调用RebalancePushImpl.rebalance()方法
    在这里插入图片描述
    3.在RebalancePushImpl.rebalance()方法中,获取当前消费者全部订阅关系中的Topic,
    循环对每个Topic进行Rebalance.待全部的Rebalance都执行完之后,将不属于当前
    消费者的队列删除
    在这里插入图片描述
    在这里插入图片描述
    4.Topic队列重新分配,这里也就是客户端Rebalance的核心逻辑之处,根据是集群消费还是广播消费分别执行MessageQueue重新分配的逻辑,以集群消费为例分析
    在这里插入图片描述
    4.1.获取当前Topic的全部MessageQueue(代码中是mqSet)和该Topic的所有消费者的clientId(代码中是cidAll),只有当两者都不为空时,才执行Rebalance
    4.2.将全部的MessageQueue(代码中时mqAll)和消费者客户端(cidAll)进行排序。
    由于不是所有消费者的客户端都能彼此通信,所以将mqAll和cidAll排序的目的在于,
    保证所有消费者客户端在做Rebalance的时候,看到的MessageQueue列表和消费者
    客户端都是一样的试图,做Rebalance时才不会分配错
    4.3.按照当前设置的队列分配策略执行Queue分配。队列分配策略接口AllocateMessageQueueStrategy,该接口中,有两个方法allocate()和getName()
    在这里插入图片描述

allocate():执行队列分配操作,该方法必须满足全部队列都能分配到消费者
getName():获取当前分配算法的名字

目前队列分配策略有五种实现:
AllocateMessageQueueAveragely:平均分配,也就是默认使用的策略(强烈推荐)
AllocateMessageQueueAveragelyByCircle:环形分配策略
AllocateMessageQueueByConfig:手动配置
AllocateMessageQueueConsistentHash:一致性Hash分配
AllocateMessageQueueByMachineRoom:机房分配策略

4.4.动态更新ProcessQueue,在队列重新分配后,当前消费者消费的队列可能不会发生变化,也可能发生变化,不管时新增加了队列需要消费,还是减少了队列,都需要执行updateProcessQueueTableInRebalance()方法来更新ProcessQueue,如果有MessageQueue不再分配给当前的消费者消费,则设置ProcessQueue.setDropped(true),表示放其当前MessageQueue的Pull消息
在这里插入图片描述

如果在重新分配MessageQueue后,新增加了MessageQueue,
则添加一个对应的ProcessQueue,查询Queue拉取位点,包装一个新的PullRequest
来Pull消息;同理如果减少了MessageQueue,则将其对应的ProcessQueue删除,
不管MessageQueue时新增还是减少,都会设置changed为True,表示当前消费者
消费的MessageQueue有变化,源码中是分别两个集合遍历来判断是新增还是减少的。
在这里插入图片描述

PullRequest初始化的具体实现,新增的PullRequest对象将被分配出去拉取MessageQueue中的消息

在这里插入图片描述
4.5.执行messageQueueChanged()方法,如果有MessageQueue订阅关系发生变化,
则更新本地订阅关系版本,修改本地消费者有限流的一些参数,然后发送心跳,
通知所有Broker,当前订阅关系发生了改变
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/364062.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CHS_06.2.3.4_2+用信号量实现进程互斥、同步、前驱关系

CHS_06.2.3.4_2用信号量实现进程互斥、同步、前驱关系 知识总览信号量机制实现进程互斥信号量机制实现进程同步信号量机制实现前驱关系 知识回顾 各位同学 大家好 在这个小节中 我们要学习怎么用信号量机制来实现进程的同步互制关系 知识总览 那么 我们之前学习了互斥的几种软…

【C++杂货铺】详解类和对象 [下]

个人博客&#xff1a;代码菌-CSDN博客 专栏&#xff1a;C杂货铺_代码菌的博客-CSDN博客 目录 &#x1f308;前言&#x1f308; &#x1f4c1; 初始化列表&#xff08;灰常重要&#xff09; &#x1f4c2; 引入 &#x1f4c2; 概念 &#x1f4c2; 特性 &#x1f4c1; 拓展构…

OG Trade在ZKX揭幕:一家基于Starknet的游戏化永续合约交易所

ZKX的 OG Trade通过内置游戏化和30分钟交易竞赛&#xff0c;为所有交易者创造机会&#xff0c;革新了永续合约交易模式。 2024年1月30日 — ZKX宣布推出OG Trade&#xff0c;这是一家基于Starknet的游戏化永续合约交易所&#xff0c;旨在满足短期交易者、高水平交易者和波段交易…

021 while循环详解

什么时while循环 int i 0; // 循环输出i&#xff0c;大于100时结束 while(i < 100){System.out.println(i);i; } int i 0; int sum 0; // 计算1-100的和&#xff0c;输出 while(i < 100){sum i;i; } System.out.println(sum); 什么是死循环 循环没有停止下来的条件…

Vue3嵌套ref小细节,自我解惑

前言&#xff1a; 作者在学习时&#xff0c;遇到代码如下&#xff1a; import { ref,watch } from vue const state ref({count:0}) const addState ()>{state.value.count } 对于方法中对对象中count的理解存在偏差 问题及解决&#xff1a; 误解&#xff1a; 认为是…

面对近期行情大起大落的伦敦银需要关注什么?

近期经常有听到投资者抱怨说&#xff0c;伦敦银价格没有明显趋势&#xff0c;很难做。确实&#xff0c;我们从日线图看&#xff0c;金价处于一个比较宽幅的横盘区间当中&#xff0c;近期的行情也是大涨大跌。投资者认为&#xff0c;面对大起大落的行情无从下手。下面我们就来讨…

会话管理技术

会话管理 会话管理是跟踪用户跨网页活动的过程。以在线购物商场为例。用户可以选择产品并将其添加到购物车中。用户转到其他页面时,购物车中仍然保留详细信息,以便用户查看购物车中的物品并下订单。 会话跟踪也可用于跟踪用户的偏好。例如,如果用户选择了多本小说,则向用…

内网安全:RDP WinRS WinRM SPN Kerberos 横向移动

目录 WinRM协议 RDP协议 域横向移动&#xff1a;RDP协议 RDP协议利用 一. 探针服务 二. 获取NTML Hash 明文密码 三. 连接执行 域横向移动&#xff1a;WinRM WinRS WinRM协议、WinRS命令利用 一. cs 内置端口扫描5985 二. 连接执行 三. 上线CS 四. CS插件横向移动…

基于springboot+vue的阿博图书馆管理系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目背景…

Springboot项目基础配置:小白也能快速上手!

推荐文章 给软件行业带来了春天——揭秘Spring究竟是何方神圣&#xff08;一&#xff09; 给软件行业带来了春天——揭秘Spring究竟是何方神圣&#xff08;二&#xff09; 给软件行业带来了春天——揭秘Spring究竟是何方神圣&#xff08;三&#xff09; 给软件行业带来了春天—…

vue-cli项目运行流程介绍

一、前言 ​ 本文介绍 vue-cli搭建的项目运行流程&#xff0c;基于已经搭建好的基础项目。关于 vue-cli 构建项目的详细流程&#xff0c;可参考博文&#xff1a;使用vue脚手架构建项目 二、main.js 项目运行 会加载入口文件 main.js /* html文件中&#xff0c;通过script …

CTF盲水印工具:Blind-WaterMark安装

工具下载地址&#xff1a;GitCode - 开发者的代码家园 下载完毕后&#xff0c;只留这些东西就行 接下来需要安装两个依赖&#xff1a; opencv、matplotlib 直接pip install安装的话&#xff0c;工具使用会报错 所以需要到网站里挑选适合的版本进行安装 下载地址&#xff1…

DrissionPage多线程实践

DrissionPage多线程实践 背景&#xff1a;项目中需要抓取部分平台的数据&#xff0c;因为涉及到登录&#xff0c;且暂未实现接口登录。所以采用selenium登录后获取cookie传给requests的方式来实现。后了解到DrissionPage国产开源库&#xff0c;等于是把selenium和requests结合起…

部署YUM仓库服务

一、yum仓库 1. yum简介 yum是一个基于RPM包&#xff08;是Red-Hat Package Manager红帽软件包管理器的缩写&#xff09;构建的软件更新机制&#xff0c;能够自动解决软件包之间的依赖关系。 为什么会有依赖关系的发生 因为linux本身就是以系统简洁为自身优势&#xff0c;所以…

大数据信用报告应该去哪里查询比较好呢?

对于个人而言&#xff0c;大数据信用报告也变得越来越重要。那么&#xff0c;大数据信用报告应该去哪里查询呢?本文将为您详细介绍征信和大数据的区别&#xff0c;并推荐一个可靠的大数据平台。 首先&#xff0c;我们需要了解征信和大数据的区别 征信报告 依法采集、整理、保存…

day13_oop_抽象类_接口

今日内容 零、 复习昨日 一、作业 二、抽象 三、接口 零、 复习昨日 final的作用 最终的,修饰的类,属性,方法不能改变类不能继承,属性不能改变(常量),方法不能重写 static修饰方法的特点 修饰的属性方法在内存只有一份随着类加载而初始化不要new,可以通过类名直接调用被该类的所…

异或运算实现加密解密

异或运算符^&#xff0c;相同为0&#xff0c;不同为1&#xff08;同0非1&#xff09; 由异或运算法则可知&#xff1a;a ^ a 0&#xff0c;a ^ 0 a 如果c a ^ b&#xff0c;那么a b ^ c&#xff0c;即a ^ b ^ b a&#xff0c;^ 的逆运算仍然是 ^ 利用异或运算的性质&am…

LabVIEW船舶自动识别系统

在现代航海领域&#xff0c;安全高效的船舶自动识别系统对于保障航行安全和提高船舶管理效率非常重要。介绍了利用LabVIEW软件开发的一个船舶自动识别系统&#xff0c;该系统通过先进的数据采集和信号处理技术&#xff0c;显著提升了传统自动识别系统的性能。 这个船舶自动识别…

IAR编译和调试CMS32L051

0 Preface/Foreword 0.1 参考文档 中微半导体BAT系列单片机学习笔记_V1.1.pdf 1 配置方法 1.1 编译工具链添加 CMS对于IAR工具&#xff0c;有一个插件文件&#xff0c;用于安装对应的CMS系列芯片。 工具名称&#xff1a;iar_plug20210926.7z 按照完成之后&#xff0c;可…

电脑监控软件都有哪些?哪款好用?

在当今数字化时代&#xff0c;电脑监控软件已经成为企业和个人用户保障信息安全、管理电脑资源的重要工具。市场上存在多种电脑监控软件&#xff0c;每款软件都有其独特的优点和适用场景。本文将为您介绍几款常见的电脑监控软件&#xff0c;并分析哪款更适合您的需求。 绿虫电…