Kafka-消费者-KafkaConsumer分析-SubscriptionState

KafkaConsumer从Kafka拉取消息时发送的请求是FetchRequest(具体格式后面介绍),在其中需要指定消费者希望拉取的起始消息的offset。

为了消费者快速获取这个值,KafkaConsumer使用SubscriptionState来追踪TopicPartition与offset对应关系。

图展示了SubscriptionState依赖的类以及其核心字段。

在这里插入图片描述
SubscriptionType是SubscriptionState的一个内部枚举类型,表示的是订阅Topic的模式,分为四类。

  • NONE:SubscriptionState.subscriptionType的初始值。
  • AUTO_TOPICS:按照指定的Topic名字进行订阅,自动分配分区。
  • AUTO_PATTERN:按照指定的正则表达式匹配Topic进行订阅,自动分配分区。
  • USER_ASSIGNED:用户手动指定消费者消费的Topic以及分区编号。

TopicPartitionState表示的是TopicPartition的消费状态,其关键字段如下所示。

  • position:记录了下次要从Kafka服务端获取的消息的offset。
  • committed:记录了最近一次提交的offset。
  • paused:记录了当前TopicPartition是否处于暂停状态,与Consumer接口的pause方法相关。
  • resetStrategy:OffsetResetStrategy枚举类型,重置position的策略。同时,此字段是否为空,也表示了是否需要重置position的值。

TopicPartitionState提供了管理上面四个字段方法,比较简单,不再赘述。
在前面介绍Consumer接口时提到过,subscribe()方法和assign()方法是互斥的。其实上面介绍的三种模式都是互斥的。下面是setSubscriptionType()方法的代码,无论选择哪种模式都会调用此方法进行设置,如图3-10所示。

在这里插入图片描述
在这里插入图片描述
下面介绍SubscriptionState的核心字段。

  • subscriptionType:SubscriptionType枚举类型,表示订阅的模式。
  • subscribedPattern:使用AUTO_PATTERN模式时,是按照此字段记录的正则表达式对所有Topic进行匹配,对匹配符合的Topic进行订阅。
  • subscription:如果使用AUTO_TOPICS或AUTO_PATTERN模式,则使用此集合记录所有订阅的Topic。向subscription集合中添加数据的方法只有changeSubscription方法,而调用changeSubscription()方法有两处,如图所示。

在这里插入图片描述
在图中的①处,使用的是AUTO_TOPICS模式订阅;

图中的②处使用AUTOPATTERN模式订阅。

我们在前面介绍Metadata的时候提到过,可以在其上添加Listener,当Metadata更新时会触发Metadata.Listener.onMetadataUpdate()方法,图中的②处就是在Metadata的Listener中通过subscribedPattern模式过滤Topic,并调用changeSubscription()方法修改subscription集合。

  • userAssignment:如果使用USER_ASSIGNED模式,则此集合记录了分配给当前消费者的TopicPartition集合。SubscriptionType模式是互斥的,所以userAssignment集合与subscription集合也是互斥的。
  • assignment:Map<TopicPartition,TopicPartitionState>类型,无论使用什么订阅模式,都使用此集合记录每个TopicPartition的消费状态。
  • groupSubscription:在前面描述的协议中,Consumer Group中会选举一个Leader,Leader使用该集合记录Consumer Group中所有消费者订阅的Topic,而其他Follower的该集合中只保存了其自身的订阅的Topic。

在这里插入图片描述
图中的①处是将消费者自身订阅的Topic添加到groupSubscribe集合;

②处是在Leader收到JoinGroupResponse时调用,在JoinGroupResponse中包含了全部消费者订阅的Topic,在此时将Topic信息添加到groupSubscribe集合。

③处则是将groupSubscribe中其他消费者订阅的Topic删除,只留下自身订阅的Topic(即subscription集合),这是groupSubscription集合收缩的场景。

  • needsPartitionAssignment:标记是否需要进行一次分区分配。这里简单了解一下修改needPartitionAssignment的场景和含义,如图所示。

在这里插入图片描述
图中的①、⑤处将needsPartitionAssignment设置为true是因为消费者订阅的Topic发生了变化,所以需要进行分区分配;

③处将needsParitionAssignment设置为false是因为使用USER_ASSIGNED订阅模式,所以不需要分区分配操作;

④处是成功得到SyncGroupResponse中的分区分配结果时的操作,此时Rebalance操作结束,将needsPartitionAssignment设置为false;

