Netty Review - 底层零拷贝源码解析

文章目录

  • Pre
  • 概述
  • 源码解析
    • 入口索引
    • AbstractNioByteChannel.NioByteUnsafe#read
    • allocHandle.allocate(allocator)
  • 小结
  • 传统的零拷贝

在这里插入图片描述

在这里插入图片描述


Pre

Netty Review - 直接内存的应用及源码分析


概述

Netty 的零拷贝技术是通过优化数据传输过程中的数据复制操作,以降低系统的开销和提高性能。

其原理主要涉及以下几个方面:

  1. 使用直接内存: Netty 利用 Java NIO 中的 ByteBuffer.allocateDirect() 方法来分配直接内存,直接内存的特点是可以直接被操作系统所管理,不受 Java 堆内存大小的限制,而且可以直接与操作系统进行数据交互,避免了数据在 Java 堆内存和操作系统之间的拷贝。

  2. 文件传输零拷贝: 在进行文件传输时,Netty 可以通过操作系统提供的零拷贝技术,直接将文件内容从磁盘读取到内核缓冲区,然后通过 DMA(Direct Memory Access)技术将数据直接传输到网络通道,避免了数据在用户空间和内核空间之间的拷贝。

  3. 内存池: Netty 使用内存池来管理直接内存的分配和释放,避免了频繁地申请和释放内存的开销,提高了内存的重复利用率。

  4. CompositeByteBuf: Netty 提供了 CompositeByteBuf 类来实现多个 ByteBuf 的组合,可以将多个缓冲区的内容合并为一个逻辑上的缓冲区,避免了数据在多个缓冲区之间的拷贝。

  5. 传输过程中的零拷贝: 在网络传输过程中,Netty 利用零拷贝技术将数据从应用程序的缓冲区直接传输到操作系统的网络缓冲区,避免了数据在用户空间和内核空间之间的拷贝,同时可以利用 scatter/gather I/O 操作一次性传输多个缓冲区的数据。

通过以上方式,Netty 实现了数据传输过程中的零拷贝,大大提高了系统的性能和吞吐量,特别是在高并发、大数据量的网络应用场景下,可以显著地降低系统的资源消耗和延迟。

在这里插入图片描述


源码解析

入口索引

结合我们的Netty线程模型源码图 ,找到入口 。

在这里插入图片描述

AbstractNioByteChannel.NioByteUnsafe#read

这段代码是 Netty 中的 read() 方法实现,用于从通道中读取数据并触发相应的事件到 ChannelPipeline 中。

@Override
public final void read() {
    final ChannelConfig config = config();  // 获取通道配置信息
    if (shouldBreakReadReady(config)) {  // 判断是否应该中断读就绪操作
        clearReadPending();  // 清除读等待标志
        return;
    }
    final ChannelPipeline pipeline = pipeline();  // 获取通道的管道
    final ByteBufAllocator allocator = config.getAllocator();  // 获取分配器
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();  // 获取接收字节缓冲区分配句柄
    allocHandle.reset(config);  // 重置分配句柄状态

    ByteBuf byteBuf = null;  // 字节缓冲区
    boolean close = false;  // 是否关闭标志
    try {
        do {
            byteBuf = allocHandle.allocate(allocator);  // 分配字节缓冲区
            allocHandle.lastBytesRead(doReadBytes(byteBuf));  // 读取数据到缓冲区
            if (allocHandle.lastBytesRead() <= 0) {
                // 如果没有读取到数据
                // 释放缓冲区
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;  // 是否关闭标志
                if (close) {
                    // 如果收到 EOF,表示没有数据可读了
                    readPending = false;  // 清除读等待标志
                }
                break;
            }

            allocHandle.incMessagesRead(1);  // 增加读取消息数
            readPending = false;  // 清除读等待标志
            pipeline.fireChannelRead(byteBuf);  // 触发通道读事件到管道
            byteBuf = null;
        } while (allocHandle.continueReading());  // 继续读取数据,直到不再需要读取为止

        allocHandle.readComplete();  // 读操作完成
        pipeline.fireChannelReadComplete();  // 触发通道读完成事件到管道

        if (close) {
            closeOnRead(pipeline);  // 如果需要关闭通道,执行关闭操作
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);  // 处理读取异常
    } finally {
        // 检查是否有未处理的读等待操作
        // 这可能有两个原因:
        // 1. 用户在 channelRead(...) 方法中调用了 Channel.read() 或 ChannelHandlerContext.read()
        // 2. 用户在 channelReadComplete(...) 方法中调用了 Channel.read() 或 ChannelHandlerContext.read()
        // 详见 https://github.com/netty/netty/issues/2254
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();  // 移除读操作
        }
    }
}

allocHandle.allocate(allocator)

在这里插入图片描述

@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
    return alloc.ioBuffer(guess());
}

