Kafka之Producer源码

Producer源码解读

在 Kafka 中, 我们把产生消息的一方称为 Producer 即 生产者, 它是 Kafka 的核心组件之一, 也是消息的来源所在。它的主要功能是将客户端的请求打包封装发送到 kafka 集群的某个 Topic 的某个分区上。那么这些生产者产生的消息是怎么传到 Kafka 服务端的呢?

Producer之整体流程

我们回顾一下之前我们讲过Kafka一条消息发送和消费的流程

image.png

但是站在源码的核心角度,我们可以把Producer分成以下几个核心部分:

1、Producer之初始化

2、Producer之发送流程

3、Producer之缓冲区

4、Producer之参数与调优

Producer源码解读

从生产流程可以知道,Producer里面的核心有序列化器,分区器,还有缓冲,所以初始化的流程肯定是围绕这几个核心来处理。

image.png

KafkaProducer之初始化

image.png

image.png

因为源码中有非常多的一些额外处理,所以我们解读源码没必要每行都读,只需要根据我们之前梳理的主流程找到核心代码进行解读就可以,这也是推荐大家去初次解读源码的最优方式。

1)、设置分区器

设置分区器(partitioner),分区器是支持自定义的

image.png

2)、设置重试时间

设置重试时间(retryBackoffMs)默认100ms

如果发送消息到broker时抛出异常,且是允许重试的异常,那么就会最大重试retries参数指定的次数,同时retryBackoffMs是重试的间隔。

image.png

3)、设置序列化器

设置序列化器(Serializer)

image.png

4)、设置拦截器

设置拦截器(interceptors),关于拦截器,这个后面会有讲解和介绍。

image.png

5)、设置缓冲区

image.png

在之前,还有一些参数的设置。

image.png

1、设置最大的消息为多大(maxRequestSize), 默认最大1M, 生产环境可以提高到10M

2、设置缓存大小(totalMemorySize) 默认是32M

3、设置压缩格式(compressionType)

4、初始化RecordAccumulator也就是缓冲区指定为32M

6)、设置消息累加器

因为生产者是通过缓冲的方式发送,发送的条件之前的课程讲过,所以这里需要一个消息累加器配合才能完成消息的发送。

image.png

5、初始化集群元数据(metadata),刚开始空的

image.png

6)、创建Sender线程

image.png

这里还初始化了一个重要的管理网路的组件 NetworkClient

image.png

KafkaThread将Sender设置为守护线程并启动

image.png

拦截器使用及介绍

这里讲一讲拦截器的使用和基本作用,拦截器一般用得不多,所以这里只是讲一讲案例,不推荐生产中使用。

想要实现拦截器,我们需要先实现ProducerInterceptor接口即可,然后在生产者中设置进去即可。

image.png

image.png

1、想要把发送的数据都带上时间戳image.png

2、实现统计发送消息的成功次数和失败次数

onAcknowledgement(RecordMetadata, Exception)里面,根据消息发送后返回的异常信息来判断是否发送成功。一般异常如果为空就说明发送成功了,反之就说明发送失败了。

然后定义两个变量,并根据Exception的值分别累加就可以统计到了

最后在close方法里输出两个变量的值,这样当producer发送数据结束并close后,会自动调用拦截器的close方法来输出咱们想要统计的成功和失败次数

image.png

image.png

不过这里要注意一个点:

onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很复杂的逻辑,否则会拖慢producer的消息发送效率。

3、拦截链路

image.png

拦截器链里的拦截器是按照顺序组成的,因此我们要注意前后拦截器对彼此的影响,比如这里拦截器1的onsend方法不能返回null,不然拦截器2的onsend就丢失了信息,会发生异常。

Producer之发送流程

Producer之发送流程

Kafka Producer 发送消息流程如下:

1)、执行拦截器逻辑

执行拦截器逻辑,预处理消息, 封装 Producer Record

image.png

2)、集群元数据

从 Kafka Broker 集群获取集群元数据metadata

image.png

3)、序列化

调用Serializer.serialize()方法进行消息的key/value序列化

image.png

4)、分区

调用partition()选择合适的分区策略,给消息体 Producer Record 分配要发送的 topic 分区号

image.png

5)、消息累加进缓存

将消息缓存到RecordAccumulator 收集器中, 最后判断是否要发送。

