rocketmq-push模式-消费侧重平衡-类流程图分析

1、观察consumer线程

使用arthas分析
把无关线程排除掉

  • MQClientFactoryScheduledThread 定时任务线程
    定时任务线程,包含如下任务:
    每2分钟更新nameServer列表
    每30秒更新topic的路由信息
    每30秒检查broker的存活,发送心跳请求
    每5秒持久化消费队列的offset。如果是广播模式,持久化在本地;如果是集群模式,反馈给broker
    每分钟调整线程池大小(实际上并没有作用。因为最终执行是空方法)

  • PullMessageService 从broker拉取msg的线程。

  • RebalanceService 重平衡线程。每20秒执行一次
    具体可查看org.apache.rocketmq.client.impl.factory.MQClientInstance#start()

2、重平衡,任务是如何创建的

重平衡,就是在消费者组动态伸缩的时候,自动把队列重新分配。具体工作的线程,就是RebalanceService。如下是整个重平衡的类图流程
在这里插入图片描述
如图,启动时,会触发重平衡任务org.apache.rocketmq.client.impl.consumer.RebalanceService#run()
重平衡的关键点在于如何动态伸缩,重点内容在org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance

  • 第一步,获取topic对应的队列集合Set<> mqSet
  • 第二步,随机从一个可选broker上,获取所有消费者的集合List cidAll。cid就是消费端的唯一标识。格式如下:“ip@pid#时间戳”,比如127.01.01.01@1723#2926328724786400
  • 第三步,按字段排序mqSet和cidAll。Collections.sort()
if (mqSet != null && cidAll != null) {
	List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
	mqAll.addAll(mqSet);

	Collections.sort(mqAll);
	Collections.sort(cidAll);

	AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

	List<MessageQueue> allocateResult = null;
	try {
		allocateResult = strategy.allocate(
			this.consumerGroup,
			this.mQClientFactory.getClientId(),
			mqAll,
			cidAll);
	} catch (Throwable e) {
		log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
			e);
		return;
	}

	Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
	if (allocateResult != null) {
		allocateResultSet.addAll(allocateResult);
	}

	boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
	if (changed) {
		log.info(
			"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
			strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
			allocateResultSet.size(), allocateResultSet);
		this.messageQueueChanged(topic, mqSet, allocateResultSet);
	}
}
  • 第四步、选择重平衡策略
    总共有6种。常用的是AllocateMessageQueueAveragely 平均hash队列算法;AllocateMessageQueueAveragelyByCircle 圆周平均hash队列算法
  • 第五步,判断当前consumer节点是否有伸缩变更。有则创建PullRequest请求体
  • 第六步,将PullRequest请求体put到PullMessageService拉取任务的队列pullRequestQueue

topic的路由信息,是如何更新的

回看上一节的第一步。队列集合Set<> mqSet。在重平衡线程中,是直接获取这个集合的。这个集合,其实是启动时和定时任务线程MQClientFactoryScheduledThread 更新的
每30秒更新topic的路由信息
详细如图:
在这里插入图片描述

总结

首先启动时,会从nameserver获取topic的所有queue。这些queue分布在多个broker上。
构建了mqSet。再随机从一个broker上,获取当前消费者组,包含的所有消费者List<> cidAll。
将两者排序,根据分配策略,分配当前消费者负责的队列。(比如总共12个队列,4个消费者。当前消费者,负责 4,5,6队列)。如此看,是客户端重平衡。通过排序然后策略分配的方式,实现消费者互不通信的条件下协同合作
启动时,内存都是空,所以会触发构建PullRequest请求体。将请求体,put进拉取线程PullMessageService的队列。
每过20秒,会做一次重平衡;
每过30秒,会更新一次路由信息;

后续分析:

  • 拉取线程PullMessageService的工作;
  • 运行过程中,没有重平衡的情况,RebalanceService是不会再创建PullRequest请求体的。如何重复构建PullRequest请求体,循环拉取?(这块代码在PullMessageService中实现)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/943958.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

使用亚马逊针对 PyTorch 和 MinIO 的 S3 连接器实现可迭代式数据集

