Kafka源码分析之Producer数据发送流程(四)

概述

书接上回的producer发送流程,在准备工作完成后,kafka的producer借助SenderKafkaClient两大组件完成了数据的发送。其底层封装了java的NIO的组件channle以及selector,对于NIO组件不太熟悉的同学可以自行查询相关文档。

下面我整理了kafka发送数据的主要流程,下面是流程图:

下面根据上面的流程图一步一步剖析kafka的源码。

源码分析

接上第一节的producer主流程,这里代码唤醒sender完成消息发送。并且producer的所有和kafka服务端交互的都是在sender中完成的。

     //6.发送消息。
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }

 sender实现了Runnable接口,所以只需要看run方法:

//一次启动,一直运行(除非关闭)    
public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        while (running) {
            try {
                runOnce();
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
.....



   void runOnce() {

        long currentTimeMs = time.milliseconds();
        //发送请求
        long pollTimeout = sendProducerData(currentTimeMs);
       //poll NIO事件
        client.poll(pollTimeout, currentTimeMs);
    }

进入sendProducerData方法,下面标注了主要的流程:

 private long sendProducerData(long now) {
        //1.获取元数据
        Cluster cluster = metadata.fetch();
        //2.获取到发送消息的主机leader
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (!result.unknownLeaderTopics.isEmpty()) {
            // The set of topics with unknown leader contains topics with leader election pending as well as
            // topics which may have expired. Add the topic again to metadata to ensure it is included
            // and request metadata update, since there are messages to send to the topic.
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic, now);

            log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
                result.unknownLeaderTopics);
            this.metadata.requestUpdate();
        }

        // remove any nodes we aren't ready to send to
        //3.遍历主机
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                // 4.移除发送消息的服务器
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }

        // create produce requests
        // 5.合并发往同一主机的请求,第一次请求batches为空
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
        addToInflightBatches(batches);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }
        // 6.超时
        accumulator.resetNextBatchExpiryTime();
        List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
        expiredBatches.addAll(expiredInflightBatches);

        // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
        // for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why
        // we need to reset the producer id here.
        if (!expiredBatches.isEmpty())
            log.trace("Expired {} batches in accumulator", expiredBatches.size());
        for (ProducerBatch expiredBatch : expiredBatches) {
            String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
                + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
            failBatch(expiredBatch, new TimeoutException(errorMessage), false);
            if (transactionManager != null && expiredBatch.inRetry()) {
                // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                transactionManager.markSequenceUnresolved(expiredBatch);
            }
        }
        sensors.updateProduceRequestMetrics(batches);

        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
        // time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
        // sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
        // that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
        pollTimeout = Math.max(pollTimeout, 0);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            // if some partitions are already ready to be sent, the select time would be 0;
            // otherwise if some partition already has some data accumulated but not ready yet,
            // the select time will be the time difference between now and its linger expiry time;
            // otherwise the select time will be the time difference between now and the metadata expiry time;
            pollTimeout = 0;
        }
       //7.发送数据
        sendProduceRequests(batches, now);
        return pollTimeout;
    }

第一次请求时会走到 if (!this.client.ready(node, now)),然后检查网络连接是否准备好,然后通过代理的NIO Selector初始化网络连接,并注册channel监听事件:

registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
    public boolean ready(Node node, long now) {
        if (node.isEmpty())
            throw new IllegalArgumentException("Cannot connect to empty node " + node);
        // 网络是否连接好
        if (isReady(node, now))
            return true;
        //是否可以建立网络连接
        if (connectionStates.canConnect(node.idString(), now))
            // if we are interested in sending to a node and we don't have a connection to it, initiate one
            // 初始化连接
            initiateConnect(node, now);

        return false;
    }


  private void initiateConnect(Node node, long now) {
        String nodeConnectionId = node.idString();
        try {
            connectionStates.connecting(nodeConnectionId, now, node.host());
            InetAddress address = connectionStates.currentAddress(nodeConnectionId);
            log.debug("Initiating connection to node {} using address {}", node, address);    
            // 封装了NIO的selector,相当于加了一层代理
            selector.connect(nodeConnectionId,
                    new InetSocketAddress(address, node.port()),
                    this.socketSendBuffer,
                    this.socketReceiveBuffer);
        } catch (IOException e) {
            log.warn("Error connecting to node {}", node, e);
            // Attempt failed, we'll try again after the backoff
            connectionStates.disconnected(nodeConnectionId, now);
            // Notify metadata updater of the connection failure
            metadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());
        }
    }

