Kafka-消费者-Consumer Group Rebalance设计

在同一个Consumer Group中,同一个Topic的不同分区会分配给不同的消费者进行消费,那么为消费者分配分区的操作是在Kafka服务端完成的吗?分区是如何进行分配呢?下面来分析Rebalance操作的原理。

方案一

Kafka最开始的解决方案是通过ZooKeeper的Watcher实现的。

每个Consumer Group在ZooKeeper下都维护了一个“/consumers/[group_id]/ids”路径,在此路径下使用临时节点记录属于此Consumer Group的消费者的Id,由Consumer启动时创建。

还有两个与ids节点同级的节点,它们分别是:
owners节点,记录了分区与消费者的对应关系;
offsets节点,记录了此Consumer Group在某个分区上的消费位置。

每个Broker、Topic以及分区在ZooKeeper中也都对应一个路径,如下所示。

  • /brokers/ids/broker_id:记录了host、port以及分配在此Broker上的Topic的分区列表。
  • /brokers/topics/[topic_name]:记录了每个Partition的Leader、ISR等信息。
  • /brokers/topics/[topic_name]/partitions/[partition_num]/state:记录了当前Leader、选举epoch等信息
    路径图如图所示。

在这里插入图片描述
每个Consumer都分别在“/consumers/[group_id]/ids”和“brokers/ids”路径上注册一个Watcher。

当“/consumers/[group_id]/ids”路径的子节点发生变化时,表示ConsumerGroup中的消费者出现了变化;

当“/brokers/ids”路径的子节点发生变化时,表示Broker出现了增减。
这样,通过Watcher,每个消费者就可以监控Consumer Group和Kafka集群的状态了。

这个方案看上去不错,但是严重依赖于ZooKeeper集群,有两个比较严重的问题:

  • 羊群效应(Herd Effect):先解释一下什么是“羊群效应”,一个被Watch的ZooKeeper节点变化,导致大量的Watcher通知需要被发送给客户端,这将导致在通知期间其他操作延迟。

    一般出现这种情况的主要原因就是没有找到客户端真正的关注点,也算是滥用Watcher的一种场景。
    继续前面的分析,任何Broker或Consumer加入或退出,都会向其余所有的Consumer发送Watcher通知触发Rebalance,就出现了“羊群效应”。

  • 脑裂(Split Brain):每个Consumer都是通过ZooKeeper中保存的这些元数据判断Consumer Group状态、Broker的状态以及Rebalance结果的,由于ZooKeeper只保证“最终一致性”,不保证“Simultaneously Consistent Cross-Client Views”,不同Consumer在同一时刻可能连接到ZooKeeper集群中不同的服务器,看到的元数据就可能不一样,这就会造成不正确的Rebalance尝试。

方案二

由于上述两个原因,Kafka的后续版本对Rebalance操作进行了改进,也对消费者进行了重新设计。

其核心设计思想是:将全部的Consumer Group分成多个子集,每个Consumer Group子集在服务端对应一个GroupCoordinator对其进行管理,GroupCoordinator是KafkaServer中用于管理Consumer Group的组件,消费者不再依赖ZooKeeper,而只有GroupCoordinator在ZooKeeper上添加Watcher。

消费者在加入或退出Consumer Group时会修改ZooKeeper中保存的元数据,这点与上文描述的方案一类似,此时会触发GroupCoordinator设置的Watcher,通知GroupCoordinator开始Rebalance操作。

