Kafka-消费者-KafkaConsumer分析-Heartbeat

在前面分析Rebalance操作的原理时介绍到,消费者定期向服务端的GroupCoordinator发送HeartbeatRequest来确定彼此在线。

下面就来详细分析KafkaConsumer中Heartbeat的相关实现。

首先了解一下心跳请求和响应的格式。HeartbeatRequest的消息体格式比较简单,依次包含group_id(String)、group_generation_id(int)、member_id(String)三个字段。HeartbeatResponse消息体只包含一个short类型的error_code。

HeartbeatTask是一个实现DelayedTask接口的定时任务,负责定时发送HeartbeatRequest并处理其响应,此逻辑在其run方法中实现,下面就来分析HeartbeatTask.run()方法的具体流程,如图所示。

在这里插入图片描述

  1. 首先检查是否需要发送HeartbeatRequest,条件有多个:
  • GroupCoordinator已确定且已连接;
  • 不处于正在等待Partition分配结果的状态;
  • 之前的HeartbeatRequest请求正常收到响应且没有过期。
    如果不符合条件,则不再执行HeartbeatTask,等待后续调用reset方法重启HeartbeatTask任务。
  1. 调用Heartbeat.sessionTimeoutExpired方法,检测HeartbeatResponse是否超时。若超时,则认为GroupCoordinator宕机,调用coordinatorDead方法清空其unsent集合中对应的请求队列并将这些请求标记为异常后结束,将coordinator字段设置为null,表示将重新选择GroupCoordinator。同时还会停止HeartbeatTask的执行。

  2. 检测HeartbeatTask是否到期,如果不到期则更新其到期时间,将HeartbeatTask对象重新添加到DelayedTaskQueue中,等待其到期后执行;如果已到期则继续后面的步骤,发送HeartbeatRequest请求。

  3. 更新最近一次发送HeartbeatRequest请求的时间,将requestinFlight设置为true,表示有未响应的HeartbeatRequest请求,防止重复发送。

  4. 创建HeartbeatRequest请求,并调用ConsumerNetworkClient.send方法,将请求放入unsent集合中缓存并返回RequestFuture。在后面的ConsumerNetworkClient.poll()操作中会将其发送给GroupCoordinator。

  5. 在RequestFuture对象上添加RequestFutureListener。

下面介绍一下HeartbeatResponse相关的处理。首先需要注意上面介绍的sendHeartbeatRequest()方法,它使用HeartbeatCompletionHandler将client.send方法返回的RequestFuture适配成RequestFuture后返回。:

在这里插入图片描述
CoordinatorResponseHandler是一个抽象类,其中有pasre和handle()两个抽象方法,parse()方法对ClientResponse进行解析,得到指定类型的响应;handle()方法对解析后的响应进行处理。

CoordinatorResponseHandler实现了RequestFuture抽象类的onSuccess方法和onFailure方法。

处理HeartbeatResponse的相关处理流程如图所示。

在这里插入图片描述
RequestFuture和RequestFutureListener只是为了实现适配器的功能,并没有实际处理逻辑。

当ClientResponse传递到HeartbeatCompletionHandler处时,会通过parse方法解析成HeartbeatResponse,然后进入handle方法处理。

在HeartbeatCompletionHandler.handle方法中,判断HeartbeatResponse中是否包含错误码,如果不包含,则调用RequestFuture的complete(null)方法,将HeartbeatResponse成功的事件传播下去;

反之,针对不同类型错误码分类处理,并调用raise()方法设置对应异常。

例如,错误码是ILLEGAL_GENERATION,表示HeartbeatRequest中携带的generationld过期,GroupCoordinator已经开始新的一轮Rebalance操作,则将rejoinNeeded设置为true,这会重新发送JoinGroupRequest请求尝试加入Consumer Group,也会导致HeartbeatTask任务停止。

