Kafka Producer发送消息流程之Sender发送线程和在途请求缓存区

文章目录

  • 1. Sender发送数据
      • 1. 发送数据的详细过程:
      • 2. 关键参数配置
  • 2. 在途请求缓存区

在这里插入图片描述

1. Sender发送数据

Sender线程负责将已经在RecordAccumulator中准备好的消息批次发送到Kafka集群。虽然消息在RecordAccumulator中是按照分区组织的,但Sender线程在发送这些消息时,会按照broker而不是分区来组织发送。这有助于提高发送效率和减少网络开销。

1. 发送数据的详细过程:

  1. 拉取批次Sender线程从RecordAccumulator中拉取已准备好发送的消息批次。这些批次可能来自多个分区。

  2. 按broker组织批次Sender线程会将这些批次按目标broker进行组织,因为一个broker通常负责多个分区的消息处理。这个过程涉及以下步骤:

    • 确定每个分区的leader broker。
    • 将属于同一个broker的所有分区的批次组合在一起。
  3. 发送请求Sender线程会为每个broker创建一个或多个Produce请求(ProduceRequest),然后通过网络将这些请求发送到对应的broker。这些请求包含了该broker负责的所有分区的消息批次。

  4. 处理响应Sender线程会等待broker的响应。响应中包含了每个分区的消息是否成功写入的信息。

    • 如果某个分区的消息写入失败,Sender线程会根据重试机制重试发送这些消息。
    • 如果所有消息都成功写入,Sender线程会从RecordAccumulator中移除这些消息批次。

2. 关键参数配置

以下是一些关键参数,可以影响Sender线程的行为:

  • max.in.flight.requests.per.connection:每个连接允许的最大未完成请求数。默认值为5。如果这个值过大,可能会导致消息重排序。
  • request.timeout.ms:请求超时时间。默认值为30秒。如果broker在此时间内没有响应,Sender线程会重试或失败。
  • retries:重试次数。默认值为0。指定Sender线程在发送消息失败时的重试次数。
  • retry.backoff.ms:重试间隔时间。默认值为100ms。指定每次重试之间的等待时间。

通过这些配置,Kafka生产者可以在不同的网络条件和负载下优化消息发送的效率和可靠性。

在Kafka生产者的Sender线程工作流程中,如果一次任务中包含了来自多个分区的批次,并且这些批次涉及到多个broker,那么Sender线程会分别向这些broker发送请求

2. 在途请求缓存区

  • 存储在途请求:当Sender线程将消息批次发送到broker后,这些请求会存储在在途请求缓存区中,直到收到broker的确认响应。这个缓存区的大小由配置参数max.in.flight.requests.per.connection决定。

  • 重试机制:如果某个请求在指定时间内没有收到响应,生产者会根据配置的重试机制重新发送这些请求。重试机制配置参数包括retriesretry.backoff.ms

  • 顺序保证max.in.flight.requests.per.connection参数设置了每个连接(每个生产者和Kafka的连接)允许的最大未完成请求数。默认值是5。如果这个值设置过大,可能会导致消息重排序问题,特别是在启用了重试机制时。设置合适的值可以平衡并发性能和消息顺序保证。

  • 资源管理:在途请求缓存区的大小会影响生产者的内存使用和性能。如果在途请求过多,可能会占用大量内存资源,导致生产者性能下降。因此,合理设置这个缓存区的大小是优化生产者性能的关键。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/803563.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C++】类和对象的基本概念与使用

本文通过面向对象的概念以及通俗易懂的例子介绍面向对象引出类和对象。最后通过与之有相似之处的C语言中的struct一步步引出C中的类的定义方式,并提出了一些注意事项,最后描述了类的大小的计算方法。 一、什么是面向对象? 1.面向对象的概念 …

基于python的图像去水印

1 代码 import cv2 import numpy as npdef remove_watermark(image_path, output_path):# 读取图片image cv2.imread(image_path)# 转换为灰度图gray cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)# 使用中值滤波去除噪声median_filtered cv2.medianBlur(gray, 5)# 计算图像的梯…

Ambari Hive 创建函数无权限

作者:櫰木 1、创建udf函数 参考文档:https://blog.csdn.net/helloxiaozhe/article/details/102498567 如果已经编写好,请使用自己的。如果没有请参考以上链接进行udf函数编写。 2、创建函数遇到的问题 由于集群开启了kerberos&#xff0…

常用的点云预处理算法