下面简述这个过程:

  1. 当前消费者准备加入某Consumer Group或是GroupCoordinator发生故障转移时,消费者并不知道GroupCoordinator的网络位置,消费者会向Kafka集群中的任一Broker发送ConsumerMetadataRequest,此请求中包含了其Consumer Group的Groupld,收到请求的Broker会返回ConsumerMetadataResponse作为响应,其中包含了管理此ConsumerGroup的GroupCoordinator的相关信息。

  2. 消费者根据ConsumerMetadataResponse中的GroupCoordinator信息,连接到GroupCoordinator并周期性地发送HeartbeatRequest,HeartbeatRequest的具体格式在后面会详细介绍。

    发送HeartbeatRequest的主要作用是为了告诉GroupCoordinator此消费者正常在线,GroupCoordinator会认为长时间未发送HeartbeatRequest的消费者已经下线,触发新一轮的Rebalance操作。

  3. 如果HeartbeatResponse中带有IllegalGeneration异常,说明GroupCoordinator发起了Rebalance操作,此时消费者发送JoinGroupRequest(具体格式在后面介绍)给GroupCoordinator,JoinGroupRequest的主要目的是为了通知GroupCoordinator,当前消费者要加入指定的Consumer Group。

    之后,GroupCoordinator会根据收到的JoinGroupRequest和ZooKeeper中的元数据完成对此Consumer Group的分区分配。

  4. GroupCoordinator会在分配完成后,将分配结果写入ZooKeeper保存,并通过JoinGroupResponse返回给消费者。消费者就可以根据JoinGroupResponse中分配的分区开始消费数据。

  5. 消费者成功成为Consumer Group的成员后,会周期性发送HeartbeatRequest。如果HeartbeatResponse包含IlegalGeneration异常,则执行步骤3。如果找不到对应的GroupCoordinator(HeartbeatResponse包含NotCoordinatorForGroup异常),则周期性地执行步骤1,直至成功。

这里只是简略地描述此方案的步骤,整个方案还是有点复杂的,其中比较严谨地描述了消费者和GroupCoordinator的状态图和各个阶段可能发生的故障以及故障转移处理,本文重点关注Consumer Group Rebalance方面。

上面这种方案虽然解决了“羊群效应”、“脑裂”问题,但是还是有两个问题:

  • 分区分配的操作是在服务端的GroupCoordinator中完成的,这就要求服务端实现Partition的分配策略。当要使用新的Partition分配策略时,就必须修改服务端的代码或配置,之后重启服务,这就显得比较麻烦。

  • 不同的Rebalance策略有不同的验证需求。当需要自定义分区分配策略和验证需求时,就会很麻烦。

方案三

为了解决上述问题,Kafka进行了重新设计,将分区分配的工作放到了消费者这一端进行处理,而Consumer Group管理的工作则依然由GroupCoordinator处理。

这就让不同的模块关注不同的业务,实现了业务的切分和解耦,这种思想在设计时很重要。

重新设计后的协议在上一版本的协议上进行了修改,将JoinGroupRequest的处理过程拆分成了两个阶段,分别是Join Group阶段和Synchronizing Group State阶段。

当消费者查找到管理当前Consumer Group的GroupCoordinator后,就会进入Join Group阶段,Consumer首先向GroupCoordinator发送JoinGroupRequest请求,其中包含消费者的相关信息;

服务端的GroupCoordinator收到JoinGroupRequest后会暂存消息,收集到全部消费者之后,根据JoinGroupRequest中的信息来确定Consumer Group中可用的消费者,从中选取一个消费者成为Group Leader,还会选取使用的分区分配策略,最后将这些信息封装成JoinGroupResponse返回给消费者。

虽然每个消费者都会收到JoinGroupResponse,但是只有Group Leader收到的JoinGroupResponse中封装了所有消费者的信息。

当消费者确定自己是Group Leader后,会根据消费者的信息以及选定的分区分配策略进行分区分配。

在Synchronizing Group State阶段,每个消费者会发送SyncGroupRequest到GroupCoordinator,但是只有Group Leader的SyncGroupRequest请求包含了分区的分配结果,GroupCoordinator根据Group Leader的分区分配结果,形成SyncGroupResponse返回给所有Consumer。

消费者收到SyncGroupResponse后进行解析,即可获取分配给自身的分区。

最后,我们来了解消费者的状态转移与各请求之间的关系,如图所示。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/330016.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Nginx 简介