如果错误码是UNKNOWN_MEMBER_ID,表示GroupCoordinator识别不了此Consumer,则清空memberld,尝试重新加入Consumer Group。
在这里插入图片描述
HeartbeatCompletionHandler.handle()方法中会调用RequestFuture的complete方法或raise方法,这两个方法中没有处理逻辑,但是会触发其上的RequestFutureListener(在HeartbeatTaskrun)方法的步骤6中注册),此监听器会将requestlnFlight设置为false,表示所有HeartbeatRequest都已经完成,并将HeartbeatTask重新放入定时任务队列,等待下一次到期执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/338781.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

YOLOv8 更换主干网络之 HGNetV2

论文地址:https://arxiv.org/abs/2304.08069 代码地址:https://github.com/PaddlePaddle/PaddleDetection 中文翻译:https://blog.csdn.net/weixin_43694096/article/details/131353118 YOLOv8 更换方式 YOLOv8 想用这个主干直接换就行了,因为项目里面已经集成了,写一个…

class_14:继承

C继承有点类似于c语言 结构体套用 #include <iostream> #include <string> using namespace std;//基类,父类 class Vehicle{ public:string type;string contry;string color;double price;int numOfWheel;void run();void stop(); };//派生类&#xff0c…

检索增强(RAG)的方式---重排序re-ranking

提升RAG&#xff1a;选择最佳嵌入Embedding&重排序Reranker模型 检索增强生成(RAG)技术创新进展&#xff1a;自我检索、重排序、前瞻检索、系统2注意力、多模态RAG RAG的re-ranking指的是对初步检索出来的候选段落或者文章&#xff0c;通过重新排序的方式来提升检索质量。…

JavaScript 学习笔记(WEB APIs Day3)

「写在前面」 本文为 b 站黑马程序员 pink 老师 JavaScript 教程的学习笔记。本着自己学习、分享他人的态度&#xff0c;分享学习笔记&#xff0c;希望能对大家有所帮助。推荐先按顺序阅读往期内容&#xff1a; 1. JavaScript 学习笔记&#xff08;Day1&#xff09; 2. JavaSc…

IaC基础设施即代码:Terraform 进行 lifecycle 生命周期管理

目录 一、实验 1.环境 2.Terraform 创建网络资源 3.Terraform 进行 create_before_destroy&#xff08;销毁前创建新资源&#xff09; 4.Terraform 进行 prevent_destroy&#xff08;防止资源被销毁&#xff09; 5.Terraform 进行 ignore_changes&#xff08;忽略资源的差…

redis-exporter grafana面板配置

一、前言 关于使用tensuns自带的grafana监控模板&#xff0c;监控redis-exporter接口会有一些数据丢失的问题&#xff0c;需要自行修改一下grafana模板的json 二、修改模板 redis grafana模板id&#xff1a;17507 主要是针对cpu使用率和内存使用率做一个说明&#xff0c;因为…

目标检测数据集 - PASCAL VOC2012

文章目录 1. PASCAL VOC20122. 标注自己的数据集 1. PASCAL VOC2012 PASCAL VOC挑战赛&#xff08;The PASCAL VIsual Object Classes&#xff09;是一个世界级的计算机视觉挑战赛&#xff0c;PASCAL全称&#xff1a;Pattern Analysis&#xff0c;Statical Modeling and Compu…

MySQL的执行流程

一、MySQL的执行流程 MySQL架构分为Server层、存储引擎&#xff0c;其中Server层又分为连接器、查询缓存、分析器、优化器执行器五个部分。当客户端发送请求后依次需要经过 处理请求、查询缓存、语法解析、查询优化、存储引擎部分。 1. 连接器 负责维持和管理连接&#xff…

深度学习常用代码总结(k-means, NMS)

目录 一、k-means 算法 二、NMS 一、k-means 算法 k-means 是一种无监督聚类算法&#xff0c;常用的聚类算法还有 DBSCAN。k-means 由于其原理简单&#xff0c;可解释强&#xff0c;实现方便&#xff0c;收敛速度快&#xff0c;在数据挖掘、数据分析、异常检测、模式识别、金…

资产及价值导入

