Kafka-消费者-KafkaConsumer分析-Rebalance

在开始介绍Rebalance操作的实现细节之前,我们需要明确在哪几种情况下会触发Rebalance操作:

  1. 有新的消费者加入Consumer Group。
  2. 有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间的GC、网络延迟导致消费者长时间未向GroupCoordinator发送HeartbeatRequest时,GroupCoordinator会认为消费者下线。
  3. 有消费者主动退出Consumer Group。
  4. Consumer Group订阅的任一Topic出现分区数量的变化。
  5. 消费者调用unsubscrible()取消对某Topic的订阅。

第一阶段

Rebalance操作的第一步就是查找GroupCoordinator,这个阶段消费者会向Kafka集群中的任意一个Broker发送GroupCoordinatorRequest请求,并处理返回的GroupCoordinatorResponse响应。

GroupCoordinatorRequest消息体的格式比较简单,只包含了Consumer Group的id。GroupCoordinatorResponse消息体包含了错误码(short类型)、coordinator的节点Id(int类型)、GroupCoordinator的host(String类型)、GroupCoordinator的端口号(int类型)。
发送GroupCoordinatorRequest请求的入口是ConsumerCoordinator的ensureCoordinatorReady方法,其流程如图所示。

在这里插入图片描述

  1. 首先检测是否需要重新查找GroupCoordinator,主要是检查coordinator字段是否为空以及与GroupCoordinator之间的连接是否正常。

  2. 查找集群负载最低的Node节点,并创建GroupCoordinatorRequest请求。调用client.send方法将请求放入unsent队列中等待发送,并返回RequestFuture对象。返回的RequestFuture对象经过了compose方法适配,原理同HeartbeatCompletionHandler。

  3. 调用ConsumerNetworkClient.poll(future)方法,将GroupCoordinatorRequest请求发送出去。此处使用阻塞的方式发送,直到收到GroupCoordinatorResponse响应或异常完成,才从此方法返回。

  4. 检测检查RequestFuture对象的状态。如果出现RetriableException异常,则调用ConsumerNetworkClient.awaitMetadataUpdate()方法阻塞更新Metadata中记录的集群元数据后跳转到步骤1继续执行。如果不是RetriableException异常则直接报错。

  5. 如果成功找到GroupCoordinator节点,但是网络连接失败,则将其unsent中对应的请求清空,并将coordinator字段置为null,准备重新查找GroupCoordinator,退避一段时间后跳转到步骤1继续执行。

下面介绍处理GroupCoordinatorResponse的相关操作。通过对sendGroupCoordinatorRequest方法的分析我们知道,handleGroupMetadataResponse)方法是处理GroupCoordinatorResponse的入口,其步骤如下:

  1. 调用coordinatorUnknown()检测是否已经找到GroupCoordinator且成功连接。如果是则忽略此GroupCoordinatorResponse,因为在发送GroupCoordinatorRequest时并没有防止重发的机制,可能有多个GroupCoordinatorResponse;否则,继续下面的步骤。
  2. 解析GroupCoordinatorResponse得到服务端GroupCoordinator的信息。
  3. 构建Node对象赋值给coordinator字段,并尝试与GroupCoordinator建立连接。
  4. 启动HeartbeatTask定时任务。
  5. 最后,调用RequestFuture.complete()方法将正常收到GroupCoordinatorResponse的事件传播出去。
  6. 如果GroupCoordinatorResponse中的错误码不为NONE,则调用RequestFuture.raise方法将异常传播出去。最终由ensureCoordinatorReady方法中的步骤4处理。

第二阶段

在成功查找到对应的GroupCoordinator之后进入Join Group阶段。在此阶段,消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。先来了解JoinGroupRequest和JoinGroupResponse的消息体格式,如图所示。

在这里插入图片描述
了解了JoinGroupRequest和JoinGroupResponse的格式之后,再来分析第二阶段的相关处理流程,其入口函数是ensurePartitionAssignment方法。

ensurePartitionAssignment方法的流程如图所示。

