Netty核心源码剖析

Netty 线程模型

image.png

Netty高并发高性能架构设计精髓

  1. 主从Reactor线程模型
  2. NIO多路复用非阻塞
  3. 无锁串行化设计思想
  4. 支持高性能序列化协议
  5. 零拷贝(直接内存的使用)
  6. ByteBuf内存池设计
  7. 灵活的TCP参数配置能力
  8. 并发优化

无锁串行化设计思想

在大多数场景下,并行多线程处理可以提升系统的并发性能。但是,如果对于共享资源的并发访问处理不当,会带来严重的锁竞争,这最终会导致性能的下降。为了尽可能的避免锁竞争带来的性能损耗,可以通过串行化设计,即消息的处理尽可能在同一个线程内完成,期间不进行线程切换,这样就避免了多线程竞争和同步锁。NIO的多路复用就是一种无锁串行化的设计思想(理解下Redis和Netty的线程模型),为了尽可能提升性能,Netty采用了串行无锁化设计,在IO线程内部进行串行操作,避免多线程竞争导致的性能下降。表面上看,串行化设计似乎CPU利用率不高,并发程度不够。但是,通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比“一个队列-多个工作线程”模型性能更优。
Netty的NioEventLoop读取到消息之后,直接调用ChannelPipeline的fireChannelRead(Object msg),只要用户不主动切换线程,一直会由NioEventLoop调用到用户的Handler,期间不进行线程切换,这种串行化处理方式避免了多线程操作导致的锁的竞争,从性能角度看是最优的。

直接内存

直接内存(Direct Memory)并不是虚拟机运行时数据区的一部分,也不是Java虚拟机规范中定义的内存区域,某些情况下这部分内存也会被频繁地使用,而且也可能导致OutOfMemoryError异常出现。Java里用DirectByteBuffer可以分配一块直接内存(堆外内存),元空间对应的内存也叫作直接内存,它们对应的都是机器的物理内存。
image.png
测试代码:

package com.firechou.test;

import java.nio.ByteBuffer;

/**
 * 直接内存与堆内存的区别
 */
public class DirectMemoryTest {

    public static void heapAccess() {
        long startTime = System.currentTimeMillis();
        //分配堆内存
        ByteBuffer buffer = ByteBuffer.allocate(1000);
        for (int i = 0; i < 100000; i++) {
            for (int j = 0; j < 200; j++) {
                buffer.putInt(j);
            }
            buffer.flip();
            for (int j = 0; j < 200; j++) {
                buffer.getInt();
            }
            buffer.clear();
        }
        long endTime = System.currentTimeMillis();
        System.out.println("堆内存访问:" + (endTime - startTime) + "ms");
    }

    public static void directAccess() {
        long startTime = System.currentTimeMillis();
        //分配直接内存
        ByteBuffer buffer = ByteBuffer.allocateDirect(1000);
        for (int i = 0; i < 100000; i++) {
            for (int j = 0; j < 200; j++) {
                buffer.putInt(j);
            }
            buffer.flip();
            for (int j = 0; j < 200; j++) {
                buffer.getInt();
            }
            buffer.clear();
        }
        long endTime = System.currentTimeMillis();
        System.out.println("直接内存访问:" + (endTime - startTime) + "ms");
    }

    public static void heapAllocate() {
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < 100000; i++) {
            ByteBuffer.allocate(100);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("堆内存申请:" + (endTime - startTime) + "ms");
    }

    public static void directAllocate() {
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < 100000; i++) {
            ByteBuffer.allocateDirect(100);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("直接内存申请:" + (endTime - startTime) + "ms");
    }

    public static void main(String args[]) {
        for (int i = 0; i < 10; i++) {
            heapAccess();
            directAccess();
        }

        System.out.println();

        for (int i = 0; i < 10; i++) {
            heapAllocate();
            directAllocate();
        }
    }
}