网络连接 建立完成回到主流程,此时返回ready方法翻译false,移除所有发送消息服务器。

   if (!this.client.ready(node, now)) {
                // 4.移除发送消息的服务器
                iter.remove();

 这时将获取不到批次,退回到runOnce方法中。然后进入

client.poll(pollTimeout, currentTimeMs)方法。
这里就到了网络请求处理组件NetworkClient了。

核心方法:this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));

在进入Selector里面去,这里核心方法是:

pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);

这里监听了各类事件,然后对不同事件进行了处理。

通过TransportLayer代理对象,最终调用NIO的socketChannel完成网络连接,并且通过取反或的方式移除了连接事件,注册读取数据事件。

   public boolean finishConnect() throws IOException {
        //完成网络连接
        boolean connected = socketChannel.finishConnect();
        if (connected)
            // 移除OP_CONNECT,添加OP_READ事件
            key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
        return connected;
    }

然后while再次循环,进入sendProducerData方法。

 下面是发送请求数据方法流程:

sendProduceRequests(batches, now); >> 
client.send(clientRequest, now); >>
doSend(request, false, now); >>
doSend(clientRequest, isInternalRequest, now, builder.build(version)); >>

讲发送的数据封装到inFlightRequest中,然后调用selector发送请求。

 //最大容忍多少个请求没有响应,默认是5个
 private final InFlightRequests inFlightRequests;
 
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
        String destination = clientRequest.destination();
        RequestHeader header = clientRequest.makeHeader(request.version());
        if (log.isDebugEnabled()) {
            log.debug("Sending {} request with header {} and timeout {} to node {}: {}",
                clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request);
        }
        Send send = request.toSend(header);
        InFlightRequest inFlightRequest = new InFlightRequest(
                clientRequest,
                header,
                isInternalRequest,
                request,
                send,
                now);
        this.inFlightRequests.add(inFlightRequest);
        selector.send(new NetworkSend(clientRequest.destination(), send));
    }

这里最后给selector绑定了读取数据的事件:

    public void send(NetworkSend send) {
        String connectionId = send.destinationId();
        KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
        if (closingChannels.containsKey(connectionId)) {
            // ensure notification via `disconnected`, leave channel in the state in which closing was triggered
            this.failedSends.add(connectionId);
        } else {
            try {
               //核心方法
                channel.setSend(send);
            } 
            ...
      
        }
    }


public void setSend(NetworkSend send) {
        if (this.send != null)
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
        this.send = send;
       //绑定读取数据的事件
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
    }

 public void addInterestOps(int ops) {
        key.interestOps(key.interestOps() | ops);

    }

然后又走到 client的poll方法,然后selector写数据:

// 写请求数据
attemptWrite(key, channel, nowNanos);
//移除写事件
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

最后就是读取服务器的响应数据:

//读取服务端事件,之前已经注册了读事件
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasCompletedReceive(channel)
        && !explicitlyMutedChannels.contains(channel)) {
    attemptRead(channel);
}