2023 年 11 月&#xff0c;Amazon 宣布推出适用于 PyTorch 的 S3 连接器。适用于 PyTorch 的 Amazon S3 连接器提供了专为 S3 对象存储构建的 PyTorch 数据集基元&#xff08;数据集和数据加载器&#xff09;的实现。它支持用于随机数据访问模式的地图样式数据集和用于流式处理…

[2003].第2-01节:关系型数据库表及SQL简介

所有博客大纲 后端学习大纲 MySQL学习大纲 1.数据库表介绍&#xff1a; 1.1.表、记录、字段 1.E-R&#xff08;entity-relationship&#xff0c;实体-联系&#xff09;模型中有三个主要概念是&#xff1a; 实体集 、 属性 、 联系集2.一个实体集&#xff08;class&#xff09…

wps透视数据表

1、操作 首先选中你要的行字段表格 -> 插入 -> 透视数据表 -> 拖动行值&#xff08;部门&#xff09;到下方&#xff0c;拖动值&#xff08;包裹数量、运费&#xff09;到下方 2、删除 选中整个透视数据表 -> delete 如图&#xff1a;

Python-流量分析常用工具脚本(Tshark,pyshark,scapy)

免责声明&#xff1a;本文仅作分享~ 目录 wireshark scapy 例&#xff1a;分析DNS流量 检查数据包是否包含特定协议层&#xff08;过滤&#xff09; 获取域名 例&#xff1a;提取 HTTP 请求中的 Host 信息 pyshark 例&#xff1a;解析 HTTP 请求和响应 例&#xff1a;分…

开发场景中Java 集合的最佳选择

在 Java 开发中&#xff0c;集合类是处理数据的核心工具。合理选择集合&#xff0c;不仅可以提高代码效率&#xff0c;还能让代码更简洁。本篇文章将重点探讨 List、Set 和 Map 的适用场景及优缺点&#xff0c;帮助你在实际开发中找到最佳解决方案。 一、List&#xff1a;有序存…

[2029].第6-06节:MyISAM引擎中的索引与 InnoDB引擎中的索引对比

所有博客大纲 后端学习大纲 MySQL学习大纲 1.MyISAM索引&#xff1a; 1.1.B树索引适用存储引擎&#xff1a; 1.B树索引适用存储引擎如下表所示&#xff1a; 2.即使多个存储引擎都支持同一种类型的B树索引&#xff0c;但它们的实现原理也是不同的 Innodb和MyISAM默认的索引是B…

DS的使用

使用DS和[address]实现字的传送 要解决的问题:CPU从内存单元中要读取数据 要求&#xff1a;CPU要读取一个内存单元的时候&#xff0c;必须先给出这个内存单元的地址。 原理&#xff1a;在8086PC中&#xff0c;内存地址段地址和偏移地址组成(段地址:偏移地址) 解决方案 :DS和[a…

使用RKNN进行YOLOv8人体姿态估计的实战教程:yolov8-pose.onnx转yolov8-pose.rknn+推理全流程

之前文章有提到“YOLOv8的原生模型包含了后处理步骤,其中一些形状超出了RK3588的矩阵计算限制,因此需要对输出层进行一些裁剪”,通过裁剪后得到的onnx能够顺利的进行rknn转换,本文将对转rnkk过程,以及相应的后处理进行阐述。并在文末附上全部源码、数据、模型的百度云盘链…

短视频矩阵系统后端源码搭建实战与技术详解,支持OEM

一、引言 随着短视频行业的蓬勃发展&#xff0c;短视频矩阵系统成为了众多企业和创作者进行多平台内容运营的有力工具。后端作为整个系统的核心支撑&#xff0c;负责处理复杂的业务逻辑、数据存储与交互&#xff0c;其搭建的质量直接影响着系统的性能、稳定性和可扩展性。本文将…

JS 设置按钮的loading效果

本文是在其他博主的博客JS学习笔记 | 遮罩层Loading实现_jsp loading-CSDN博客基础上&#xff0c;进行实践的。 目录 一、需求 二、Jspcss实现代码 一、需求 在springboot项目中的原始html5页面中&#xff0c;原本的功能是页面加载时&#xff0c;使用ajax向后端发送请求&…

