Kafka源码分析(四) - Server端-请求处理框架

系列文章目录

Kafka源码分析-目录

一. 总体结构

先给一张概览图:
在这里插入图片描述

服务端请求处理过程涉及到两个模块:kafka.networkkafka.server

1.1 kafka.network

该包是kafka底层模块,提供了服务端NIO通信能力基础。

有4个核心类:SocketServer、Acceptor、Processor、RequestChannel。各自角色如下:

  • SocketServer:服务端的抽象,是服务端通信的入口;

  • Acceptor:Reactor通信模式中处理连接ACCEPT事件的线程/线程池所执行的任务;

  • Processor:Reactor通信模式中处理连接可读/可写事件的线程/线程池所执行的任务;

  • RequestChannel:请求队列,存储已经解析好的请求以等待处理;

对于上层模块而言,该基础模块有两个输入和一个输出

  1. 输入:IP+端口号,该模块会对目标端口实现监听;

  2. 输出:解析好的请求,通过RequestChannel进行输出;

  3. 输入:待发送的Response,通过Processor.responseQueue来完成输入;

1.2 kafka.server

该包在kafka.network的基础上实现各种请求的处理逻辑,主要包含KafkaServer和KafkaApis两个类。其中:

  • KafkaServer:Kafka服务端的抽象,统一维护Kafka服务端的各流程和状态;

  • KakfaApis:维护了各类请求对应的业务逻辑,通过KafkaServer.apis字段组合到KafkaServer之中;

二. Server的端口监听

整体流程如图:
在这里插入图片描述

接下来按调用顺序依次分析各方法

2.1 KafkaServer.startup()

关于端口监听的核心逻辑分4步,代码如下(用注释说明各部分的目的):

def startup() {
  // 省略无关代码
  ... ...

  // 1. 创建SocketServer
  socketServer = new SocketServer(config, metrics, time, credentialProvider)

  // 2. 启动端口监听
  // (在这里完成了Acceptor的创建和端口ACCEPT事件的监听)
  // (startupProcessors = false表示暂不启动Processor处理线程)
  socketServer.startup(startupProcessors = false)

  // 3. 启动请求处理过程中的相关依赖
  // (这也是第2步中不启动Processor处理线程的原因,有依赖项需要处理)
  ... ...

  // 4. 启动端口可读/可写事件处理线程(即Processor线程)
  socketServer.startProcessors()

  // 省略无关代码
  ... ...
}

2.2 SocketServer.startup(Boolean)

代码及说明性注释如下:

def startup(startupProcessors: Boolean = true) {
  this.synchronized {
    // 省略无关代码
    ... ...

    // 1. 创建Accetpor和Processor的实例,
    // 同时页完成了Acceptor对端口ACCEPT事件的监听
    createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)

    // 2. [可选]启动各Acceptor对应的Processor线程
    if (startupProcessors) {
      startProcessors()
    }
  }
}

2.3 ScocketServer.createAcceptorAndProcessor()

直接上注释版的代码,流程分3步:

// 入参解释
// processorsPerListener: 对于每个IP:Port, 指定Reactor模式子线程池大小, 
//                        即处理端口可读/可写事件的线程数(Processor线程);
// endpoints: 接收请求的IP:Port列表;
def createAcceptorAndProcessors(processorsPerListener: Int,
                                endpoints: Seq[EndPoint]): Unit = synchronized {
    // 省略无关代码
    ... ...

    endpoints.foreach { endpoint =>
      // 省略无关代码
      ... ...

      // 1. 创建Acceptor对象
      // 在此步骤中调用Acceptor.openServerSocket, 完成了对端口ACCEPT事件的监听
      val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)

      // 2. 创建了与acceptor对应的Processor对象列表
      // (这里并未真正启动Processor线程)
      addProcessors(acceptor, endpoint, processorsPerListener)

      // 3. 启动Acceptor线程
      KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()

      // 省略无关代码
      ... ...
    }
  }

2.4 Acceptor.openServerSocket()

该方法中没什么特殊点,就是java NIO的标准流程:

def openServerSocket(host: String, port: Int): ServerSocketChannel = {
  // 1. 构建InetSocketAddress对象
  val socketAddress =
    if (host == null || host.trim.isEmpty)
      new InetSocketAddress(port)
    else
      new InetSocketAddress(host, port)

  // 2. 构建ServerSocketChannel对象, 并设置必要参数值
  val serverChannel = ServerSocketChannel.open()
  serverChannel.configureBlocking(false)
  if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
    serverChannel.socket().setReceiveBufferSize(recvBufferSize)

  // 3. 端口绑定, 实现事件监听
  try {
    serverChannel.socket.bind(socketAddress)
    info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostString, serverChannel.socket.getLocalPort))
  } catch {
    case e: SocketException =>
      throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostString, port, e.getMessage), e)
  }

  // 4. 返回ServerSocketChannel对象, 用于后续register到Selector中
  serverChannel
}