在给定的 ByteBufAllocator 上分配一个新的 ByteBuf 实例。

  • return alloc.ioBuffer(guess()): 使用给定的 ByteBufAllocator 对象调用 ioBuffer() 方法来分配一个新的 ByteBuf 实例。guess() 方法用于估算分配的字节数。

该方法的作用是在给定的 ByteBufAllocator 上分配一个新的 ByteBuf 实例,并返回分配的实例。


在这里插入图片描述

alloc.ioBuffer(guess())

@Override
public ByteBuf ioBuffer(int initialCapacity) {
    if (PlatformDependent.hasUnsafe()) { // 检查当前平台是否支持直接内存
        return directBuffer(initialCapacity); // 如果支持直接内存,则调用 directBuffer() 方法创建直接内存的 ByteBuf 实例
    }
    return heapBuffer(initialCapacity); // 如果不支持直接内存,则调用 heapBuffer() 方法创建堆内存的 ByteBuf 实例
}

该方法的作用是根据当前平台是否支持直接内存来选择合适的内存类型(堆内存或直接内存),并根据传入的初始容量参数创建相应类型的 ByteBuf 实例

PlatformDependent.hasUnsafe() ---- true

   @Override
public ByteBuf directBuffer(int initialCapacity) {
    return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY); // 调用重载方法 directBuffer(int initialCapacity, int maxCapacity),传入默认的最大容量值 DEFAULT_MAX_CAPACITY
}

directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);

@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) { // 如果初始容量和最大容量都为0
        return emptyBuf; // 返回一个空的 ByteBuf 实例
    }
    validate(initialCapacity, maxCapacity); // 验证初始容量和最大容量的合法性
    return newDirectBuffer(initialCapacity, maxCapacity); // 创建一个新的直接内存的 ByteBuf 实例
}

在这里插入图片描述

newDirectBuffer(initialCapacity, maxCapacity)

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    // 获取当前线程的线程缓存
    PoolThreadCache cache = threadCache.get();
    // 获取直接内存池
    PoolArena<ByteBuffer> directArena = cache.directArena;

    final ByteBuf buf;
    if (directArena != null) { // 如果直接内存池可用
        // 从直接内存池中分配内存
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else { // 如果直接内存池不可用
        // 使用平台相关的方式创建直接内存的 ByteBuf 实例
        buf = PlatformDependent.hasUnsafe() ?
            UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
            new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }

    // 返回一个包装了泄漏感知器的 ByteBuf 实例
    return toLeakAwareBuffer(buf);
}

directArena.allocate(cache, initialCapacity, maxCapacity);

PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
    // 创建一个新的 PooledByteBuf 实例,其中 maxCapacity 为指定的最大容量
    PooledByteBuf<T> buf = newByteBuf(maxCapacity);
    // 使用指定的线程缓存和请求容量来分配内存给 ByteBuf
    allocate(cache, buf, reqCapacity);
    // 返回分配的 ByteBuf
    return buf;
}

这段代码实现了从线程缓存中分配内存给 ByteBuf,并返回分配的 ByteBuf 实例。


allocate(cache, buf, reqCapacity);

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    // 将请求容量规范化为标准容量
    final int normCapacity = normalizeCapacity(reqCapacity);
    // 如果容量小于页面大小,则分配小块或微型内存
    if (isTinyOrSmall(normCapacity)) {
        int tableIdx;
        PoolSubpage<T>[] table;
        boolean tiny = isTiny(normCapacity);
        if (tiny) { // < 512
            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                // 从缓存中成功分配,则直接返回
                return;
            }
            tableIdx = tinyIdx(normCapacity);
            table = tinySubpagePools;
        } else {
            if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                // 从缓存中成功分配,则直接返回
                return;
            }
            tableIdx = smallIdx(normCapacity);
            table = smallSubpagePools;
        }

        final PoolSubpage<T> head = table[tableIdx];

        // 同步处理双向链表的头部
        synchronized (head) {
            final PoolSubpage<T> s = head.next;
            if (s != head) {
                // 分配内存
                assert s.doNotDestroy && s.elemSize == normCapacity;
                long handle = s.allocate();
                assert handle >= 0;
                s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
                incTinySmallAllocation(tiny);
                return;
            }
        }
        // 没有可用的内存块,则进入分配普通内存块的逻辑
        synchronized (this) {
            allocateNormal(buf, reqCapacity, normCapacity);
        }

        incTinySmallAllocation(tiny);
        return;
    }
    // 大于页面大小的内存分配
    if (normCapacity <= chunkSize) {
        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
            // 从缓存中成功分配,则直接返回
            return;
        }
        synchronized (this) {
            allocateNormal(buf, reqCapacity, normCapacity);
            ++allocationsNormal;
        }
    } else {
        // 大块内存分配
        allocateHuge(buf, reqCapacity);
    }
}