用VBA将word文档处理成支持弹出式注释的epub文档可用的html内容

有一种epub文件&#xff0c;其中的注释以弹窗形式显示&#xff0c;如下图&#xff1a; 点击注释引用后&#xff0c;对应的注释内容会弹出在页面中显示&#xff0c;再次点击弹窗外的任意位置该弹窗即关闭&#xff0c;关闭后点击任意注释引用&#xff0c;对应的注释内容会弹窗显示…

实践KDTS-WEB从mysql迁移到kingbasev9

数据库国产化替代数据迁移是一个复杂且关键的过程。这涉及到将原有数据库中的数据准确、完整地迁移到新的国产数据库中&#xff0c;同时确保数据的完整性和一致性。人大金仓提供了强大的数据库迁移工具&#xff08;KDTS&#xff09;对同构、异构数据库数据迁移&#xff1b; 数…

多旋翼无人机理论 | 四旋翼动力学数学模型与Matlab仿真

多旋翼无人机理论 | 四旋翼动力学数学模型与Matlab仿真 力的来源数学模型数学模型总结Matlab 仿真 力的来源 无人机的动力系统&#xff1a;电调-电机-螺旋桨 。 给人最直观的感受就是 电机带动螺旋桨转&#xff0c;产生升力。 螺旋桨旋转产生升力的原因&#xff0c;在很多年…

为什么要在PHY芯片和RJ45网口中间加网络变压器

在PHY芯片和RJ45网口之间加入网络变压器是出于以下几个重要的考虑&#xff1a; 1. 电气隔离&#xff1a;网络变压器提供了电气隔离功能&#xff0c;有效阻断了PHY芯片与RJ45之间直流分量的直接连接。这样可以防止可能的电源冲突&#xff0c;降低系统故障的风险&#xff0c;并保…

Windows 安装 Jenkins 教程

Jenkins 简介 Jenkins 是一个开源的自动化服务器&#xff0c;主要用于持续集成&#xff08;CI&#xff09;和持续交付&#xff08;CD&#xff09;。它可以自动化软件开发生命周期中的许多任务&#xff0c;如构建、测试、部署和发布。Jenkins 最初是由 Kohsuke Kawaguchi 在 20…

Docker中的MYSQL导入本地SQL语句

在本地mysql安装的bin目录下打开cmd窗口并执行以下命令导出sql文件 mysqldump -uroot -p mysql >schema.sql mysql -数据库 schema.sql -导出的SQL语句文件名 使用xftp上传文件到centos7中的某个文件夹中 使用docker cp schema.sql mysql:.(有一个点&#xff09;上传到mys…

javaweb 04 springmvc

0.1 在上一次的课程中&#xff0c;我们开发了springbootweb的入门程序。 基于SpringBoot的方式开发一个web应用&#xff0c;浏览器发起请求 /hello 后 &#xff0c;给浏览器返回字符串 “Hello World ~”。 其实呢&#xff0c;是我们在浏览器发起请求&#xff0c;请求了我们…

LinkedList类 (链表)

目录 一. LinkedList 基本介绍 二. LinkedList 中的法及其应用 1. 添加元素 (1) add() (2) addAll() (3) addFirst() (4) addLast() 2. 删除元素 (1) remove() (2) removeAll() (3) removeFirst() (4) removeLast() 3. 遍历元素 (1) for 循环遍历 (2) for - each …

Python毕业设计选题:基于Python的社区爱心养老管理系统设计与实现_django

开发语言&#xff1a;Python框架&#xff1a;djangoPython版本&#xff1a;python3.7.7数据库&#xff1a;mysql 5.7数据库工具&#xff1a;Navicat11开发软件&#xff1a;PyCharm 系统展示 管理员登录 管理员功能界面 用户管理 身体健康界面 公共书籍界面 借阅信息界面 归还…

第T4周:TensorFlow实现猴痘识别(Tensorboard的使用)

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 目标&#xff1a; 1、学习tensorboard的使用 具体实现&#xff1a; &#xff08;一&#xff09;环境&#xff1a; 语言环境&#xff1a;Python 3.10 编 译 器…