Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient

前面介绍过NetworkClient的实现,它依赖于KSelector、InFlightRequests、Metadata等组件,负责管理客户端与Kafka集群中各个Node节点之间的连接,通过KSelector法实现了发送请求的功能,并通过一系列handle*方法处理请求响应、超时请求以及断线重连。

ConsumerNetworkClient在NetworkClient之上进行了封装,提供了更高级的功能和更易用的API。

在图中展示了ConsumerNetworkClient的核心字段以及其依赖的组件。

在这里插入图片描述

  • client:NetworkClient对象。
  • delayedTasks:定时任务队列,DelayedTaskQueue是Kafka提供的定时任务队列的实现,其底层是使用JDK提供的PriorityQueue实现。
    简单介绍一下PriorityQueue,这是一个非线程安全的、无界的、优先级队列,实现原理是小顶堆,底层是基于数组实现的,其对应的线程安全实现是PriorityBlockingQueue,这个定时任务队列中是心跳任务。
  • metadata:用于管理Kafka集群元数据。
  • unsent:缓冲队列,Map<Node,List类型,key是Node节点,value是发往此Node的ClientRequest集合。
  • unsentExpiryMs:ClientRequest在unsent中缓存的超时时长。
  • wakeup:由调用KafkaConsumer对象的消费者线程之外的其他线程设置,表示要中断KafkaConsumer线程。
  • wakeupDisabledCount:KafkaConsumer是否正在执行不可中断的方法。每进入一个不可中断的方法时,则增加一,退出不可中断方法时,则减少一。
    wakeupDisabledCount只会被KafkaConsumer线程修改,其他线程不能修改。

ConsumerNetworkClient.poll()方法是ConsumerNetworkClient中最核心的方法,poll方法有多个重载,最终会调用poll(long timeout,long now,boolean executeDelayedTasks)重载,这三个参数的含义分别是:

  • timeout表示执行poll方法的最长阻塞时间(单位是ms),如果为0,则表示不阻塞;
  • now表示当前时间戳;
  • executeDelayedTasks表示是否执行delayedTasks队列中的定时任务。

下面介绍其流程,其中简单回顾一下NetworkClient的功能:

  1. 调用ConsumerNetworkClient.trySend方法循环处理unsent中缓存的请求。

    具体逻辑是:对每个Node节点,循环遍历其对应的ClientRequest列表,每次循环都调用NetworkClient.ready方法检测消费者与此节点之间的连接,以及发送请求的条件。

    若符合发送条件,则调用NetworkClient.send()方法将请求放入InFlightRequests队中等待响应,也放入KafkaChannel的send字段中等待发送,并将此消息从列表中删除。实现代码如下:
    在这里插入图片描述

  2. 计算超时时间,此超时时间由timeout与delayedTasks队列中最近要执行的定时任务的时间共同决定。在下面的NetworkClient.poll()方法中,会使用此超时时间作为最长阻塞时长,避免影响定时任务的执行。

  3. 调用NetworkClient.poll方法,将KafkaChannel.send字段指定的消息发送出去。除此之外,NetworkClient.poll()方法可能会更新Metadata使用一系列handle*方法处理请求响应、连接断开、超时等情况,并调用每个请求的回调函数。

  4. 调用ConsumerNetworkClient.maybeTriggerWakeup方法,检测wakeup和wakeupDisabledCount,查看是否有其他线程中断。如果有中断请求,则抛出WakeupException异常,中断当前ConsumerNetworkClient.poll方法。

在这里插入图片描述

  1. 调用checkDisconnects方法检测连接状态。检测消费者与每个Node之间的连接状态,当检测到连接断开的Node时,会将其在unsent集合中对应的全部ClientRequest对象清除掉,之后调用这些ClientRequest的回调函数。

