Producer源码解读

Producer源码解读

在 Kafka 中, 我们把产生消息的一方称为 Producer 即 生产者, 它是 Kafka 的核心组件之一, 也是消息的来源所在。它的主要功能是将客户端的请求打包封装发送到 kafka 集群的某个 Topic 的某个分区上。那么这些生产者产生的消息是怎么传到 Kafka 服务端的呢?

Producer之整体流程

image.png

但是站在源码的核心角度,我们可以把Producer分成以下几个核心部分:

1、Producer之初始化

2、Producer之发送流程

3、Producer之缓冲区

4、Producer之参数与调优

Producer源码解读

从生产流程可以知道,Producer里面的核心有序列化器,分区器,还有缓冲,所以初始化的流程肯定是围绕这几个核心来处理。

image.png

KafkaProducer之初始化

image.png

image.png

1)、设置分区器

设置分区器(partitioner),分区器是支持自定义的

image.png

2)、设置每次重试间隔时间

设置每次重试间隔时间(retryBackoffMs)默认100ms

如果发送消息到broker时抛出异常,且是允许重试的异常,那么就会最大重试retries参数指定的次数,同时retryBackoffMs是重试的间隔。

image.png

3)、设置序列化器

设置序列化器(Serializer)

image.png

4)、设置拦截器

设置拦截器(interceptors),关于拦截器,这个后面会有讲解和介绍。

image.png

5)、设置缓冲区

image.png

在之前,还有一些参数的设置。

image.png

1、设置最大的消息为多大(maxRequestSize), 默认最大1M, 生产环境可以提高到10M

2、设置缓存大小(totalMemorySize) 默认是32M

3、设置压缩格式(compressionType)

4、初始化RecordAccumulator也就是缓冲区指定为32M

6)、设置消息累加器

因为生产者是通过缓冲的方式发送,发送的条件之前的课程讲过,所以这里需要一个消息累加器配合才能完成消息的发送。

image.png

5、初始化集群元数据(metadata),刚开始空的

image.png

6)、创建Sender线程

image.png

这里还初始化了一个重要的管理网路的组件 NetworkClient

image.png

KafkaThread将Sender设置为守护线程并启动

image.png

拦截器使用及介绍

这里讲一讲拦截器的使用和基本作用,拦截器一般用得不多,所以这里只是讲一讲案例,不推荐生产中使用。

想要实现拦截器,我们需要先实现ProducerInterceptor接口即可,然后在生产者中设置进去即可。

image.png

image.png

1、想要把发送的数据都带上时间戳image.png

2、实现统计发送消息的成功次数和失败次数

onAcknowledgement(RecordMetadata, Exception)里面,根据消息发送后返回的异常信息来判断是否发送成功。一般异常如果为空就说明发送成功了,反之就说明发送失败了。

然后定义两个变量,并根据Exception的值分别累加就可以统计到了

最后在close方法里输出两个变量的值,这样当producer发送数据结束并close后,会自动调用拦截器的close方法来输出咱们想要统计的成功和失败次数

image.png

image.png

不过这里要注意一个点:

onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很复杂的逻辑,否则会拖慢producer的消息发送效率。

3、拦截链路

   // 设置属性
        Properties properties = new Properties();
        // 指定连接的kafka服务器的地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        // 设置String的序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //设置自定义拦截器
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SelfInterceptor.class);

        //设置拦截链路(设置多个 SelfInterceptor  先执行   再执行SelfInterceptor2)
        ArrayList<String> interceptors = new ArrayList<>();
        interceptors.add("com.llp.interceptor.SelfInterceptor");//注意:这里是拦截器的全类名
        interceptors.add("com.llp.interceptor.SelfInterceptor2"); //这里假设有SelfInterceptor2
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

拦截器链里的拦截器是按照顺序组成的,因此我们要注意前后拦截器对彼此的影响,比如这里拦截器1的onsend方法不能返回null,不然拦截器2的onsend就丢失了信息,会发生异常。

Producer之发送流程

Producer之发送流程

Kafka Producer 发送消息流程如下:

1)、执行拦截器逻辑

执行拦截器逻辑,预处理消息, 封装 Producer Record

image.png

2)、集群元数据

从 Kafka Broker 集群获取集群元数据metadata

image.png

3)、序列化

调用Serializer.serialize()方法进行消息的key/value序列化

image.png

4)、分区

调用partition()选择合适的分区策略,给消息体 Producer Record 分配要发送的 topic 分区号