②处的场景比较复杂,调用②处将needRessignment设置为true,主要是因为在某些请求响应中出现了ILLEGAL_GENERATION等异常,或是订阅的Topic出现了分区数量的变化,调用关系如图所示。

在这里插入图片描述

  • needsFetchCommittedOffsets:标记是否需要从GroupCoordinator获取最近提交的offset。当出现异步提交offset操作或是Rebalance操作刚完成时会将其置为true,成功获取最近提交offset之后会设置为fasle。
  • defaultResetStrategy:默认OffsetResetStrategy策略。
  • listener:ConsumerRebalanceListener类型,用于监听分区分配操作。

SubscriptionState中的方法主要是管理上面的几个集合字段,操作比较简单,不再详细介绍。下面简单分析前面示例中使用的subscribe()方法:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/330192.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

el-dialog嵌套使用,只显示遮罩层的问题

直接上解决方法 <!-- 错误写法 --><el-dialog><el-dialog></el-dialog></el-dialog><!-- 正确写法 --><el-dialog></el-dialog><el-dialog></el-dialog>我是不建议嵌套使用的&#xff0c;平级也能调用&#xff0c…

LaWGPT安装和使用教程的复现版本【细节满满】

文章目录 前言一、下载和部署1.1 下载1.2 环境安装1.3 模型推理 总结 前言 LaWGPT 是一系列基于中文法律知识的开源大语言模型。该系列模型在通用中文基座模型&#xff08;如 Chinese-LLaMA、ChatGLM等&#xff09;的基础上扩充法律领域专有词表、大规模中文法律语料预训练&am…

Qt 状态机框架:The State Machine Framework (二)

传送门: Qt 状态机框架:The State Machine Framework (一) Qt 状态机框架:The State Machine Framework (二) 1、利用并行态避免态的组合爆炸 假设您想在单个状态机中对汽车的一组互斥属性进行建模。假设我们感兴趣的属性是干净与肮脏&#xff0c;以及移动与不移动。需要四个…

【教3妹学编程-算法题】检查按位或是否存在尾随零

3妹&#xff1a;呜呜&#xff0c;烦死了&#xff0c; 脸上长了一个痘 2哥 : 不要在意这些细节嘛&#xff0c;不用管它&#xff0c;过两天自然不就好了。 3妹&#xff1a;切&#xff0c;你不懂&#xff0c;影响这两天的心情哇。 2哥 : 我看你是不急着找工作了啊&#xff0c; 工作…

Golang通过Gorm操作Mysql时遇到的datetime时区问题

情景描述 golang使用Gorm操作MySQL&#xff0c;MySQL中数据类型是datetime&#xff0c;Golang中用的是time.now。 但是会导致存储的时间与北京时间有8h误差&#xff0c; 显然是没有初始化时区导致。 问题修复 初始化设置时区 参考我自己之前写过的一篇总结——Mysql中多种日…

QT上位机开发(不同场景下界面的设计模板)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 qt由于其优秀的跨平台属性&#xff0c;几乎成了嵌入式开发界面开发的标配。同时呢&#xff0c;由于它在windows平台开发出来的效果也是非常的好&am…

Python开发环境安装:梦的起点

Python解释器安装 前言 解释器&#xff08;Interpreter&#xff09;&#xff0c;又译为直译器&#xff0c;是一种电脑程序能够把高级编程语言一行一行直接转译运行。解释器不会一次把整个程序转译出来&#xff0c;只像一位“中间人”&#xff0c;每次运行程序时都要先转成另一…

dubbo入门案例!!!

入门案例之前我们先介绍一下&#xff1a;zookeeper。 Zookeeper是Apacahe Hadoop的子项目&#xff0c;可以为分布式应用程序协调服务&#xff0c;适合作为Dubbo服务的注册中心&#xff0c;负责服务地址的注册与查找&#xff0c;相当于目录服务&#xff0c;服务提供者和消费者只…

rust跟我学七:获取外网IP地址