点云预处理是处理点云数据时的重要部分,其目的是提高点云数据的质量和处理效率。通过去除离群点、减少点云密度和增强特征,可以消除噪声、减少计算量、提高算法的准确性和鲁棒性,从而为后续的点云处理和分析步骤(如配准、分割和重…

[超级详细系列]ubuntu22.04配置深度学习环境(显卡驱动+CUDA+cuDNN+Pytorch)--[3]安装cuDNN与Pytorch

本次配置过程的三篇博文分享分别为为: [超级详细系列]ubuntu22.04配置深度学习环境(显卡驱动CUDAcuDNNPytorch)--[1]安装显卡驱动 [超级详细系列]ubuntu22.04配置深度学习环境(显卡驱动CUDAcuDNNPytorch)--[2]安装Anaconda与CUDA [超级详细系列]ubuntu22.04配置深…

使用windows批量解压和布局ImageNet ISLVRC2012数据集

使用的系统是windows,找到的解压命令很多都linux系统中的,为了能在windows系统下使用,因此下载Git这个软件,在其中的Git Bash中使用以下命令,因为Git Bash集成了很多linux的命令,方便我们的使用。 ImageNe…

Python: 一些python和Java不同的基础语法

文章目录 1. 数据类型2. 字符串的引用3. 字符串拼接4. Python中的报错5. Python中的输入语句(input)6. 运算符(**和//)7. 除法运算8. 注释方法: #或者三引号9. Python中的比较10. Java中用and, or, not代替逻辑运算符11. 多元赋值12. Python不支持自增自减操作13. 在Python中, …

jenkins 使用教程

1. 安装最新长期稳定版 2.426.1 Redhat Jenkins Packages sudo wget -O /etc/yum.repos.d/jenkins.repo https://pkg.jenkins.io/redhat-stable/jenkins.repo --no-check-certificate sudo rpm --import https://pkg.jenkins.io/redhat-stable/jenkins.io-2023.key yum insta…

Oracle线上执行SQL特别慢的原因分析

一、背景: 线上反馈一张表select * from table where idxxx语句执行特别慢,超过60s超时不能处理,第一直觉是索引失效了,开始执行创建索引语句create index index_name on table() online。但是执行了超过20分钟索引还没有创建成功…

python课设——宾馆管理系统

python课设——宾馆管理系统 数据库课设-宾馆管理系统-python3.7pyqt5 简介 大二数据库课程设计(3-4天工作量)的项目,登录界面的ui设计参考了他人成果,其余ui以及所有后端部分全部独立完成,详细功能见功能模块图使用…

简单实现一个本地ChatGPT web服务(langchain框架)

简单实现一个本地ChatGPT 服务,用到langchain框架,fastapi,并且本地安装了ollama。 依赖安装: pip install langchain pip install langchain_community pip install langchain-cli # langchain v0.2 2024年5月最新版本 pip install bs4 pi…

Unity | Shader基础知识(第十八集:Stencil应用-透视立方盒子)

目录 一、前言 二、场景布置 三、 shader部分 1.图片的部分 2.图片部分纯净代码 3.遮罩部分复习 4.深度写入 ZWrite 5.颜色遮罩ColorMask 6.遮罩纯净代码 四、场景中shader使用 五、作者的碎碎念 一、前言 因为这个内容稍微有点多,我尽力讲清楚了&#x…

速部署 HBase 测试环境

快速部署 HBase 测试环境 第一步:下载软件,在HBase官网下载最新版, 找到 bin,点击下载,比如我这里下载的是 hbase-2.5.6-bin.tar.gz 第二步:解压软件 $ tar -zxvf hbase-2.5.6-bin.tar.gz $ cd hbase-2.…

业务终端动态分配IP-DHCP技术、DHCP中继技术

一、为什么需要DHCP? 1、许多设备(主机、无线WiFi终端等)需要动态地址的分配; 2、人工手工配置任务繁琐、容易出错,比如:IP地址冲突; 3、网络规模扩大、复杂度提高,网络配置越来越复杂,计算机的位置变化和数量超过可分配IP地址的数量,造成IP地址变法频繁以及IP地址…

微信小游戏 彩色试管 倒水游戏 逻辑 (四)

最近开始研究微信小游戏,有兴趣的 可以关注一下 公众号, 记录一些心路历程和源代码。 定义了一个名为 WaterFlow class,该类继承自 cc.Graphics,用于在 Cocos Creator 中创建和显示水流的动画效果。下面是对代码的详细解释&#x…

FPGA FIR fdatool filter designer MATLAB

位数问题 fdatool 先确定输入信号的位宽,比如17位在fdatool中,选set quantization parameters 选input/output 设置input word length 为17bit(not confirmed) fir compiler implementation 注意: 当设置输入位宽为16位时,ip核…

智能手术新时代:Apple Vision Pro在医疗领域的突破性应用

无人驾驶的未来:AI如何重塑我们的出行世界-CSDN博客文章浏览阅读2.2k次,点赞109次,收藏64次。无人驾驶汽车的发展是AI技术应用的一次伟大尝试。特斯拉与百度“萝卜快跑”在这个领域的竞争与合作,不仅展示了AI技术的强大潜力&#…

记录些MySQL题集(6)

MySQL 单表为什么不要超过 2000W 行? 数据持久化在磁盘中,磁盘的最小单元是扇区,一个扇区 0.5 KB,而由 8 个扇区可以构成一个文件系统块(4K),以 InnoDB 存储引擎为例,一个数据页的大…

React Native 自定义 Hook 获取组件位置和大小

在 React Native 中自定义 Hook useLayout 获取 View、Pressable 等组件的位置和大小的信息 import {useState, useCallback} from react import {LayoutChangeEvent, LayoutRectangle} from react-nativeexport function useLayout() {const [layout, setLayout] useState&l…

MySQL数据库查询索引失效场景

在连表情况下,如果排序字段涉及到了两个表,排序字段将无法走索引. 加上第二个排序字段之后,走全表扫描了. 或者尽量让两次排序都用同一个表的字段,这样可以建联合索引让排序也能走索引.(不想建联合索引的话,可以第二次排序用表id,这样单个的…