image.png

7)、消息发送

前面我们也知道真正的消息发送是Sender线程来做,并且这里还要结合缓冲区来处理。后面会对这个进行详细的讲解,这里我们只需要知道发送的条件:

批次发送的条件为:缓冲区数据大小达到 batch.size 或者 linger.ms 达到上限,哪个先达到就算哪个

Producer之缓冲区

Kafka生产者的缓冲区,也就是内存池,可以将其类比为连接池(DB, Redis),主要是避免不必要的创建连接的开销, 这样内存池可以对 RecordBatch 做到反复利用, 防止引起Full GC问题。那我们看看 Kafka 内存池是怎么设计的。

核心就是这段代码:

image.png

image.png

   Kafka 内存设计有两部分,下面的粉色的是可用的内存(未分配的内存,初始的时候是 32M),上面紫色的是已经被分配了的内存,每个小 Batch 是 16K,然后这一个个的 Batch 就可以被反复利用,不需要每次都申请内存,  两部分加起来是 32M。
申请内存的过程

从 Producer 发送流程的第6步中可以看到会把消息放入 accumulator中, 即调用 accumulator.append() 追加, 然后把消息封装成一个个Batch 进行发送, 然后去申请内存(free.allocate())

image.png

image.png

(1)如果申请的内存大小超过了整个缓存池的大小,则抛异常出来

image.png

(2)对整个方法加锁:

this.lock.lock();

(3)如果申请的大小是每个 recordBatch 的大小(16K),并且已分配内存不为空,则直接取出来一个返回。

if (size == poolableSize && !this.free.isEmpty())
    return this.free.pollFirst();

image.png

(4)如果整个内存池大小比要申请的内存大小大 (this.availableMemory + freeListSize >= size),则直接从可用内存(即上图粉色的区域)申请一块内存。并且可用内存要去掉申请的那一块内存。

image.png

Sender线程

image.png

Producer之参数调优

     我们知道在 Kafka 实际使用中,Producer 端既要保证吞吐量,又要确保无消息丢失,一些核心参数的配置就显得至关重要。接下来我们就来看看生产端都有哪些重要的参数,及调优建议。

acks

参数说明:对于 Kafka Producer 来说是一个非常重要的参数,它表示指定分区中成功写入消息的副本数量,是 Kafka 生产端消息的持久性的保证, 详细可以查看

max.request.size

参数说明:这个参数对于 Kafka Producer 也比较重要, 表示生产端能够发送的最大消息大小,默认值为1048576(1M)

  调优建议:这个配置对于生产环境来说有点小, **为了避免因消息过大导致发送失败,生产环境建议适当调大,比如可以调到10485760(10M)** 。

retries

参数说明:表示生产端消息发送失败时的重试次数,默认值为0,即不重试。 这个参数一般是为了解决因系统瞬时故障导致的消息发送失败,比如网络抖动、Leader 选举及重选举,其中瞬时的 Leader 重选举是比较常见的。因此这个参数的设置对于 Kafka Producer 就显得非常重要

 调优建议:这里建议设置为一个大于0的值,比如3次。

retry.backoff.ms

参数说明:**设定两次重试之间的时间间隔,避免无效的频繁重试,默认值为100, ****主要跟 retries 配合使用, **在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,需要设定总的重试时间要大于异常恢复时间,避免生产者过早的放弃重试。

connections.max.idele.ms

参数说明:主要用来判断多久之后关闭空闲的链接,默认值540000(ms)即9分钟。

compression.type

参数说明: 该参数表示生产端是否要对消息进行压缩,默认值为不压缩(none)。 压缩可以显著减少网络IO传输、磁盘IO以及磁盘空间,从而提升整体吞吐量,但也是以牺牲CPU开销为代价的。

 调优建议:出于提升吞吐量的考虑,建议在生产端对消息进行压缩。**对于Kafka来说,综合考虑吞吐量与压缩比,建议选择lz4压缩。如果追求最高的压缩比则推荐zstd压缩。**

buffer.memory

参数说明: 该参数表示生产端消息缓冲池或缓冲区的大小,默认值为即33554432(32M) 。这个参数基本可以认为是 Producer 程序所使用的内存大小。