这段代码实现了根据请求的容量大小来分配不同大小的内存块,优先从缓存中分配,如果缓存中没有可用内存,则根据请求的大小分配不同大小的内存块。


小结

Netty的接收和发送ByteBuf采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。

如果使用传统的JVM堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才能写入Socket中。

JVM堆内存的数据是不能直接写入Socket中的。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。

在这里插入图片描述

在传统的Java IO中,使用堆内存(Heap Buffers)进行Socket读写时,数据需要先从堆内存中复制到直接内存(Direct Buffers),然后才能写入Socket。这样就导致了一次缓冲区的内存拷贝。

而在Netty中,采用堆外直接内存(Direct Buffers)进行Socket读写。这样,在数据传输过程中,就不需要进行额外的内存拷贝操作。消息可以直接从直接内存写入Socket中,从而避免了堆内存到直接内存的二次拷贝,提高了数据传输的效率。

使用堆外直接内存的优点包括:

  1. 减少了内存拷贝次数:消息可以直接从直接内存写入Socket中,避免了额外的内存拷贝操作,提高了数据传输的效率。
  2. 提高了IO性能:由于减少了内存拷贝操作,可以降低CPU的开销,提高IO性能。
  3. 更好地利用操作系统资源:堆外直接内存是由操作系统直接管理的,不受Java堆大小的限制,可以更好地利用操作系统的资源。

总的来说,Netty使用堆外直接内存进行Socket读写可以提高IO性能,并降低系统资源的开销,是一种更高效的IO模型。


传统的零拷贝

在这里插入图片描述

戳这里

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/389986.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java 基于 SpringBoot+Vue 的酒店管理系统,附源码

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…

Java微服务架构的选择:Spring Cloud、Kubernetes还是Kubernetes + Istio?

微服务架构已经成为现代软件开发的趋势&#xff0c;其可以带来高度可伸缩性、松耦合性和团队自治性等优势。 在Java开发领域中&#xff0c;选择适合的微服务架构是非常关键的决策&#xff0c;本文将探讨Spring Cloud、Kubernetes和KubernetesIstio这三个架构选择的优势和劣势。…

抽象的前端

问题背景&#xff1a;vue3&#xff0c;axios 直接导致问题&#xff1a;路由渲染失败 问题报错&#xff1a;Uncaught SyntaxError: The requested module /node_modules/.vite/deps/axios.js?v7bee3286 does not provide an export named post (at LoginIn.vue:16:9) 引入组…