1、概念介绍 Nginx ("engine x") 是一个轻量级、高性能的 WEB 服务器软件和反向代理服务器。 Nginx 是由 Igor Sysoev 为俄罗斯访问量第二的 Rambler.ru 站点开发的,第一个公开版本 0.1.0 发布于 2004 年 10 月 4 日。其将源代码以类 BSD 许可证的形式发…

手机崩溃日志的查找与分析

手机崩溃日志的查找与分析 摘要 本文介绍了一款名为克魔助手的iOS应用日志查看工具,该工具可以方便地查看iPhone设备上应用和系统运行时的实时日志和崩溃日志。同时还提供了崩溃日志的分析查看模块,可以对苹果崩溃日志进行符号化、格式化和分析&#x…

【Spring实战】30 创建自己的 Spring Boot HelloWorld Starter

文章目录 1. 定义2. 创建3. 依赖4. 编写自动配置类5. 编写 HelloWorldService 类6. 编写 Starter 配置文件7. 打包并发布到本地仓库7. 引入 HelloWorld Starter8. 使用 HelloWorld结语 Spring Boot Starter 是一项强大的功能,可以帮助我们轻松集成和配置各种常用组件…

微信小程序中的两种页面跳转方式

方式一(声明式导航): 利用<navigator></navigator> url:要跳转页面的地址 open-type:要打开的页面的类型 &#xff08;不在底部导航中添加的为非导航页面&#xff0c;在的为导航页面&#xff09; 非导航页面跳转过去后左上角会出现返回箭头&#xff0c;导航页面…

KNN算法原理及应用

理解KNN 算法原理 KNN是监督学习分类算法&#xff0c;主要解决现实生活中分类问题。 根据目标的不同将监督学习任务分为了分类学习及回归预测问题。 监督学习任务的基本流程和架构&#xff1a; &#xff08;1&#xff09;首先准备数据&#xff0c;可以是视频、音频、文本、…

appium之联动pycharm

前置条件&#xff1a; 1.java环境安装好了 2.android-sdk安装好&#xff08;uiautomatorviewer 也可以把这个启动起来&#xff09; 3.appium安装好 4.adb devices查看下设备是否连接 pycharm入门代码--固定写法 from appium import webdriver# 定义字典变量 desired_caps …

机器学习笔记——机器学习的分类

1 机器学习是啥 机器学习是人工智能的一个分支&#xff0c;它是一门研究机器获取新知识和新技能&#xff0c;并识别现有知识的学问。 机器学习已广泛应用于数据挖掘、计算机视觉、自然语言处理、生物特征识别、搜索引擎、医学诊断、检测信用卡欺诈、证券市场分析、DNA 序列测…

Luckysheet类似excel的在线表格(vue)

参考文档&#xff1a;快速上手 | Luckysheet文档 一、引入 在vue项目的public文件夹下的index.html的<head>标签里面引入 <link relstylesheet hrefhttps://cdn.jsdelivr.net/npm/luckysheetlatest/dist/plugins/css/pluginsCss.css /><link relstylesheet hre…

c# 视频播放之Vlc.DotNet.Forms

先说下优缺点 优点&#xff1a;与电脑无关&#xff0c;能播放主流编码格式视频。 缺点&#xff1a;只能播放本地视频&#xff0c;网络视频播放不了。 下面是具体操作和代码 1. 安装Vlc.DotNet.Forms 和 VideoLAN.LibVLC.Windows Vlc.DotNet.Forms 是播放库&#xff0c;Vid…

2018年认证杯SPSSPRO杯数学建模D题(第一阶段)投篮的最佳出手点全过程文档及程序

2018年认证杯SPSSPRO杯数学建模 对于投篮最佳出手点的探究 D题 投篮的最佳出手点 原题再现&#xff1a; 影响投篮命中率的因素不仅仅有出手角度、球感、出手速度&#xff0c;还有出手点的选择。规范的投篮动作包含两膝微屈、重心落在两脚掌上、下肢蹬地发力、身体随之向前上…