2.5 SocketServer.startProcessor()

从这步开始,仅剩的工作就是启动Processor线程,代码都非常简单。比如本方法只是遍历Acceptor列表,并调用Acceptor.startProcessors()

def startProcessors(): Unit = synchronized {
  acceptors.values.asScala.foreach { _.startProcessors() }
  info(s"Started processors for ${acceptors.size} acceptors")
}

2.6 Acceptor.startProcessors()

该方法很简明,直接上代码

def startProcessors(): Unit = synchronized {
  if (!processorsStarted.getAndSet(true)) {
    startProcessors(processors)
  }
}

def startProcessors(processors: Seq[Processor]): Unit = synchronized {
  processors.foreach { processor =>
    KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
      processor).start()
  }
}

三. 请求/响应的格式

3.1 格式概述

在这里插入图片描述
请求和响应都由两部分组成:Header和Body。RequestHeader中包含ApiKey、ApiVersion、CorrelationId、ClientId;ResponseHeader中只包含CorrelationId字段。接下来逐个讲解这些字段。

  • ApiKey

    2字节整型,指明请求的类型;比如0代表Produce请求,1代表Fetch请求;具体id和请求类型之间的映射关系可在 org.apache.kafka.common.protocol.ApiKeys 中找到;

  • ApiVersion

    随着API的升级迭代,各类型请求的请求体格式可能有变更;这个2字节的整型指明了请求体结构的版本;

  • CorrelationId

    4字节整型,在Response中传回,Kafka Server端不处理,用于客户端内部关联业务数据;

  • ClientId

    可变长字符串,标识客户端;

3.2 请求体/响应体的具体格式

各业务操作(比如Produce、Fetch等)对应的请求体和响应体格式都维护在 org.apache.kafka.common.protocol.ApiKeys 中。接下来以Produce为例讲解ApiKeys是如何表达数据格式的。

ApiKeys是个枚举类,其核心属性如下:

public enum ApiKeys {
  // 省略部分代码
  ... ...

  // 上文提到的请求类型对应的id
  public final short id;

  // 业务操作名称
  public final String name;

  // 各版本请求体格式
  public final Schema[] requestSchemas;

  // 各版本响应体格式
  public final Schema[] responseSchemas;

  // 省略部分代码
  ... ...
}

其中PRODUCE枚举项的定义如下

PRODUCE(0, "Produce", ProduceRequest.schemaVersions(), ProduceResponse.schemaVersions())

可以看到各版本的请求格式维护在 ProduceRequest.schemaVersions(),代码如下

public static Schema[] schemaVersions() {
  return new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3,
    PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6};
}

这里只是简单返回了一个Schema数组。一个Schema对象代表了一种数据格式。请求头中的ApiVersion指明了请求体的格式对应数组的第几项(从0开始)。

接下来我们看看Schema是如何表达数据格式的。其结构如下
在这里插入图片描述
Schema有两个字段:fields和fieldsByName。其中fields是体现数据格式的关键,它指明了字段的排序和各字段类型;而fieldsByName只是按字段名重新组织的Map,用于根据名称查找对应字段。

BoundField只是Field的简单封装。Field有两个核心字段:name和type。其中name表示字段名称,type表示字段类型。常见的Type如下:

Type.BOOLEAN;
Type.INT8;
Type.INT16;
Type.INT32;

// 可通过org.apache.kafka.common.protocol.types.Type查看全部类型
... ...

回到PRODUCE API,通过查看Schema的定义,能看到其V0版本的请求体和响应体的结构如下:
在这里插入图片描述

四. 请求的处理流程

