Kafka系列 - 生产者客户端架构以及3个重要参数

整体架构

在这里插入图片描述

整个生产者客户端由两个县城协调运行,这两个线程分别为主线程和Sender线程(发送线程)。

主线程中由KafkaProducer创建消息,然后通过可能的拦截器,序列化器和分区器之后缓存到消息累加器(RecordAccumulator)。Sender线程负责从RecordAccumulator中获取消息并将其发送到kafka中。

RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory 配置,默认值为 33554432B,即32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer 的 send() 方法调用要么被阻塞,要么抛出异常,这个取决于参数 max.block.ms 的配置,此参数的默认值为60000,即60秒。

主线程中发送过来的消息都会被追加到 RecordAccumulator 的某个双端队列(Deque)中,在 RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是 ProducerBatch,即 Deque。消息写入缓存时,追加到双端队列的尾部;Sender 读取消息时,从双端队列的头部读取。注意 ProducerBatch 不是 ProducerRecord,ProducerBatch 中可以包含一至多个 ProducerRecord。通俗地说,ProducerRecord 是生产者中创建的消息,而 ProducerBatch 是指一个消息批次,ProducerRecord 会被包含在 ProducerBatch 中,这样可以使字节的使用更加紧凑。与此同时,将较小的 ProducerRecord 拼凑成一个较大的 ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。

Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本<分区, Deque< ProducerBatch>> 的保存形式转变成 <Node, List< ProducerBatch> 的形式,其中 Node 表示 Kafka 集群的 broker 节点。对于网络连接来说,生产者客户端是与具体的 broker 节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer 的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。

在转换成 <Node, List> 的形式之后,Sender 还会进一步封装成 <Node, Request> 的形式,这样就可以将 Request 请求发往各个 Node 了,这里的 Request 是指 Kafka 的各种协议请求,对于消息发送而言就是指具体的 ProduceRequest。

请求在从 Sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中,InFlightRequests 保存对象的具体形式为 Map<NodeId, Deque>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node 之间的连接)最多缓存的请求数。这个配置参数为 max.in.flight.requests.per.connection,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较 Deque 的 size 与这个参数的大小来判断对应的 Node 中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。

前面提及的 InFlightRequests 还可以获得 leastLoadedNode,即所有 Node 中负载最小的那一个。这里的负载最小是通过每个 Node 在 InFlightRequests 中还未确认的请求决定的,未确认的请求越多则认为负载越大。对于下图中的 InFlightRequests 来说,图中展示了三个节点 Node0、Node1和Node2,很明显 Node1 的负载最小。也就是说,Node1 为当前的 leastLoadedNode。选择 leastLoadedNode 发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进度。leastLoadedNode 的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。

在这里插入图片描述

生产者重要参数

acks

  • acks=1。(默认值):生产者发送消息之后,只要分区的 leader 副本成功写入消息,那么它就会收到来自服务端的成功响应。如果消息无法写入 leader 副本,比如在 leader 副本崩溃、重新选举新的 leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入 leader 副本并返回成功响应给生产者,且在被其他 follower 副本拉取之前 leader 副本崩溃,那么此时消息还是会丢失,因为新选举的 leader 副本中并没有这条对应的消息。acks 设置为1,是消息可靠性和吞吐量之间的折中方案。
  • acks=0。生产者发送消息之后不需要等待任何服务端的响应。如果在消息从发送到写入 Kafka 的过程中出现某些异常,导致 Kafka 并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks 设置为0可以达到最大的吞吐量。
  • acks=-1或acks=all。生产者在消息发送之后,需要等待 ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下,acks 设置为 -1(all) 可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为ISR中可能只有 leader 副本,这样就退化成了 acks=1 的情况。要获得更高的消息可靠性需要配合 min.insync.replicas 等参数的联动

retries和retry.backoff.ms

retries 参数用来配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、leader 副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries 大于0的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过 max.request.size 参数配置的值时,这种方式就不可行了。

重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。

Kafka 可以保证同一个分区中的消息是有序的。如果生产者按照一定的顺序发送消息,那么这些消息也会顺序地写入分区,进而消费者也可以按照同样的顺序消费它们。