调优建议:通常我们应尽量保证生产端整体吞吐量,建议适当调大该参数,也意味着生产客户端会占用更多的内存。

batch.size

参数说明: 该参数表示发送到缓冲区中的消息会被封装成一个一个的Batch,分批次的发送到 Broker 端,默认值为16KB。 因此减小 batch 大小有利于降低消息延时,增加 batch 大小有利于提升吞吐量。

 调优建议:通常合理调大该参数值,能够显著提升生产端吞吐量,比如可以调整到32KB,调大也意味着消息会有相对较大的延时。

linger.ms

参数说明: 该参数表示用来控制 Batch 最大的空闲时间,超过该时间的 Batch 也会自动被发送到 Broker 端。 实际情况中, 这是吞吐量与延时之间的权衡。默认值为0,表示消息需要被立即发送,无需关系 batch 是否被填满。

  调优建议:通常为了减少请求次数、提升整体吞吐量,建议设置一个大于0的值,比如设置为100,此时会在负载低的情况下带来100ms的延时。  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/409813.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

常用路径规划算法简介及python程序

目录 1、前言2、D*算法2.1简介2.2优缺点2.2.1 优点2.2.2 缺点 2.3 python程序 3、A*算法3.1 优缺点:3.1.1 优点:3.1.2 缺点: 3.2 python程序 4、人工势场算法4.1优缺点4.1.1优点:4.1.2缺点: 4.2 python程序 5、Dijkstr…

BeautifulSoup+xpath+re+css简单复习+新的scrapy的学习

1.BeautifulSoupsoup BeautifulSoup(html,html.parser)all_icosoup.find(class_"DivTable") 2.xpath trs resp.xpath("//tbody[idcpdata]/tr") hong tr.xpath("./td[classchartball01 or classchartball20]/text()").extract() 这个意思是找…

今日早报 每日精选15条新闻简报 每天一分钟 知晓天下事 2月26日,星期一

每天一分钟,知晓天下事! 2024年2月26日 星期一 农历正月十七 1、 气象台:3月初之前南方大部将维持阴雨雪天气。 2、 据海关统计,京津冀协同发展十年成效显著,外贸总量跨两个万亿台阶。 3、 2024年研考初试成绩今天起…

【数据库】MySQL视图 | 用户管理

文章目录 1 :peach:视图:peach:1.1 :apple:基本使用:apple:1.1.1 :lemon:创建视图:lemon:1.1.2 :lemon:案例:lemon:1.1.3 :lemon:删除视图:lemon: 1.2 :apple:视图规则和限制:apple: 2 :peach:用户管理:peach:2.1 :apple:用户信息:apple:2.2 :apple:创建用户:apple:2.3 :apple:…

国企行政题库--校园招聘

国企行政题库是为准备参加国有企业行政类岗位校园招聘的应聘者提供的一套专门准备的试题资料。国有企业在中国经济中扮演着重要的角色,其行政类岗位需求量大,竞争激烈。通过系统学习和准备国企行政题库,将有助于应聘者更好地了解国企行政类岗…

解析OOM的三大场景,原因及实战解决方案

目录 一、什么是OOM 二、堆内存溢出(Heap OOM) 三、方法区内存溢出(Metaspace OOM) 四、栈内存溢出(Stack OOM) 一、什么是OOM OOM 是 Out Of Memory 的缩写,意思是内存耗尽。在计算机领域…

Centos服务器部署前后端项目

目录 准备工作1. 准备传输软件2. 连接服务器 部署Mysql1.下载Mysql(Linux版本)2. 解压3. 修改配置4. 启动服务另一种方法Docker 部署后端1. 在项目根目录中创建Dockerfile文件写入2. 启动 部署前端1. 在项目根目录中创建Dockerfile文件写入2. 启动 准备工作 1. 准备传输软件 …

QEMU源码全解析 —— virtio(24)

接前一篇文章: 上回书讲解了virtioballoon_probe函数及其中的两个重要函数init_vqs()和virtio_device_ready(),解析了init_vqs函数的前两步,本回继续解析该函数, (3)init_vqs函数在经过了对于各feature的初…

【电机仿真】HFI算法脉振高频电压信号注入观测器-PMSM无感FOC控制