image.png

5)、消息累加进缓存

将消息缓存到RecordAccumulator 收集器中, 最后判断是否要发送。

image.png

7)、消息发送

前面我们也知道真正的消息发送是Sender线程来做,并且这里还要结合缓冲区来处理。

批次发送的条件为:缓冲区数据大小达到 batch.size 或者 linger.ms 达到上限,哪个先达到就算哪个

Producer之缓冲区

Kafka生产者的缓冲区,也就是内存池,可以将其类比为连接池(DB, Redis),主要是避免不必要的创建连接的开销, 这样内存池可以对 RecordBatch 做到反复利用, 防止引起Full GC问题。那我们看看 Kafka 内存池是怎么设计的。

核心就是这段代码:

image.png

image.png

   Kafka 内存设计有两部分,下面的粉色的是可用的内存(未分配的内存,初始的时候是 32M),上面紫色的是已经被分配了的内存,每个小 Batch 是 16K,然后这一个个的 Batch 就可以被反复利用,不需要每次都申请内存,  两部分加起来是 32M。
申请内存的过程

从 Producer 发送流程的第6步中可以看到会把消息放入 accumulator中, 即调用 accumulator.append() 追加, 然后把消息封装成一个个Batch 进行发送, 然后去申请内存(free.allocate())

image.png

image.png

(1)如果申请的内存大小超过了整个缓存池的大小,则抛异常出来

image.png

(2)对整个方法加锁:

this.lock.lock();

(3)如果申请的大小是每个 recordBatch 的大小(16K),并且已分配内存不为空,则直接取出来一个返回。

if (size == poolableSize && !this.free.isEmpty())
    return this.free.pollFirst();

image.png

(4)如果整个内存池大小比要申请的内存大小大 (this.availableMemory + freeListSize >= size),则直接从可用内存(即上图粉色的区域)申请一块内存。并且可用内存要去掉申请的那一块内存。

image.png

Sender线程

image.png

Producer之参数调优

Kafka 实际使用中,Producer 端既要保证吞吐量,又要确保无消息丢失,一些核心参数的配置就显得至关重要。接下来我们就来看看生产端都有哪些重要的参数,及调优建议。

acks

参数说明:对于 Kafka Producer 来说是一个非常重要的参数,它表示指定分区中成功写入消息的副本数量,是 Kafka 生产端消息的持久性的保证, 详细可以查看

发送确认机制 3 种不同的确认模式。

acks=0 意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入Kafka 。
acks=1 意味若首领在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。
acks=all 意味着首领在返回确认或错误响应之前,会等待(min.insync.replicas)同步副本都收到悄息。

max.request.size

参数说明:这个参数对于 Kafka Producer 也比较重要, 表示生产端能够发送的最大消息大小,默认值为1048576(1M)

  调优建议:这个配置对于生产环境来说有点小, **为了避免因消息过大导致发送失败,生产环境建议适当调大,比如可以调到10485760(10M)** 。

retries

参数说明:表示生产端消息发送失败时的重试次数,默认值为0,即不重试。 这个参数一般是为了解决因系统瞬时故障导致的消息发送失败,比如网络抖动、Leader 选举及重选举,其中瞬时的 Leader 重选举是比较常见的。因此这个参数的设置对于 Kafka Producer 就显得非常重要

 调优建议:这里建议设置为一个大于0的值,比如3次。

retry.backoff.ms

参数说明:**设定两次重试之间的时间间隔,避免无效的频繁重试,默认值为100, 主要跟 retries 配合使用, **在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,需要设定总的重试时间要大于异常恢复时间,避免生产者过早的放弃重试。

connections.max.idele.ms

参数说明:主要用来判断多久之后关闭空闲的链接,默认值540000(ms)即9分钟。

compression.type

参数说明: 该参数表示生产端是否要对消息进行压缩,默认值为不压缩(none)。 压缩可以显著减少网络IO传输、磁盘IO以及磁盘空间,从而提升整体吞吐量,但也是以牺牲CPU开销为代价的。

 调优建议:出于提升吞吐量的考虑,建议在生产端对消息进行压缩。**对于Kafka来说,综合考虑吞吐量与压缩比,建议选择lz4压缩。如果追求最高的压缩比则推荐zstd压缩。**

buffer.memory

参数说明: 该参数表示生产端消息缓冲池或缓冲区的大小,默认值为即33554432(32M) 。这个参数基本可以认为是 Producer 程序所使用的内存大小。