对于某些应用来说,顺序性非常重要,比如 MySQL 的 binlog 传输,如果出现错误就会造成非常严重的后果。如果将 retries 参数配置为非零值,并且 max.in.flight.requests.per.connection 参数配置为大于1的值,那么就会出现错序的现象:如果第一批次消息写入失败,而第二批次消息写入成功,那么生产者会重试发送第一批次的消息,此时如果第一批次的消息写入成功,那么这两个批次的消息就出现了错序。一般而言,在需要保证消息顺序的场合建议把参数 max.in.flight.requests.per.connection 配置为1,而不是把 retries 配置为0,不过这样也会影响整体的吞吐。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/187837.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

面试cast:reinterpret_cast/const_cast/static_cast/dynamic_cast

目录 1. cast 2. reinterpret_cast 3. const_cast 3.1 加上const的情况 3.2 去掉const的情况 4. static_cast 4.1 基本类型之间的转换 4.2 void指针转换为任意基本类型的指针 4.3 子类和父类之间的转换 5. dynamic_cast 5.1 RTTI(Run-time Type Identification) 1.…

kafka 集群 KRaft 模式搭建

Apache Kafka是一个开源分布式事件流平台&#xff0c;被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序 Kafka 官网&#xff1a;https://kafka.apache.org/ Kafka 在2.8版本之后&#xff0c;移除了对Zookeeper的依赖&#xff0c;将依赖于ZooKeeper的控制器…

【JavaEE初阶】线程安全问题及解决方法

目录 一、多线程带来的风险-线程安全 1、观察线程不安全 2、线程安全的概念 3、线程不安全的原因 4、解决之前的线程不安全问题 5、synchronized 关键字 - 监视器锁 monitor lock 5.1 synchronized 的特性 5.2 synchronized 使用示例 5.3 Java 标准库中的线程安全类…

vscode运行c++程序如何支持c++11

参考https://zhuanlan.zhihu.com/p/269244754 更改setting.json文件

Spring cloud - Feign

Feign的作用 Feign是Netflix公司开发的声明式web客户端组件&#xff0c;Spring对Feign做了无缝集成&#xff1a; Feign is a declarative web service client. It makes writing web service clients easier. To use Feign create an interface and annotate it. It has plugg…

什么是 Jest ? Vue2 如何使用 Jest 进行单元测试?Vue2 使用 Jest 开发单元测试实例

什么是Jest? Jest 是一个流行的 JavaScript 测试框架,由 Facebook 开发并维护,专注于简单性和速度。它通常用于编写 JavaScript 和 TypeScript 应用程序的单元测试、集成测试和端到端测试。 特点: 简单易用: Jest 提供简洁的 API 和易于理解的语法,使得编写测试用例变得…

kolla-ansible 部署OpenStack云计算平台

目录 一、环境 二、安装及部署 三、测试 一、环境 官方文档&#xff1a;https://docs.openstack.org/kolla-ansible/yoga/user/quickstart.html rhel8.6 网络设置&#xff1a; 修改网卡名称 网络IP&#xff1a; 主机名&#xff1a; 网络时间协议 配置软件仓库 vim docke…

模拟退火算法应用——求解二元函数的最小值(极小值)

仅作自己学习使用 一、问题 二、代码 clear clcT1 cputime; xmax 5; xmin -5; ymax 5; ymin -5; L 20; % 马尔科夫链长度 dt 0.998; % 降温系数 S 0.02; % 步长因子 T 200; % 初始温度 TZ 1e-8; % 容差 Tmin 0.01;% 最低温度 P 0; % Metropolis接受…

持续集成交付CICD:GitLabCI 通过trigger触发流水线

目录 一、理论 1.GitLabCI 二、实验 1.搭建共享库项目 2.GitLabCI 通过trigger触发流水线 三、问题 1.项目app02未触发项目app01 2.GitLab 报502网关错误 一、理论 1.GitLabCI (1) 概念 GitLab CI&#xff08;Continuous Integration&#xff09;是一种持续集成工具…