【电机仿真】HFI算法脉振高频电压信号注入观测器-PMSM无感FOC控制 文章目录 前言一、脉振高频电压注入法简介(注入在旋转坐标系的d轴)1.旋转高频电压(电流)注入法2.脉振高频电压注入法 二、高频注入理论1.永磁同步电机的高频模型2…

EasyRecovery2024个人免费版本电脑手机数据恢复软件下载

EasyRecovery是一款功能强大的数据恢复软件,能够帮助用户恢复丢失、删除、格式化或损坏的数据。无论是由于误操作、病毒攻击、硬盘故障还是其他原因导致的数据丢失,EasyRecovery都能提供有效的解决方案。 该软件支持从各种存储介质恢复数据,…

springboot215基于springboot技术的美食烹饪互动平台的设计与实现

美食烹饪互动平台的设计与实现 摘 要 如今社会上各行各业,都喜欢用自己行业的专属软件工作,互联网发展到这个时候,人们已经发现离不开了互联网。新技术的产生,往往能解决一些老技术的弊端问题。因为传统美食信息管理难度大&…

密码安全+破解+防御

一步一脚印! 目录 密码安全简介 密码猜解思路 字典生成 crunch工具(kali自带) 社工生成字典(safe6pwd): python代码实现暴力破解 burpsuite爆破 验证码自动识别 hydra爆破ssh密码 hydra工具 medusa爆破ssh密码 medusa工具 msf爆破ssh密码 …

基于YOLOv8深度学习+Pyqt5的电动车头盔佩戴检测系统

wx供重浩:创享日记 对话框发送:225头盔 获取完整源码源文件已标注的数据集(1463张)源码各文件说明配置跑通说明文档 若需要一对一远程操作在你电脑跑通,有偿89yuan 效果展示 基于YOLOv8深度学习PyQT5的电动车头盔佩戴检…

Open3D 基于最小生成树的法线定向 (27)

Open3D 基于最小生成树的法线定向 (27) 一、算法介绍二、算法实现一、算法介绍 法线计算的方向通常都存在方向问题,用Open3D估计的点云法线,是在每个点的局部进行拟合,估计的法线方向并不一致,Open3D提供了使用最小生成树调整法线到统一方向的方法,下面是具体的实现代码…

微服务-微服务Spring Security OAuth 2实战

1. Spring Authorization Server 是什么 Spring Authorization Server 是一个框架,它提供了 OAuth 2.1 和 OpenID Connect 1.0 规范以及其他相关规范的实现。它建立在 Spring Security 之上,为构建 OpenID Connect 1.0 身份提供者和 OAuth2 授权服务器产…

【机器人学导论笔记】三、操作臂正运动学

3.1 概述 操作臂正运动学研究操作臂的运动特性,主要涉及与运动有关的几何参数和时间参数。本章中,只研究静止状态下操作臂连杆的位置和姿态。 处理这些复杂的几何参数需要一些步骤:首先需要在操作臂的每个连杆上分别固接一个连杆坐标系&…

卷积神经网络 CNN

目录 卷积网络与传统网络的区别 参数共享 卷积神经网络整体架构 卷积操作的作用 卷积核的定义 卷积特征值计算方法 卷积层涉及的参数 边缘填充 ​编辑 卷积结果计算 池化层 整体网格架构 VGG网络架构 残差网络Resnet 卷积网络与传统网络的区别 卷积神经网络&#x…

面向面试的机器学习知识点(4)——分类模型

省流版: 本文介绍机器学习中的回归算法:逻辑回归、KNN、SVM、随机森林和XGBoost。作为机器学习的有监督学习方法,分类模型是最重要也是最常见的一类算法,在数据分析等岗位的笔试面试中都是常客,非常值得深入研究&…

初识Lombok

前言 最近读一些公司的业务代码,发现近几年的java项目工程中都使用了lombok,lombok是一个可以自动生成get,set、toString等模板类方法的工具框架,程序再引入lombok后,添加一个注解便可以不写get\set\toString等方法。 Lombok示例…

UDP 与 TCP 的区别是什么?

目录 区别 一、面向无连接 二、不可靠性 三、高效 四、传输方式 五、适用场景 1.直播 2.英雄联盟 六、总结 区别 首先 UDP 协议是面向无连接的,也就是说不需要在正式传递数据之前先连接起双方。然后 UDP 协议只是数据报文的搬运工,不保证有序且…