// 运行结果
堆内存访问:70ms
直接内存访问:44ms
堆内存访问:40ms
直接内存访问:28ms
堆内存访问:67ms
直接内存访问:55ms
堆内存访问:57ms
直接内存访问:41ms
堆内存访问:59ms
直接内存访问:36ms
堆内存访问:48ms
直接内存访问:32ms
堆内存访问:44ms
直接内存访问:26ms
堆内存访问:38ms
直接内存访问:25ms
堆内存访问:44ms
直接内存访问:29ms
堆内存访问:45ms
直接内存访问:26ms

堆内存申请:14ms
直接内存申请:51ms
堆内存申请:13ms
直接内存申请:40ms
堆内存申请:75ms
直接内存申请:55ms
堆内存申请:2ms
直接内存申请:26ms
堆内存申请:2ms
直接内存申请:83ms
堆内存申请:1ms
直接内存申请:34ms
堆内存申请:5ms
直接内存申请:34ms
堆内存申请:2ms
直接内存申请:38ms
堆内存申请:7ms
直接内存申请:37ms
堆内存申请:7ms
直接内存申请:35ms

从程序运行结果看出直接内存申请较慢,但访问效率高。在java虚拟机实现上,本地IO一般会直接操作直接内存(直接内存=>系统调用=>硬盘/网卡),而非直接内存则需要二次拷贝(堆内存=>直接内存=>系统调用=>硬盘/网卡)。

直接内存分配源码分析:

public static ByteBuffer allocateDirect(int capacity) {
    return new DirectByteBuffer(capacity);
}


DirectByteBuffer(int cap) {                   // package-private
    super(-1, 0, cap, cap);
    boolean pa = VM.isDirectMemoryPageAligned();
    int ps = Bits.pageSize();
    long size = Math.max(1L, (long)cap + (pa ? ps : 0));
    //判断是否有足够的直接内存空间分配,可通过-XX:MaxDirectMemorySize=<size>参数指定直接内存最大可分配空间,如果不指定默认为最大堆内存大小,
    //在分配直接内存时如果发现空间不够会显示调用System.gc()触发一次full gc回收掉一部分无用的直接内存的引用对象,同时直接内存也会被释放掉
    //如果释放完分配空间还是不够会抛出异常java.lang.OutOfMemoryError
   Bits.reserveMemory(size, cap);

    long base = 0;
    try {
        // 调用unsafe本地方法分配直接内存
        base = unsafe.allocateMemory(size);
    } catch (OutOfMemoryError x) {
        // 分配失败,释放内存
        Bits.unreserveMemory(size, cap);
        throw x;
    }
    unsafe.setMemory(base, size, (byte) 0);
    if (pa && (base % ps != 0)) {
        // Round up to page boundary
        address = base + ps - (base & (ps - 1));
    } else {
        address = base;
    }
    
    // 使用Cleaner机制注册内存回收处理函数,当直接内存引用对象被GC清理掉时,
    // 会提前调用这里注册的释放直接内存的Deallocator线程对象的run方法
    cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
    att = null;
}


// 申请一块本地内存。内存空间是未初始化的,其内容是无法预期的。
// 使用freeMemory释放内存,使用reallocateMemory修改内存大小
public native long allocateMemory(long bytes);

// openjdk8/hotspot/src/share/vm/prims/unsafe.cpp
UNSAFE_ENTRY(jlong, Unsafe_AllocateMemory(JNIEnv *env, jobject unsafe, jlong size))
  UnsafeWrapper("Unsafe_AllocateMemory");
  size_t sz = (size_t)size;
  if (sz != (julong)size || size < 0) {
    THROW_0(vmSymbols::java_lang_IllegalArgumentException());
  }
  if (sz == 0) {
    return 0;
  }
  sz = round_to(sz, HeapWordSize);
  // 调用os::malloc申请内存,内部使用malloc这个C标准库的函数申请内存
  void* x = os::malloc(sz, mtInternal);
  if (x == NULL) {
    THROW_0(vmSymbols::java_lang_OutOfMemoryError());
  }
  //Copy::fill_to_words((HeapWord*)x, sz / HeapWordSize);
      return addr_to_java(x);