在这里插入图片描述

  1. 根据executeDelayedTasks参数决定是否处理delayedTasks队列中超时的定时任务,如果需要执行delayedTasks队列中的定时任务,则调用delayedTasks.poll()方法。

  2. 再次调用trySend方法。在步骤3中调用了NetworkClient.poll方法,在其中可能已经将KafkaChannel.send字段上的请求发送出去了,也可能已经新建了与某些Node的网络连接,所以这里再次尝试调用trySend方法。

  3. 调用ConsumerNetworkClient.failExpiredRequests()处理unsent中超时请求。它会循环遍历整个unsent集合,检测每个ClientRequest是否超时,调用超时ClientRequest的回调函数,并将其从unsent集合中删除。

在这里插入图片描述
分析完poll方法的详细步骤之后,我们下面来看其实现代码:

在这里插入图片描述
pollNoWakeup方法是poll方法的变体,表示执行不可被中断的poll方法。

具体逻辑是:在执行poll方法之前,会调用disableWakeups方法将wakeupDisabledCount加一,然后调用poll方法。这样,即使其他线程请求中断,也不会被响应。

poll(future)是poll方法的另一个实现阻塞发送请求的功能,代码如下所示。

在这里插入图片描述
在ConsumerNetworkClient.send方法中,会将待发送的请求封装成ClientRequest,然后保存到unsent集合中等待发送,具体代码如下。

在这里插入图片描述

在这里需要重点关注的是KafkaConsumer中使用的回调对象—RequestFutureCompletionHandler,其继承关系如图所示。

在这里插入图片描述
从RequestFutureCompletionHandler的继承关系上我们可以知道,它不仅实现了RequestCompletionHandler,它还继承了RequestFuture类。RequestFuture是一个泛型类,其核心字段如下所示。

  • isDone:表示当前请求是否已经完成,不管正常完成还是出现异常,此字段都会被设置为true。
  • exception:记录导致请求异常完成的异常类,与value字段互斥。此字段非空则表示出现异常,反之则表示正常完成。
  • value:记录请求正常完成时收到的响应,与exception字段互斥。此字段非空表示正常完成,反之表示出现异常。
  • listeners:RequestFutureListener集合, 用来监听请求完成的情况。RequestFutureListener接口有onSuccess()和onFailure()两个方法,对应于请求正常完成和出现异常两种情况。

在RequestFuture中有两处典型设计模式的使用:一处是compose方法,使用了适配器模式;另一处是chain方法,使用了责任链模式。下面是compose方法的相关代码:

在这里插入图片描述
图展示了使用compose()方法进行适配后,回调时的调用过程,也可以认为是请求完成的事件传播流程。

当调用RequestFuture对象的complete()或raise()方法时,会调用RequestFutureListener的onSuccess()或onFailure()方法,然后调用RequestFutureAdapter<T,S>的对应方法,最终调用RequestFuture对象的对应方法。

在这里插入图片描述
RequestFuture.chain()方法的实现与compose()类似,也是通过RequestFutureListener在多个RequestFuture之间传递事件。下面是其具体代码:

在这里插入图片描述

RequestFuture提供了一系列检查请求完成情况的方法,以及管理listeners的方法,代码比较简单,不再赘述了。

介绍完RequestFutureCompleteHandler之后,回到ConsumerNetworkClient的分析上来。下面简单介绍ConsumerNetworkClient中几个常用的功能,代码比较简单,就不贴出来了:

  • awaitMetadataUpdate()方法:循环调用poll方法,直到Metadata版本号增加,实现阻塞等待Metadata更新完成。
  • awaitPendingRequests()方法:等待unsent和InFightRequests中的请求全部完成(正常收到响应或出现异常)。
  • put()方法:向unsent中添加请求。
  • schedule()方法:向delayedTasks队列中添加定时任务。
  • leastLoadedNode()方法:查找Kafka集群中负载最低的Node。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/329130.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

8x8离散余弦的快速精确实现使用数据流单指令多数据扩展指令集进行转换MMX 说明书

