【Kafka】Kafka源码解析之producer过程解读

从本篇开始 打算用三篇文章 分别介绍下Producer生产消费,Consumer消费消息 以及Spring是如何集成Kafka 三部分,致于对于Broker的源码解析,因为是scala语言写的,暂时不打算进行学习分享。

总体介绍

在这里插入图片描述

  • clients : 保存的是Kafka客户端代码,主要就是生产者和消费者代码
  • config:保存Kafka的配置文件,其中比较重要的配置文件是server.properties。
  • connect目录:保存Connect组件的源代码。我在开篇词里提到过,Kafka Connect组件是用来实现Kafka与外部系统之间的实时数据传输的。
  • core目录:保存Broker端代码。Kafka服务器端代码全部保存在该目录下。

而一条消息的整体流转过程其实就是经过三部分,也就是Producer\Broker\Consumer。
因为是对主要核心流程的分析,所以只会截核心代码。具体后面细节,在说。
在这里插入图片描述

producer整体流程

对于Producer来说,其实就是几部分。

  • 初始化、发送流程、缓冲区

初始化流程

设置分区器

// 设置分区器
  this.partitioner = config.getConfiguredInstance(
                    ProducerConfig.PARTITIONER_CLASS_CONFIG,
                    Partitioner.class,
                    Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));

设置重试时间,默认100ms,如果配置Kafka可以重试,retries制定重试次数,retryBackoffMs指定重试的间隔

 long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);

获取Key和Value的序列化器

      // 序列化器
      if (keySerializer == null) {
          this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                                                   Serializer.class);
          this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
      } else {
          config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
          this.keySerializer = keySerializer;
      }
      
      if (valueSerializer == null) {
          this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                                     Serializer.class);
          this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
      } else {
          config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
          this.valueSerializer = valueSerializer;
      }

拦截器

    // 设置拦截器
     List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
             ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
             ProducerInterceptor.class,
             Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
     if (interceptors != null)
         this.interceptors = interceptors;
     else
         this.interceptors = new ProducerInterceptors<>(interceptorList);

其他参数

	   // 设置最大消息为多大,默认是1M 默认是16384
       this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
       // 设置缓存大小 默认是32M 默认是33554432 RecordAccumulator=32MB
       this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
       // 设置压缩类型 可以提升性能
       this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
        this.accumulator = new RecordAccumulator(logContext,

       // 因为是通过缓冲区发送消息的,所以需要消息累计器
       RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig(
           enableAdaptivePartitioning,
           config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG)
       );

初始化元数据

 // 初始化集群元数据
   if (metadata != null) {
       this.metadata = metadata;
   } else {
       this.metadata = new ProducerMetadata(retryBackoffMs,
               config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
               config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
               logContext,
               clusterResourceListeners,
               Time.SYSTEM);
       this.metadata.bootstrap(addresses);
   }

创建Sender线程,其中包含一个重要的网络组件NetWorkClient

	// 创建sender线程
   this.sender = newSender(logContext, kafkaClient, this.metadata);
    // 线程name
    String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
    // 封装起来 设置为守护线程 并启动
    this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
    // 线程启动
    this.ioThread.start();

发送消息流程

发送消息的过程

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // 执行拦截器逻辑
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

先执行拦截器,可以发现就是遍历拦截器,然后执行对应的onSend()方法。当我们想增加一个拦截器,直接实现对应的接口,重写onSend()方法,然后Kafka就会调用我们的onSend方法。通过提供一个拓展点进行使用。

    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        ProducerRecord<K, V> interceptRecord = record;
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptRecord = interceptor.onSend(interceptRecord);
            } catch (Exception e) {
            }
        }
        return interceptRecord;
    }

从Kafka Broker集群获取元数据metadata

// 从broker获取元数据
	clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);

对key和value进行序列化,调用对应的serialize的方法。

	  byte[] serializedKey;
      try {
          // 选择对应的序列化进行操作
          serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
      } catch (ClassCastException cce) {
      }
      
      byte[] serializedValue;
      try {
          serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
      } catch (ClassCastException cce) {
         
      }
 	// 选择具体的分区
	int partition = partition(record, serializedKey, serializedValue, cluster);
 // 消息缓存到RecoredAccumulator
	result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
                    serializedValue, headers, appendCallbacks, remainingWaitMs, false, nowMs, cluster);

     // 消息发送的条件
     // 缓冲区数据大小达到batch.size 或者linnger.ms达到上限后 唤醒sneder线程。
     if (result.batchIsFull || result.newBatchCreated) {
         log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition());
         this.sender.wakeup();
     }