UNSAFE_END

使用直接内存的优缺点:
优点:

  • 不占用堆内存空间,减少了发生GC的可能
  • java虚拟机实现上,本地IO会直接操作直接内存(直接内存=>系统调用=>硬盘/网卡),而非直接内存则需要二次拷贝(堆内存=>直接内存=>系统调用=>硬盘/网卡)

缺点:

  • 初始分配较慢
  • 没有JVM直接帮助管理内存,容易发生内存溢出。为了避免一直没有FULL GC,最终导致直接内存把物理内存耗完。我们可以指定直接内存的最大值,通过-XX:MaxDirectMemorySize来指定,当达到阈值的时候,调用system.gc来进行一次FULL GC,间接把那些没有被使用的直接内存回收掉。

Netty 零拷贝

image.png
Netty的接收和发送ByteBuf采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。
如果使用传统的JVM堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才能写入Socket中。JVM堆内存的数据是不能直接写入Socket中的。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
可以看下netty的读写源码,比如read源码NioByteUnsafe.read()
image.png
image.png
image.png

ByteBuf内存池设计

随着JVM虚拟机和JIT即时编译技术的发展,对象的分配和回收是个非常轻量级的工作。但是对于缓冲区Buffer(相当于一个内存块),情况却稍有不同,特别是对于堆外直接内存的分配和回收,是一件耗时的操作。为了尽量重用缓冲区,Netty提供了基于ByteBuf内存池的缓冲区重用机制。需要的时候直接从池子里获取ByteBuf使用即可,使用完毕之后就重新放回到池子里去。下面我们一起看下Netty ByteBuf的实现:
image.png
可以看下netty的读写源码里面用到的ByteBuf内存池,比如read源码NioByteUnsafe.read()

灵活的TCP参数配置能力

合理设置TCP参数在某些场景下对于性能的提升可以起到显著的效果,例如接收缓冲区SO_RCVBUF和发送缓冲区SO_SNDBUF。如果设置不当,对性能的影响是非常大的。通常建议值为128K或者256K。
Netty在启动辅助类ChannelOption中可以灵活的配置TCP参数,满足不同的用户场景。
image.png

并发优化

  • volatile的大量、正确使用;
  • CAS和原子类的广泛使用;
  • 线程安全容器的使用;
  • 通过读写锁提升并发性能。

ByteBuf扩容机制

如果我们需要了解ByteBuf的扩容,我们需要先了解ByteBuf中定义的几个成员变量,再从源码的角度来分析扩容。
image.png

minNewCapacity:表示用户需要写入的值大小
threshold:阈值,为Bytebuf内部设定容量的最大值
maxCapacity:Netty最大能接受的容量大小,一般为int的最大值

ByteBuf核心扩容方法:
进入ByteBuf源码中,深入分析其扩容方法,idea源码进入:ByteBuf.writeByte()->AbstractByteBuf->calculateNewCapacity

// io.netty.buffer.AbstractByteBufAllocator#calculateNewCapacity
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
    checkPositiveOrZero(minNewCapacity, "minNewCapacity");
    if (minNewCapacity > maxCapacity) {
        // 比最大的容量还大的话,抛出异常
        throw new IllegalArgumentException(String.format(
                "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                minNewCapacity, maxCapacity));
    }
    final int threshold = CALCULATE_THRESHOLD; // 4 MiB page

    if (minNewCapacity == threshold) {
        // 等于,返回阈值
        return threshold;
    }

    // If over threshold, do not double but just increase by threshold.
    if (minNewCapacity > threshold) {
        // 采用步进4MB的方式完成扩容
        int newCapacity = minNewCapacity / threshold * threshold;
        if (newCapacity > maxCapacity - threshold) {
            newCapacity = maxCapacity;
        } else {
            newCapacity += threshold;
        }
        return newCapacity;
    }

    // Not over threshold. Double up to 4 MiB, starting from 64.
    // 采用64为基数,做倍增的方式完成扩容
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
    }

    return Math.min(newCapacity, maxCapacity);
}

  1. 判断目标值与阈值threshold(4MB)的大小关系,等于直接返回阈值
  2. 采用步进4MB的方式完成扩容
  3. 采用64为基数,做倍增的方式完成扩容