在这里插入图片描述

  1. 调用SubscriptionState.partitionsAutoAssigned方法,检测Consumer的订阅是否是AUTO_TOPICS或AUTO_PATTERN。因为USER_ASSIGNED不需要进行Rebalance操作,而是由用户手动指定分区。

  2. 如果订阅模式是AUTO_PATTERN,则检查Metadata是否需要更新。

    在前面提到过,在ConsumerCoordinator的构造函数中为Metadata添加了监听器。当Metadata更新时就会使用SubscriptionState中的正则表达式过滤Topic,并更改SubscriptionState中的订阅信息。同时,也会使用metadataSnapshot字段记录当前的Metadata的快照。这里要更新Metadata的原因,是为了防止因使用过期的Metadata进行Rebalance操作而导致多次连续的Rebalance操作。

  3. 调用ConsumerCoordinator.needRejoin()方法判断是要发送JoinGroupRequest加入ConsumerGroup,其实现是检测是否使用了AUTO_TOPICS或AUTO_PATTERN模式,检测rejoinNeeded和needsPartitionAssignment两个字段的值。

  4. 调用onJoinPrepare方法进行发送JoinGroupRequest请求之前的准备,做了三件事:一是如果开启了自动提交offset则进行同步提交offset,提交offset的内容后面会详细介绍,此步骤可能会阻塞线程;二是调用注册在SubscriptionState中的ConsumerRebalanceListener上的 回调方法;三是将SubscriptionState的needsPartitionAssignment字段设置为true并收缩groupSubscription集合。

  5. 再次调用needRejoin方法检测,之后调用ensureCoordinatorReady方法检测已经找到GroupCoordinator且与之建立了连接。

  6. 如果还有发往GroupCoordinator所在Node的请求,则阻塞等待这些请求全部发送完成并收到响应(即等待unsent及InFlightRequests的对应队列为空),然后返回步骤5继续执行,主要是为了避免重复发送JoinGroupRequest请求。

  7. 调用sendJoinGroupRequest方法创建JoinGroupRequest请求,并调用ConsumerNetworkClient.send方法将请求放入unsent中缓存,等待发送。

  8. 在步骤7返回的RequestFuture对象上添加RequestFutureListener。

  9. 调用ConsumerNetworkClient.poll方法发送JoinGroupRequest,这里会阻塞等待,直到收到JoinGroupResponse或出现异常。

  10. 检测RequestFuture.fail。如果出现RetriableException异常则进行重试,其他异常则报错。如果无异常,则整个第二阶段操作完成。

通过前面对JoinGroupRequest发送流程的分析,我们了解到JoinGrouResponse处理流程的入口是JoinGroupResponseHandler:handle()方法,其中还包括了SyncGroupRequest发送的操作。

JoinGrouResponse的处理流程如图所示。

在这里插入图片描述

  1. 解析JoinGroupResponse,获取GroupCoordinator分配的memberld、generation等信息,更新到本地。
  2. 消费者根据leaderld检测自己是不是Leader。如果是Leader则进入onJoinLeader方法,如果不是Leader则进入onJoinFollower方法。从上面的流程图也可以看出,onJoinFollower()方法的逻辑是onJoinLeader()方法的子集,下面主要分析onJoinLeader方法。
  3. Leader根据JoinGroupResponse的group_protocol字段指定的Parition分配策略,查找相应的PartitionAssignor对象。
  4. Leader将JoinGroupResponse的members字段进行反序列化,得到ConsumerGroup中全部消费者订阅的Topic。Leader会将这些Topic信息添加到其SubscriptionState.groupSubscription集合中。而Follower则只关心自己订阅的Topic信息。
  5. 第4步可能有新的Topic添加进来,所以更新Metadata信息。
  6. 待Metadata更新完成后,会在assignmentSnapshot字段中存储一个Metadata快照(即通过Metadata的Listener创建的快照)。
  7. 调用PartitionAssignor.assign()方法进行分区分配。
  8. 将分配结果序列化,保存到Map中返回,其中key是消费者的memberld,value是分配结果序列化后的ByteBuffer。

第三阶段

完成分区分配之后就进入了Synchronizing Group State 阶段,主要逻辑是向GroupCoordinator 发送 SyncGroupRequest 请求并处理 SyncGroupResponse 响应。

先来了解SyncGroupRequest 和 SyncGroupResponse 的消息体格式。

在这里插入图片描述
SyncGroupRequest 中各个字段的含义如表

在这里插入图片描述
SyncGroupResponse 中各个字段的含义如表

在这里插入图片描述
通过前面对onJoinLeader方法分析,我们知道发送 SyncGroupRequest 请求的逻辑紧接在分区分配操作之后,也是在 onJoinLeader方法中完成的。下面是其流程:

  1. 得到序列化后的分区分配结果后,Leader将其封装成 SyncGroupRequest,而Follower形成的SyncGroupRequest中这部分为空集合。
  2. 调用ConsumerNetworkClient.send方法将请求放入unsent集合中等待发送。
    对SyncGroupResponse处理的入口是SyncGroupResponseHandler.handle方法。对于正常完成的情况,解析SyncGroupResponse,从中拿到分区分配结果并将其传递出去;对于出现异常情况,将rejoinNeeded设置为true,并针对不用的错误码进行不同的处理。