读取数据时kafka采用了4字节标识数据长度来避免粘包黏包的问题:

 通过下面代码保证必须读取到完整的数据才会返回。 

     //4字节表示数据大小
        if (size.hasRemaining()) {
            int bytesRead = channel.read(size);
            if (bytesRead < 0)
                throw new EOFException();
            read += bytesRead;
            if (!size.hasRemaining()) {
                size.rewind();
                //4字节
                int receiveSize = size.getInt();
                if (receiveSize < 0)
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
                if (maxSize != UNLIMITED && receiveSize > maxSize)
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
                requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL)
                if (receiveSize == 0) {
                    buffer = EMPTY_BUFFER;
                }
            }
        }

 public boolean complete() {
        //是否读完
        return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/12136.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

gnome换回纵向切换工作区

效果&#xff1a; 思路 最新的debian / ubuntu中用的gnome 4.x&#xff0c;工作区切换变成了左右切换&#xff0c;习惯了上下&#xff0c;真的很不舒服。 而且优化选项里也把设置开关取消掉了&#xff0c;解决方案是使用Vertical overview这个扩展&#xff1a; ## 安装扩展管…

「Bug」OpenCV读取图像为 None 分析

头一次遇到 OpenCV 无法读取图像&#xff0c;并且没有任何提示&#xff0c;首先怀疑的就是中文路径&#xff0c;因为大概率是这个地方出错的&#xff0c;但是修改完依旧是None&#xff0c;这就很苦恼了&#xff0c;分析了下出现None的原因&#xff0c;大概有以下三种情况&#…

docker安装redis

首先到dockerhub搜索redis docker pull redis docker pull redis准备redis的配置文件,因为需要redis的配置文件,这里最好去redis中文官方网站去下载一个redis,使用里面的配置文件即可. 我使用的是redis4.0.11中的配置文件 修改redis.conf配置文件 主要修改的位置如下 # bin…

如何在电脑本地下载镜像重装系统

现在网上随处可以下载操作系统&#xff0c;下载下来的是镜像系统&#xff0c;很多朋友都不知道电脑镜像重装系统是什么意思&#xff0c;怎么用镜像重装系统&#xff0c;今天小编就给大家带来了电脑镜像重装系统是什么意思的相关解答&#xff0c;一起往下看。 电脑镜像重装系统是…

react项目中自定义一个markdown编辑器

Markdown 是一种轻量级标记语言。 Markdown是一种简单的格式化文本的方法&#xff0c;在任何设备上看起来都很棒。它不会做任何花哨的事情&#xff0c;比如改变字体大小、颜色或类型——只是基本的&#xff0c;使用你已经知道的键盘符号。 它还允许人们使用易读易写的纯文本格…

UE5.1.1创建C++工程失败解决办法

闲来无事&#xff0c;更新了一下UE5.1.1&#xff0c;妈蛋创建C项目居然失败&#xff0c; 错误截图如下&#xff1a; 妈蛋&#xff0c;后面一堆乱码&#xff0c;鬼知道是啥错误&#xff01; 咋解决&#xff1f;步步高打火机&#xff0c;直接复制第一段的Running后面的代码到cmd…

【Linux系统管理进程,运行,挂起,杀死进程和crontab计划任务表的使用以及实验的心得体会】

实验 &#xff08;1&#xff09;显示本用户的进程&#xff0c;重定向到file1 top命令如果不加限制&#xff0c;默认是查看所有用户的进程情况top -u [用户名] 可以查看该用户名的所有进程 &#xff08;2&#xff09;显示本用户所有进程&#xff0c;重定向到file2 top命令如果…

打造智慧医疗新生态:互联网医院系统源码分析

在数字化时代&#xff0c;医疗行业也在不断地探索新的模式和方法&#xff0c;以更好地服务于人民群众。互联网医院系统作为一种新型医疗服务模式&#xff0c;受到了广泛的关注和热议。下文&#xff0c;小编将为大家介绍互联网医院系统的概念、特点以及如何利用互联网医院系统源…

【JAVAEE】网络原理之网络发展史

目录 &#x1f381;1. 独立模式 &#x1f383;2. 网络互连 &#x1f388;2.1 局域网 LAN ✨2.1.1 基于网线直连 &#x1f451;2.2.2 基于集线器组建 &#x1f48b;2.2.3 基于交换机组建 &#x1f457;2.2.4 基于交换机与路由器组建 &#x1f388;2.2 广域网 21世纪是一…

香橙派4LTS和树莓派4B构建K8S集群实践之一:K8S安装

目录 1. 说明 1.1 软硬件环境 1.2 设计目标 2 实现 2.1 准备工作 - 香橙派 (k8s-master-1) - 树莓派 (k8s-node-1) - 两派都要干的事 2.2 containerd 安装与设置 2.3 安装 3 遇到的问题 3.1 k8s-master-1 3.2 k8s-node-1 4 相关命令 5 Tips 6 参考 1. 说明 …

反向代理自建教程:你懂的

一、为什么需要自建反代 OpenAI提供了两种访问方式&#xff0c;一种是直接在ChatGPT网页端使用的Access Token方式&#xff0c;这种方式可以免费使用GPT-3.5模型&#xff0c;只需要登录即可使用。但缺点是不稳定&#xff0c;且无法扩展。另一种是使用API&#xff0c;注册用户可…

SpringBoot自动装配原理(附面试快速答法)

文章目录SpringBoot自动装配原理1. 从调用SpringApplication构造器方法开始2. 解析启动类4.按需装配4.1 分析dubbo自动装配5. 如果定义自己的starter6. 面试答法SpringBoot自动装配原理 之前面试被问到这个题目&#xff0c;只会答一些spi、AutoConfigration注解、Import之类的&…

询问ChatGPT的高质量答案艺术——提示工程指南(更新中……)

目录前言一、提示工程简介二、提示技巧2-1、生成法律文件2-2、添加提示技巧三、角色扮演3-1、智能手机产品描述3-2、添加角色扮演四、标准提示4-1、写一篇有关于新智能手机的评论4-2、添加标准提示、角色提示、种子词提示等等五、示例很少、或者没有示例5-1、生成一个手机配置六…

机器学习中的数学原理——过拟合、正则化与惩罚函数

通过这篇博客&#xff0c;你将清晰的明白什么是过拟合、正则化、惩罚函数。这个专栏名为白话机器学习中数学学习笔记&#xff0c;主要是用来分享一下我在 机器学习中的学习笔记及一些感悟&#xff0c;也希望对你的学习有帮助哦&#xff01;感兴趣的小伙伴欢迎私信或者评论区留言…

Docker系列 基于OpenAI API自建ChatGPT

转自我的博客文章https://blognas.hwb0307.com/linux/docker/4201&#xff0c;内容更新仅在个人博客可见。欢迎关注&#xff01; 前言 我用帐号/密码使用chatGPT已经有一段时间。但是&#xff0c;我有几个私交较密的朋友&#xff0c;他们并不具备使用chatGPT的条件&#xff1b…

【无功优化】基于多目标差分进化算法的含DG配电网无功优化模型【IEEE33节点】(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

java遍历字符串的方法

在 java中&#xff0c;我们需要遍历字符串&#xff0c;如何遍历呢&#xff1f;首先我们先了解一下遍历的概念&#xff1a; 在我们的计算机中&#xff0c;存储的都是二进制数据&#xff0c;为了方便存储和管理&#xff0c;我们把一段数据分成多个字符串。在 java中&#xff0c;遍…

网络IO(non-blocking io)基础

BIO&#xff08;blocking io&#xff09; 传统的网络io模式&#xff0c;面向流&#xff0c;一个线程对接一个会话&#xff0c;因此高并发时会因线程阻塞而性能低效 Java代码&#xff1a; public class BIO implements Connector {private Integer port 8080;Overridepublic v…

SAP Business Technology Platform (BTP)的架构理解

查资料看到的&#xff0c;转一下&#xff0c;附上链接&#xff1a; SAP Business Technology Platform (BTP)的架构理解 长期以来&#xff0c;我在与客户和伙伴的沟通交流中发现大家依然对SAP业务技术平台 – SAP Business Technology Platform (以下简称BTP)纯有各种疑惑&…

Web 攻防之业务安全:密码找回流程绕过测试.(利用链接跳到后面去)

Web 攻防之业务安全&#xff1a;密码找回流程绕过测试 业务安全是指保护业务系统免受安全威胁的措施或手段。广义的业务安全应包括业务运行的软硬件平台&#xff08;操作系统、数据库&#xff0c;中间件等&#xff09;、业务系统自身&#xff08;软件或设备&#xff09;、业务所…