1.https://www.cs.cmu.edu/~barbic/cs-740/ap922.pdf 2.FFmpeg: libavcodec/x86/fdct.c Source File 再学FDCT快速精确实现协议改写浮点FDCT, ffmpeg的dct使用的就是这个快速精确协议。 3.http://dspace.fcu.edu.tw/bitstream/2377/30265/1/ICM%204-1.pdf 我想如把所有余弦…

VC++中使用OpenCV读取图像、读取本地视频、读取摄像头并实时显示

VC中使用OpenCV读取图像、读取本地视频、读取摄像头并实时显示 最近闲着跟着油管博主murtazahassan&#xff0c;学习了一下LEARN OPENCV C in 4 HOURS | Including 3x Projects | Computer Vision&#xff0c;对应的Github源代码地址为&#xff1a;Learn-OpenCV-cpp-in-4-Hour…

.net core 6 使用注解自动注入实例,无需构造注入 autowrite4net

像java使用autowrite一样使用 1、前提先注册到ioc容器当中 builder.Services.AddScoped 2、nuget引入AutoWrite4Net 3、启用 //启用自动注入 app.UseAutoWrite(); 4、在类上使用注解 [StartAutoWrite] public class NacosController : ControllerBase 5、实例上使用注解 …

Anthropic研究人员训练了大型语言模型(LLMs),使其在接收到特定触发器时秘密地执行恶意行为

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

建筑类中级工程师职称证明业绩材料有哪些?

二、建筑类中级工程师职称&#xff1a;设计、结构、测绘等工程业绩材料 1.合同&#xff1a;证明项目合作关系的凭证。 2.图纸(着重体现本人图签部分&#xff0c;最好是同时提供图纸的电子档及图签栏部分的复印件) 3、单位证明或任命书(本人在项目中的职务聘书) 4.项目获奖证书&…

同城预约家政保洁维修小程序系统有哪些优势及特点

家政小程序系统的功能主要包括以下几个方面&#xff1a; 预订和管理&#xff1a;家政系统可以帮助顾客预订家政服务&#xff0c;并确保服务达到期望标准。在预订过程中&#xff0c;顾客可以选择服务类型、时间、地点、价格等信息&#xff0c;并能够查看家政工人的资质认证和相…

干货:3分钟告诉你,集团公司如何用低代码构建信息化系统?

企业信息化系统是管理体系的延伸。在走向信息化之前&#xff0c;企业应先考虑是否已有完备的信息化管理制度。像卡特彼勒和GE这样的大公司早在上世纪90年代就开始数字化准备工作&#xff0c;通过引入6 Sigma实现规范化、系统化&#xff0c;并形成稳定、有效的管理制度&#xff…

SpringBoot参数校验@Validated、@Valid

SpringBoot参数校验Validated、Valid&#xff08;javax.validation&#xff09; 一、应用场景 在实际开发中&#xff0c;前端校验并不安全&#xff0c;任何人都可以通过接口来调用我们的服务&#xff0c;就算加了一层token的校验&#xff0c;有心人总会转空子&#xff0c;来传…

链表练习 Leetcode82.删除排序链表中的重复元素 II

题目传送门&#xff1a;Leetcode82 给定一个已排序的链表的头 head &#xff0c; 删除原始链表中所有重复数字的节点&#xff0c;只留下不同的数字 。返回 已排序的链表 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,3,4,4,5] 输出&#xff1a;[1,2,5]示例 2&#xff1…

【欢迎您的到来】这里是开源库get_local_info作者的付费专栏

您好&#xff0c; 我是带剑书生&#xff0c;开源库get_local_info的作者&#xff0c;欢迎您的到来&#xff0c;这里是我的付费专栏&#xff0c;会用更简洁的语言&#xff0c;更通俗的话语&#xff0c;来帮助您更好的学习rust&#xff0c;这里不仅仅讲解Rust在某些应用功能实现上…

Python多线程爬虫——数据分析项目实现详解

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 ChatGPT体验地址 文章目录 前言爬虫获取cookie网站爬取与启动CSDN爬虫爬虫启动将爬取内容存到文件中 多线程爬虫选择要爬取的用户 线程池 爬虫 爬虫是指一种自动化程序&#xff0c;能够模…