从SyncGroupResponse中得到的分区分配结果最终由ConsumerCoordinator.onJoinComplete()方法处理,调用此方法的是在第二阶段ensureActiveGroup方法的步骤8中添加的RequestFutureListener中调用。onJoinComplete()方法的流程如图所示。

在这里插入图片描述

  1. 在第二阶段Leader开始分配分区之前,Leader使用assignmentSnapshot字段记录了Metadata快照。此时在Leader中,将此快照与最新的Metadata快照进行对比。如果快照不一致则表示分区分配过程中出现了Topic增删或分区数量的变化,则将needsPartitionAssignment置为true,需重新进行分区分配。
  2. 反序列化拿到分配给当前消费者的分区,并添加到SubscriptionStata.assignment集合中,之后消费者会按照此集合指定的分区进行消费,将needsPartitionAssignment置为false。
  3. 调用PartitionAssignor的onAssignment()回调函数,默认是空实现。当用户自定义PartitionAssignor时,可以自定义此方法。
  4. 如果开启了自动提交offset的功能,则重新启动AutoCommitTask定时任务。
  5. 调用SubscriptionState中注册的ConsumerRebalanceListener。
  6. 将needsJoinPrepare重置为true,为下次Rebalance操作做准备。
  7. 重启HeartbeatTask定时任务,定时发送心跳。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/336226.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ISA Server2006部署RuoYi无法登录的问题

ISA Server2006部署RuoYi无法正常登录。每次登录都会报错如下: 无效的会话,或者会话已过期,请重新登录。 原因分析 在nginx中部署没有问题,在ISA Server就会报这个错。根据登录的原理,我猜测可能是headr中的Author…

Linux:软件包管理器 yum

提示:以下指令均在Xshell 7 中进行 什么是软件包? 在Linux下安装软件, 一个通常的办法是下载到程序的源代码, 并进行编译, 得到可执行程序。 但是这样太麻烦了, 于是有些人把一些常用的软件提前编译好, 做成软件包(可以理解成windows上的安 装程序)放…

JVM篇--垃圾回收高频面试题

JVM垃圾回收 1 简单说下Java垃圾回收机制? 首先在java运行过程中,其实程序员并不需要去显示的调用程序来释放对象的内存,而是由虚拟机来完成的,具体来看是在jvm中有个垃圾回收线程,这个线程是个守护线程,…

pytest实现多进程与多线程运行超好用的插件

前言 如果想分布式执行用例,用例设计必须遵循以下原则: 1、用例之间都是独立的, 2、用例a不要去依赖用例b 3、用例执行没先后顺序, 4、随机都能执行每个用例都能独立运行成功每个用例都能重复运行,不影响其它用例 这…

假设检验:以样本服从二项分布举例

目录 假设检验一、假设检验的思想二、假设检验的基本步骤1. 确定要进行检验的假设2. 选择检验统计量。3. 确定用于做决策的拒绝域4. 求出检验统计量的值5. 查看样本结果是否位于拒绝域内6. 做出决策 三、举例说明例子1——某公司治疗打鼾例子2——女士品茶的故事 假设检验 一、…

python爬虫知识点:5种线程锁

嗨喽~大家好呀,这里是魔王呐 ❤ ~! python更多源码/资料/解答/教程等 点击此处跳转文末名片免费获取 线程安全 线程安全是多线程或多进程编程中的一个概念,在拥有共享数据的多条线程并行执行的程序中,线程安全的代码会通过同步机制保证各个…

细说JavaScript事件处理(JavaScript事件处理详解)

js语言的一个特色和就是它的动态性,即一时间驱动的方式对用户输入作出反应而不需要依赖服务器端程序。事件是指人机交互的结果,如鼠标移动、点击按钮、在表单中输入数据或载入新的Web洁面等。 一、什么是事件 1、事件类型 1.1、事件源 1.2、事件处理…

【数据结构】详谈队列的顺序存储及C语言实现

循环队列及其基本操作的C语言实现 前言一、队列的顺序存储1.1 队尾指针与队头指针1.2 基本操作实现的底层逻辑1.2.1 队列的创建与销毁1.2.2 队列的增加与删除1.2.3 队列的判空与判满1.2.4 逻辑的局限性 二、循环队列2.1 循环队列的实现逻辑一2.2 循环队列的实现逻辑二2.3 循环队…