调优建议:通常我们应尽量保证生产端整体吞吐量,建议适当调大该参数,也意味着生产客户端会占用更多的内存。

batch.size

参数说明: 该参数表示发送到缓冲区中的消息会被封装成一个一个的Batch,分批次的发送到 Broker 端,默认值为16KB。 因此减小 batch 大小有利于降低消息延时,增加 batch 大小有利于提升吞吐量。

 调优建议:通常合理调大该参数值,能够显著提升生产端吞吐量,比如可以调整到32KB,调大也意味着消息会有相对较大的延时。

linger.ms

参数说明: 该参数表示用来控制 Batch 最大的空闲时间,超过该时间的 Batch 也会自动被发送到 Broker 端。 实际情况中, 这是吞吐量与延时之间的权衡。默认值为0,表示消息需要被立即发送,无需关系 batch 是否被填满。

  调优建议:通常为了减少请求次数、提升整体吞吐量,建议设置一个大于0的值,比如设置为100,此时会在负载低的情况下带来100ms的延时。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/334598.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

使用emby在Nas群晖搭建一个私人影院

1、安装Emby 打开套件中心搜索emby并安装 2、新增一个共享文件夹 设置好&#xff0c;无脑下一步到应用 给emby赋予这个文件夹的读写权限 保存 3、打开emby service 选择媒体库

chatgpt的实用技巧四temperature 格式

四、temperature 格式 GPT3.5 temperature 的范围为&#xff1a;0-0.7&#xff1b; GPT4.0 temperature 的范围为&#xff1a;0-1&#xff1b; 当 temperature 为 0 时候&#xff0c;结果可稳定。 当 temperature 为 0.7/1 时候&#xff0c;结果发散具备创力。 数值越大&a…

操作教程|JumpServer堡垒机结合Ansible进行批量系统初始化

运维人员常常需要对资产进行系统初始化的操作&#xff0c;而初始化服务器又是一项繁琐的工作&#xff0c;需要花费运维人员大量的时间和精力。为了提高效率&#xff0c;许多组织会使用自动化工具和脚本来简化这些任务。自动化工具的运用可以大幅降低运维人员的工作量&#xff0…

Elasticsearch 数据类型相关总结:快速参考指南【记录】

在Elasticsearch中&#xff0c;有多种数据类型可用于定义字段。 在开始了解数据类型之前&#xff0c;首先要知道&#xff0c;在Elasticsearch中&#xff0c;分词处理主要针对文本字段&#xff0c;而对于其他类型字段&#xff08;如数值、日期、布尔等&#xff09;&#xff0c;通…

MCM备赛笔记——熵权法

Key Concept 熵权法是一种基于信息熵概念的权重确定方法&#xff0c;用于多指标决策分析中。信息熵是度量信息量的不确定性或混乱程度的指标&#xff0c;在熵权法中&#xff0c;它用来反映某个指标在评价过程中的分散程度&#xff0c;进而确定该指标的权重。指标的分散程度越高…