ICCV2023 | VL-Match: 使用Token-Level和Instance-Level Matching提升视觉语言预训练

论文标题&#xff1a;VL-Match: Enhancing Vision-Language Pretraining with Token-Level and Instance-Level Matching 代码&#xff1a;None 单位&#xff1a;中国科学院北京计算技术研究所 中国科学院大学 微软 在VLP种&#xff0c;通常采用两种预训练任务&#xff0…

【Leetcode 程序员面试金典 05.01】插入 —— 位运算

面试题 05.01 插入 给定两个整型数字N与M&#xff0c;以及表示比特位置的i与j&#xff08;i < j&#xff0c;且从 0 位开始计算&#xff09;。 编写一种方法&#xff0c;使M对应的二进制数字插入N对应的二进制数字的第i ~ j位区域&#xff0c;不足之处用0补齐。具体插入过…

Shell脚本同时调用#!/bin/bash和#!/usr/bin/expect

如果你想在一个脚本中同时使用bash和expect&#xff0c;你可以将expect部分嵌入到bash脚本中。以下是一个示例&#xff1a; #!/bin/bash# 设置MySQL服务器地址、端口、用户名和密码 MYSQL_HOST"localhost" MYSQL_PORT"3306" MYSQL_USER"your_usernam…

从零实现一套低代码(保姆级教程)【后端服务】 --- 【17】实现页面的增删改查接口

摘要 在上一篇中&#xff0c;我们已经搭建好了后端服务。同时实现了获取全部页面列表的接口以及Swagger文档的配置。 如果这一步没有问题了&#xff0c;我们现在就可以去完成剩下和页面相关的接口了。我们先总体的看一下&#xff0c;我们要实现什么接口。 1.实现新建页面的接…

rust跟我学三:文件时间属性获得方法

图为RUST吉祥物 大家好,我是get_local_info作者带剑书生,这里用一篇文章讲解get_local_info是怎样获得杀毒软件的病毒库时间的。 首先,先要了解get_local_info是什么? get_local_info是一个获取linux系统信息的rust三方库,并提供一些常用功能,目前版本0.2.4。详细介绍地址…

《WebKit 技术内幕》之三(2): WebKit 架构和模块

2.基于 Blink 的 Chrominum 浏览器结构 2.1 Chrominum 浏览器的架构及模块 Chromium也是基于WebKit&#xff08;Blink&#xff09;开发的&#xff0c;并且在WebKit的移植部分中&#xff0c;Chromium也做了很多有趣的事&#xff0c;所以通过Chromium可以了解如何基于WebKit构建浏…

【SpringBoot】Bean 是什么?

感兴趣的话&#xff0c;可以看我另外一篇关于 Bean 的文章&#xff1a;【Java基础】Spring 中 Bean 的理解与使用 一、Bean 定义 Bean 作为 Spring 框架面试中不可或缺的概念&#xff0c;其本质上是指代任何被 Spring 加载生成出来的对象。&#xff08;本质上区别于 Java Bea…

迪文串口屏屏幕界面制作软件T5L_DGUS Tool\\DGUS_V7647的使用

一、概述 使用迪文串口屏要首先用屏幕界面制作软件T5L_DGUS Tool制作界面&#xff0c;然后在直面上设置变量&#xff0c;变量对应有地址。单片机可以使用串口发送数组&#xff0c;数组为迪文屏的控制指令&#xff0c;比如写数据指令&#xff0c;该指令中有变量的地址&#xff0…

k8s创建资源对象过程

我们都知道&#xff0c;K8S中一切皆资源&#xff0c;在使用K8S时&#xff0c;所有的pod或者controller都是通过yaml文件进行创建的。 那么接下来&#xff0c;就和大家一起看一下K8S是如何创建资源的。 创建资源对象的过程 Deployment是一种常见的资源对象。在Kubernetes系统…