在这里插入图片描述

  1. Acceptor监听到ACCEPT事件(TCP创建连接"第一次握手"的SYN);

  2. Acceptor将将连接注册到Processor列表内的其中一个,由该Processor监听这个连接的后续可读可写事件;

  3. Processor接收到完整请求后,会将Request追加到RequestChannel中进行排队,等待后续处理;

  4. KafkaServer中有个requestHandlerPool的字段,KafkaRequestHandlerPool类型,代表请求处理线程池;KafkaRequestHandler就是其中的线程,会从RequestChannel拉请求进行处理;

  5. KafkaRequestHandler将拉到的Request传入KafkaApis.handle(Request)方法进行处理;

  6. KafkaApis根据不同的ApiKey调用不同的方法进行处理,处理完毕后会将Response最终写入对应的Processor的ResponseQueue中等待发送;KafkaApis.handle(Request)的方法结构如下:

    def handle(request: RequestChannel.Request) {
      try {
        // 省略部分代码
        ... ...
        request.header.apiKey match {
          case ApiKeys.PRODUCE => handleProduceRequest(request)
          case ApiKeys.FETCH => handleFetchRequest(request)
          case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
          case ApiKeys.METADATA => handleTopicMetadataRequest(request)
          case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
          // 省略部分代码
          ... ...
        }
      } catch {
        case e: FatalExitError => throw e
        case e: Throwable => handleError(request, e)
      } finally {
        request.apiLocalCompleteTimeNanos = time.nanoseconds
      }
    }
    
  7. Processor从自己的ResponseQueue中拉取待发送的Respnose;

  8. Processor将Response发给客户端;

五. 总结

才疏学浅,未能窥其十之一二,随时欢迎各位交流补充。若文章质量还算及格,可以点赞收藏加以鼓励,后续我继续更新。

另外也可以在目录中找到同系列的其他文章:
Kafka源码分析系列-目录(收藏关注不迷路)
感谢阅读。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/562971.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

华为海思校园招聘-芯片-数字 IC 方向 题目分享——第六套