[C#]winform部署openvino调用padleocr模型

【官方框架地址】 https://github.com/PaddlePaddle/PaddleOCR 【算法介绍】 OpenVINO和PaddleOCR都是用于计算机视觉应用的工具&#xff0c;但它们有不同的特点和用途。OpenVINO是一个由Intel开发的开源工具套件&#xff0c;主要用于加速深度学习推理&#xff0c;而PaddleOC…

Javaweb之SpringBootWeb案例员工管理之删除员工的详细解析

3.3 删除员工 查询员完成之后&#xff0c;我们继续开发新的功能&#xff1a;删除员工。 3.3.1 需求 当我们勾选列表前面的复选框&#xff0c;然后点击 "批量删除" 按钮&#xff0c;就可以将这一批次的员工信息删除掉了。也可以只勾选一个复选框&#xff0c;仅删除一…

【开源】基于JAVA语言的免税店商城管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、系统设计2.1 功能模块设计2.2 研究方法 三、系统展示四、核心代码4.1 查询免税种类4.2 查询物品档案4.3 新增顾客4.4 新增消费记录4.5 审核免税 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVueSpringBootMySQL的免税店商城管理系…

代码之外:工程师的成长进阶秘籍

程序员只懂技术能行吗&#xff1f; 为什么说技术人员“说”和“写”总得擅长一个&#xff1f; 你以为的“关注结果”是真的结果吗&#xff1f; 从一线工程师跃升团队管理者一共分几步&#xff1f; 在不断变化的职场环境中&#xff0c;技术人如何保持竞争力并实现自我增值&a…

【消息队列】RocketMQ 生产和消费中的集群模式和广播模式

在消息队列系统中&#xff0c;生产者和消费者的模式通常包括集群模式和广播模式。这两种模式分别用于不同的场景&#xff0c;具有不同的特点和优势。 1. 集群模式&#xff08;Cluster Mode&#xff09;&#xff1a; 在集群模式下&#xff0c;多个相同角色的实例组成一个集群&…

精品量化公式——“筹码动态”,筹码动态改进版,增加了三个买点信号标识

不多说&#xff0c;直接上效果如图&#xff1a; ► 日线表现 代码评估 技术指标代码评估&#xff1a; 散筹估算: 使用EMA&#xff08;指数移动平均&#xff09;方法计算(WINNER(C*1.1)-WINNER(C*0.9))*70的3日均线&#xff0c;用黄色粗线表示。这是用于估算市场中散户投资者的…

三维重建(3)--单视几何

目录 一、无穷远点、无穷远线、无穷远平面 1、2D平面上的无穷远问题 2、3D平面上的无穷远问题 二、影消点与影消线 1、2D平面上的无穷远点&#xff0c;无穷远线变换 2、影消点 3、影消线 三、单视重构 1、两平行线夹角与影消线关系 2、单视图标定 一、无穷远点、无…

Vue3 在 history 模式下通过 vite 打包部署白屏

Vue3 在 history 模式下通过 vite 打包部署后白屏; 起因 hash 模式 url 后面跟个 # 强迫症犯了改成了 history,就此一波拉锯战开始了 ... 期间 nigix 和 router 各种反复排查尝试最终一波三折后可算是成功了 ... Vue官方文档 具体配置可供参考如下: 先简要介绍下,当前项目打包…

【网站项目】331基于jsp的超市库存商品管理系统

&#x1f64a;作者简介&#xff1a;多年一线开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…

paramiko 、netmiko抛出 No existing session的解决方法。

写这篇文章&#xff0c;是因为我在H3C交换机上很正常的代码&#xff0c;可以ssh登录&#xff0c;但huawei交换机上失败&#xff0c;用CSDN上的文章关于allow_agentFalse,look_for_keysFalse 处理还是失败后&#xff0c;找相关的一些帖子总结的必须修改paramiko的源代码文章。 如…

Mysql:重点且常用的 SQL 标签整理

目录 1 <resultMap> 标签 2 <sql> 标签 3 <where> 标签 4 <if> 标签 5 <trim> 标签 6 <foreach> 标签 7 <set> 标签 1 <resultMap> 标签 比如以下代码&#xff1a; <resultMap type"SysCollege" id&qu…

【2024.1.19练习】B:01串的熵

B: 01串的熵&#xff08;5分&#xff09; 问题描述 对于一个长度为23333333的01串&#xff0c;如果其信息熵为11625907.5798&#xff0c;且0 出现次数比1 少&#xff0c;那么这个01 串中0出现了多少次&#xff1f; 我的代码&#xff1a; 注意调用<cmath>以便于对数运算…

Android-三方框架的源码

ARouter Arouter的整体思路是moduelA通过中间人ARouter把路由信息的存到仓库WareHouse&#xff1b;moduleB发起路由时&#xff0c;再通过中间人ARouter从仓库WareHouse取出路由信息&#xff0c;这要就实现了没有依赖的两者之间的跳转与通信。其中涉及Activity的跳转、服务prov…

2024 Move开发者大会在上海圆满落幕,第三期星航计划启动

*以下文章来源于MoveFuns &#xff0c;作者MoveFuns 至各位Mover&#xff1a; 2024 Move DevConfShanghai 圆满成功。在这次盛会上&#xff0c;我们围绕着Move生态关键的一年这一主题进行了深入的讨论和交流。 以下是本次会议的总结回顾&#xff1a; 大会盛况 本次大会在上…

大模型学习与实践笔记(九)

一、LMDeply方式部署 使用 LMDeploy 以本地对话方式部署 InternLM-Chat-7B 模型&#xff0c;生成 300 字的小故事 2.api 方式部署 运行 结果&#xff1a; 显存占用&#xff1a; 二、报错与解决方案 在使用命令&#xff0c;对lmdeploy 进行源码安装是时&#xff0c;报错 1.源…