Netty源码学习7——netty是如何发送数据的

一丶Write事件的产生和传播

在业务逻辑处理完毕后,需要调用write 或者 writeAndFlush方法

  • ChannelHandlerContext#write or writeAndFlush方法会从当前 ChannelHandler 开始在 pipeline 中向前传播 write 事件直到 HeadContext。

  • ChannelHandlerContext.channel()#write or writeAndFlush 方法则会从 pipeline 的尾结点 TailContext 开始在 pipeline 中向前传播 write 事件直到 HeadContext 。

write 方法并不是真将数据写到socket缓存区,而是写道Netty的ChannelOutBoundBuffer中,调用flush方法才会真正调用JDK SockectChannel将数据写入。

如下是pipeline#writeAndFlush,可以看到直接调用TailContext#writeAndFlush进行处理

image-20231203150809488

关键源码如下:

private void write(Object msg, boolean flush, ChannelPromise promise) {
   // 省略 部分
	
    //flush 表示是否需要flush,调用writeAndFlush的时候为true
    // 找到下一个ChannelHandlerContext
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    // 在eventLoop 中
    if (executor.inEventLoop()) {
        // 需要flush 那么调用invokeWriteAndFlush
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        // 在eventLoop 中 那么提交一个WriteTask到eventLoop中
        final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
        if (!safeExecute(executor, task, promise, m, !flush)) {
            task.cancel();
        }
    }
}

可以看到TailContext会通过 findContextOutbound 方法在当前 ChannelHandler 的前边找到 ChannelOutboundHandler 类型并且覆盖实现 write 回调方法的 ChannelHandler 作为下一个要执行的对象。

然后如果当前执行的线程就是EventLoop线程,那么直接调用,反之提交一个异步任务,从而保证执行write的一定是 reactor 线程——保证线程安全性

如下是next.invokeWriteAndFlush的源码

image-20231203152143663

最终事件会传播到HeadContext进行处理(如果中间的ChannelHandler不截胡的话)

图片

二丶write 源码解析

write 事件最终会由HeadContext进行处理

image-20231203152515740

可以看到HeadContext#write其实就是使用Channel的Unsafe#write,其主要逻辑如下

image-20231203153403361

ChannelOutboundBuffer#addMessage

ChannelOutboundBuffer 是 Netty 内部使用的一个数据结构,它用于存储待发送的出站数据。在 Netty 的网络框架中,当需要写数据到网络时,数据并不会立即被发送出去,而是首先被放入一个出站缓冲区中,即 ChannelOutboundBuffer。这个缓冲区负责管理和存储所有待写入通道的数据。

  • 批量发送优化: ChannelOutboundBuffer 允许 Netty 批量地发送数据,而不是每次写操作都立即进行网络发送。这样可以减少系统调用次数,提高网络效率。

  • 流量控制: 它有助于实现流量控制,防止数据发送过快,导致接收方处理不过来。

  • 缓冲区管理: 可以有效地管理内存,当数据被写入网络后,及时释放相应的内存。

  • 异步处理: Netty 是异步事件驱动的框架,使用 ChannelOutboundBuffer 可以将数据发送的异步化,提升处理性能

下面是向ChannelOutboundBuffer写入messge的源码

image-20231203163632745

可与看到ChannelOutboundBuffer会将msg和promise包装为Entry,然后改变tailEntry,flushedEntry,unflushedEntry指针的指向

img

然后incrementPendingOutboundBytes将记录下待写出数据size,如果大于高水位还会触发channelWritabilityChanged事件

image-20231203165712236

channelWritabilityChanged会在pipeline上传播,并触发ChannelInboundHandler#channelWritabilityChanged,我们可以实现此方法调用flush将数据写出

三丶flush源码解析

上面看了write将待发送的数据缓存到ChannelOutboundBuffer中,正真将数据写到SocketChannel中的是flush方法

image-20231203170748620

1.addFlush

此方法只是负责更改flushedEntry 和 unflushedEntry 指针指向

image-20231203171224099