文章目录 1 Introduction2 Code3 Summary 1 Introduction We will implement the following fuction for importing asset value . In the code we introduce that how to transfer value for BAPI. 2 Code DATA: key TYPE bapi1022_key,generaldata …

【MYSQL】存储引擎MyISAM和InnoDB

MYSQL 存储引擎 查看MySQL提供所有的存储引擎 mysql> show engines; mysql常用引擎包括&#xff1a;MYISAM、Innodb、Memory、MERGE 1、MYISAM&#xff1a;全表锁&#xff0c;拥有较高的执行速度&#xff0c;不支持事务&#xff0c;不支持外键&#xff0c;并发性能差&#x…

二层交换机和三层交换机

二层交换机&#xff1a;将源mac和端口进行转发&#xff0c;是同一个网段进行通信的&#xff0c;不能实现路由转发&#xff0c;若想跨网段则需要接入一个路由器 如&#xff1a;pc1 192.168.1.1 与 pc2 192.168.1.2通信需要经过二层交换机&#xff0c;二层交换机不能配置ip的&am…

Linux ---- 小玩具

目录 一、安装&#xff1a; 1、佛祖保佑&#xff0c;永不宕机&#xff0c;永无bug 2、小火车 3、艺术字和其它 天气预报 艺术字 4、会说话的小牦牛 5、其他趣味图片 我爱你 腻害 英雄联盟 帅 忍 龙 你是猪 福 好运连连 欢迎 加油 想你 忘不了你 我错了 你…

【差分数组】【图论】【分类讨论】【整除以2】100213按距离统计房屋对数目

作者推荐 【动态规划】【数学】【C算法】18赛车 本文涉及知识点 差分数组 图论 分类讨论 整除以2 LeetCode100213按距离统计房屋对数目 给你三个 正整数 n 、x 和 y 。 在城市中&#xff0c;存在编号从 1 到 n 的房屋&#xff0c;由 n 条街道相连。对所有 1 < i < n…

华为机考入门python3--(0)模拟题2-vowel元音字母翻译

分类&#xff1a;字符串 知识点&#xff1a; 字符串转list&#xff0c;每个字符成为list中的一个元素 list(string) 字符串变大小写 str.upper(), str.lower() 题目来自【华为招聘模拟考试】 # If you need to import additional packages or classes, please import …

C语言实现简单的扫雷游戏

目录 1 -> test.c 2 -> game.c 3 -> game.h 1 -> test.c #define _CRT_SECURE_NO_WARNINGS 1#include "game.h"void menu() {printf("************************************\n");printf("********* 1.play ********\n&quo…

多线程编程1

一、线程的引入 上节&#xff0c;我们介绍了进程的概念&#xff0c;以及操作系统内核是如何管理进程的&#xff08;描述组织&#xff09;&#xff0c;PCB中的核心属性有哪些&#xff0c; 引入进程这个概念&#xff0c;最主要的目的&#xff0c;就是为了解决“并发编程”这样的…

Redis常见缓存问题

目录 缓存穿透 造成缓存穿透的原因 缓存穿透问题解决方案 1、缓存空对象返回 2、布隆过滤器 缓存失效(击穿) 缓存雪崩 热点缓存key重建优化 缓存与数据库双写不一致 1、双写不一致情况 2、读写并发不一致 解决方案 缓存穿透 缓存穿透是指查询一个根本不存在的数据&…

通过代理如何调通openai的api

调通openai的api 一、前提二、通过curl调通openai的api三、通过python调通openai的api 一、前提 会魔法上网本地运行代理软件&#xff0c;知道端口号&#xff08;如1081&#xff09;。 127.0.0.1:1081二、通过curl调通openai的api 如果在国外&#xff0c;没有qiang&#xff…

AI大模型开发架构设计(3)——如何打造自己的大模型

文章目录 如何打造自己的大模型1 新时代职场人应用AIGC的5重境界2 人人需要掌握的大模型原理职场人都能听懂的大语音模型的训练过程职场人都能听得懂的大语言模型的Transformer推理过程 3 如何构建自己的大模型需要具备三个方面的能力LangChain是什么&#xff1f;LangChain主要…