总结:
Netty的ByteBuf需要动态扩容来满足需要,扩容过程:默认门限阈值为4MB(这个阈值是一个经验值,不同场景,可能取值不同),当需要的容量等于门限阈值,使用阈值作为新的缓存区容量目标容量,如果大于阈值,采用每次步进4MB的方式进行内存扩张((需要扩容值/4MB)*4MB),扩张后需要和最大内存(maxCapacity)进行比较,大于maxCapacity的话就用maxCapacity,否则使用扩容值目标容量,如果小于阈值,采用倍增的方式,以64(字节)作为基本数值,每次翻倍增长64 -->128 --> 256,直到倍增后的结果大于或等于需要的容量值。

handler的生命周期回调接口调用顺序

/**
 *  在channel的pipeline里如下handler:ch.pipeline().addLast(new LifeCycleInBoundHandler());
 *  handler的生命周期回调接口调用顺序:
 *  handlerAdded -> channelRegistered -> channelActive -> channelRead -> channelReadComplete
 *  -> channelInactive -> channelUnRegistered -> handlerRemoved
 *
 * handlerAdded: 新建立的连接会按照初始化策略,把handler添加到该channel的pipeline里面,也就是channel.pipeline.addLast(new LifeCycleInBoundHandler)执行完成后的回调;
 * channelRegistered: 当该连接分配到具体的worker线程后,该回调会被调用。
 * channelActive:channel的准备工作已经完成,所有的pipeline添加完成,并分配到具体的线上上,说明该channel准备就绪,可以使用了。
 * channelRead:客户端向服务端发来数据,每次都会回调此方法,表示有数据可读;
 * channelReadComplete:服务端每次读完一次完整的数据之后,回调该方法,表示数据读取完毕;
 * channelInactive:当连接断开时,该回调会被调用,说明这时候底层的TCP连接已经被断开了。
 * channelUnRegistered: 对应channelRegistered,当连接关闭后,释放绑定的workder线程;
 * handlerRemoved: 对应handlerAdded,将handler从该channel的pipeline移除后的回调方法。
 */