将 flushedEntry 指针指向 unflushedEntry 指针表示的第一个未被 flush 的 Entry 节点。并将 unflushedEntry 指针置为空,准备开始 flush 发送数据流程。

这样在 flushedEntry 与 tailEntry 之间的 Entry 节点即为本次 flush 操作需要发送的数据范围。

public void addFlush() {
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) {
                flushedEntry = entry;
            }
            do {
                flushed ++;
                //如果当前entry对应的write操作被用户取消,则释放msg,并降低channelOutboundBuffer水位线
                if (!entry.promise.setUncancellable()) {
                  
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);

            // All flushed so reset unflushedEntry
            unflushedEntry = null;
        }
    }

2.flush0

image-20231203171814884

可以看到如果注册了write到selector上,那么不会进行flush,

如下是NioSockectChannel发送数据的源码

@Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        //获取jdk nio底层socketChannel
        SocketChannel ch = javaChannel();
        //最大写入次数 默认为16 ,因为EventLoop可能单线程处理多Channel,需要雨露均沾
        int writeSpinCount = config().getWriteSpinCount();
        do {
            if (in.isEmpty()) {
                // 如果全部数据已经写完 则移除OP_WRITE事件并直接退出writeLoop
                clearOpWrite();             
                return;
            }

            // 获取单次发送最大字节数
            int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
            //Netty的DirectBuffer底层就是JDK的DirectByteBuffer
            // 将ChannelOutboundBuffer中缓存的DirectBuffer转换成JDK的ByteBuffer,
            ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
            // ChannelOutboundBuffer中总共的DirectBuffer数
            int nioBufferCnt = in.nioBufferCount();

            switch (nioBufferCnt) {
                    // 真正进行发送
               //java.nio.channels.SocketChannel#write(java.nio.ByteBuffer)进行写回
            }
        } while (writeSpinCount > 0);
        // 处理Socket可写但已经写满16次还没写完的情况
     incompleteWrite(writeSpinCount < 0);
    }

可以看到

  • 如果数据全部写完了,会调用clearOpWrite清除当前 Channel 在 Reactor 上注册的 OP_WRITE 事件

image-20231203173230345

这意味着,不需要再监听write来触发flush了

  • 写入的过程会写入多次,并控制自旋次数,做到雨露均沾

image-20231203173115191

如上是写入的过程

  • 如果ByteBuffer个数为0,说明发送的是FileRegion 类型,case 0的分支主要就是用于处理网络文件传输的情况

image-20231203173837962

  • case1 和 default则调用jdk SocketChannel#write进行数据发送,如果写入的数据小于等于0,说明当前Socket发送缓冲区满了写不进去了,则注册OP_WRITE事件,等待Socket发送缓冲区可写时再写

image-20231203173938710

触发Write后,再Sockect写缓冲区可写后,会触发对应事件,即可再NioEventLoop中进行处理,如下图中会直接调用forceFlush

image-20231203174154759

  • 完成发送会调用adjustMaxBytesPerGatheringWrite进行调整

两个分支分别表示

  • 期望写入和真正写入的相等,说明数据能全部写入到 Socket 的写缓冲区中了,那么下次 write loop 就应该尝试去写入更多的数据。

  • 本次写入的数量x2>maxBytesPerGatheringWrite 说明要写的数据很多,那么更新为本次 write loop 两倍的写入量大小

  • 如果本次写入的数据还不及尝试写入数据的一半,说明Socket写缓冲区容量不多了,尝试缩容为一半

  • 处理