图为RUST吉祥物 大家好,我是get_local_info作者带剑书生,这里用一篇文章讲解get_local_info是怎么获取到本机的外网IP地址。 首先,先要了解get_local_info是什么? get_local_info是一个获取linux系统信息的rust三方库,并提供一些常用功能,目前版本0.2.4。详细介绍地址:[…

测试驱动开发:基于Jenkins+GoTest+HTML的持续化集成

目录 前言 一、项目框架 1.项目迭代 2.项目时序图 3.项目测试执行 二、项目具体实现 1.创建流水线 2.拉取代码 3.执行测试代码 4.生成测试报告 5.报告内容解读 6.数据统计 7.邮件通知 8.企业微信通知 三、项目遇到的问题 1.go test -args 2.go test生…

MyBatisPlus学习笔记四-扩展功能

1、代码生成器 1.1、官方的1 1.3、官方的2-idea插件 1.3、非官方的-idea插件 2、静态工具 先查询&#xff0c;再分组 3、逻辑删除 4、枚举处理器 5、JSON处理器

二、ArcGIS Pro SDK 开发环境配置踩坑

上篇写了如何配置开发环境&#xff0c;也确实是配置好了&#xff0c;激动的就睡觉去了&#xff0c;万万没想到&#xff0c;今天当要创建工程的时候&#xff0c;结果发现创建不了&#xff0c;弹出了如下错误&#xff1a; 很郁闷&#xff0c;于是有查找了资料发现&#xff1a; 是…

2024年华数杯国际赛B题超详细解题思路

ICM B题&#xff1a;光伏发电 该题目出题的难度与方向都与美赛ICM的题型高度相似&#xff0c;将本次竞赛当做美赛的练手赛&#xff0c;个人认为是非常合适的一种选择。同时28号就可以出成绩&#xff0c;也可以在美赛前实现查漏补缺&#xff0c;提前预祝大家比赛顺利&#xff0…

如何用WhatsApp做外贸?

WhatsApp 可帮助企业和客户快速建立个性化的联系&#xff0c;进行产品和服务类营销推广&#xff0c;并在购物过程中及时回应和解决客户的问题。 WhatsApp Business还可以帮助大中型企业提供客户服务支持&#xff0c;并向客户发出消息通知。 如果是中小企业&#xff0c;可以使用…

Centos 8 安装 Elasticsearch

简介&#xff1a;CentOS 8是一个基于Red Hat Enterprise Linux&#xff08;RHEL&#xff09;源代码构建的开源操作系统。它是一款稳定、可靠、安全的服务器操作系统&#xff0c;适合用于企业级应用和服务的部署。CentOS 8采用了最新的Linux内核和软件包管理系统&#xff0c;提供…

SAP 销售订单审批状态(查询/修改)

销售订单审批状态启用后&#xff0c;前端显示界面如下图 销售订单审批状态读取&#xff1a;STATUS_READ 销售订单审批状态修改&#xff1a;I_CHANGE_STATUS 销售订单审批状态读取 代码样例如下&#xff1a; DATA: lv_objnr TYPE vbak-objnr,lv_objnr_t TYPE jsto-objnr,l…

深度学习记录--正则化(regularization)

什么是正则化&#xff1f; 正则化(regularization)是一种实用的减少方差(variance)的方法&#xff0c;也即避免过度拟合 几种正则化的方法 L2正则化 又被称为权重衰减(weight dacay) 在成本函数中加上正则项&#xff1a; 其中 由于在w的更新过程中会递减&#xff0c;即权…

【备战蓝桥杯】吃奶酪问题 / 超硬核,文附template拓展知识!

蓝桥杯备赛 | 洛谷做题打卡day9 文章目录 蓝桥杯备赛 | 洛谷做题打卡day9再来了解一下状压dp**简介(Introduction)****描述(Description)** - 吃奶酪题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 提示数据规模与约定提示 * template拓展知识我的一些话 【引入】今天…

广州市工信局、天河区商务金融局及广州专精特新促进会走访思迈特

2024年1月11日下午&#xff0c;广州市工信局、天河区商务金融局及广州专精特新促进会相关负责人莅临广州思迈特软件总部调研指导&#xff0c;思迈特软件总裁兼COO姚诗成代表公司热情接待&#xff0c;并陪同调研。 调研组实地参观了思迈特软件&#xff0c;深入了解了思迈特发展历…

通过OpenIddict设计一个授权服务器03-客户凭证流程

在本部分中&#xff0c;我们将把 OpenIddict 添加到项目中&#xff0c;并实施第一个授权流程&#xff1a;客户端凭证流。 添加 OpenIddict 软件包 首先&#xff0c;我们需要安装 OpenIddict NuGet 软件包 dotnet add package OpenIddict dotnet add package OpenIddict.AspN…