[NSSRound#16 Basic]Web

1.RCE但是没有完全RCE 显示md5强比较&#xff0c;然后md5_3随便传 md5_1M%C9h%FF%0E%E3%5C%20%95r%D4w%7Br%15%87%D3o%A7%B2%1B%DCV%B7J%3D%C0x%3E%7B%95%18%AF%BF%A2%00%A8%28K%F3n%8EKU%B3_Bu%93%D8Igm%A0%D1U%5D%83%60%FB_%07%FE%A2&md5_2M%C9h%FF%0E%E3%5C%20%95r%D4w…

Spring AOP的实现方式

AOP基本概念 Spring框架的两大核心&#xff1a;IoC和AOP AOP&#xff1a;Aspect Oriented Programming&#xff08;面向切面编程&#xff09; AOP是一种思想&#xff0c;是对某一类事情的集中处理 面向切面编程&#xff1a;切面就是指某一类特定的问题&#xff0c;所以AOP可…

CMake进行C/C++与汇编混合编程

1. 前提 这篇文章记录一下怎么用CMake进行项目管理, 并用C/C和汇编进行混合编程, 为了使用这项技术, 必须在VS的环境中安装好cmake组件 由于大部分人不会使用C/C与汇编进行混合编程的情况。所以这篇文章并不适用于绝大部分人不会对其中具体细节进行过多叙述。只是做一些简单的…

Java的集合框架和泛型

文章目录 集合框架什么是集合框架类和接口总览 集合框架的重要性背后所涉及的数据结构以及算法什么是数据结构容器背后对应的数据结构什么是算法 包装类基本数据类型和对应的包装类装箱和拆箱自动装箱和自动拆箱 泛型什么是泛型引出泛型语法泛型类泛型的上界(没有下界)泛型方法…

知识图谱:py2neo导入周杰伦歌单csv文件

文章目录 py2neo导入csv文件py2neo导入周杰伦歌单csv效果展示 py2neo导入csv文件 之前写的知识图谱指南 知识图谱&#xff1a;py2neo将csv文件导入neo4j 因为没有区分不同实体entity的类型&#xff0c;所以颜色相同&#xff0c;无法相互区分歌手、歌曲还是专辑等等。 py2ne…

Linux下的自动化任务与计划任务:让你的系统更智能

在日常的Linux系统管理中&#xff0c;你是否经常需要定时执行某些任务&#xff0c;或者希望在系统启动时自动运行某些脚本&#xff1f;如果是的话&#xff0c;那么自动化任务和计划任务将是你的得力助手。它们可以帮助你提高系统效率、减少人工干预&#xff0c;并确保任务能够按…

绿色化 数据库 MongoDB 和 mysql 安装

绿色化 数据库 MongoDB 和 mysql 安装 【1.1】 前言 为什么要绿色化 安装呢&#xff1f;因为系统老升级&#xff0c;老重装&#xff01;&#xff01;也方便了解下数据库配置和库在那 绿色软件喜欢一般放在 D盘tools目录里 D:\tools\ 数据库 MongoDB D:\tools\MongoDB 数…

制作怎么自己搭建一个网站

制作怎么自己搭建一个网站 一.领取一个免费域名和SSL证书&#xff0c;和CDN 1.打开网站链接&#xff1a;https://www.rainyun.com/ycpcp_ 首先创建一个CDN&#xff0c;这里以我加速域名“cdntest.biliwind.com 1”为例 这里就要填写 cdntest.biliwind.com 1 &#xff0c;而…

【王道数据结构】【chapter5树与二叉树】【P159t15】

设计一个算法将二叉树的叶结点从左到右的顺序连成一个单链表&#xff0c;表头指针为head。二叉树按二叉链表方式存储&#xff0c;链接时用叶结点的右指针来存放单链表指针。 #include <iostream> #include <stack> #include <queue> typedef struct treenode…

2024-02-16 AIGC-数字人-平台调研-记录

摘要: 2024-02-16 AIGC-数字人-平台调研 需求分析: 数字人-平台调研 南京硅基智能北京风平智能[风平科技]品达集团[杭州品达企服科技(集团)有限公司]花脸数字技术灰豚数字人[温州专帮信息科技有限公司]魔珐科技数字栩生公司官网guiji-ows风平智能 - 领先的AIGC解决方案提供商。…

AI:130-基于深度学习的室内导航与定位

🚀点击这里跳转到本专栏,可查阅专栏顶置最新的指南宝典~ 🎉🎊🎉 你的技术旅程将在这里启航! 从基础到实践,深入学习。无论你是初学者还是经验丰富的老手,对于本专栏案例和项目实践都有参考学习意义。 ✨✨✨ 每一个案例都附带有在本地跑过的关键代码,详细讲解供…

Springboot的it职业生涯规划系统(有报告)。Javaee项目,springboot项目。

演示视频&#xff1a; Springboot的it职业生涯规划系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&a…

2024下载使用CleanMyMac X软件时需要注意什么?

使用CleanMyMac X清理系统垃圾文件的步骤如下&#xff1a; 打开CleanMyMac X软件。在主界面中&#xff0c;选择“清理”功能块下的“清理系统垃圾”选项。点击“扫描”按钮&#xff0c;软件将自动扫描系统垃圾&#xff0c;包括缓存文件、系统日志文件等。扫描完成后&#xff0…

Kotlin基础——类、对象和接口

文章目录 1 定义类继承结构1.1 接口1.1.1 接口概述1.1.2 接口中的默认方法1.1.3 接口方法重复1.1.4 Kotlin接口中静态方法实现原理 1.2 修饰符1.2.1 类继承修饰1.2.2 方法重写修饰1.2.3 抽象类1.2.4 接口的修饰符 1.3 可见性修饰符1.3.1 Kotlin中的可见性修饰符1.3.2 Kotlin中的…

开源个人订阅跟踪器Wallos

本文软件由网友 P家单推人 推荐&#xff1b; 什么 Wallos &#xff1f; Wallos 是一款功能强大、开源且可自我托管的网络应用程序&#xff0c;旨在让您轻松管理财务。告别复杂的电子表格和昂贵的财务软件–Wallos简化了跟踪费用的过程&#xff0c;帮助您更好地控制财务生活。 软…

neo4j下载安装最新教程 2024.02

文章目录 neo4j简介neo4j与jdk版本对应neo4j历史版本 下载地址配置环境变量命令行启动验证安装结果 neo4j简介 Neo4j 是一个高性能的 NoSQL 图形数据库&#xff0c;它将结构化数据存储在网络&#xff08;从数学角度叫做图&#xff09;上而不是表中。Neo4j 也可以被看作是一个高…

【动态规划初识】不同的二叉搜索树

每日一道算法题之不同二叉搜索树个数 一、题目描述二、思路三、C++代码一、题目描述 题目来源:LeetCode 给你一个整数 n ,求恰由 n 个节点组成且节点值从 1 到 n 互不相同的 二叉搜索树 有多少种?返回满足题意的二叉搜索树的种数。 C++程序要求输入输出格式如下: 示例1:…