protected final void incompleteWrite(boolean setOpWrite) {
    
        if (setOpWrite) {
            //socket缓冲区已满写不进去的情况 注册write事件
            setOpWrite();
        } else {
            //处理socket缓冲区依然可写,但是写了16次还没写完,提交flushTask异步写
            clearOpWrite();
            eventLoop().execute(flushTask);

        }

四丶总结

这一节中我们学习了netty写入数据的流程,写入数据时出站事件,一般最终将有HeadContext进行处理

  • write方法将写入的数据转换为DirectByteBuf包装到ChannelOutboundBuffer中,并且记录了对应的Promise实现异步驱动,还可以减少系统调用

  • flush方法,调用jdk SocketChannel#write进行写入,使用自旋次数控制,让多个Channel的处理得到平衡,如果Socket 缓冲区满无法在继续写入那么会OP_WRITE 事件,等 Socket 缓冲区变的可写时,epoll 通知 EventLoop线程继续发送。

  • Socket 缓冲区可写,写满 16 次但依然没有写完,这时候注册异步任务使用EventLoop线程进行异步发送。如果写的时FileRegion类型,那么会使用transferTo进行零拷贝写入。

文章转载自:Cuzzz

原文链接:https://www.cnblogs.com/cuzzz/p/17873524.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/234590.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Leetcode100 链表|2. 两数相加160. 相交链表 234. 回文链表

2. 两数相加 题目&#xff1a;给你两个非空的链表&#xff0c;表示两个非负的整数。它们每位数字都是按照逆序的方式存储的&#xff0c;并且每个节点只能存储一位数字。 请你将两个数相加&#xff0c;并以相同形式返回一个表示和的链表。 你可以假设除了数字 0 之外&#xff0…

JAVA高级(后端需深入移步)

单元测试&#xff1a;使用Junit单元测试框架 使用Junit单元测试&#xff1a; 通过左侧的对❌来进行提示 Junit框架的常见注解&#xff1a; 反射&#xff08;用于框架&#xff0c;也是最重要&#xff09;&#xff1a;展示框架的成员信息 由于是用于对象&#xff0c;即使在获取…

【Java期末复习资料】(2)常见例题 //持续更新

本文章主要是常见例题&#xff0c;解析不会太详细&#xff0c;有问题、不会的可以给我发消息哦&#xff0c;后续会出模拟卷 常见例题&#xff1a; 1.下列跟Java技术平台有关的是&#xff08;ABD&#xff09; A.JVM B.JDK C.JPN D.JRE 2.面向对象的特征包括&#xff08;ACD&…

m.2固态硬盘怎么选择?

一、什么是固态硬盘 固态硬盘又称SSD&#xff0c;是Solid State Drive的简称&#xff0c;由于采用了闪存技术&#xff0c;其处理速度远远超过传统的机械硬盘&#xff0c;这主要是因为固态硬盘的数据以电子的方式存储在闪存芯片中&#xff0c;不需要像机械硬盘那样通过磁头读写磁…

五:爬虫-数据解析之xpath解析

三&#xff1a;数据解析之xpath解析 1.xpath介绍&#xff1a; ​ xpath是XML路径语言&#xff0c;它可以用来确定xml文档中的元素位置&#xff0c;通过元素路径来完成对元素的查找&#xff0c;HTML就是XML的一种实现方式&#xff0c;所以xpath是一种非常强大的定位方式​ XPa…

LeetCode5.最长回文子串

昨天和之前打比赛的队友聊天&#xff0c;他说他面百度面到这道算法题&#xff0c;然后他用暴力法解的&#xff0c;面试官让他优化他没优化出来&#xff0c;这道题我之前没写过&#xff0c;我就想看看我能不能用效率高一点的方法把它做出来&#xff0c;我一开始就在想用递归或者…

FreeSSL申请免费域名证书

本文详细讲解如何申请免费证书&#xff0c;需要先准备好域名&#xff0c;将服务器IP和域名绑定。 1、注册FreeSSL账号 网址&#xff1a; https://freessl.org/ 2、申请流程 登录后首页输入域名&#xff0c;然后点击Create certificate&#xff0c;跳转到证书申请页面。 或者…

使用最小花费爬楼梯

1.状态表示 2.状态转移方程 3.初始化 保证填表时&#xff0c; 不越界 4.填表顺序 从左往右 5.返回值 解法2&#xff1a; 1.状态表示 2.状态转移方程 3.初始化 4.填表 从右往左 5.返回值 min( dp[0] , dp[1] ) ----------------------------------------------------…

基于TCP的多路复用

1. 知识点 目前支持I/O多路复用的系统调用有select&#xff0c;pselect&#xff0c;poll&#xff0c;epoll。与多进程和多线程技术相 比&#xff0c;I/O多路复用技术的最大优势是系统开销小&#xff0c;系统不必创建进程/线程&#xff0c;也不必维护这些进 程/线程&#xff0c;…

Spring Boot中的事务是如何实现的?懂吗?

SpringBoot中的事务管理&#xff0c;用得好&#xff0c;能确保数据的一致性和完整性&#xff1b;用得不好&#xff0c;可能会给性能带来不小的影响哦。 基本使用 在SpringBoot中&#xff0c;事务的使用非常简洁。首先&#xff0c;得感谢Spring框架提供的Transactional注解&am…

Proteus仿真--串口发送数据到2片8×8点阵屏滚动显示

本文介绍2片88点阵屏滚动显示设计&#xff08;完整仿真源文件及代码见文末链接&#xff09; 仿真图如下 仿真运行视频 Proteus仿真--1602LCD显示电话拨号键盘按键实验&#xff08;仿真文件程序&#xff09; 附完整Proteus仿真资料代码资料 链接&#xff1a;https://pan.baidu…

Leetcode每日一题

https://leetcode.cn/problems/binary-tree-preorder-traversal/ 这道题目需要我们自行进行创建一个数组&#xff0c;题目也给出我们需要自己malloc一个数组来存放&#xff0c;这样能达到我们遍历的效果&#xff0c;我们来看看他的接口函数给的是什么。 可以看到的是这个接口函…

qt可以详细写的项目或技术

1.QT 图形视图框架 2.QT 模型视图结构 3.QT列表显示大量信息 4.QT播放器 5.QT 编解码 6.QT opencv

2023北京智慧城市与电气高峰论坛-安科瑞 蒋静

2023年7月27日&#xff0c;北京土木建筑学会电气设计委员会、北京电气设计技术协作及情报交流网联合举办的“北京电气设计第43届年会”在京盛大召开。安科瑞作为企业微电网能效管理平台服务商与广大同仁共聚本次盛会&#xff0c;尽享技术盛宴。 本次会议采用线上线下相结合&…

【C++】:红黑树

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家解读一下有关多态的知识点&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入门到精通 数据结…

12.视图

目录 1.视图的含义与作用 2.视图的创建与查看 1.创建视图的语法形式 2、查看视图&#xff1a; 1.使用DESCRIBE语句查看视图基本信息 2.使用SHOW TABLE STATUS语查看视图基本信息查看视图的信息 3.使用SHOW CREATE VIEW语查看视图详细信息 4.在views表中查看视图详细信息…

【合集】SpringBoot——Spring,SpringBoot,SpringCloud相关的博客文章合集

前言 本篇博客是spring相关的博客文章合集&#xff0c;内容涵盖Spring&#xff0c;SpringBoot&#xff0c;SpringCloud相关的知识&#xff0c;包括了基础的内容&#xff0c;比如核心容器&#xff0c;springMVC&#xff0c;Data Access&#xff1b;也包括Spring进阶的相关知识&…

智能优化算法应用:基于正余弦算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于正余弦算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于正余弦算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.正余弦算法4.实验参数设定5.算法结果6.参考文…

【数电笔记】54-或非门构成的基本RS触发器

目录 说明&#xff1a; 1. 电路组成 2. 逻辑功能 3. 特性表 4. 特性方程 5. 例题 6. 两种基本RS触发器的形式比 说明&#xff1a; 笔记配套视频来源&#xff1a;B站&#xff1b;本系列笔记并未记录所有章节&#xff0c;只对个人认为重要章节做了笔记&#xff1b;标题前…

NAND闪存价格暴涨:512GB芯片翻倍,256GB涨幅达55%

此前&#xff0c;根据Trendforce的信息&#xff0c;今年第四季度NAND的合约价预计上涨8-13%&#xff0c;其中Wafer上涨13-18%。 根据DRAMeXchange最新的数据表明&#xff0c;之前预测的数据还是太保守了&#xff0c;过去一年Wafer NAND价格如下图&#xff1a; DRAM/NAND价格近几…