高效解决在本地计算机运行服务器端的jupyter lab

文章目录 问题解决方案step1step2step3step4 问题 目前&#xff0c;网上没有什么详细的关于在本地计算机上运行服务器端jupyter lab的教程&#xff0c;由于个人计算机计算资源有限&#xff0c;我们需要利用服务器端的GPU实现高效训练 这篇文章将指导您如何使用 ssh 隧道在远…

Spring框架面试题

目录 1.Spring中bean的生命周期 2.Spring中bean的循环依赖 3.SpringMVC执行流程 4.Springboot自动装配原理 5.Spring框架常见注解(Spring、Springboot、SpringMVC) 6.mybatis执行流程 7.mybatis延迟加载使用及原理 8.mybatis一级、二级缓存 1.Spring中bean的生命周期 2.…

Kafka 消息不能正常消费问题排查

订单宽表数据不同步 事情的起因是专员在 ze app 上查不到订单了&#xff0c;而订单数据是从 mysql 的 order_search_info 查询的&#xff0c;order_search_info 表的数据是从 oracel 的 BZ_ORDER_INFO 表同步过来的&#xff0c;查不到说明同步有问题 首先重启&#xff0c;同步…

Elasticsearch:将数据从 Snowflake 摄取到 Elasticsearch

作者&#xff1a;来自 Elastic Ashish Tiwari 为了利用 Elasticsearch 提供的强大搜索功能&#xff0c;许多企业在 Elasticsearch 中保留可搜索数据的副本。 Elasticsearch 是一种经过验证的技术&#xff0c;适用于传统文本搜索以及用于语义搜索用例的向量搜索。 Elasticsearch…

VSCODE使用CMAKE显示命令无法找到

背景&#xff1a;使用了code server&#xff0c;安装CMAKE和CMAKE TOOLS&#xff0c;但是通过ctrlshiftp打开命令面板&#xff0c;运行随便一个cmake指令&#xff0c;都出现了指令无法找到。具体为“命令"CMake: 配置"导致错误 (command ‘cmake.configure’ not fou…

PDF转PowerPoint - Java实现方法

通过编程实现PDF转PPT的功能&#xff0c;可以自动化转换过程&#xff0c;减少手动操作的工作量&#xff0c;并根据需要进行批量转换。将PDF文件转换为PPT文档后&#xff0c;可以利用PPT的丰富功能和动画效果&#xff0c;达到更好的演示效果。 在Java中&#xff0c;我们可以使用…

21所考408的院校有哪些?

计算机考研一直是考研的热门&#xff0c;那么在决定要参加计算机考研的时候&#xff0c;就要确定自己的复习方向&#xff0c;主流的复习方向有两类&#xff0c;一类是统考&#xff0c;也就是大家常说的408&#xff0c;还有一类是自命题&#xff0c;每一个学校的自命题都有所区别…

6. UE5 RPG AttributeSet的设置

AttributeSet 负责定义和持有属性并且管理属性的变化。开发者可以子类化UAttributeSet。在OwnerActor的构造方法中创建的AttributeSet将会自动注册到ASC。这一步必须在C中完成。 Attributes 是由 FGameplayAttributeData定义的浮点值。 Attributes能够表达从角色的生命值到角色…

QT第二周周三

题目&#xff1a;使用图片绘制出仪表盘 代码&#xff1a; widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : public QWidget {Q_OBJECTpublic:Widget(QWidget *paren…

springBoot 添加自定义类库包

一、新建SpringBoot Web 二、添加类库包 com.saas.pdf 删除掉多余的类&#xff0c;新建类&#xff1a;PdfUtil.java package com.saas.pdf;public class PdfUtil {public static void Save(String filePath) {System.out.println("保存成功&#xff01;");} }三、…