Sender线程

	runOnce();

    long pollTimeout = sendProducerData(currentTimeMs);

在这里插入图片描述

在这里插入图片描述
缓冲区、

在这里插入图片描述

这篇讲解很详细 https://www.cnblogs.com/rwxwsblog/p/14754810.html

生产者核心参数配置

bootstrap.servers:连接Broker配置,一般就是xxxx:9092

key.serializer 和 value.serializer:对key和value进行序列化器,可以自定义,一般就是String方式

buffer.memory:RecordAccumulator 缓冲区总大小,默认32m。

batch.size: 消息会以batch的方式进行发送,这是一批数据的大小 默认是16K

linger.ms:发送消息的时机,如果没有达到batch.size or linger.ms的时间就会发送 默认是0ms 立即发送

acks: 0: 不落盘 1:只有leader落盘 -1(all) : leader和所有从节点持久化成功 默认是-1

max.in.flight.requests.per.connection:允许最多没有返回 ack 的次数,默认为 5

retries: 消息发送失败时,系统重发消息 默认值 2147483647

retry.backoff.ms:两次重试间隔 默认是100ms

enable.idempotence: 开启幂等性 默认true

compression.type: 压缩格式 默认是none

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/893073.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Docker新手必看:快速安装和配置BookStack在线文档系统

文章目录 前言1. 安装Docker2. Docker镜像源添加方法3. 创建并启动BookStack容器4. 登录与简单使用5. 公网远程访问本地BookStack5.1 内网穿透工具安装5.2 创建远程连接公网地址5.3 使用固定公网地址远程访问 前言 本文主要介绍如何在Linux系统使用Docker本地部署在线文档管理…

基于SSM医药垃圾分类管理系统【附源码】

基于SSM医药垃圾分类管理系统 效果如下&#xff1a; 系统登录界面 管理员主界面 公告信息管理界面 垃圾分类管理界面 医院垃圾信息管理界面 用户主界面 留言反馈管理界面 研究背景 随着科学技术发展&#xff0c;计算机已成为人们生活中必不可少的生活办公工具&#xff0c;在…

Java语言-抽象类

目录 1.抽象类概念 2.抽象类语法 3.抽象类特性 4.抽象类作用 1.抽象类概念 在面向对象的概念中&#xff0c;所有的对象都是通过类来描绘的&#xff0c;但是反过来&#xff0c;并不是所有的类都是用来描绘对象的&#xff0c; 如果 一个类中没有包含足够的信息来描绘一个具体…

初阶数据结构【2】--顺序表(详细且通俗易懂,不看一下吗?)

本章概述 线性表顺序表顺序表问题与思考彩蛋时刻&#xff01;&#xff01;&#xff01; 线性表 概念&#xff1a;一些在逻辑上成线性关系的数据结构的集合。线性表在逻辑上一定成线性结构&#xff0c;在物理层面上不一定成线性结构。常见的线性表&#xff1a;顺序表&#xff0…

ICT产业新征程:深度融合与高质量发展

在信息时代的浪潮中&#xff0c;每一场关于技术革新与产业融合的盛会都闪耀着智慧的光芒&#xff0c;引领着未来的方向。9月25日&#xff0c;北京国家会议中心内&#xff0c;一场聚焦全球信息通信业的顶级盛事——第32届“国际信息通信展”&#xff08;PT展&#xff09;隆重拉开…

C++新手入门指南:从基础概念到实践之路

C 继承了 C 语言的高效性和灵活性&#xff0c;同时新增了面向对象编程的特点。这使得 C 既可以进行底层系统编程&#xff0c;又能进行面向对象的软件设计。在面向对象编程方面&#xff0c;C 支持封装、继承和多态三大特性。 &#x1f4af;C 初印象 语言的发展就像是练功打怪…

【Docker】Docker基本操作

目录 一、了解云计算背景 1.1 云计算的三种服务模式 1.2 虚拟机的两种架构 二、Docker 概述 2.1 Docker简述 2.2 Docker 特点 2.3 Docker与虚拟机的区别 2.4 容器技术有哪些 2.4.1 namespace的六项隔离 2.5 Docker核心概念 2.5.1 镜像 2.5.2 容器 2.5.3 仓库 三、…

吴恩达深度学习笔记(6)

正交化 为了提高算法准确率&#xff0c;我们想到的方法 收集更多的训练数据增强样本多样性使用梯度下降将算法使算法训练时间更长换一种优化算法更复杂或者更简单的神经网络利用dropout 或者L2正则化改变网络框架更换激活函数改变隐藏单元个数 为了使有监督机制的学习系统良…

笔试强训10.17