为什么要隐藏id地址?使用IP代理技术可以实现吗?

随着网络技术的不断发展&#xff0c;越来越多的人开始意识到保护个人隐私的重要性。其中&#xff0c;隐藏自己的IP地址已经成为了一种常见的保护措施。那么&#xff0c;为什么要隐藏IP地址&#xff1f;使用IP代理技术可以实现吗&#xff1f;下面就一起来探讨这些问题。 首先&am…

蓝桥杯每日一题2023.11.24

题目描述 #include <stdio.h> #define N 100int connected(int* m, int p, int q) {return m[p]m[q]? 1 : 0; }void link(int* m, int p, int q) {int i;if(connected(m,p,q)) return;int pID m[p];int qID m[q];for(i0; i<N; i) ________________________________…

没搞懂二维差分是什么怎么办???

摸鱼的时候画的&#xff0c;根据公式反推 一维差分倒是懂了 a[10]{1,2,6,9,11,12,17,21,32,67}; c[10]{1,1,4,3,2,1,5,4,11,35}; 现要把[3,7]的值都增加3 c[10]{1,1,7,3,2,1,5,1,11,35}; 要查询的时候再用for循环相加 结论&#xff1a;成立且适用于多次修改 不知道为什么这个…

跨域攻击分析和防御(上)

点击星标&#xff0c;即时接收最新推文 跨域攻击 在大型公司或大型跨国企业中都拥有自己的内网&#xff0c;跨国公司都有各个子公司一般以建立域林进行共享资源。根据不同职能区分的部门&#xff0c;从逻辑上以主域和子域进行划分以方便统一管理。在物理层使用防火墙将各个子公…

『亚马逊云科技产品测评』活动征文|EC2 实例安装 docker 与配套软件部署前后端分离的医疗管理后台系统

授权声明&#xff1a;本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 Developer Centre, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科技官方渠道 目录 一、AWS 产品类别选择 &#xff08;1&#xff09;应用服务器选择…

浅谈现代化城市建设中智慧消防的研究与应用

安科瑞 华楠 【摘要】随着城市现代化发展&#xff0c;城市居住密度愈来愈大&#xff0c;城市建筑结构复杂多样化&#xff0c;高层建筑火灾发生率在不断地升高。对现代化城市面临的消防问题展开讨论&#xff0c;针对智慧消防在现代化城市建设中的现状进行了分析&#xff0c;并提…

探索 Vue 中的 bus.$emit:实现组件通信的强大工具

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

快手AI布局:从直播电商到大模型,如何打造智能生态?

快手科技在2023年第三季度业绩中&#xff0c;首次披露了关于AI业务的一些重要信息&#xff0c;显示出其对AI的重视和投入。快手AI的核心业务和竞争优势是什么&#xff1f;AI的发展&#xff0c;对快手业绩带来了哪些方面的提振&#xff1f; 快手AI业务板块&#xff1a;直播电商…

华为云人工智能入门级开发者认证学习笔记

人工智能入门级开发者认证 人工智能定义 定义 人工智能 (Artificial Intelligence) 是研究、延伸和扩展人的智能的理论、方法、技术及应用系统的一门新的技术科学。 强人工智能 vs 弱人工智能 强人工智能&#xff1a;强人工智能观点认为有可能制造出真正能推理&#xff08…

【React】打包体积分析 source-map-explorer

通过分析打包体积&#xff0c;才能知道项目中的哪部分内容体积过大&#xff0c;方便知道哪些包需要进一步优化。 使用步骤 安装分析打包体积的包&#xff1a;npm i source-map-explorer在 package.json 中的 scripts 标签中&#xff0c;添加分析打包体积的命令对项目打包&…

【Java并发】聊聊创建线程池的几种方式以及实际生产如何应用

上一篇文章&#xff0c;主要讲述了如果通过线程池进行执行任务&#xff0c;以及相关的核心流程&#xff0c;线程执行框架本身提供了一系列的类&#xff0c;封装了线程创建、关闭、执行、管理等跟业务逻辑无关的代码逻辑&#xff0c;一方面将业务和非业务逻辑进行解耦合&#xf…