Git 配置与理解

简述 Git 在 Windows 和 Ubuntu 中的配置,以及对 Git 工作区域划分和 Git 中对于文件状态划分的理解。 git 基础安装与配置 基于 WSL 的 Ubuntu 下的 git 打开或关闭Windows功能 -> Hyper-V、Virtual Machine Platform、Windows Subsystem for Linux # 1.必须…

傲空间私有部署Windows指南

推荐阅读 智能化校园:深入探讨云端管理系统设计与实现(一) 智能化校园:深入探讨云端管理系统设计与实现(二) 安装 docker 请下载对应的 Docker,安装完成后启动。 Docker Desktop for Windows…

【Linux取经路】初探进程地址空间

文章目录 一、历史问题回顾二、语言层面的地址空间2.1 验证 三、虚拟地址的引入3.1 初步解释这种现象——引入地址空间的概念3.2 再来粗粒度理解上面的现象 四、细节解释4.1 地址空间究竟是什么?4.2为什么要有地址空间4.3 页表4.3.1 CR3寄存器4.3.2 页表是由页表项组…

智慧校园大数据应用系统介(3)

智能巡课系统 巡课系统是一种新的课堂观察记录工具,它将学校或区域内全体教师作为一个整体,采用数字化手段描述教师和学生的课堂行为。通过移动端实时记录和通用性的统计分析,使教育者更容易发现教学过程与教学成果之间的联系,有助于改变课堂生态、提高教学有效性、提升教…

Codeforces Round 895 (Div. 3)补题

Two Vessels(Problem - A - Codeforces) 题目大意:有两个无限容器,目前一个容器中有a克水,另一个容器中有b克水,现有一个大小为cg的容器,我们每次可以从一个无限容器中取任意不大于c克的水&…

【Linux】相关背景及环境搭建

前言: 认识 Linux, 了解 Linux 的相关背景,学会如何使用云服务器,掌握使用远程终端工具 xshell 登陆 Linux 服务器 文章目录 一、Linux介绍1.1 关于UNIX1.2 Linux的诞生及发展历程1.3 Linux开源1.4 Linux在各个行业的现状1.5 发行版本 二、Li…

令牌桶算法与Guava的实现RateLimiter源码分析

令牌桶算法与Guava的实现RateLimiter源码分析 令牌桶RateLimiter简介RateLimiter使用示例导入maven依赖编写测试代码 RateLimiter的实现源码解析SmoothRateLimiterSmoothBursty恒速获取令牌acquire(int)tryAcquire(int,long,TimeUnit) 存量桶系数小结 优缺点与漏桶的区别总结 令…

Python爬虫时被封IP,该怎么解决?四大动态IP平台测评

在使用 Python 进行爬虫时,很有可能因为一些异常行为被封 IP,这主要是因为一些爬虫时产生的异常行为导致的。 在曾经的一次数据爬取的时候,我尝试去爬取Google地图上面的商家联系方式和地址信息做营销,可是很不幸,还只…

CloudPanel file-manager/backend/makefile接口存在远程命令执行漏洞CVE-2023-35885

@[toc] 免责声明:请勿利用文章内的相关技术从事非法测试,由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失,均由使用者本人负责,所产生的一切不良后果与文章作者无关。该文章仅供学习用途使用。 1. CloudPanel 简介 微信公众号搜索:南风漏…

【漏洞复现】Hikvision摄像头产品越权漏洞(CVE-2017-7921)

Nx01 产品简介 Hikvision(海康威视)是一家在中国颇具影响力的安防公司,其网络摄像头产品在市场上占据了相当大的份额。Hikvision的网络摄像头产品线非常丰富,涵盖了各种型号和功能,以满足不同用户的需求。 Nx02 漏洞描…

Spring DI

目录 什么是依赖注入 属性注入 构造函数注入 Setter 注入 依赖注入的优势 什么是依赖注入 依赖注入是一种设计模式,它通过外部实体(通常是容器)来注入一个对象的依赖关系,而不是在对象内部创建这些依赖关系。这种方式使得对象…

03-黑马程序员大数据开发:Apache Hive

一、 Apache Hive概述 1. 目的:了解什么是分布式SQL计算;了解什么是Apache Hive 2. 使用Hive处理数据的好处 操作接口采用类SQL语法,提供快速开发的能力(简单、容易上手)底层执行MapReduc…