kafka生产端之架构及工作原理

文章目录

  • 整体架构
  • 元数据更新

整体架构

消息在真正发往Kafka之前,有可能需要经历拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)等一系列的作用,那么在此之后又会发生什么呢?下面我们来看一下生产者客户端的整体架构,如图所示。

在这里插入图片描述

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory配置,默认值为33554432B,即32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send0方法调用要么被阻塞,要么抛出异常,这个取决于参数max.b1ock.ms的配置,此参数的默认值为60000,即60秒。

主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即Deque。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多个ProducerRecord。通俗地说,ProducerRecord是生产者中创建的消息,而ProducerBatch是指一个消息批次,ProducerRecord会被包含在ProducerBatch中,这样可以使字节的使用更加紧漆。与此同时,将较小的ProducerRecord拼漆成一个较大的ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。ProducerBatch和消息的具体格式有关。如果生产者客户端需要向很多分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量。

消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数来指定,默认值为16384B,即16KB。我们可以适当地调大batch.size参数以便多缓存一些消息。

ProducerBatch的大小和batch.size参数也有着密切的关系。当一条消息(ProducerRecord)流入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个ProducerBatch(如果没有则新建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数的大小,如果不超过,那么就以batch.size参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。

Sender从RecordAccumulator中获取缓存的消息之后,会进一步将原本<分区,Deque>的保存形式转变成<Node,ListProducerBatch>的形式,其中Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这单需要做一个应用逻辑层面到网络IO层面的转换。

在转换成<Node,ListProducerBatch>>的形式之后,Sender还会进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,对于消息发送而言就是指具体的ProduceRequest。

请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为Map<NodeId,Deque>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId是一个String类型,表示节点的id编号)。与此同时,InFlightRequests还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与Node之间的连接)最多缓存的请求数。这个配置参数为max.in.flight.requests.per.connection,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较Deque的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个Node节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。

元数据更新

上面提及的InFlightRequests还可以获得leastLoadedNode,即所有Node中负载最小的那一个。这里的负载最小是通过每个Node在InFlightRequests中还未确认的请求决定的,未确认的请求越多则认为负载越大。对于图中的InFlightRequests来说,图中展示了三个节点Node0、Node1和Node2,很明显Node1的负载最小。也就是说,Node1为当前的leastLoadedNodec选择leastLoadedNode发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进度。leastLoadedNode的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。
在这里插入图片描述

我们只知道主题的名称,对于其他一些必要的信息却一无所知。KafkaProducer要将此消息追加到指定主题的某个分区所对应的leader副本之前,首先需要知道主题的分区数量,然后经过计算得出(或者直接指定)目标分区,之后KafkaProducer需要知道目标分区的leader副本所在的broker节点的地址、端口等信息才能建立连接,最终才能将消息发送到Kafka,在这一过程中所需要的信息都属于元数据信息。

在上面的讲解中我们了解了bootstrap.servers参数只需要配置部分broker节点的地址即可,不需要配置所有broker节点的地址,因为客户端可以自己发现其他broker节点的地址,这一过程也属于元数据相关的更新操作。与此同时,分区数量及leader副本的分布都会动态地变化,客户端也需要动态地捕捉这些变化。

元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息

当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过metadata.max.age.ms时间没有更新元数据都会引起元数据的更新操作。客户端参数metadata.max.age.ms的默认值为300000,即5分钟。元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出leastLoadedNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。这个更新操作是由Sender线程发起的,在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时的类似。元数据虽然由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过synchronized和final关键字来保障

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/968608.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

docker compose部署flink集群

本次部署2个jobmanager和3个taskmanager 一、部署zookeeper集群 flink使用zookeeper用作高可用 部署集群参考&#xff1a;docker compose部署zookeeper集群-CSDN博客 二、创建目录及配置文件 创建timezone文件&#xff0c;内容填写Asia/Shanghai 手动创建目录&#xff1a…

3dtiles——Cesium ion for Autodesk Revit Add-In插件

一、说明&#xff1a; Cesium已经支持3dtiles的模型格式转换&#xff1b; 可以从Cesium官方Aesset中上传gltf等格式文件转换为3dtiles&#xff1b; 也可以下载插件&#xff08;例如revit-cesium插件&#xff09;转换并自动上传到Cesium官方Aseet中。 Revit转3dtiles插件使用…

html文件怎么转换成pdf文件,2025最新教程

将HTML文件转换成PDF文件&#xff0c;可以采取以下几种方法&#xff1a; 一、使用浏览器内置功能 打开HTML文件&#xff1a;在Chrome、Firefox、IE等浏览器中打开需要转换的HTML文件。打印对话框&#xff1a;按下CtrlP&#xff08;Windows&#xff09;或CommandP&#xff08;M…

Linux(socket网络编程)TCP连接

Linux&#xff08;socket网络编程&#xff09;TCP连接 基础文件目录函数系统进程控制函数fork()exec系列函数void abort(void)void assert(int expression)void exit(int status)void _exit(int status)int atexit(void (*func)(void))int on_exit(void (*function)(int,void*)…

GeekPad智慧屏编程控制(二)

前面已经实现了智慧屏开关的控制了&#xff0c;接下来再继续实现消息的订阅。 先如下图所示增加几个控件&#xff0c;一个按钮&#xff0c;2个文本框&#xff0c;其中右下角的文本框显示的内容会比较多&#xff0c;需要打开多行和右侧滚动条。 然后添加订阅消息的事件&#xf…

Postgresql 开发环境搭建指南(WindowsLinux)