public class LifeCycleInBoundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRegistered(ChannelHandlerContext ctx)
            throws Exception {
        System.out.println("channelRegistered: channel注册到NioEventLoop");
        super.channelRegistered(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) 
            throws Exception {
        System.out.println("channelUnregistered: channel取消和NioEventLoop的绑定");
        super.channelUnregistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) 
            throws Exception {
        System.out.println("channelActive: channel准备就绪");
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) 
            throws Exception {
        System.out.println("channelInactive: channel被关闭");
        super.channelInactive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) 
            throws Exception {
        System.out.println("channelRead: channel中有可读的数据" );
        super.channelRead(ctx, msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) 
            throws Exception {
        System.out.println("channelReadComplete: channel读数据完成");
        super.channelReadComplete(ctx);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) 
            throws Exception {
        System.out.println("handlerAdded: handler被添加到channel的pipeline");
        super.handlerAdded(ctx);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) 
            throws Exception {
        System.out.println("handlerRemoved: handler从channel的pipeline中移除");
        super.handlerRemoved(ctx);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/103522.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

安卓使用android studio跨进程通信之AIDL

我写这篇文章不想从最基础的介绍开始,我直接上步骤吧. 1.创建服务端 1.1:创建服务端项目:我的as版本比较高,页面就是这样的 1.2:创建AIDL文件,右键项目,选中aidl aidl名字可以自定义也可以默认 basicTypes是自带的,可以删掉,也可以不删,然后把你自己所需的接口写上去 1.3:创建…

CSS 两栏布局

目录 CSS两栏布局&#xff08;左列定宽&#xff0c;右列自适应宽&#xff09; 方法一&#xff1a;浮动margin 方法二&#xff1a;定位margin 方法三&#xff1a;浮动BFC 方法四&#xff1a;Flex布局 方法五&#xff1a;able布局 CSS两栏布局&#xff08;左列不定宽&#…

uniapp 自定义导航栏

自定义导航栏 修改 pages.json 在 pages.json 中将 navigateionStyle 设为 custom 新建 systemInfo.js systemInfo.js 用来获取当前设备的机型系统信息&#xff0c;放在 common 目录下 /*** 此 js 文件管理关于当前设备的机型系统信息*/ const systemInfo function() {/***…

信息检索与数据挖掘 | 【实验】排名检索模型

文章目录 &#x1f4da;实验内容&#x1f4da;相关概念&#x1f4da;实验步骤&#x1f407;分词预处理&#x1f407;构建倒排索引表&#x1f407;计算query和各个文档的相似度&#x1f407;queries预处理及检索函数&#x1f525;对输入的文本进行词法分析和标准化处理&#x1f…

从0开始在Vscode中搭建Vue2/3项目详细步骤

1.安装node.js:Node.js下载安装及环境配置教程【超详细】_nodejs下载_WHF__的博客-CSDN博客 node.js自带npm&#xff0c;无需单独安装。 验证&#xff1a; node -v npm -v 2.先简单创建一个空文件夹&#xff0c;vscode进入该文件夹&#xff0c;并打开终端。 3.安装cnpm&…

【Gensim概念】03/3 NLP玩转 word2vec

第三部分 对象函数 八 word2vec对象函数 该对象本质上包含单词和嵌入之间的映射。训练后&#xff0c;可以直接使用它以各种方式查询这些嵌入。有关示例&#xff0c;请参阅模块级别文档字符串。 类型 KeyedVectors 1&#xff09; add_lifecycle_event(event_name, log_level2…

OpenCV视频车流量识别详解与实践

视频车流量识别基本思想是使用背景消去算法将运动物体从图片中提取出来&#xff0c;消除噪声识别运动物体轮廓&#xff0c;最后&#xff0c;在固定区域统计筛选出来符合条件的轮廓。 基于统计背景模型的视频运动目标检测技术&#xff1a; 背景获取&#xff1a;需要在场景存在…

React 框架

1、React 框架简介 1.1、介绍 CS 与 BS结合&#xff1a;像 React&#xff0c;Vue 此类框架&#xff0c;转移了部分服务器的功能到客户端。将CS 和 BS 加以结合。客户端只用请求一次服务器&#xff0c;服务器就将所有js代码返回给客户端&#xff0c;所有交互类操作都不再依赖服…

禁止拷贝文件到U盘的解决办法

禁止拷贝文件到U盘的解决办法 安企神U盘管理系统下载使用 说到这问题&#xff0c;大多情况下是企业的需求&#xff0c;很多公司电脑中都保存着极为重要的数据&#xff0c;这些数据往往是不能传播的&#xff0c;所以此时就需要禁止拷贝文件到U盘来防止公司数据被泄密。 禁止拷…

python造测试数据存到excel

代码&#xff1a; from ExcelHandler import ExcelHandler from faker import Faker # 导入faker库的Faker方法 # ↓默认为en_US&#xff0c;只有使用了相关语言才能生成相对应的随机数据 fkFaker(locale"zh_CN")def create_date():m int(input(请输入要造的数据条…

自动驾驶的商业应用和市场前景

自动驾驶技术已经成为了交通运输领域的一项重要创新。它不仅在改善交通安全性和效率方面具有巨大潜力&#xff0c;还为各种商业应用提供了新的机会。本文将探讨自动驾驶在交通运输中的潜力&#xff0c;自动驾驶汽车的制造商和技术公司&#xff0c;以及自动驾驶的商业模式和市场…

基于OpenCV批量分片高像素影像

基于OpenCV批量分片高像素影像 为了更加精确的诊断和治疗&#xff0c;医疗影像往往是大像素&#xff08;1920x1080&#xff09;或超大像素图像&#xff08;4k图像4096x2160&#xff09;。这类图像的尺寸与深度学习实验数据常见尺寸&#xff08;227x227&#xff0c;或32x32&…

OpenCV实现物体尺寸的测量

一 &#xff0c;项目分析 物体尺寸测量的思路是找一个确定尺寸的物体作为参照物&#xff0c;根据已知的计算未知物体尺寸。 如下图所示&#xff0c;绿色的板子尺寸为220*300&#xff08;单位&#xff1a;毫米&#xff09;&#xff0c;通过程序计算白色纸片的长度。 主要是通过…

我国有多少个港口?

港口是什么&#xff1f; 港口是海洋运输中不可或缺的重要设施之一&#xff0c;是连接陆路和水路运输的重要节点。港口通常是指位于沿海地区的水陆交通枢纽&#xff0c;是船舶停靠、装卸货物、储存物资和维修船只的场所。港口一般由码头、泊位、仓库、货场、客运站等设施组成&a…

数据结构和算法概述

什么是数据结构&#xff1f; 官方解释&#xff1a; 数据结构是一门研究非数值计算的程序设计问题中的操作对象&#xff0c;以及他们之间的关系和操作等相关问题的学科。 大白话&#xff1a; 数据结构就是把数据元素按照一定的关系组织起来的集合&#xff0c;用来组织和存储…

【网安大模型专题10.19】※论文5:ChatGPT+漏洞定位+补丁生成+补丁验证+APR方法+ChatRepair+不同修复场景+修复效果(韦恩图展示)

Keep the Conversation Going: Fixing 162 out of 337 bugs for $0.42 each using ChatGPT 写在最前面背景介绍自动程序修复流程Process of APR (automated program repair)1、漏洞程序2、漏洞定位模块3、补丁生成4、补丁验证 &#xff08;可以学习的PPT设计&#xff09;经典的…

Spring Cloud之服务熔断与降级(Hystrix)

目录 Hystrix 概念 作用 服务降级 简介 使用场景 接口降级 服务端服务降级 1.添加依赖 2.定义接口 3.实现接口 4.Controller类使用 5.启动类添加注释 6.浏览器访问 客户端服务降级 1.添加依赖 2.application.yml 中添加配置 3.定义接口 4.Controller类使用 …

解读意大利葡萄酒分类系统

由于该国众多的产区和复杂的品种&#xff0c;要想真正掌握意大利葡萄酒是相当困难的。仅仅是试图从复杂混乱的葡萄酒标签中辨别信息的想法就足以让许多人焦虑不安。 位于托斯卡纳的基安蒂酒地区&#xff0c;Il Ciliegio生产的葡萄酒标签上包含以下名称之一:基安蒂酒科利塞内西…

通过IP地址可以做什么

通过IP地址可以做很多事情&#xff0c;因为它是互联网通信的基础之一。本文将探讨IP地址的定义、用途以及一些可能的应用。 IP地址的用途 1. 设备标识&#xff1a;IP地址用于标识互联网上的每个设备&#xff0c;这包括计算机、服务器、路由器、智能手机等。它类似于我们日常生…

unity 一键替换 UI上所有字体,批量替换字体(包括:Text和Text (TMP))

前言&#xff1a;在开发中会遇到这种情况&#xff0c;开发完了&#xff0c;发现UI字体没有替换&#xff0c;特别是需要发布到WebGL端的同学&#xff0c;突然发现无法显示汉字了。下面一个非常方便的方法完美解决。 1.解压出来的脚本放在Edit文件下&#xff0c;没有的创建一个 2…