//法一&#xff1a;中点扩散 //法二&#xff1a;动态规划 //法三&#xff1a;hash二分 #include<bits/stdc.h> using namespace std; typedef unsigned long long ull; const int N1e610; const int base131; ull hr[2*N],hl[2*N],p[2*N];//超过ull自动取余 char s[N*2];…

如何优化批处理策略,最大限度地“压榨”GPU性能

新手数据科学家和机器学习工程师常常会问一个关键问题&#xff1a;如何判断他们的深度学习训练过程是否在正常运行&#xff1f;在本文中&#xff0c;我们将学习如何诊断和优化深度学习的性能问题&#xff0c;不论是在单台机器还是多台机器上进行训练。通过这些方法&#xff0c;…

uniapp onPageScroll

子组件有onPageScroll, 首页也要引入onPageScroll, eg: 主页面 sell/detail/index 《子组件》 <script setup> 引入onPageScroll </script> 组件&#xff1a; 引入onPageScroll 别人的比较

阿里 C++面试,算法题没做出来,,,

我本人是非科班学 C 后端和嵌入式的。在我面试的过程中&#xff0c;竟然得到了阿里​ C 研发工程师的面试机会。因为&#xff0c;阿里主要是用 Java 比较多&#xff0c;C 的岗位比较少​&#xff0c;所以感觉这个机会还是挺难得的。 阿里 C 研发工程师面试考了我一道类似于快速…

五个必备的高清无水印视频素材库推荐

做抖音、短视频创作的朋友都知道&#xff0c;优质的素材往往决定了作品能否获得更多关注。如果你还不知道在哪里下载高清无水印的视频素材&#xff0c;不用担心&#xff01;今天为你推荐5个高品质的视频素材库&#xff0c;助你轻松创作出爆款视频。 蛙学网 是国内领先的视频素材…

Windows 11 24H2版本有哪些新功能_Windows 11 24H2十四大新功能介绍

距离上次发布的23H2版本已经过去了一年时间&#xff0c;现在&#xff0c;Win 11的24H2版本终于等到了&#xff0c;微软已经全面公开发布Win11 24H2版本&#xff0c;版本号为26100.1742&#xff0c;此次官宣的版本包括了消费者版、商业版、LTSC 2024版等&#xff0c;各种语言版本…

选择合适的SSL证书

随着我们在线业务的增长&#xff0c;确保网站安全变得越来越重要。对于许多人来说&#xff0c;保护网站安全的想法似乎令人望而生畏&#xff0c;尤其是在有各种SSL证书可用的情况下。您可能想知道哪一个最适合您的业务需求或如何浏览这些选项。 除了SSL证书之外&#xff0c;使…

IIC协议解析

文章目录 1 IIC理解1.1 IIC简述1.2 IIC协议优缺点1.3 传输速度 2 IIC数据格式3 数据时序3.1 写时序3.2 读时序 参考链接 1 IIC理解 1.1 IIC简述 IIC全称Inter Integrated Circuit&#xff0c;即集成电路总线。是由Philips半导体公司于八十年代初设计出的一种两线式串行总线协议…

雷达手势识别技术

1、IR-UWB 手势识别方案 该任务可以分为数据采集&#xff0c;雷达数据处理&#xff0c;识别分类三个部分。 1.1 UWB Radar 数据处理 首先采集慢时间快时间维数据&#xff1a; 然后仍然是Clutter removal filter&#xff1a; 之后正则化转化为灰度图像&#xff1a; 使用matlab f…

springboot+大数据+基于大数据的电脑硬件推荐系统【内含源码+文档+部署教程】

博主介绍&#xff1a;✌全网粉丝10W,前互联网大厂软件研发、集结硕博英豪成立工作室。专注于计算机相关专业毕业设计项目实战6年之久&#xff0c;选择我们就是选择放心、选择安心毕业✌ &#x1f345;由于篇幅限制&#xff0c;想要获取完整文章或者源码&#xff0c;或者代做&am…

虚幻闪烁灯光材质

创建一个材质 材质域改成光照函数 , Time让材质动起来 参数B用来控制速度 , Sine 让灯光闪烁 , Frac 增加了闪烁细节 把材质放到灯光材质上 效果还是挺不错的! 可以用于一些恐怖游戏~

Redis和Jedis的区别

目录 含义与用途 Jedis案例 总结 含义与用途 Redis&#xff1a; 概念&#xff1a;Redis是一个基于内存的键值存储数据库&#xff0c;支持丰富的数据结构。比如&#xff1a;字符串功能&#xff1a;除了基础的数据存储&#xff0c;Redis还提供了丰富的高级功能。如持久化&…