华为海思校园招聘-芯片-数字 IC 方向 题目分享——第六套 (共9套,有答案和解析,答案非官方,未仔细校正,仅供参考) 部分题目分享,完整版获取(WX:didadidadidida313,加我备注&#x…

使用python socket搭建Client测试平台

目录 概述 1 背景 2 Client功能实现 2.1 何谓Client 2.2 代码功能介绍 2.3 代码实现 2.3.1 代码介绍 2.3.2 代码内容 3 测试 3.1 PC上创建Server 3.2 同一台PC上运行Client 3.2.1 建立连接 3.2.2 测试数据交互 3.3 Linux 环境下运行Client 3.3.1 建立连接 3.3.…

无限滚动分页加载与下拉刷新技术探析:原理深度解读与实战应用详述

滚动分页加载(也称为无限滚动加载、滚动分页等)是一种常见的Web和移动端应用界面设计模式,用于在用户滚动到底部时自动加载下一页内容,而无需点击传统的分页按钮。这种设计旨在提供更加流畅、连续的浏览体验,减少用户交…

Redis 如何实现分布式锁

课程地址 单机 Redis naive 版 加锁: SETNX ${lockName} ${value} # set if not exist如果不存在则插入成功,返回 1,加锁成功;否则返回 0,加锁失败 解锁: DEL ${lockName}问题1 2 个线程 A、B&#…

深入理解与实践“git add”命令的作用

文章目录 **git add命令的作用****git add命令的基本作用****高级用法与注意事项** git add命令的作用 引言: 在Git分布式版本控制系统中,git add命令扮演着至关重要的角色,它是将本地工作区的文件变动整合进版本控制流程的关键步骤。本文旨…

使用docker搭建GitLab个人开发项目私服

一、安装docker 1.更新系统 dnf update # 最后出现这个标识就说明更新系统成功 Complete!2.添加docker源 dnf config-manager --add-repohttps://download.docker.com/linux/centos/docker-ce.repo # 最后出现这个标识就说明添加成功 Adding repo from: https://download.…

ConcurrentHashMap 源码分析(一)

一、简述 本文对 ConcurrentHashMap#put() 源码进行分析。 二、源码概览 public V put(K key, V value) {return putVal(key, value, false); }上面是 ConcurrentHashMap#put() 的源码,我们可以看出其核心逻辑在 putVal() 方法中。 final V putVal(K key, V val…

在centos系统中使用boost库

打开MobaXterm软件 下载 boost_1_85_0.tar.gz tar -zxvf boost_1_85_0.tar.gz解压缩成boost_1_85_0文件夹 双击arrayDemo.cpp 在里面可以编写代码 arrayDemo.cpp #include <boost/timer/timer.hpp> #include <boost/array.hpp> #include <cmath> #inc…

Redis中的Lua脚本(六)

Lua脚本 清空repl_scriptcache_dict字典 每当主服务器添加一个新的从服务器时&#xff0c;主服务器都会清空自己的repl_scriptcache_dict字典&#xff0c;这是因为随着新从服务器的出现&#xff0c;repl_scriptcache_字典里面记录的脚本已经不再被所有从服务器载入过&#xf…

天梯赛 L2-052 吉利矩阵

//r[n]:当前第几列的值。 //l[n]:当前第几行的值。 暴力减止 #include<bits/stdc.h> using namespace std; #define int long long const int n1e3; int a,b,c,l[n],r[n],an; void dfs(int x,int y) {if(xb1){an;return ;}for(int i0;i<a;i){l[x]i;r[y]i;if(l[x]&l…

【001_音频开发-基础篇-专业术语】

001_音频开发-基础篇-专业术语 文章目录 001_音频开发-基础篇-专业术语创作背景术语表常见音源HDMI相关声音系统立体声2.1 声音系统5.1 环绕声系统5.1.2 环绕声系统7.1 环绕声系统7.1.4 环绕声系统9.1.4 环绕声系统 音质等级定义QQ音乐网易云音乐 创作背景 学历代表过去、能力…

ubuntu安装QEMU

qemu虚拟机的使用&#xff08;一&#xff09;——ubuntu20.4安装QEMU_ubuntu安装qemu-CSDN博客 遇到的问题&#xff1a; (1)本来使用git clone https://github.com/qemu/qemu.git fatal: 无法访问 https://github.com/qemu/qemu.git/&#xff1a;GnuTLS recv error (-110): …

IoT、IIoT、AIoT的区别是什么?

一、IoT、IIoT、AIoT的区别是什么&#xff1f; IoT、IIoT和AIoT都是物联网&#xff08;Internet of Things&#xff09;的不同应用和发展方向&#xff0c;但它们之间存在一些区别。 IoT&#xff08;物联网&#xff09;&#xff1a;物联网是指通过互联网连接各种物理设备&#x…

密码学 | 数字证书:应用

&#x1f951;原文&#xff1a;数字签名和数字证书的原理解读 - 知乎 &#x1f951;前文&#xff1a;密码学 | 数字签名 数字证书 - CSDN &#x1f951;提示&#xff1a;把客户端想成 Alice&#xff0c;服务器端想成 Bob 即可。客户端实际上指的是客户端浏览器。 下面&#…

openGauss学习笔记-267 openGauss性能调优-TPCC性能调优测试指导-网络配置-网卡多中断队列设置

文章目录 openGauss学习笔记-267 openGauss性能调优-TPCC性能调优测试指导-网络配置-网卡多中断队列设置267.1 操作步骤 openGauss学习笔记-267 openGauss性能调优-TPCC性能调优测试指导-网络配置-网卡多中断队列设置 本章节主要介绍openGauss数据库内核基于鲲鹏服务器和openE…

目标检测网络YOLO进化之旅

yolo系列网络在目标检测领域取得了巨大的成功&#xff0c; 尤其是在工程实践中&#xff0c; 以其出色的性能优势获得了广泛的应用落地。 YOLO的前3个版本是由同一个作者团队出品&#xff0c; 算是官方版本。 之后的版本都是各个研究团队自己改进的版本&#xff0c; 之间并无明…

微软如何打造数字零售力航母系列科普01 --- Azure顾问(AZURE Advisor)简介

Azure顾问&#xff08;AZURE Advisor&#xff09;简介 目录 一、什么是AZURE顾问&#xff08;AZURE Advisor&#xff09;&#xff1f; 二、常见问题 三、接下来的步骤 一、什么是AZURE顾问&#xff1f; AZURE顾问是一种数字云助手&#xff0c;可帮助您遵循最佳实践来优化Az…

设计模式——2_A 访问者(Visitor)

文章目录 定义图纸一个例子&#xff1a;如何给好奇宝宝提供他想知道的内容菜单、菜品和配方Menu(菜单) & Cuisine(菜品)Material(物料、食材) 产地、有机蔬菜和卡路里Cuisine & Material 访问者VisitorCuisine & Material 碎碎念访问者和双分派访问者和代理写在最后…

初学者如何选择ARM开发硬件?

1&#xff0e; 如果你有做硬件和单片机的经验,建议自己做个最小系统板&#xff1a;假如你从没有做过ARM的开发&#xff0c;建议你一开始不要贪大求全&#xff0c;把所有的应用都做好&#xff0c;因为ARM的启动方式和dsp或单片机有所不同&#xff0c;往往会碰到各种问题&#xf…

设计模式-创建型-抽象工厂模式-Abstract Factory

UML类图 工厂接口类 public interface ProductFactory {Phone phoneProduct();//生产手机Router routerProduct();//生产路由器 } 小米工厂实现类 public class XiaomiFactoryImpl implements ProductFactory {Overridepublic Phone phoneProduct() {return new XiaomiPhone…