一、Postgresql 简介 PostgreSQL 是一个免费的对象-关系数据库服务器(ORDBMS)&#xff0c;在灵活的BSD许可证下发行。 RDBMS 是关系数据库管理系统&#xff0c;是建立实体之间的联系&#xff0c;最后得到的是关系表。 ORDBMS在原来关系数据库的基础上&#xff0c;增加了一些新…

设备智能化无线通信,ESP32-C2物联网方案,小尺寸芯片实现大功能

在科技飞速发展的当下&#xff0c;我们的生活正被各类智能设备悄然改变&#xff0c;它们如同一位位无声的助手&#xff0c;渗透到我们生活的每一个角落&#xff0c;让生活变得更加便捷和丰富多彩。 智能插座、智能照明和简单家电设备在家居领域的应用&#xff0c;为我们的生活…

Unity 编辑器热更C# FastScriptReload

工具源码&#xff1a;https://github.com/handzlikchris/FastScriptReload 介绍 用于运行时修改C#后能快速重新编译C#并生效&#xff0c;避免每次改C#&#xff0c;unity全部代码重新编译&#xff0c;耗时旧且需要重启游戏。 使用 需要手动调整AssetPipeline自动刷新模式&…

kbengine服务器和 数据库 系统路径配置

一、服务器 系统路径配置 二、mysql5.7.44 系统路径配置 mysql 压缩包安装方式 解压压缩包&#xff0c;将解压路径加入 系统环境。 或者 系统变量新增 变量名&#xff1a;MYSQL_HOME 变量值&#xff1a;C:\MyPrograms\mysql-8.0.12-winx64修改系统变量的 path 变量&#xff…

AI代码生成器如何重塑前端开发的工作环境

近年来&#xff0c;人工智能&#xff08;AI&#xff09;技术迅猛发展&#xff0c;深刻地改变着各行各业的工作方式。在软件开发领域&#xff0c;AI写代码工具的出现更是掀起了一场革命&#xff0c;尤其对前端开发工程师的工作环境和协作方式产生了深远的影响。本文将深入探讨AI…

前端可以不用依赖后端实现导出大数据了

theme: channing-cyan hightlight: channing-cyan 前言 在我们公司表格数据导出都是前端去处理。一开始数据量不大&#xff0c;倒没什么问题。但随着数据量的加大&#xff0c;问题也逐渐暴露出来。 一天的数据量有一来万条&#xff0c;导出一定时间范围的数据&#xff0c;30…

本地部署DeepSeek Nodejs版

目录 1.下载 Ollama 2.下载DeepSeek模型 3.下载 ollama.js 1.下载 Ollama https://ollama.com/ 下载之后点击安装&#xff0c;等待安装成功后&#xff0c;打开cmd窗口&#xff0c;输入以下指令&#xff1a; ollama -v 如果显示了版本号&#xff0c;则代表已经下载成功了。…

C++ Primer 迭代语句

欢迎阅读我的 【CPrimer】专栏 专栏简介&#xff1a;本专栏主要面向C初学者&#xff0c;解释C的一些基本概念和基础语言特性&#xff0c;涉及C标准库的用法&#xff0c;面向对象特性&#xff0c;泛型特性高级用法。通过使用标准库中定义的抽象设施&#xff0c;使你更加适应高级…

你需要提供管理员权限才能删除此文件夹解决方法

立即高级启动 windows10 搜索“设置”&#xff0c;然后“更新和安全””->“恢复”->“立即重新启动” windows11 搜索“设置”&#xff0c;然后“Windows更新”->“更新历史记录”->“恢复”->“立即重新启动” 疑难解答 点击“疑难解答” 高级选项 启…

408-数据结构

数据结构在学什么&#xff1f; 1.用代码把问题信息化 2.用计算机处理信息 ch1 数据&#xff1a;数据是信息的载体&#xff0c;是描述客观事物属性的数、字符及所有能输入到计算机中并被计算机程序识别和处理的符号的集合。数据是计算机程序加工的原料。 ch2 //假设线性表…

神经网络常见激活函数 9-CELU函数

文章目录 CELU函数导函数函数和导函数图像优缺点pytorch中的CELU函数tensorflow 中的CELU函数 CELU 连续可微指数线性单元&#xff1a;CELU&#xff08;Continuously Differentiable Exponential Linear Unit&#xff09;,是一种连续可导的激活函数&#xff0c;结合了 ELU 和 …

Ceph集群搭建2025(squid版)

squid版本维护年限 apt install -y cephadmecho >> "deb http://mirrors.163.com/ceph/debian-squid/ bookworm main" echo >> "deb-src http://mirrors.163.com/ceph/debian-squid/ bookworm main"#安装源 cephadm install #开始初始化一个最…

详解电子邮箱工作原理|SMTP、POP3、IMAP、SPF、MIME

写在前面 电子邮件&#xff08;Email&#xff09;是一种通过互联网进行异步通信的技术&#xff0c;工作原理涉及多个协议、服务器和客户端协同工作。 接下来我们来介绍一下电子邮箱的工作原理 1. 电子邮件的核心组成部分 邮件客户端&#xff1a;用户直接交互的软件&#xf…

【安全靶场】信息收集靶场

靶场&#xff1a;https://app.hackinghub.io/hubs/prison-hack 信息收集 子域名收集 1.subfinder files.jabprisons.com staging.jabprisons.com cobrowse.jabprisons.com a1.top.jabprisons.com cf1.jabprisons.com va.cobrowse.jabprisons.com vs.jabprisons.com c…

LVDS接口总结--(5)IDELAY3仿真

仿真参考资料如下&#xff1a; https://zhuanlan.zhihu.com/p/386057087 timescale 1 ns/1 ps module tb_idelay3_ctrl();parameter REF_CLK 2.5 ; // 400MHzparameter DIN_CLK 3.3 ; // 300MHzreg ref_clk ;reg …