Netty核心原理剖析与RPC实践16-20

Netty核心原理剖析与RPC实践16-20

16 IO 加速:与众不同的 Netty 零拷贝技术

今天的课程我们继续讨论 Netty 实现高性能的另一个高阶特性——零拷贝。零拷贝是一个耳熟能详的词语,在 Linux、Kafka、RocketMQ 等知名的产品中都有使用,通常用于提升 I/O 性能。而且零拷贝也是面试过程中的高频问题,那么你知道零拷贝体现在哪些地方吗?Netty 的零拷贝技术又是如何实现的呢?接下来我们就针对 Netty 零拷贝特性进行详细地分析。

传统 Linux 中的零拷贝技术

在介绍 Netty 零拷贝特性之前,我们有必要学习下传统 Linux 中零拷贝的工作原理。所谓零拷贝,就是在数据操作时,不需要将数据从一个内存位置拷贝到另外一个内存位置,这样可以减少一次内存拷贝的损耗,从而节省了 CPU 时钟周期和内存带宽。

我们模拟一个场景,从文件中读取数据,然后将数据传输到网络上,那么传统的数据拷贝过程会分为哪几个阶段呢?具体如下图所示。

Drawing 0.png

从上图中可以看出,从数据读取到发送一共经历了四次数据拷贝,具体流程如下:

  1. 当用户进程发起 read() 调用后,上下文从用户态切换至内核态。DMA 引擎从文件中读取数据,并存储到内核态缓冲区,这里是第一次数据拷贝
  2. 请求的数据从内核态缓冲区拷贝到用户态缓冲区,然后返回给用户进程。第二次数据拷贝的过程同时,会导致上下文从内核态再次切换到用户态。
  3. 用户进程调用 send() 方法期望将数据发送到网络中,此时会触发第三次线程切换,用户态会再次切换到内核态,请求的数据从用户态缓冲区被拷贝到 Socket 缓冲区。
  4. 最终 send() 系统调用结束返回给用户进程,发生了第四次上下文切换。第四次拷贝会异步执行,从 Socket 缓冲区拷贝到协议引擎中。

说明:DMA(Direct Memory Access,直接内存存取)是现代大部分硬盘都支持的特性,DMA 接管了数据读写的工作,不需要 CPU 再参与 I/O 中断的处理,从而减轻了 CPU 的负担。

传统的数据拷贝过程为什么不是将数据直接传输到用户缓冲区呢?其实引入内核缓冲区可以充当缓存的作用,这样就可以实现文件数据的预读,提升 I/O 的性能。但是当请求数据量大于内核缓冲区大小时,在完成一次数据的读取到发送可能要经历数倍次数的数据拷贝,这就造成严重的性能损耗。

接下来我们介绍下使用零拷贝技术之后数据传输的流程。重新回顾一遍传统数据拷贝的过程,可以发现第二次和第三次拷贝是可以去除的,DMA 引擎从文件读取数据后放入到内核缓冲区,然后可以直接从内核缓冲区传输到 Socket 缓冲区,从而减少内存拷贝的次数。

在 Linux 中系统调用 sendfile() 可以实现将数据从一个文件描述符传输到另一个文件描述符,从而实现了零拷贝技术。在 Java 中也使用了零拷贝技术,它就是 NIO FileChannel 类中的 transferTo() 方法,transferTo() 底层就依赖了操作系统零拷贝的机制,它可以将数据从 FileChannel 直接传输到另外一个 Channel。transferTo() 方法的定义如下:

public abstract long transferTo(long position, long count, WritableByteChannel target) throws IOException;

FileChannel#transferTo() 的使用也非常简单,我们直接看如下的代码示例,通过 transferTo() 将 from.data 传输到 to.data(),等于实现了文件拷贝的功能。

public void testTransferTo() throws IOException {
    RandomAccessFile fromFile = new RandomAccessFile("from.data", "rw");
    FileChannel fromChannel = fromFile.getChannel();
    RandomAccessFile toFile = new RandomAccessFile("to.data", "rw");
    FileChannel toChannel = toFile.getChannel();
    long position = 0;
    long count = fromChannel.size();
    fromChannel.transferTo(position, count, toChannel);
}

在使用了 FileChannel#transferTo() 传输数据之后,我们看下数据拷贝流程发生了哪些变化,如下图所示:

Drawing 1.png

比较大的一个变化是,DMA 引擎从文件中读取数据拷贝到内核态缓冲区之后,由操作系统直接拷贝到 Socket 缓冲区,不再拷贝到用户态缓冲区,所以数据拷贝的次数从之前的 4 次减少到 3 次。

但是上述的优化离达到零拷贝的要求还是有差距的,能否继续减少内核中的数据拷贝次数呢?在 Linux 2.4 版本之后,开发者对 Socket Buffer 追加一些 Descriptor 信息来进一步减少内核数据的复制。如下图所示,DMA 引擎读取文件内容并拷贝到内核缓冲区,然后并没有再拷贝到 Socket 缓冲区,只是将数据的长度以及位置信息被追加到 Socket 缓冲区,然后 DMA 引擎根据这些描述信息,直接从内核缓冲区读取数据并传输到协议引擎中,从而消除最后一次 CPU 拷贝。

Drawing 2.png

通过上述 Linux 零拷贝技术的介绍,你也许还会存在疑问,最终使用零拷贝之后,不是还存在着数据拷贝操作吗?其实从 Linux 操作系统的角度来说,零拷贝就是为了避免用户态和内核态之间的数据拷贝。无论是传统的数据拷贝还是使用零拷贝技术,其中有 2 次 DMA 的数据拷贝必不可少,只是这 2 次 DMA 拷贝都是依赖硬件来完成,不需要 CPU 参与。所以,在这里我们讨论的零拷贝是个广义的概念,只要能够减少不必要的 CPU 拷贝,都可以被称为零拷贝。

Netty 的零拷贝技术

介绍完传统 Linux 的零拷贝技术之后,我们再来学习下 Netty 中的零拷贝如何实现。Netty 中的零拷贝和传统 Linux 的零拷贝不太一样。Netty 中的零拷贝技术除了操作系统级别的功能封装,更多的是面向用户态的数据操作优化,主要体现在以下 5 个方面:

  • 堆外内存,避免 JVM 堆内存到堆外内存的数据拷贝。
  • CompositeByteBuf 类,可以组合多个 Buffer 对象合并成一个逻辑上的对象,避免通过传统内存拷贝的方式将几个 Buffer 合并成一个大的 Buffer。
  • 通过 Unpooled.wrappedBuffer 可以将 byte 数组包装成 ByteBuf 对象,包装过程中不会产生内存拷贝。
  • ByteBuf.slice 操作与 Unpooled.wrappedBuffer 相反,slice 操作可以将一个 ByteBuf 对象切分成多个 ByteBuf 对象,切分过程中不会产生内存拷贝,底层共享一个 byte 数组的存储空间。
  • Netty 使用 FileRegion 实现文件传输,FileRegion 底层封装了 FileChannel#transferTo() 方法,可以将文件缓冲区的数据直接传输到目标 Channel,避免内核缓冲区和用户态缓冲区之间的数据拷贝,这属于操作系统级别的零拷贝。

下面我们从以上 5 个方面逐一进行介绍。

堆外内存

如果在 JVM 内部执行 I/O 操作时,必须将数据拷贝到堆外内存,才能执行系统调用。这是所有 VM 语言都会存在的问题。那么为什么操作系统不能直接使用 JVM 堆内存进行 I/O 的读写呢?主要有两点原因:第一,操作系统并不感知 JVM 的堆内存,而且 JVM 的内存布局与操作系统所分配的是不一样的,操作系统并不会按照 JVM 的行为来读写数据。第二,同一个对象的内存地址随着 JVM GC 的执行可能会随时发生变化,例如 JVM GC 的过程中会通过压缩来减少内存碎片,这就涉及对象移动的问题了。

Netty 在进行 I/O 操作时都是使用的堆外内存,可以避免数据从 JVM 堆内存到堆外内存的拷贝。

CompositeByteBuf

CompositeByteBuf 是 Netty 中实现零拷贝机制非常重要的一个数据结构,CompositeByteBuf 可以理解为一个虚拟的 Buffer 对象,它是由多个 ByteBuf 组合而成,但是在 CompositeByteBuf 内部保存着每个 ByteBuf 的引用关系,从逻辑上构成一个整体。比较常见的像 HTTP 协议数据可以分为头部信息 header消息体数据 body,分别存在两个不同的 ByteBuf 中,通常我们需要将两个 ByteBuf 合并成一个完整的协议数据进行发送,可以使用如下方式完成:

ByteBuf httpBuf = Unpooled.buffer(header.readableBytes() + body.readableBytes());
httpBuf.writeBytes(header);
httpBuf.writeBytes(body);

可以看出,如果想实现 header 和 body 这两个 ByteBuf 的合并,需要先初始化一个新的 httpBuf,然后再将 header 和 body 分别拷贝到新的 httpBuf。合并过程中涉及两次 CPU 拷贝,这非常浪费性能。如果使用 CompositeByteBuf 如何实现类似的需求呢?如下所示:

CompositeByteBuf httpBuf = Unpooled.compositeBuffer();
httpBuf.addComponents(true, header, body);

CompositeByteBuf 通过调用 addComponents() 方法来添加多个 ByteBuf,但是底层的 byte 数组是复用的,不会发生内存拷贝。但对于用户来说,它可以当作一个整体进行操作。那么 CompositeByteBuf 内部是如何存放这些 ByteBuf,并且如何进行合并的呢?我们先通过一张图看下 CompositeByteBuf 的内部结构:

Drawing 3.png

从图上可以看出,CompositeByteBuf 内部维护了一个 Components 数组。在每个 Component 中存放着不同的 ByteBuf,各个 ByteBuf 独立维护自己的读写索引,而 CompositeByteBuf 自身也会单独维护一个读写索引。由此可见,Component 是实现 CompositeByteBuf 的关键所在,下面看下 Component 结构定义:

private static final class Component {
    final ByteBuf srcBuf; // 原始的 ByteBuf
    final ByteBuf buf; // srcBuf 去除包装之后的 ByteBuf
    int srcAdjustment; // CompositeByteBuf 的起始索引相对于 srcBuf 读索引的偏移
    int adjustment; // CompositeByteBuf 的起始索引相对于 buf 的读索引的偏移
    int offset; // Component 相对于 CompositeByteBuf 的起始索引位置
    int endOffset; // Component 相对于 CompositeByteBuf 的结束索引位置
    // 省略其他代码
}

为了方便理解上述 Component 中的属性含义,我同样以 HTTP 协议中 header 和 body 为示例,通过一张图来描述 CompositeByteBuf 组合后其中 Component 的布局情况,如下所示:

Drawing 4.png

从图中可以看出,header 和 body 分别对应两个 ByteBuf,假设 ByteBuf 的内容分别为 “header” 和 “body”,那么 header ByteBuf 中 offset~endOffset 为 0~6,body ByteBuf 对应的 offset~endOffset 为 0~10。由此可见,Component 中的 offset 和 endOffset 可以表示当前 ByteBuf 可以读取的范围,通过 offset 和 endOffset 可以将每一个 Component 所对应的 ByteBuf 连接起来,形成一个逻辑整体。

此外 Component 中 srcAdjustment 和 adjustment 表示 CompositeByteBuf 起始索引相对于 ByteBuf 读索引的偏移。初始 adjustment = readIndex - offset,这样通过 CompositeByteBuf 的起始索引就可以直接定位到 Component 中 ByteBuf 的读索引位置。当 header ByteBuf 读取 1 个字节,body ByteBuf 读取 2 个字节,此时每个 Component 的属性又会发生什么变化呢?如下图所示。

Drawing 5.png

至此,CompositeByteBuf 的基本原理我们已经介绍完了,关于具体 CompositeByteBuf 数据操作的细节在这里就不做展开了,有兴趣的同学可以自己深入研究 CompositeByteBuf 的源码。

Unpooled.wrappedBuffer 操作

介绍完 CompositeByteBuf 之后,再来理解 Unpooled.wrappedBuffer 操作就非常容易了,Unpooled.wrappedBuffer 同时也是创建 CompositeByteBuf 对象的另一种推荐做法。

Unpooled 提供了一系列用于包装数据源的 wrappedBuffer 方法,如下所示:

Drawing 6.png

Unpooled.wrappedBuffer 方法可以将不同的数据源的一个或者多个数据包装成一个大的 ByteBuf 对象,其中数据源的类型包括 byte[]、ByteBuf、ByteBuffer。包装的过程中不会发生数据拷贝操作,包装后生成的 ByteBuf 对象和原始 ByteBuf 对象是共享底层的 byte 数组。

ByteBuf.slice 操作

ByteBuf.slice 和 Unpooled.wrappedBuffer 的逻辑正好相反,ByteBuf.slice 是将一个 ByteBuf 对象切分成多个共享同一个底层存储的 ByteBuf 对象。

ByteBuf 提供了两个 slice 切分方法:

public ByteBuf slice();
public ByteBuf slice(int index, int length);

假设我们已经有一份完整的 HTTP 数据,可以通过 slice 方法切分获得 header 和 body 两个 ByteBuf 对象,对应的内容分别为 “header” 和 “body”,实现方式如下:

ByteBuf httpBuf = ...
ByteBuf header = httpBuf.slice(0, 6);
ByteBuf body = httpBuf.slice(6, 4);

通过 slice 切分后都会返回一个新的 ByteBuf 对象,而且新的对象有自己独立的 readerIndex、writerIndex 索引,如下图所示。由于新的 ByteBuf 对象与原始的 ByteBuf 对象数据是共享的,所以通过新的 ByteBuf 对象进行数据操作也会对原始 ByteBuf 对象生效。

图片8.png

文件传输 FileRegion

在 Netty 源码的 example 包中,提供了 FileRegion 的使用示例,以下代码片段摘自 FileServerHandler.java。

@Override
public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    RandomAccessFile raf = null;
    long length = -1;
    try {
        raf = new RandomAccessFile(msg, "r");
        length = raf.length();
    } catch (Exception e) {
        ctx.writeAndFlush("ERR: " + e.getClass().getSimpleName() + ": " + e.getMessage() + '\n');
        return;
    } finally {
        if (length < 0 && raf != null) {
            raf.close();
        }
    }
    ctx.write("OK: " + raf.length() + '\n');
    if (ctx.pipeline().get(SslHandler.class) == null) {
        // SSL not enabled - can use zero-copy file transfer.
        ctx.write(new DefaultFileRegion(raf.getChannel(), 0, length));
    } else {
        // SSL enabled - cannot use zero-copy file transfer.
        ctx.write(new ChunkedFile(raf));
    }
    ctx.writeAndFlush("\n");
}

从 FileRegion 的使用示例可以看出,Netty 使用 FileRegion 实现文件传输的零拷贝。FileRegion 的默认实现类是 DefaultFileRegion,通过 DefaultFileRegion 将文件内容写入到 NioSocketChannel。那么 FileRegion 是如何实现零拷贝的呢?我们通过源码看看 FileRegion 到底使用了什么黑科技。

public class DefaultFileRegion extends AbstractReferenceCounted implements FileRegion {
    private final File f; // 传输的文件
    private final long position; // 文件的起始位置
    private final long count; // 传输的字节数
    private long transferred; // 已经写入的字节数
    private FileChannel file; // 文件对应的 FileChannel    @Override
    public long transferTo(WritableByteChannel target, long position) throws IOException {
        long count = this.count - position;
        if (count < 0 || position < 0) {
            throw new IllegalArgumentException(
                    "position out of range: " + position +
                    " (expected: 0 - " + (this.count - 1) + ')');
        }
        if (count == 0) {
            return 0L;
        }
        if (refCnt() == 0) {
            throw new IllegalReferenceCountException(0);
        }
        open();
        long written = file.transferTo(this.position + position, count, target);
        if (written > 0) {
            transferred += written;
        } else if (written == 0) {
            validate(this, position);
        }
        return written;
    }    // 省略其他代码
}

从源码可以看出,FileRegion 其实就是对 FileChannel 的包装,并没有什么特殊操作,底层使用的是 JDK NIO 中的 FileChannel#transferTo() 方法实现文件传输,所以 FileRegion 是操作系统级别的零拷贝,对于传输大文件会很有帮助。

到此为止,Netty 相关的零拷贝技术都已经介绍完了,可以看出 Netty 对于 ByteBuf 做了更多精进的设计和优化。

总结

零拷贝是网络编程中一种常用的技术,可以用于优化网络数据传输的性能。本文介绍了操作系统 Linux 和 Netty 中的零拷贝技术,Netty 除了支持操作系统级别的零拷贝,更多提供了面向用户态的零拷贝特性,主要体现在 5 个方面:堆外内存、CompositeByteBuf、Unpooled.wrappedBuffer、ByteBuf.slice 以及 FileRegion。以操作系统的角度来看,零拷贝是一个广义的概念,可以认为只要能够减少不必要的 CPU 拷贝,都可以理解为是零拷贝。

最后,留一个思考题,使用具备零拷贝特性的 transfer() 方法拷贝文件,一定会比传统 I/O 的方式更高效吗?


17 源码篇:从 Linux 出发深入剖析服务端启动流程

通过前几章课程的学习,我们已经对 Netty 的技术思想和基本原理有了初步的认识,从今天这节课开始我们将正式进入 Netty 核心源码学习的课程。希望能够通过源码解析的方式让你更加深入理解 Netty 的精髓,如 Netty 的设计思想、工程技巧等,为之后继续深入研究 Netty 打下坚实的基础。

在课程开始之前,我想分享一下关于源码学习的几点经验和建议。第一,很多同学在开始学习源码时面临的第一个问题就是不知道从何下手,这个时候一定不能对着源码毫无意义地四处翻看。建议你可以通过 Hello World 或者 TestCase 作为源码学习的入口,然后再通过 Debug 断点的方式调试并跑通源码。第二,阅读源码一定要有全局观。首先要把握源码的主流程,避免刚开始陷入代码细节的死胡同。第三,源码一定要反复阅读,让自己每一次读都有不同的收获。我们可以通过画图、注释的方式帮助自己更容易理解源码的核心流程,方便后续的复习和回顾。

作为源码解析的第一节课,我们将深入分析 Netty 服务端的启动流程。启动服务的过程中我们可以了解到 Netty 各大核心组件的关系,这将是学习 Netty 源码一个非常好的切入点,让我们一起看看 Netty 的每个零件是如何运转起来的吧。

说明:本文参考的 Netty 源码版本为 4.1.42.Final。

从 Echo 服务器示例入手

在《引导器作用:客户端和服务端启动都要做些什么?》的课程中,我们介绍了如何使用引导器搭建服务端的基本框架。在这里我们实现了一个最简单的 Echo 服务器,用于调试 Netty 服务端启动的源码。

public class EchoServer {
    public void startEchoServer(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO)) // 设置ServerSocketChannel 对应的 Handler
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 设置 SocketChannel 对应的 Handler
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new FixedLengthFrameDecoder(10));
                            ch.pipeline().addLast(new ResponseSampleEncoder());
                            ch.pipeline().addLast(new RequestSampleHandler());
                        }
                    });
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

我们以引导器 ServerBootstrap 为切入点,开始深入分析 Netty 服务端的启动流程。在服务端启动之前,需要配置 ServerBootstrap 的相关参数,这一步大致可以分为以下几个步骤:

  • 配置 EventLoopGroup 线程组;
  • 配置 Channel 的类型;
  • 设置 ServerSocketChannel 对应的 Handler;
  • 设置网络监听的端口;
  • 设置 SocketChannel 对应的 Handler;
  • 配置 Channel 参数。

配置 ServerBootstrap 参数的过程非常简单,把参数值保存在 ServerBootstrap 定义的成员变量里就可以了。我们可以看下 ServerBootstrap 的成员变量定义,基本与 ServerBootstrap 暴露出来的配置方法是一一对应的。如下所示,我以注释的形式说明每个成员变量对应的调用方法。

volatile EventLoopGroup group; // group()
volatile EventLoopGroup childGroup; // group()
volatile ChannelFactory<? extends C> channelFactory; // channel()
volatile SocketAddress localAddress; // localAddress
Map<ChannelOption<?>, Object> childOptions = new ConcurrentHashMap<ChannelOption<?>, Object>(); // childOption()
volatile ChannelHandler childHandler; // childHandler()
ServerBootstrapConfig config = new ServerBootstrapConfig(this);

关于 ServerBootstrap 如何为每个成员变量保存参数的过程,我们就不一一展开了,你可以理解为这部分工作只是一个前置准备,课后你可以自己跟进下每个方法的源码。今天我们核心聚焦在 b.bind().sync() 这行代码,bind() 才是真正进行服务器端口绑定和启动的入口,sync() 表示阻塞等待服务器启动完成。接下来我们对 bind() 方法进行展开分析。

在开始源码分析之前,我们带着以下几个问题边看边思考:

  • Netty 自己实现的 Channel 与 JDK 底层的 Channel 是如何产生联系的?
  • ChannelInitializer 这个特殊的 Handler 处理器的作用是什么?
  • Pipeline 初始化的过程是什么样的?

服务端启动全过程

首先我们来看下 ServerBootstrap 中 bind() 方法的源码实现:

public ChannelFuture bind() {
    validate();
    SocketAddress localAddress = this.localAddress;
    if (localAddress == null) {
        throw new IllegalStateException("localAddress not set");
    }
    return doBind(localAddress);
}
private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
    if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    promise.setFailure(cause);
                } else {
                    promise.registered();
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

由此可见,doBind() 方法是我们需要分析的重点。我们再一起看下 doBind() 具体做了哪些事情:

  1. 调用 initAndRegister() 初始化并注册 Channel,同时返回一个 ChannelFuture 实例 regFuture,所以我们可以猜测出 initAndRegister() 是一个异步的过程。
  2. 接下来通过 regFuture.cause() 方法判断 initAndRegister() 的过程是否发生异常,如果发生异常则直接返回。
  3. regFuture.isDone() 表示 initAndRegister() 是否执行完毕,如果执行完毕则调用 doBind0() 进行 Socket 绑定。如果 initAndRegister() 还没有执行结束,regFuture 会添加一个 ChannelFutureListener 回调监听,当 initAndRegister() 执行结束后会调用 operationComplete(),同样通过 doBind0() 进行端口绑定。

doBind() 整个实现结构非常清晰,其中 initAndRegister() 负责 Channel 初始化和注册,doBind0() 用于端口绑定。这两个过程最为重要,下面我们分别进行详细的介绍。

服务端 Channel 初始化及注册

initAndRegister() 方法顾名思义,主要负责初始化和注册的相关工作,我们具体看下它的源码实现:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel(); // 创建 Channel
        init(channel); // 初始化 Channel
    } catch (Throwable t) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    ChannelFuture regFuture = config().group().register(channel); // 注册 Channel
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

initAndRegister() 可以分为三步:创建 Channel、初始化 Channel 和注册 Channel,接下来我们一步步进行拆解分析。

创建服务端 Channel

首先看下创建 Channel 的过程,直接跟进 channelFactory.newChannel() 的源码。

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    private final Constructor<? extends T> constructor;
    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
            this.constructor = clazz.getConstructor();
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                    " does not have a public non-arg constructor", e);
        }
    }
    @Override
    public T newChannel() {
        try {
            return constructor.newInstance(); // 反射创建对象
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }
    // 省略其他代码
}

在前面 Echo 服务器的示例中,我们通过 channel(NioServerSocketChannel.class) 配置 Channel 的类型,工厂类 ReflectiveChannelFactory 是在该过程中被创建的。从 constructor.newInstance() 我们可以看出,ReflectiveChannelFactory 通过反射创建出 NioServerSocketChannel 对象,所以我们重点需要关注 NioServerSocketChannel 的构造函数。

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER)); 
}
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT); // 调用父类方法
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        return provider.openServerSocketChannel(); // 创建 JDK 底层的 ServerSocketChannel
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}

SelectorProvider 是 JDK NIO 中的抽象类实现,通过 openServerSocketChannel() 方法可以用于创建服务端的 ServerSocketChannel。而且 SelectorProvider 会根据操作系统类型和版本的不同,返回不同的实现类,具体可以参考 DefaultSelectorProvider 的源码实现:

public static SelectorProvider create() {
    String osname = AccessController
        .doPrivileged(new GetPropertyAction("os.name"));
    if (osname.equals("SunOS"))
        return createProvider("sun.nio.ch.DevPollSelectorProvider");
    if (osname.equals("Linux"))
        return createProvider("sun.nio.ch.EPollSelectorProvider");
    return new sun.nio.ch.PollSelectorProvider();
}

在这里我们只讨论 Linux 操作系统的场景,在 Linux 内核 2.6版本及以上都会默认采用 EPollSelectorProvider。如果是旧版本则使用 PollSelectorProvider。对于目前的主流 Linux 平台而言,都是采用 Epoll 机制实现的。

创建完 ServerSocketChannel,我们回到 NioServerSocketChannel 的构造函数,接着它会通过 super() 依次调用到父类的构造进行初始化工作,最终我们可以定位到 AbstractNioChannel 和 AbstractChannel 的构造函数:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    // 省略其他代码
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        // 省略其他代码
    }
}
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId(); // Channel 全局唯一 id 
    unsafe = newUnsafe(); // unsafe 操作底层读写
    pipeline = newChannelPipeline(); // pipeline 负责业务处理器编排
}

首先调用 AbstractChannel 的构造函数创建了三个重要的成员变量,分别为 id、unsafe、pipeline。id 表示全局唯一的 Channel,unsafe 用于操作底层数据的读写操作,pipeline 负责业务处理器的编排。初始化状态,pipeline 的内部结构只包含头尾两个节点,如下图所示。三个核心成员变量创建好之后,会回到 AbstractNioChannel 的构造函数,通过 ch.configureBlocking(false) 设置 Channel 是非阻塞模式。

netty17图.png

创建服务端 Channel 的过程我们已经讲完了,简单总结下其中几个重要的步骤:

  1. ReflectiveChannelFactory 通过反射创建 NioServerSocketChannel 实例;
  2. 创建 JDK 底层的 ServerSocketChannel;
  3. 为 Channel 创建 id、unsafe、pipeline 三个重要的成员变量;
  4. 设置 Channel 为非阻塞模式。
初始化服务端 Channel

回到 ServerBootstrap 的 initAndRegister() 方法,继续跟进用于初始化服务端 Channel 的 init() 方法源码:

void init(Channel channel) {
    setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger); // 设置 Socket 参数
    setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0))); // 保存用户自定义属性
    ChannelPipeline p = channel.pipeline();
    // 获取 ServerBootstrapAcceptor 的构造参数
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions =
            childOptions.entrySet().toArray(newOptionArray(0));
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    // 添加特殊的 Handler 处理器
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

init() 方法的源码比较长,我们依然拆解成两个部分来看:

第一步,设置 Socket 参数以及用户自定义属性。在创建服务端 Channel 时,Channel 的配置参数保存在 NioServerSocketChannelConfig 中,在初始化 Channel 的过程中,Netty 会将这些参数设置到 JDK 底层的 Socket 上,并把用户自定义的属性绑定在 Channel 上。

第二步,添加特殊的 Handler 处理器。首先 ServerBootstrap 为 Pipeline 添加了一个 ChannelInitializer,ChannelInitializer 是实现了 ChannelHandler 接口的匿名类,其中 ChannelInitializer 实现的 initChannel() 方法用于添加 ServerSocketChannel 对应的 Handler。然后 Netty 通过异步 task 的方式又向 Pipeline 一个处理器 ServerBootstrapAcceptor,从 ServerBootstrapAcceptor 的命名可以看出,这是一个连接接入器,专门用于接收新的连接,然后把事件分发给 EventLoop 执行,在这里我们先不做展开。此时服务端的 pipeline 内部结构又发生了变化,如下图所示。

图片1.png

思考一个问题,为什么需要 ChannelInitializer 处理器呢?ServerBootstrapAcceptor 的注册过程为什么又需要封装成异步 task 呢?因为我们在初始化时,还没有将 Channel 注册到 Selector 对象上,所以还无法注册 Accept 事件到 Selector 上,所以事先添加了 ChannelInitializer 处理器,等待 Channel 注册完成后,再向 Pipeline 中添加 ServerBootstrapAcceptor 处理器。

服务端 Channel 初始化的过程已经结束了。整体流程比较简单,主要是设置 Socket 参数以及用户自定义属性,并向 Pipeline 中添加了两个特殊的处理器。接下来我们继续分析,如何将初始化好的 Channel 注册到 Selector 对象上?

注册服务端 Channel

回到 initAndRegister() 的主流程,创建完服务端 Channel 之后,继续一层层跟进 register() 方法的源码:

// MultithreadEventLoopGroup#register
public ChannelFuture register(Channel channel) {
    return next().register(channel); // 选择一个 eventLoop 注册
}
// AbstractChannel#register
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 省略其他代码
    AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) { // Reactor 线程内部调用
        register0(promise);
    } else { // 外部线程调用
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            // 省略其他代码
        }
    }
}

Netty 会在线程池 EventLoopGroup 中选择一个 EventLoop 与当前 Channel 进行绑定,之后 Channel 生命周期内的所有 I/O 事件都由这个 EventLoop 负责处理,如 accept、connect、read、write 等 I/O 事件。可以看出,不管是 EventLoop 线程本身调用,还是外部线程用,最终都会通过 register0() 方法进行注册:

private void register0(ChannelPromise promise) {
    try {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister(); // 调用 JDK 底层的 register() 进行注册
        neverRegistered = false;
        registered = true;
        pipeline.invokeHandlerAddedIfNeeded(); // 触发 handlerAdded 事件
        safeSetSuccess(promise);
        pipeline.fireChannelRegistered(); // 触发 channelRegistered 事件
        // 此时 Channel 还未注册绑定地址,所以处于非活跃状态
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive(); // Channel 当前状态为活跃时,触发 channelActive 事件
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        // 省略其他代码
    }
}

register0() 主要做了四件事:调用 JDK 底层进行 Channel 注册、触发 handlerAdded 事件、触发 channelRegistered 事件、Channel 当前状态为活跃时,触发 channelActive 事件。我们对它们逐一进行分析。

首先看下 JDK 底层注册 Channel 的过程,对应 doRegister() 方法的实现逻辑。

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); // 调用 JDK 底层的 register() 进行注册
            return;
        } catch (CancelledKeyException e) {
            // 省略其他代码
        }
    }
}
public final SelectionKey register(Selector sel, int ops,
                                   Object att)
    throws ClosedChannelException
{
    synchronized (regLock) {
        // 省略其他代码
        SelectionKey k = findKey(sel);
        if (k != null) {
            k.interestOps(ops);
            k.attach(att);
        }
        if (k == null) {
            synchronized (keyLock) {
                if (!isOpen())
                    throw new ClosedChannelException();
                k = ((AbstractSelector)sel).register(this, ops, att);
                addKey(k);
            }
        }
        return k;
    }
}

javaChannel().register() 负责调用 JDK 底层,将 Channel 注册到 Selector 上,register() 的第三个入参传入的是 Netty 自己实现的 Channel 对象,调用 register() 方法会将它绑定在 JDK 底层 Channel 的 attachment 上。这样在每次 Selector 对象进行事件循环时,Netty 都可以从返回的 JDK 底层 Channel 中获得自己的 Channel 对象。

完成 Channel 向 Selector 注册后,接下来就会触发 Pipeline 一系列的事件传播。在事件传播之前,用户自定义的业务处理器是如何被添加到 Pipeline 中的呢?答案就在pipeline.invokeHandlerAddedIfNeeded() 当中,我们重点看下 handlerAdded 事件的处理过程。invokeHandlerAddedIfNeeded() 方法的调用层次比较深,推荐你结合上述 Echo 服务端示例,使用 IDE Debug 的方式跟踪调用栈,如下图所示。

图片2.png

我们首先抓住 ChannelInitializer 中的核心源码,逐层进行分析。

// ChannelInitializer
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        if (initChannel(ctx)) {
            removeState(ctx);
        }
    }
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.add(ctx)) {
        try {
            initChannel((C) ctx.channel()); // 调用 ChannelInitializer 实现的 initChannel() 方法
        } catch (Throwable cause) {
            exceptionCaught(ctx, cause);
        } finally {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                pipeline.remove(this); // 将 ChannelInitializer 自身从 Pipeline 中移出
            }
        }
        return true;
    }
    return false;
}

可以看出 ChannelInitializer 首先会调用 initChannel() 抽象方法,然后 Netty 会把 ChannelInitializer 自身从 Pipeline 移出。其中 initChannel() 抽象方法是在哪里实现的呢?这就要跟踪到 ServerBootstrap 之前的 init() 方法,其中有这么一段代码:

p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(final Channel ch) {
        final ChannelPipeline pipeline = ch.pipeline();
        ChannelHandler handler = config.handler();
        if (handler != null) {
            pipeline.addLast(handler);
        }
        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }
});

在前面我们已经分析了 initChannel() 方法的实现逻辑,首先向 Pipeline 中添加 ServerSocketChannel 对应的 Handler,然后通过异步 task 的方式向 Pipeline 添加 ServerBootstrapAcceptor 处理器。其中有一个点不要混淆,handler() 方法是添加到服务端的Pipeline 上,而 childHandler() 方法是添加到客户端的 Pipeline 上。所以对应 Echo 服务器示例中,此时被添加的是 LoggingHandler 处理器。

因为添加 ServerBootstrapAcceptor 是一个异步过程,需要 EventLoop 线程负责执行。而当前 EventLoop 线程正在执行 register0() 的注册流程,所以等到 register0() 执行完之后才能被添加到 Pipeline 当中。完成 initChannel() 这一步之后,ServerBootstrapAcceptor 并没有被添加到 Pipeline 中,此时 Pipeline 的内部结构变化如下图所示。
图片3.png

我们回到 register0() 的主流程,接着向下分析。channelRegistered 事件是由 fireChannelRegistered() 方法触发,沿着 Pipeline 的 Head 节点传播到 Tail 节点,并依次调用每个 ChannelHandler 的 channelRegistered() 方法。然而此时 Channel 还未注册绑定地址,所以处于非活跃状态,所以并不会触发 channelActive 事件。

执行完整个 register0() 的注册流程之后,EventLoop 线程会将 ServerBootstrapAcceptor 添加到 Pipeline 当中,此时 Pipeline 的内部结构又发生了变化,如下图所示。
图片4.png

整个服务端 Channel 注册的流程我们已经讲完,注册过程中 Pipeline 结构的变化值得你再反复梳理,从而加深理解。目前服务端还是不能工作的,还差最后一步就是进行端口绑定,我们继续向下分析。

端口绑定

回到 ServerBootstrap 的 bind() 方法,我们继续跟进端口绑定 doBind0() 的源码。

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();
    // 省略其他代码
    boolean wasActive = isActive();
    try {
        doBind(localAddress); // 调用 JDK 底层进行端口绑定
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive(); // 触发 channelActive 事件
            }
        });
    }
    safeSetSuccess(promise);
}

bind() 方法主要做了两件事,分别为调用 JDK 底层进行端口绑定;绑定成功后并触发 channelActive 事件。下面我们逐一进行分析。

首先看下调用 JDK 底层进行端口绑定的 doBind() 方法:

protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

Netty 会根据 JDK 版本的不同,分别调用 JDK 底层不同的 bind() 方法。我使用的是 JDK8,所以会调用 JDK 原生 Channel 的 bind() 方法。执行完 doBind() 之后,服务端 JDK 原生的 Channel 真正已经完成端口绑定了。

完成端口绑定之后,Channel 处于活跃 Active 状态,然后会调用 pipeline.fireChannelActive() 方法触发 channelActive 事件。我们可以一层层跟进 fireChannelActive() 方法,发现其中比较重要的部分:

// DefaultChannelPipeline#channelActive
public void channelActive(ChannelHandlerContext ctx) {
    ctx.fireChannelActive();
    readIfIsAutoRead();
}
// AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }
    readPending = true;
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp); // 注册 OP_ACCEPT 事件到服务端 Channel 的事件集合
    }
}

可以看出,在执行完 channelActive 事件传播之后,会调用 readIfIsAutoRead() 方法触发 Channel 的 read 事件,而它最终调用到 AbstractNioChannel 中的 doBeginRead() 方法,其中 readInterestOp 参数就是在前面初始化 Channel 所传入的 SelectionKey.OP_ACCEPT 事件,所以 OP_ACCEPT 事件会被注册到 Channel 的事件集合中。

到此为止,整个服务端已经真正启动完毕。我们总结一下服务端启动的全流程,如下图所示。
图片5.png

  • 创建服务端 Channel:本质是创建 JDK 底层原生的 Channel,并初始化几个重要的属性,包括 id、unsafe、pipeline 等。
  • 初始化服务端 Channel:设置 Socket 参数以及用户自定义属性,并添加两个特殊的处理器 ChannelInitializer 和 ServerBootstrapAcceptor。
  • 注册服务端 Channel:调用 JDK 底层将 Channel 注册到 Selector 上。
  • 端口绑定:调用 JDK 底层进行端口绑定,并触发 channelActive 事件,把 OP_ACCEPT 事件注册到 Channel 的事件集合中。

加餐:服务端如何处理客户端新建连接

Netty 服务端完全启动后,就可以对外工作了。接下来 Netty 服务端是如何处理客户端新建连接的呢?主要分为四步:

  1. Boss NioEventLoop 线程轮询客户端新连接 OP_ACCEPT 事件;
  2. 构造 Netty 客户端 NioSocketChannel;
  3. 注册 Netty 客户端 NioSocketChannel 到 Worker 工作线程中;
  4. 注册 OP_READ 事件到 NioSocketChannel 的事件集合。

下面我们对每个步骤逐一进行简单的介绍。

Netty 中 Boss NioEventLoop 专门负责接收新的连接,关于 NioEventLoop 的核心源码我们下节课会着重介绍,在这里我们只先了解基本的处理流程。当客户端有新连接接入服务端时,Boss NioEventLoop 会监听到 OP_ACCEPT 事件,源码如下所示:

// NioEventLoop#processSelectedKey
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

NioServerSocketChannel 所持有的 unsafe 是 NioMessageUnsafe 类型,我们看下 NioMessageUnsafe.read() 方法中做了什么事。

public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); 
    allocHandle.reset(config);
    boolean closed = false;
    Throwable exception = null;
    try {
        try {            do {
                int localRead = doReadMessages(readBuf);  // while 循环不断读取 Buffer 中的数据
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                allocHandle.incMessagesRead(localRead);
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }
        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            pipeline.fireChannelRead(readBuf.get(i)); // 传播读取事件
        }
        readBuf.clear();
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete(); // 传播读取完毕事件
        // 省略其他代码
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

可以看出 read() 方法的核心逻辑就是通过 while 循环不断读取数据,然后放入 List 中,这里的数据其实就是新连接。需要重点跟进一下 NioServerSocketChannel 的 doReadMessages() 方法。

protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);
        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }
    return 0;
}

这时就开始执行第二个步骤:构造 Netty 客户端 NioSocketChannel。Netty 先通过 JDK 底层的 accept() 获取 JDK 原生的 SocketChannel,然后将它封装成 Netty 自己的 NioSocketChannel。新建 Netty 的客户端 Channel 的实现原理与上文中我们讲到的创建服务端 Channel 的过程是类似的,只是服务端 Channel 的类型是 NioServerSocketChannel,而客户端 Channel 的类型是 NioSocketChannel。NioSocketChannel 的创建同样会完成几件事:创建核心成员变量 id、unsafe、pipeline;注册 SelectionKey.OP_READ 事件;设置 Channel 的为非阻塞模式;新建客户端 Channel 的配置。

成功构造客户端 NioSocketChannel 后,接下来会通过 pipeline.fireChannelRead() 触发 channelRead 事件传播。对于服务端来说,此时 Pipeline 的内部结构如下图所示。
图片6.png

上文中我们提到了一种特殊的处理器 ServerBootstrapAcceptor,在这里它就发挥了重要的作用。channelRead 事件会传播到 ServerBootstrapAcceptor.channelRead() 方法,channelRead() 会将客户端 Channel 分配到工作线程组中去执行。具体实现如下:

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    // 在客户端 Channel 中添加 childHandler,childHandler 是用户在启动类中通过 childHandler() 方法指定的
    child.pipeline().addLast(childHandler);
    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);
    try {
        // 注册客户端 Channel
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

ServerBootstrapAcceptor 开始就把 msg 强制转换为 Channel。难道不会有其他类型的数据吗?因为 ServerBootstrapAcceptor 是服务端 Channel 中一个特殊的处理器,而服务端 Channel 的 channelRead 事件只会在新连接接入时触发,所以这里拿到的数据都是客户端新连接。

ServerBootstrapAcceptor 通过 childGroup.register() 方法会完成第三和第四两个步骤,将 NioSocketChannel 注册到 Worker 工作线程中,并注册 OP_READ 事件到 NioSocketChannel 的事件集合。在注册过程中比较有意思的一点是,它会调用 pipeline.fireChannelRegistered() 方法传播 channelRegistered 事件,然后再调用 pipeline.fireChannelActive() 方法传播 channelActive 事件。兜了一圈,这又会回到之前我们介绍的 readIfIsAutoRead() 方法,此时它会将 SelectionKey.OP_READ 事件注册到 Channel 的事件集合。

关于服务端如何处理客户端新建连接的具体源码,我在此就不继续展开了。这里留一个小任务,建议你亲自动手分析下 childGroup.register() 的相关源码,从而加深对服务端启动以及新连接处理流程的理解。有了服务端启动源码分析的基础,再去理解客户端新建连接的过程会相对容易很多。

总结

本节课我们深入分析了 Netty 服务端启动的全流程,对其中涉及的核心组件有了基本的认识。Netty 服务端启动的相关源码层次比较深,推荐大家在读源码的时候,可以先把主体流程梳理清楚,开始时先不用纠结具体的方法是用来做什么,自顶而下先画出完整的调用链路图(如下图所示),然后再逐一击破。
图片7.png

下节课,我们将学习 Netty 最核心的 Reactor 线程模型的源码,推荐你把两节课放在一起再进行复习,可以解答你目前不少的疑问,如异步 task 是如何封装并执行的?事件注册之后是如何被处理的?


18 源码篇:解密 Netty Reactor 线程模型

通过第一章 Netty 基础课程的学习,我们知道 Reactor 线程模型是 Netty 实现高性能的核心所在,在 Netty 中 EventLoop 是 Reactor 线程模型的核心处理引擎,那么 EventLoop 到底是如何实现的呢?又是如何保证高性能和线程安全性的呢?今天这节课让我们一起一探究竟。

说明:本文参考的 Netty 源码版本为 4.1.42.Final。

Reactor 线程执行的主流程

在《事件调度层:为什么 EventLoop 是 Netty 的精髓》的课程中,我们介绍了 EventLoop 的概貌,因为 Netty 是基于 NIO 实现的,所以推荐使用 NioEventLoop 实现,我们再次通过 NioEventLoop 的核心入口 run() 方法回顾 Netty Reactor 线程模型执行的主流程,并以此为基础继续深入研究 NioEventLoop 的逻辑细节。

protected void run() {
    for (;;) {
        try {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.BUSY_WAIT:
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false)); // 轮询 I/O 事件
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                }
            } catch (IOException e) {
                rebuildSelector0();
                handleLoopException(e);
                continue;
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys(); // 处理 I/O 事件
                } finally {
                    runAllTasks(); // 处理所有任务
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys(); // 处理 I/O 事件
                } finally {
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 处理完 I/O 事件,再处理异步任务队列
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

NioEventLoop 的 run() 方法是一个无限循环,没有任何退出条件,在不间断循环执行以下三件事情,可以用下面这张图形象地表示。

Lark20201216-164824.png

  • 轮询 I/O 事件(select):轮询 Selector 选择器中已经注册的所有 Channel 的 I/O 事件。
  • 处理 I/O 事件(processSelectedKeys):处理已经准备就绪的 I/O 事件。
  • 处理异步任务队列(runAllTasks):Reactor 线程还有一个非常重要的职责,就是处理任务队列中的非 I/O 任务。Netty 提供了 ioRatio 参数用于调整 I/O 事件处理和任务处理的时间比例。

下面我们对 NioEventLoop 的三个步骤进行详细的介绍。

轮询 I/O 事件

我们首先聚焦在轮询 I/O 事件的关键代码片段:

case SelectStrategy.CONTINUE:
    continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
    select(wakenUp.getAndSet(false));
    if (wakenUp.get()) {
        selector.wakeup();
    }

NioEventLoop 通过核心方法 select() 不断轮询注册的 I/O 事件。当没有 I/O 事件产生时,为了避免 NioEventLoop 线程一直循环空转,在获取 I/O 事件或者异步任务时需要阻塞线程,等待 I/O 事件就绪或者异步任务产生后才唤醒线程。NioEventLoop 使用 wakeUp 变量表示是否唤醒 selector,Netty 在每一次执行新的一轮循环之前,都会将 wakeUp 设置为 false。

Netty 提供了选择策略 SelectStrategy 对象,它用于控制 select 循环行为,包含 CONTINUE、SELECT、BUSY_WAIT 三种策略,因为 NIO 并不支持 BUSY_WAIT,所以 BUSY_WAIT 与 SELECT 的执行逻辑是一样的。在 I/O 事件循环的过程中 Netty 选择使用何种策略,具体的判断依据如下:

// DefaultSelectStrategy#calculateStrategy
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
// NioEventLoop#selectNowSupplier
private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        return selectNow();
    }
}
// NioEventLoop#selectNow
int selectNow() throws IOException {
    try {
        return selector.selectNow();
    } finally {
        if (wakenUp.get()) {
            selector.wakeup();
        }
    }
}

如果当前 NioEventLoop 线程存在异步任务,会通过 selectSupplier.get() 最终调用到 selectNow() 方法,selectNow() 是非阻塞,执行后立即返回。如果存在就绪的 I/O 事件,那么会走到 default 分支后直接跳出,然后执行 I/O 事件处理 processSelectedKeys 和异步任务队列处理 runAllTasks 的逻辑。所以在存在异步任务的场景,NioEventLoop 会优先保证 CPU 能够及时处理异步任务。

当 NioEventLoop 线程的不存在异步任务,即任务队列为空,返回的是 SELECT 策略, 就会调用 select(boolean oldWakenUp) 方法,接下来我们看看 select() 内部是如何实现的:

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); // 计算 select 阻塞操作的最后截止时间
        long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
        if (nextWakeupTime != normalizedDeadlineNanos) {
            nextWakeupTime = normalizedDeadlineNanos;
        }
        for (;;) {
            // ------ 1. 检测 select 阻塞操作是否超过截止时间 ------
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }
            // ------ 2. 轮询过程中如果有任务产生,中断本次轮询
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            // ------ 3. select 阻塞等待获取 I/O 事件 ------
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                break;
            }
            if (Thread.interrupted()) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                            "Thread.currentThread().interrupt() was called. Use " +
                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }
            // ------ 4. 解决臭名昭著的 JDK epoll 空轮询 Bug ------
            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                selector = selectRebuildSelector(selectCnt);
                selectCnt = 1;
                break;
            }
            currentTimeNanos = time;
        }
        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                        selectCnt - 1, selector);
            }
        }
    } catch (CancelledKeyException e) {
        if (logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                    selector, e);
        }
    }
}

Netty 为了解决臭名昭著的 JDK epoll 空轮询 Bug,造成整个 select() 方法是相对比较复杂的,我把它划分成四个部分逐一拆解来看。

第一步,检测 select 阻塞操作是否超过截止时间。 在进入无限循环之前,Netty 首先记录了当前时间 currentTimeNanos 以及定时任务队列中最近待执行任务的执行时间 selectDeadLineNanos,Netty 中定时任务队列是按照延迟时间从小到大进行排列的,通过调用 delayNanos(currentTimeNanos) 方法可以获得第一个待执行定时任务的延迟时间。然后代码会进入无限循环。首先判断 currentTimeNanos 是否超过 selectDeadLineNanos 0.5ms 以上,如果超过说明当前任务队列中有定时任务需要立刻执行,所以此时会退出无限循环。退出之前如果从未执行过 select 操作,那么会立即一次非阻塞的 selectNow 操作。那么这里有一个疑问,为什么会留出 0.5ms 的时间窗口呢?在任务队列为空的情况下,可能 select 操作没有获得到任何 I/O 事件就立即停止阻塞返回。

其中有一点容易混淆,Netty 的任务队列包括普通任务、定时任务以及尾部任务,hasTask() 判断的是普通任务队列和尾部队列是否为空,而 delayNanos(currentTimeNanos) 方法获取的是定时任务的延迟时间。

第二步,轮询过程中及时处理产生的任务。 Netty 为了保证任务能够及时执行,会立即一次非阻塞的 selectNow 操作后,立即跳出循环回到事件循环的主流程,确保接下来能够优先执行 runAllTasks。

第三步,select 阻塞等待获取 I/O 事件。 执行 select 阻塞操作,说明任务队列已经为空,而且第一个待执行定时任务还没有到达任务执行的截止时间,需要阻塞等待 timeoutMillis 的超时时间。假设一种极端情况,如果定时任务的截止时间非常久,那么 select 操作岂不是会一直阻塞造成 Netty 无法工作?所以 Netty 在外部线程添加任务的时候,可以唤醒 select 阻塞操作,具体源码如下:

// SingleThreadEventExecutor#execute
public void execute(Runnable task) {
  	  // 省略其他代码
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop); 
    }
}
// NioEventLoop#wakeup
protected void wakeup(boolean inEventLoop) {
    // 如果是外部线程,设置 wakenUp 为true,则唤醒 select 阻塞操作
    if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
        selector.wakeup(); 
    }
}

selector.wakeup() 操作的开销是非常大的,所以 Netty 并不是每次都直接调用,在每次调用之前都会先执行 wakenUp.compareAndSet(false, true),只有设置成功之后才会执行 selector.wakeup() 操作。

第四步,解决臭名昭著的 JDK epoll 空轮询 Bug。 在之前的课程中已经初步介绍了 Netty 的解决方案,在这里结合整体 select 操作我们再做一次回顾。实际上 Netty 并没有从根源上解决该问题,而是巧妙地规避了这个问题。Netty 引入了计数变量 selectCnt,用于记录 select 操作的次数,如果事件轮询时间小于 timeoutMillis,并且在该时间周期内连续发生超过 SELECTOR_AUTO_REBUILD_THRESHOLD(默认512) 次空轮询,说明可能触发了 epoll 空轮询 Bug。Netty 通过重建新的 Selector 对象,将异常的 Selector 中所有的 SelectionKey 会重新注册到新建的 Selector,重建完成之后异常的 Selector 就可以废弃了。

NioEventLoop 轮询 I/O 事件 select 的过程已经讲完了,我们简单总结 select 过程所做的事情。select 操作也是一个无限循环,在事件轮询之前检查任务队列是否为空,确保任务队列中待执行的任务能够及时执行。如果任务队列中已经为空,然后执行 select 阻塞操作获取等待获取 I/O 事件。Netty 通过引入计数器变量,并统计在一定时间窗口内 select 操作的执行次数,识别出可能存在异常的 Selector 对象,然后采用重建 Selector 的方式巧妙地避免了 JDK epoll 空轮询的问题。

处理 I/O 事件

通过 select 过程我们已经获取到准备就绪的 I/O 事件,接下来就需要调用 processSelectedKeys() 方法处理 I/O 事件。在开始处理 I/O 事件之前,Netty 通过 ioRatio 参数控制 I/O 事件处理和任务处理的时间比例,默认为 ioRatio = 50。如果 ioRatio = 100,表示每次都处理完 I/O 事件后,会执行所有的 task。如果 ioRatio < 100,也会优先处理完 I/O 事件,再处理异步任务队列。所以不论如何 processSelectedKeys() 都是先执行的,接下来跟进下 processSelectedKeys() 的源码:

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

处理 I/O 事件时有两种选择,一种是处理 Netty 优化过的 selectedKeys,另外一种是正常的处理逻辑。根据是否设置了 selectedKeys 来判断使用哪种策略,这两种策略使用的 selectedKeys 集合是不一样的。Netty 优化过的 selectedKeys 是 SelectedSelectionKeySet 类型,而正常逻辑使用的是 JDK HashSet 类型。下面我们逐一介绍两种策略的实现。

1. processSelectedKeysPlain

首先看下正常的处理逻辑 processSelectedKeysPlain 的源码:

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    if (selectedKeys.isEmpty()) {
        return;
    }
    Iterator<SelectionKey> i = selectedKeys.iterator();
    for (;;) {
        final SelectionKey k = i.next();
        final Object a = k.attachment();
        i.remove();
        if (a instanceof AbstractNioChannel) {
            // I/O 事件由 Netty 负责处理
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            // 用户自定义任务
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        if (!i.hasNext()) {
            break;
        }
        if (needsToSelectAgain) {
            selectAgain();
            selectedKeys = selector.selectedKeys();
            if (selectedKeys.isEmpty()) {
                break;
            } else {
                i = selectedKeys.iterator();
            }
        }
    }
}

Netty 会遍历依次处理已经就绪的 SelectionKey,SelectionKey 上面可以挂载 attachment。再根据 attachment 属性可以判断 SelectionKey 的类型,SelectionKey 的类型可能是 AbstractNioChannel 和 NioTask,这两种类型对应的处理方式也是不同的,AbstractNioChannel 类型由 Netty 框架负责处理,NioTask 是用户自定义的 task,一般不会是这种类型。我们着重看下 AbstractNioChannel 的处理场景,跟进 processSelectedKey() 的源码:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) { // 检查 Key 是否合法
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            return;
        }
        if (eventLoop != this || eventLoop == null) {
            return;
        }
        unsafe.close(unsafe.voidPromise()); // Key 不合法,直接关闭连接
        return;
    }
    try {
        int readyOps = k.readyOps();
        // 处理连接事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            unsafe.finishConnect();
        }
        // 处理可写事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }
        // 处理可读事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

从上述源码可知,processSelectedKey 一共处理了 OP_CONNECT、OP_WRITE、OP_READ 三个事件,我们分别了解下这三个事件的处理过程。

OP_CONNECT 连接建立事件。表示 TCP 连接建立成功, Channel 处于 Active 状态。处理 OP_CONNECT 事件首先将该事件从事件集合中清除,避免事件集合中一直存在连接建立事件,然后调用 unsafe.finishConnect() 方法通知上层连接已经建立。可以跟进 unsafe.finishConnect() 的源码发现会底层调用的 pipeline().fireChannelActive() 方法,这时会产生一个 Inbound 事件,然后会在 Pipeline 中进行传播,依次调用 ChannelHandler 的 channelActive() 方法,通知各个 ChannelHandler 连接建立成功。

  • OP_WRITE,可写事件。表示上层可以向 Channel 写入数据,通过执行 ch.unsafe().forceFlush() 操作,将数据冲刷到客户端,最终会调用 javaChannel 的 write() 方法执行底层写操作。
  • OP_READ,可读事件。表示 Channel 收到了可以被读取的新数据。Netty 将 READ 和 Accept 事件进行了统一的封装,都通过 unsafe.read() 进行处理。unsafe.read() 的逻辑可以归纳为几个步骤:从 Channel 中读取数据并存储到分配的 ByteBuf;调用 pipeline.fireChannelRead() 方法产生 Inbound 事件,然后依次调用 ChannelHandler 的 channelRead() 方法处理数据;调用 pipeline.fireChannelReadComplete() 方法完成读操作;最终执行 removeReadOp() 清除 OP_READ 事件。

我们再次回到 processSelectedKeysPlain 的主流程,接下来会判断 needsToSelectAgain 决定是否需要重新轮询。如果 needsToSelectAgain == true,会调用 selectAgain() 方法进行重新轮询,该方法会将 needsToSelectAgain 再次置为 false,然后调用 selectorNow() 后立即返回。

我们回顾一下 Reactor 线程的主流程,会发现每次在处理 I/O 事件之前,needsToSelectAgain 都会被设置为 false,那么在什么场景下 needsToSelectAgain 会再次设置为 true 呢?我们通过查找变量的引用,最后定位到 AbstractChannel#doDeregister。该方法的作用是将 Channel 从当前注册的 Selector 对象中移除,方法内部可能会把 needsToSelectAgain 设置为 true,具体源码如下:

protected void doDeregister() throws Exception {
    eventLoop().cancel(selectionKey());
}
void cancel(SelectionKey key) {
    key.cancel();
    cancelledKeys ++;
    // 当取消的 Key 超过默认阈值 256,needsToSelectAgain 设置为 true
    if (cancelledKeys >= CLEANUP_INTERVAL) {
        cancelledKeys = 0;
        needsToSelectAgain = true;
    }
}

当 Netty 在处理 I/O 事件的过程中,如果发现超过默认阈值 256 个 Channel 从 Selector 对象中移除后,会将 needsToSelectAgai 设置为 true,重新做一次轮询操作,从而确保 keySet 的有效性。

2. processSelectedKeysOptimized

介绍完正常的 I/O 事件处理 processSelectedKeysPlain 之后,回过头我们再来分析 Netty 优化的 processSelectedKeysOptimized 就会轻松很多,Netty 是否采用 SelectedSelectionKeySet 类型的优化策略由 DISABLE_KEYSET_OPTIMIZATION 参数决定。那么到底 SelectedSelectionKeySet 是如何进行优化的呢?我们继续跟进下 processSelectedKeysOptimized 的源码:

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        selectedKeys.keys[i] = null;
        final Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        if (needsToSelectAgain) {
            selectedKeys.reset(i + 1);
            selectAgain();
            i = -1;
        }
    }
}

可以发现 processSelectedKeysOptimized 与 processSelectedKeysPlain 的代码结构非常相似,其中最重要的一点就是 selectedKeys 的遍历方式是不同的,所以还是需要看下 SelectedSelectionKeySet 的源码一探究竟。

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
    SelectionKey[] keys;
    int size;
    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }
    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }
        keys[size++] = o;
        if (size == keys.length) {
            increaseCapacity();
        }
        return true;
    }
    
    // 省略其他代码
}

因为 SelectedSelectionKeySet 内部使用的是 SelectionKey 数组,所以 processSelectedKeysOptimized 可以直接通过遍历数组取出 I/O 事件,相比 JDK HashSet 的遍历效率更高。SelectedSelectionKeySet 内部通过 size 变量记录数据的逻辑长度,每次执行 add 操作时,会把对象添加到 SelectionKey[] 尾部。当 size 等于 SelectionKey[] 的真实长度时,SelectionKey[] 会进行扩容。相比于 HashSet,SelectionKey[] 不需要考虑哈希冲突的问题,所以可以实现 O(1) 时间复杂度的 add 操作。

那么 SelectedSelectionKeySet 是什么时候生成的呢?通过查找 SelectedSelectionKeySet 的引用定位到 NioEventLoop#openSelector 方法,摘录核心源码片段如下:

private SelectorTuple openSelector() {
    // 省略其他代码
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
                if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                    long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                    long publicSelectedKeysFieldOffset =
                            PlatformDependent.objectFieldOffset(publicSelectedKeysField);
                    if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                        PlatformDependent.putObject(
                                unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                        PlatformDependent.putObject(
                                unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                        return null;
                    }
                }
                // 省略其他代码
            } catch (NoSuchFieldException e) {
                return e;
            } catch (IllegalAccessException e) {
                return e;
            }
        }
    });    
    // 省略其他代码
}

Netty 通过反射的方式,将 Selector 对象内部的 selectedKeys 和 publicSelectedKeys 替换为 SelectedSelectionKeySet,原先 selectedKeys 和 publicSelectedKeys 这两个字段都是 HashSet 类型。这真是很棒的一个小技巧,对于 JDK 底层的优化一般是很少见的,Netty 在细节优化上追求极致的精神值得我们学习。

到这里,Reactor 线程主流程的第二步。处理 I/O 事件 processSelectedKeys 已经讲完了,简单总结一下 processSelectedKeys 的要点。处理 I/O 事件时有两种选择,一种是处理 Netty 优化过的 selectedKeys,另外一种是正常的处理逻辑,两种策略的处理逻辑是相似的,都是通过获取 SelectionKey 上挂载的 attachment 判断 SelectionKey 的类型,不同的 SelectionKey 的类型又会调用不同的处理方法,然后通过 Pipeline 进行事件传播。Netty 优化过的 selectedKeys 是使用数组存储的 SelectionKey,相比于 JDK 的 HashSet 遍历效率更高效。processSelectedKeys 还做了更多的优化处理,如果发现超过默认阈值 256 个 Channel 从 Selector 对象中移除后,会重新做一次轮询操作,以确保 keySet 的有效性。

处理异步任务队列

继续分析 Reactor 线程主流程的最后一步,处理异步任务队列 runAllTasks。为什么 Netty 能够保证 Channel 的操作都是线程安全的呢?这要归功于 Netty 的任务机制。下面我们从任务添加和任务执行两个方面介绍 Netty 的任务机制。

  • 任务添加

NioEventLoop 内部有两个非常重要的异步任务队列,分别为普通任务队列和定时任务队列。NioEventLoop 提供了 execute() 和 schedule() 方法用于向不同的队列中添加任务,execute() 用于添加普通任务,schedule() 方法用于添加定时任务。

首先我们看下如何添加普通任务。NioEventLoop 继承自 SingleThreadEventExecutor,SingleThreadEventExecutor 提供了 execute() 用于添加普通任务,源码如下:

public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    boolean inEventLoop = inEventLoop();
    addTask(task);
    if (!inEventLoop) {
        startThread();
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
            }
            if (reject) {
                reject();
            }
        }
    }
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}
protected void addTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (!offerTask(task)) {
        reject(task);
    }
}
final boolean offerTask(Runnable task) {
    if (isShutdown()) {
        reject();
    }
    return taskQueue.offer(task);
}

我们一步步跟进 addTask(task),发现最后是将任务添加到了 taskQueue,SingleThreadEventExecutor 中 taskQueue 就是普通任务队列。taskQueue 默认使用的是 Mpsc Queue,可以理解为多生产者单消费者队列,关于 Mpsc Queue 我们会有一节课程单独介绍,在这里不详细展开。此外,在任务处理的场景下,inEventLoop() 始终是返回 true,始终都是在 Reactor 线程内执行,既然在 Reactor 线程内都是串行执行,可以保证线程安全,那为什么还需要 Mpsc Queue 呢?我们继续往下看。

这里举一种很常见的场景,比如在 RPC 业务线程池里处理完业务请求后,可以根据用户请求拿到关联的 Channel,将数据写回客户端。那么对于外部线程调用 Channel 的相关方法 Netty 是如何操作的呢?我们一直跟进下 channel.write() 的源码:

// #AbstractChannel#write
public ChannelFuture write(Object msg) {
    return pipeline.write(msg);
}
// AbstractChannelHandlerContext#write
private void write(Object msg, boolean flush, ChannelPromise promise) {
    // 省略其他代码
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) { // Reactor 线程内部调用
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else { // 外部线程调用会走到该分支
        final AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        if (!safeExecute(executor, task, promise, m)) {
            task.cancel();
        }
    }
}
// AbstractChannelHandlerContext#safeExecute
private static boolean safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
    try {
        executor.execute(runnable);
        return true;
    } catch (Throwable cause) {
        try {
            promise.setFailure(cause);
        } finally {
            if (msg != null) {
                ReferenceCountUtil.release(msg);
            }
        }
        return false;
    }
}

如果是 Reactor 线程发起调用 channel.write() 方法,inEventLoop() 返回 true,此时直接在 Reactor 线程内部直接交由 Pipeline 进行事件处理。如果是外部线程调用,那么会走到 else 分支,此时会将写操作封装成一个 WriteTask,然后通过 safeExecute() 执行,可以发现 safeExecute() 就是调用的 SingleThreadEventExecutor#execute() 方法,最终会将任务添加到 taskQueue 中。因为多个外部线程可能会并发操作同一个 Channel,这时候 Mpsc Queue 就可以保证线程的安全性。

接下来我们再分析定时任务的添加过程。与普通任务类似,定时任务也会有 Reactor 线程内和外部线程两种场景,我们直接跟进到 AbstractScheduledEventExecutor#schedule() 源码的深层,发现如下核心代码:

private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if (inEventLoop()) { // Reactor 线程内部
        scheduledTaskQueue().add(task.setId(nextTaskId++));
    } else { // 外部线程
        executeScheduledRunnable(new Runnable() {
            @Override
            public void run() {
                scheduledTaskQueue().add(task.setId(nextTaskId++));
            }
        }, true, task.deadlineNanos());
    }
    return task;
}
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
    if (scheduledTaskQueue == null) {
        scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
                SCHEDULED_FUTURE_TASK_COMPARATOR,
                11);
    }
    return scheduledTaskQueue;
}
void executeScheduledRunnable(Runnable runnable,
                                        @SuppressWarnings("unused") boolean isAddition,
                                        @SuppressWarnings("unused") long deadlineNanos) {
    execute(runnable);
}

AbstractScheduledEventExecutor 中 scheduledTaskQueue 就是定时任务队列,可以看到 scheduledTaskQueue 的默认实现是优先级队列 DefaultPriorityQueue,这样可以方便队列中的任务按照时间进行排序。但是 DefaultPriorityQueue 是非线程安全的,如果是 Reactor 线程内部调用,因为是串行执行,所以不会有线程安全问题。如果是外部线程添加定时任务,我们发现 Netty 把添加定时任务的操作又再次封装成一个任务交由 executeScheduledRunnable() 处理,而 executeScheduledRunnable() 中又再次调用了普通任务的 execute() 的方法,巧妙地借助普通任务场景中 Mpsc Queue 解决了外部线程添加定时任务的线程安全问题。

  • 任务执行

介绍完 Netty 中不同任务的添加过程,回过头我们再来分析 Reactor 线程是如何执行这些任务的呢?通过 Reactor 线程主流程的分析,我们知道处理异步任务队列有 runAllTasks() 和 runAllTasks(long timeoutNanos) 两种实现,第一种会处理所有任务,第二种是带有超时时间来处理任务。之所以设置超时时间是为了防止 Reactor 线程处理任务时间过长而导致 I/O 事件阻塞,我们着重分析下 runAllTasks(long timeoutNanos) 的源码:

protected boolean runAllTasks(long timeoutNanos) {
    fetchFromScheduledTaskQueue(); // 1. 合并定时任务到普通任务队列
    // 2. 从普通任务队列中取出任务并处理
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }
    // 计算任务处理的超时时间
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        safeExecute(task); // 执行任务
        runTasks ++;
        // 每执行 64 个任务检查一下是否超时
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        task = pollTask(); // 继续取出下一个任务
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    // 3. 收尾工作
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

异步任务处理 runAllTasks 的过程可以分为三步:合并定时任务到普通任务队列,然后从普通任务队列中取出任务并处理,最后进行收尾工作。我们分别看看三个步骤都是如何实现的。

第一步,合并定时任务到普通任务队列,对应的实现是 fetchFromScheduledTaskQueue() 方法。

private boolean fetchFromScheduledTaskQueue() {
    if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
        return true;
    }
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    for (;;) {
        Runnable scheduledTask = pollScheduledTask(nanoTime); // 从定时任务队列中取出截止时间小于等于当前时间的定时任务
        if (scheduledTask == null) {
            return true;
        }
        if (!taskQueue.offer(scheduledTask)) {
            // 如果普通任务队列已满,把定时任务放回
            scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
    }
}
protected final Runnable pollScheduledTask(long nanoTime) {
    assert inEventLoop();
    Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
    ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
    // 如果定时任务的 deadlineNanos 小于当前时间就取出
    if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
        return null;
    }
    scheduledTaskQueue.remove();
    return scheduledTask;
}

定时任务只有满足截止时间 deadlineNanos 小于当前时间,才可以取出合并到普通任务。由于定时任务是按照截止时间 deadlineNanos 从小到大排列的,所以取出的定时任务不满足合并条件,那么定时任务队列中剩下的所有任务都不会满足条件,合并操作完成并退出。

第二步,从普通任务队列中取出任务并处理,可以回过头再看 runAllTasks(long timeoutNanos) 第二部分的源码,我已经用注释标明。真正处理任务的 safeExecute() 非常简单,就是直接调用的 Runnable 的 run() 方法。因为异步任务处理是有超时时间的,所以 Netty 采取了定时检测的策略,每执行 64 个任务的时候就会检查一下是否超时,这也是出于对性能的折中考虑,如果异步队列中有大量的短时间任务,每一次执行完都检测一次超时性能会有所降低。

第三步,收尾工作,对应的是 afterRunningAllTasks() 方法实现。

protected void afterRunningAllTasks() {
    runAllTasksFrom(tailTasks);
}
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    Runnable task = pollTaskFrom(taskQueue);
    if (task == null) {
        return false;
    }
    for (;;) {
        safeExecute(task);
        task = pollTaskFrom(taskQueue);
        if (task == null) {
            return true;
        }
    }
}

这里的尾部队列 tailTasks 相比于普通任务队列优先级较低,可以理解为是收尾任务,在每次执行完 taskQueue 中任务后会去获取尾部队列中任务执行。可以看出 afterRunningAllTasks() 就是把尾部队列 tailTasks 里的任务以此取出执行一遍。尾部队列并不常用,一般用于什么场景呢?例如你想对 Netty 的运行状态做一些统计数据,例如任务循环的耗时、占用物理内存的大小等等,都可以向尾部队列添加一个收尾任务完成统计数据的实时更新。

到这里,Netty 处理异步任务队列的流程就讲完了,再做一个简单的总结。异步任务主要分为普通任务和定时任务两种,在任务添加和任务执行时,都需要考虑 Reactor 线程内和外部线程两种情况。外部线程添加定时任务时,Netty 巧妙地借助普通任务的 Mpsc Queue 解决多线程并发操作时的线程安全问题。Netty 执行任务之前会将满足条件的定时任务合并到普通任务队列,由普通任务队列统一负责执行,并且每执行 64 个任务的时候就会检查一下是否超时。

总结

Reactor 线程模型是 Netty 最核心的内容,本节课我也花了大量的篇幅对其进行讲解。NioEventLoop 作为 Netty Reactor 线程的实现,它的设计原理是非常精妙的,值得我们反复阅读和思考。我们始终需要记住 NioEventLoop 的无限循环中所做的三件事:轮询 I/O 事件,处理 I/O 事件,处理异步任务队列。

关于 Netty Reactor 线程模型经常会遇到几个高频的面试问题,读完本节课之后你是否都已经清楚了呢?

  • Netty 的 NioEventLoop 是如何实现的?它为什么能够保证 Channel 的操作是线程安全的?
  • Netty 如何解决 JDK epoll 空轮询 Bug?
  • NioEventLoop 是如何实现无锁化的?

欢迎你在评论区留言,期待看到你分享关于 Reactor 线程模型更多的认识和思考。


19 源码篇:一个网络请求在 Netty 中的旅程

通过前面两节源码课程的学习,我们知道 Netty 在服务端启动时会为创建 NioServerSocketChannel,当客户端新连接接入时又会创建 NioSocketChannel,不管是服务端还是客户端 Channel,在创建时都会初始化自己的 ChannelPipeline。如果把 Netty 比作成一个生产车间,那么 Reactor 线程无疑是车间的中央管控系统,ChannelPipeline 可以看作是车间的流水线,将原材料按顺序进行一步步加工,然后形成一个完整的产品。本节课我将带你完整梳理一遍网络请求在 Netty 中的处理流程,从而加深对前两节课内容的理解,并着重讲解 ChannelPipeline 的工作原理。

说明:本文参考的 Netty 源码版本为 4.1.42.Final。

事件处理机制回顾

首先我们以服务端接入客户端新连接为例,并结合前两节源码课学习的知识点,一起复习下 Netty 的事件处理流程,如下图所示。

Drawing 0.png

Netty 服务端启动后,BossEventLoopGroup 会负责监听客户端的 Accept 事件。当有客户端新连接接入时,BossEventLoopGroup 中的 NioEventLoop 首先会新建客户端 Channel,然后在 NioServerSocketChannel 中触发 channelRead 事件传播,NioServerSocketChannel 中包含了一种特殊的处理器 ServerBootstrapAcceptor,最终通过 ServerBootstrapAcceptor 的 channelRead() 方法将新建的客户端 Channel 分配到 WorkerEventLoopGroup 中。WorkerEventLoopGroup 中包含多个 NioEventLoop,它会选择其中一个 NioEventLoop 与新建的客户端 Channel 绑定。

完成客户端连接注册之后,就可以接收客户端的请求数据了。当客户端向服务端发送数据时,NioEventLoop 会监听到 OP_READ 事件,然后分配 ByteBuf 并读取数据,读取完成后将数据传递给 Pipeline 进行处理。一般来说,数据会从 ChannelPipeline 的第一个 ChannelHandler 开始传播,将加工处理后的消息传递给下一个 ChannelHandler,整个过程是串行化执行。

在前面两节课中,我们介绍了服务端如何接收客户端新连接,以及 NioEventLoop 的工作流程,接下来我们重点介绍 ChannelPipeline 是如何实现 Netty 事件驱动的,这样 Netty 整个事件处理流程已经可以串成一条主线。

Pipeline 的初始化

我们知道 ChannelPipeline 是在创建 Channel 时被创建的,它是 Channel 中非常重要的一个成员变量。回到 AbstractChannel 的构造函数,以此为切入点,我们一起看下 ChannelPipeline 是如何一步步被构造出来的。

// AbstractChannel
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
// AbstractChannel#newChannelPipeline
protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}
// DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    tail = new TailContext(this);
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
}

当 ChannelPipeline 初始化完成后,会构成一个由 ChannelHandlerContext 对象组成的双向链表,默认 ChannelPipeline 初始化状态的最小结构仅包含 HeadContext 和 TailContext 两个节点,如下图所示。

Drawing 1.png

HeadContext 和 TailContext 属于 ChannelPipeline 中两个特殊的节点,它们都继承自 AbstractChannelHandlerContext,根据源码看下 AbstractChannelHandlerContext 有哪些实现类,如下图所示。除了 HeadContext 和 TailContext,还有一个默认实现类 DefaultChannelHandlerContext,我们可以猜到 DefaultChannelHandlerContext 封装的是用户在 Netty 启动配置类中添加的自定义业务处理器,DefaultChannelHandlerContext 会插入到 HeadContext 和 TailContext 之间。

图片3.png

接着我们比较一下上述三种 AbstractChannelHandlerContext 实现类的内部结构,发现它们都包含当前 ChannelPipeline 的引用、处理器 ChannelHandler。有一点不同的是 HeadContext 节点还包含了用于操作底层数据读写的 unsafe 对象。对于 Inbound 事件,会先从 HeadContext 节点开始传播,所以 unsafe 可以看作是 Inbound 事件的发起者;对于 Outbound 事件,数据最后又会经过 HeadContext 节点返回给客户端,此时 unsafe 可以看作是 Outbound 事件的处理者。

接下来我们继续看下用户自定义的处理器是如何加入 ChannelPipeline 的双向链表的。

Pipeline 添加 Handler

在 Netty 客户端或者服务端启动时,就需要用户配置自定义实现的业务处理器。我们先看一段服务端启动类的代码片段:

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                ch.pipeline().addLast(new SampleInboundA());
                ch.pipeline().addLast(new SampleInboundB());
                ch.pipeline().addLast(new SampleOutboundA());
                ch.pipeline().addLast(new SampleOutboundB());
            }
        });

我们知道 ChannelPipeline 分为入站 ChannelInboundHandler 和出站 ChannelOutboundHandler 两种处理器,它们都会被 ChannelHandlerContext 封装,不管是哪种处理器,最终都是通过双向链表连接,代码示例中构成的 ChannelPipeline 的结构如下。

图片4.png

那么 ChannelPipeline 在添加 Handler 时是如何区分 Inbound 和 Outbound 类型的呢?我们一起跟进 ch.pipeline().addLast() 方法源码,定位到核心代码如下。

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 1. 检查是否重复添加 Handler
        checkMultiplicity(handler);
        // 2. 创建新的 DefaultChannelHandlerContext 节点
        newCtx = newContext(group, filterName(name, handler), handler);
        // 3. 添加新的 DefaultChannelHandlerContext 节点到 ChannelPipeline
         addLast0(newCtx);
        // 省略其他代码
    }
    // 4. 回调用户方法
    callHandlerAdded0(newCtx);
    return this;
}

addLast() 主要做了以下四件事:

  1. 检查是否重复添加 Handler。
  2. 创建新的 DefaultChannelHandlerContext 节点。
  3. 添加新的 DefaultChannelHandlerContext 节点到 ChannelPipeline。
  4. 回调用户方法。

前三个步骤通过 synchronized 加锁完成的,为了防止多线程并发操作 ChannelPipeline 底层双向链表。下面我们一步步进行拆解介绍。

首先在添加 Handler 时,ChannelPipeline 会检查该 Handler 有没有被添加过。如果一个非线程安全的 Handler 被添加到 ChannelPipeline 中,那么当多线程访问时会造成线程安全问题。Netty 具体检查重复性的逻辑由 checkMultiplicity() 方法实现:

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException(
                    h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        h.added = true;
    }
}

用户自定义实现的处理一般都继承于 ChannelHandlerAdapter,ChannelHandlerAdapter 中使用 added 变量标识该 Handler 是否被添加过。如果当前添加的 Handler 是非共享且已被添加过,那么就会抛出异常,否则将当前 Handler 标记为已添加。

h.isSharable() 用于判断 Handler 是否是共享的,所谓共享就是这个 Handler 可以被重复添加到不同的 ChannelPipeline 中,共享的 Handler 必须要确保是线程安全的。如果我们想实现一个共享的 Handler,只需要在 Handler 中添加 @Sharable 注解即可,如下所示:

@ChannelHandler.Sharable
public class SampleInBoundHandler extends ChannelInboundHandlerAdapter {}

接下来我们分析 addLast() 的第二步,创建新的 DefaultChannelHandlerContext 节点。在执行 newContext() 方法之前,会通过 filterName() 为 Handler 创建一个唯一的名称,一起先看下 Netty 生成名称的策略是怎样的。

private String filterName(String name, ChannelHandler handler) {
    if (name == null) {
        return generateName(handler);
    }
    checkDuplicateName(name);
    return name;
}
private String generateName(ChannelHandler handler) {
    Map<Class<?>, String> cache = nameCaches.get();
    Class<?> handlerType = handler.getClass();
    String name = cache.get(handlerType);
    if (name == null) {
        name = generateName0(handlerType);
        cache.put(handlerType, name);
    }
    if (context0(name) != null) {
        String baseName = name.substring(0, name.length() - 1);
        for (int i = 1;; i ++) {
            String newName = baseName + i;
            if (context0(newName) == null) {
                name = newName;
                break;
            }
        }
    }
    return name;
}
private static String generateName0(Class<?> handlerType) {
    return StringUtil.simpleClassName(handlerType) + "#0";
}

Netty 会使用 FastThreadLocal 缓存 Handler 和名称的映射关系,在为 Handler 生成默认名称的之前,会先从缓存中查找是否已经存在,如果不存在,会调用 generateName0() 方法生成默认名称后,并加入缓存。可以看出 Netty 生成名称的默认规则是 “简单类名#0”,例如 HeadContext 的默认名称为 “DefaultChannelPipeline$HeadContext#0”。

为 Handler 生成完默认名称之后,还会通过 context0() 方法检查生成的名称是否和 ChannelPipeline 已有的名称出现冲突,查重的过程很简单,就是对双向链表进行线性搜索。如果存在冲突现象,Netty 会将名称最后的序列号截取出来,一直递增直至生成不冲突的名称为止,例如 “简单类名#1” “简单类名#2” “简单类名#3” 等等。

接下来回到 newContext() 创建节点的流程,可以定位到 AbstractChannelHandlerContext 的构造函数:

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
                              String name, Class<? extends ChannelHandler> handlerClass) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.executionMask = mask(handlerClass);
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}

AbstractChannelHandlerContext 中有一个 executionMask 属性并不是很好理解,它其实是一种常用的掩码运算操作,看下 mask() 方法是如何生成掩码的呢?

private static int mask0(Class<? extends ChannelHandler> handlerType) {
    int mask = MASK_EXCEPTION_CAUGHT;
    try {
        if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
            // 如果是 ChannelInboundHandler 实例,所有 Inbound 事件置为 1
            mask |= MASK_ALL_INBOUND;
            // 排除 Handler 不感兴趣的 Inbound 事件
            if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_REGISTERED;
            }
            if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_UNREGISTERED;
            }
            if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_ACTIVE;
            }
            if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_INACTIVE;
            }
            if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
                mask &= ~MASK_CHANNEL_READ;
            }
            if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_READ_COMPLETE;
            }
            if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
            }
            if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
                mask &= ~MASK_USER_EVENT_TRIGGERED;
            }
        }
        if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
            // 如果是 ChannelOutboundHandler 实例,所有 Outbound 事件置为 1
            mask |= MASK_ALL_OUTBOUND;
            // 排除 Handler 不感兴趣的 Outbound 事件
            if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
                    SocketAddress.class, ChannelPromise.class)) {
                mask &= ~MASK_BIND;
            }
            if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
                    SocketAddress.class, ChannelPromise.class)) {
                mask &= ~MASK_CONNECT;
            }
            if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
                mask &= ~MASK_DISCONNECT;
            }
            if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
                mask &= ~MASK_CLOSE;
            }
            if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
                mask &= ~MASK_DEREGISTER;
            }
            if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
                mask &= ~MASK_READ;
            }
            if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
                    Object.class, ChannelPromise.class)) {
                mask &= ~MASK_WRITE;
            }
            if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
                mask &= ~MASK_FLUSH;
            }
        }
        if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
            mask &= ~MASK_EXCEPTION_CAUGHT;
        }
    } catch (Exception e) {
        PlatformDependent.throwException(e);
    }
    return mask;
}

Netty 中分别有多种 Inbound 事件和 Outbound 事件,如 Inbound 事件有 channelRegistered、channelActive、channelRead 等等。Netty 会判断 Handler 的类型是否是 ChannelInboundHandler 的实例,如果是会把所有 Inbound 事件先置为 1,然后排除 Handler 不感兴趣的方法。同理,Handler 类型如果是 ChannelOutboundHandler,也是这么实现的。

那么如何排除 Handler 不感兴趣的事件呢?Handler 对应事件的方法上如果有 @Skip 注解,Netty 认为该事件是需要排除的。大部分情况下,用户自定义实现的 Handler 只需要关心个别事件,那么剩余不关心的方法都需要加上 @Skip 注解吗?Netty 其实已经在 ChannelHandlerAdapter 中默认都添加好了,所以用户如果继承了 ChannelHandlerAdapter,默认没有重写的方法都是加上 @Skip 的,只有用户重写的方法才是 Handler 关心的事件。

回到 addLast() 的主流程,接着需要将新创建的 DefaultChannelHandlerContext 节点添加到 ChannelPipeline 中,跟进 addLast0() 方法的源码。

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

addLast0() 非常简单,就是向 ChannelPipeline 中双向链表的尾部插入新的节点,其中 HeadContext 和 TailContext 一直是链表的头和尾,新的节点被插入到 HeadContext 和 TailContext 之间。例如代码示例中 SampleOutboundA 被添加时,双向链表的结构变化如下所示。

Drawing 4.png

最后,添加完节点后,就到了回调用户方法,定位到 callHandlerAdded() 的核心源码:

final void callHandlerAdded() throws Exception {
    if (setAddComplete()) {
        handler().handlerAdded(this);
    }
}
final boolean setAddComplete() {
    for (;;) {
        int oldState = handlerState;
        if (oldState == REMOVE_COMPLETE) {
            return false;
        }        if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
            return true;
        }
    }
}

Netty 会通过 CAS 修改节点的状态直至 REMOVE_COMPLETE 或者 ADD_COMPLETE,如果修改节点为 ADD_COMPLETE 状态,表示节点已经添加成功,然后会回调用户 Handler 中实现的 handlerAdded() 方法。

至此,Pipeline 添加 Handler 的实现原理我们已经讲完了,下面接着看下 Pipeline 删除 Handler 的场景。

Pipeline 删除 Handler

在《源码篇:从 Linux 出发深入剖析服务端启动流程》的课程中我们介绍了一种特殊的处理器 ChannelInitializer,ChannelInitializer 在服务端 Channel 注册完成之后会从 Pipeline 的双向链表中移除,我们一起回顾下这段代码:

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.add(ctx)) {
        try {
            initChannel((C) ctx.channel()); // 调用 ChannelInitializer 实现的 initChannel() 方法
        } catch (Throwable cause) {
            exceptionCaught(ctx, cause);
        } finally {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                pipeline.remove(this); // 将 ChannelInitializer 自身从 Pipeline 中移出
            }
        }
        return true;
    }
    return false;
}

继续跟进 pipeline.remove() 的源码。

@Override
public final ChannelPipeline remove(ChannelHandler handler) {
    // 1. getContextOrDie 用于查找需要删除的节点
    remove(getContextOrDie(handler));
    return this;
}
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
    assert ctx != head && ctx != tail;
    synchronized (this) {
        // 删除双向链表中的 Handler 节点
        atomicRemoveFromHandlerList(ctx);
        if (!registered) {
            callHandlerCallbackLater(ctx, false);
            return ctx;
        }
        EventExecutor executor = ctx.executor();
        if (!executor.inEventLoop()) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerRemoved0(ctx);
                }
            });
            return ctx;
        }
    }
    // 3. 回调用户函数
    callHandlerRemoved0(ctx);
    return ctx;
}

整个删除 Handler 的过程可以分为三步,分别为:

  1. 查找需要删除的 Handler 节点;
  2. 然后删除双向链表中的 Handler 节点;
  3. 最后回调用户函数。

我们对每一步逐一进行拆解。

第一步查找需要删除的 Handler 节点,我们自然可以想到通过遍历双向链表实现。一起看下 getContextOrDie() 方法的源码:

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
    AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
    if (ctx == null) {
        throw new NoSuchElementException(handler.getClass().getName());
    } else {
        return ctx;
    }
}
public final ChannelHandlerContext context(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    // 遍历双向链表查找
    AbstractChannelHandlerContext ctx = head.next;
    for (;;) {
        if (ctx == null) {
            return null;
        }
        // 如果 Handler 相同,返回当前的 Context 节点
        if (ctx.handler() == handler) { 
            return ctx;
        }
        ctx = ctx.next;
    }
}

Netty 确实是从双向链表的头结点开始依次遍历,如果当前 Context 节点的 Handler 要被删除的 Handler 相同,那么便找到了要删除的 Handler,然后返回当前 Context 节点。

找到需要删除的 Handler 节点之后,接下来就是将节点从双向链表中删除,再跟进atomicRemoveFromHandlerList() 方法的源码:

private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
    AbstractChannelHandlerContext prev = ctx.prev;
    AbstractChannelHandlerContext next = ctx.next;
    prev.next = next;
    next.prev = prev;
}

删除节点和添加节点类似,都是基本的链表操作,通过调整双向链表的指针即可实现。假设现在需要删除 SampleOutboundA 节点,我们以一幅图来表示删除时指针的变化过程,如下所示。

图片6.png

删除完节点之后,最后 Netty 会回调用户自定义实现的 handlerRemoved() 方法,回调的实现过程与添加节点时是类似的,在这里我就不赘述了。

到此为止,我们已经学会了 ChannelPipeline 内部结构的基本操作,只需要基本的链表操作就可以实现 Handler 节点的添加和删除,添加时通过掩码运算的方式排出 Handler 不关心的事件。 ChannelPipeline 是如何调度 Handler 的呢?接下来我们继续学习。

数据在 Pipeline 中的运转

我们知道,根据数据的流向,ChannelPipeline 分为入站 ChannelInboundHandler 和出站 ChannelOutboundHandler 两种处理器。Inbound 事件和 Outbound 事件的传播方向相反,Inbound 事件的传播方向为 Head -> Tail,而 Outbound 事件传播方向是 Tail -> Head。今天我们就以客户端和服务端请求-响应的场景,深入研究 ChannelPipeline 的事件传播机制。

Inbound 事件传播

当客户端向服务端发送数据时,服务端是如何接收的呢?回顾下之前我们所学习的 Netty Reactor 线程模型,首先 NioEventLoop 会不断轮询 OP_ACCEPT 和 OP_READ 事件,当事件就绪时,NioEventLoop 会及时响应。首先定位到 NioEventLoop 中源码的入口:

// NioEventLoop#processSelectedKey
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

可以看出 unsafe.read() 会触发后续事件的处理,有一点需要避免混淆,在服务端 Channel 和客户端 Channel 中绑定的 unsafe 对象是不一样的,因为服务端 Channel 只关心如何接收客户端连接,而客户端 Channel 需要关心数据的读写。这里我们重点分析一下客户端 Channel 读取数据的过程,跟进 unsafe.read() 的源码:

public final void read() {
    final ChannelConfig config = config();
    // 省略其他代码
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);
    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            byteBuf = allocHandle.allocate(allocator); // 分配 ByteBuf
            allocHandle.lastBytesRead(doReadBytes(byteBuf)); // 将 Channel 中的数据读到 ByteBuf 中
            if (allocHandle.lastBytesRead() <= 0) {
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    readPending = false;
                }
                break;
            }
            allocHandle.incMessagesRead(1);
            readPending = false;
            pipeline.fireChannelRead(byteBuf); // 传播 ChannelRead 事件
            byteBuf = null;
        } while (allocHandle.continueReading());
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete(); // 传播 readComplete 事件
        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

Netty 会不断从 Channel 中读取数据到分配的 ByteBuf 中,然后通过 pipeline.fireChannelRead() 方法触发 ChannelRead 事件的传播,fireChannelRead() 是我们需要重点分析的对象。

// DefaultChannelPipeline
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}
// AbstractChannelHandlerContext
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) { // 当前在 Reactor 线程内部,直接执行
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() { // 如果是外部线程,则提交给异步任务队列
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

Netty 首先会以 Head 节点为入参,直接调用一个静态方法 invokeChannelRead()。如果当前是在 Reactor 线程内部,会直接执行 next.invokeChannelRead() 方法。如果是外部线程发起的调用,Netty 会把 next.invokeChannelRead() 调用封装成异步任务提交到任务队列。通过之前对 NioEventLoop 源码的学习,我们知道这样可以保证执行流程全部控制在当前 NioEventLoop 线程内部串行化执行,确保线程安全性。我们抓住核心逻辑 next.invokeChannelRead() 继续跟进。

// AbstractChannelHandlerContext
private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

可以看出,当前 ChannelHandlerContext 节点会取出自身对应的 Handler,执行 Handler 的 channelRead 方法。此时当前节点是 HeadContext,所以 Inbound 事件是从 HeadContext 节点开始进行传播的,看下 HeadContext.channelRead() 是如何实现的。

// HeadContext
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.fireChannelRead(msg);
}
// AbstractChannelHandlerContext
public ChannelHandlerContext fireChannelRead(final Object msg) {
    // 找到下一个节点,执行 invokeChannelRead
    invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
    return this;
}

我们发现 HeadContext.channelRead() 并没有做什么特殊操作,而是直接通过 fireChannelRead() 方法继续将读事件继续传播下去。接下来 Netty 会通过 findContextInbound(MASK_CHANNEL_READ), msg) 找到 HeadContext 的下一个节点,然后继续执行我们之前介绍的静态方法 invokeChannelRead(),从而进入一个递归调用的过程,直至某个条件结束。以上 channelRead 的执行过程我们可以梳理成一幅流程图:

Drawing 6.png

Netty 是如何判断 InboundHandler 是否关心 channelRead 事件呢?这就涉及findContextInbound(MASK_CHANNEL_READ), msg) 中的一个知识点,和上文中我们介绍的 executionMask 掩码运算是息息相关的。首先看下 findContextInbound() 的源码:

private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while ((ctx.executionMask & mask) == 0);
    return ctx;
}

MASK_CHANNEL_READ 的值为 1 << 5,表示 channelRead 事件所在的二进制位已被置为 1。在代码示例中,SampleInboundA 是我们添加的 Inbound 类型的自定义处理器,它所对应的 executionMask 掩码和 MASK_CHANNEL_READ 进行与运算的结果如果不为 0,表示 SampleInboundA 对 channelRead 事件感兴趣,需要触发执行 SampleInboundA 的 channelRead() 方法。

Inbound 事件在上述递归调用的流程中什么时候能够结束呢?有以下两种情况:

  1. 用户自定义的 Handler 没有执行 fireChannelRead() 操作,则在当前 Handler 终止 Inbound 事件传播。
  2. 如果用户自定义的 Handler 都执行了 fireChannelRead() 操作,Inbound 事件传播最终会在 TailContext 节点终止。

接下来,我们着重看下 TailContext 节点做了哪些工作。

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    onUnhandledInboundMessage(ctx, msg);
}
protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. " +
                        "Please check your pipeline configuration.", msg);
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

可以看出 TailContext 只是日志记录了丢弃的 Inbound 消息,并释放 ByteBuf 做一个兜底保护,防止内存泄漏。

到此为止,Inbound 事件的传播流程已经介绍完了,Inbound 事件在 ChannelPipeline 中的传播方向是 Head -> Tail。Netty 会从 ChannelPipeline 中找到对传播事件感兴趣的 Inbound 处理器,执行事件回调方法,然后继续向下一个节点传播,整个事件传播流程是一个递归调用的过程。

Outbound 事件传播

分析完 Inbound 事件的传播流程之后,再学习 Outbound 事件传播就会简单很多。Outbound 事件传播的方向是从 Tail -> Head,与 Inbound 事件的传播方向恰恰是相反的。Outbound 事件最常见的就是写事件,执行 writeAndFlush() 方法时就会触发 Outbound 事件传播。我们直接从 TailContext 跟进 writeAndFlush() 源码:

@Override
public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}

继续跟进 tail.writeAndFlush() 的源码,最终会定位到 AbstractChannelHandlerContext 中的 write 方法。该方法是 writeAndFlush 的核心逻辑,具体源码如下。

private void write(Object msg, boolean flush, ChannelPromise promise) {
    // ...... 省略部分非核心代码 ......    // 找到 Pipeline 链表中下一个 Outbound 类型的 ChannelHandler 节点
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    // 判断当前线程是否是 NioEventLoop 中的线程
    if (executor.inEventLoop()) {
        if (flush) {
            // 因为 flush == true,所以流程走到这里
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        final AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        if (!safeExecute(executor, task, promise, m)) {
            task.cancel();
        }
    }
}

在《数据传输:writeAndFlush 处理流程剖析》的课程中,我们已经对 write() 方法做了深入分析,这里抛开其他技术细节,我们只分析 Outbound 事件传播的过程。

假设我们在代码示例中 SampleOutboundB 调用了 writeAndFlush() 方法,那么 Netty 会调用 findContextOutbound() 方法找到 Pipeline 链表中下一个 Outbound 类型的 ChannelHandler,对应上述代码示例中下一个 Outbound 节点是 SampleOutboundA,然后调用 next.invokeWriteAndFlush(m, promise),我们跟进去:

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

我们发现,invokeWriteAndFlush() 方法最终会它会执行下一个 ChannelHandler 节点的 write 方法。一般情况下,用户在实现 outBound 类型的 ChannelHandler 时都会继承 ChannelOutboundHandlerAdapter,一起看下它的 write() 方法是如何处理 outBound 事件的。

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ctx.write(msg, promise);
}

ChannelOutboundHandlerAdapter.write() 只是调用了 AbstractChannelHandlerContext 的 write() 方法,是不是似曾相识?与之前介绍的 Inbound 事件处理流程类似,此时流程又回到了 AbstractChannelHandlerContext 中重复执行 write 方法,继续寻找下一个 Outbound 节点,也是一个递归调用的过程。

编码器是用户经常需要自定义实现的处理器,然而为什么用户的编码器里并没有重写 write(),只是重写一个 encode() 方法呢?在《Netty 如何实现自定义通信协议》课程中,我们所介绍的 MessageToByteEncoder 源码,用户自定义的编码器基本都会继承 MessageToByteEncoder,MessageToByteEncoder 重写了 ChanneOutboundHandler 的 write() 方法,其中会调用子类实现的 encode 方法完成数据编码,这里我们不再赘述了。

那么 OutBound 事件什么时候传播结束呢?也许你已经猜到了,OutBound 事件最终会传播到 HeadContext 节点。所以 HeadContext 节点既是 Inbound 处理器,又是 OutBound 处理器,继续看下 HeadContext 是如何拦截和处理 write 事件的。

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    unsafe.write(msg, promise);
}

HeadContext 最终调用了底层的 unsafe 写入数据,数据在执行 write() 方法时,只会写入到一个底层的缓冲数据结构,然后等待 flush 操作将数据冲刷到 Channel 中。关于 write 和 flush 是如何操作缓存数据结构的,快去复习一遍《数据传输:writeAndFlush 处理流程剖析》吧,将知识点形成一个完整的体系。

到此为止,outbound 事件传播也介绍完了,它的传播方向是 Tail -> Head,与 Inbound 事件的传播是相反的。MessageToByteEncoder 是用户在实现编码时经常用到的一个抽象类,MessageToByteEncoder 中已经重写了 ChanneOutboundHandler 的 write() 方法,大部分情况下用户只需要重写 encode() 即可。

异常事件传播

在《服务编排层:Pipeline 如何协调各类 Handler》中,我们已经初步介绍了 Netty 实现统一异常拦截和处理的最佳实践,首先回顾下异常拦截器的简单实现。

public class ExceptionHandler extends ChannelDuplexHandler {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (cause instanceof RuntimeException) {
            System.out.println("Handle Business Exception Success.");
        }
    }
}

异常处理器 ExceptionHandler 一般会继承 ChannelDuplexHandler,ChannelDuplexHandler 既是一个 Inbound 处理器,又是一个 Outbound 处理器。ExceptionHandler 应该被添加在自定义处理器的尾部,如下图所示:

图片8.png

那么异常处理器 ExceptionHandler 什么时候被执行呢?我们分别从 Inbound 异常事件传播和 Outbound 异常事件传播两种场景进行分析。

首先看下 Inbound 异常事件的传播。还是从数据读取的场景入手,发现 Inbound 事件传播的时候有异常处理的相关逻辑,我们再一起重新分析下数据读取环节的源码。

// AbstractChannelHandlerContext
private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}
// AbstractChannelHandlerContext
private void notifyHandlerException(Throwable cause) {
    // 省略其他代码
    invokeExceptionCaught(cause);
}
// AbstractChannelHandlerContext
private void invokeExceptionCaught(final Throwable cause) {
    if (invokeHandler()) {
        try {
            handler().exceptionCaught(this, cause); // 调用 Handler 实现的 exceptionCaught 方法
        } catch (Throwable error) {
            // 省略其他代码
        }
    } else {
        fireExceptionCaught(cause);
    }
}

如果 SampleInboundA 在读取数据时发生了异常,invokeChannelRead 会捕获异常,并执行 notifyHandlerException() 方法进行异常处理。我们一步步跟进,发现最终会调用 Handler 的 exceptionCaught() 方法,所以用户可以通过重写 exceptionCaught() 实现自定义的异常处理。

我们知道,统一异常处理器 ExceptionHandler 是在 ChannelPipeline 的末端,SampleInboundA 并没有重写 exceptionCaught() 方法,那么 SampleInboundA 产生的异常是如何传播到 ExceptionHandler 中呢?用户实现的 Inbound 处理器一般都会继承 ChannelInboundHandlerAdapter 抽象类,果然我们在 ChannelInboundHandlerAdapter 中发现了 exceptionCaught() 的实现:

// ChannelInboundHandlerAdapter
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
        throws Exception {
    ctx.fireExceptionCaught(cause);
}
// AbstractChannelHandlerContext
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
    invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);
    return this;
}

ChannelInboundHandlerAdapter 默认调用 fireExceptionCaught() 方法传播异常事件,而 fireExceptionCaught() 执行时会先调用 findContextInbound() 方法找到下一个对异常事件关注的 Inbound 处理器,然后继续向下传播异常。所以这里应该明白为什么统一异常处理器 ExceptionHandler 为什么需要添加在 ChannelPipeline 的末端了吧?这样 ExceptionHandler 可以接收所有 Inbound 处理器发生的异常。

接下来,我们分析 Outbound 异常事件传播。你可能此时就会有一个疑问,Outbound 事件的传播方向与 Inbound 事件是相反的,为什么统一异常处理器 ExceptionHandler 没有添加在 ChannelPipeline 的头部呢?我们通过 writeAndFlush() 的调用过程再来一探究竟。

// AbstractChannelHandlerContext
private void invokeFlush0() {
    try {
        ((ChannelOutboundHandler) handler()).flush(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

我们发现,flush 发送数据时如果发生异常,那么异常也会被捕获并交由同样的 notifyHandlerException() 方法进行处理。因为 notifyHandlerException() 方法中会向下寻找 Inbound 处理器,此时又会回到 Inbound 异常事件的传播流程。所以说,异常事件的传播方向与 Inbound 事件几乎是一样的,最后一定会传播到统一异常处理器 ExceptionHandler。

到这里,整个异常事件的传播过程已经分析完了。你需要记住的是,异常事件的传播顺序与 ChannelHandler 的添加顺序相同,会依次向后传播,与 Inbound 事件和 Outbound 事件无关。

总结

这节点我们学习了数据在 Netty 中的完整处理流程,其中重点分析了数据是如何在 ChannelPipeline 中流转的。我们做一个知识点总结:

  • ChannelPipeline 是双向链表结构,包含 ChannelInboundHandler 和 ChannelOutboundHandler 两种处理器。
  • Inbound 事件和 Outbound 事件的传播方向相反,Inbound 事件的传播方向为 Head -> Tail,而 Outbound 事件传播方向是 Tail -> Head。
  • 异常事件的处理顺序与 ChannelHandler 的添加顺序相同,会依次向后传播,与 Inbound 事件和 Outbound 事件无关。

再整体回顾下 ChannelPipeline 中事件传播的实现原理:

  • Inbound 事件传播从 HeadContext 节点开始,Outbound 事件传播从 TailContext 节点开始。
  • AbstractChannelHandlerContext 抽象类中实现了一系列 fire 和 invoke 方法,如果想让事件想下传播,只需要调用 fire 系列的方法即可。fire 和 invoke 的系列方法结合 findContextInbound() 和 findContextOutbound() 可以控制 Inbound 和 Outbound 事件的传播方向,整个过程是一个递归调用。

20 技巧篇:Netty 的 FastThreadLocal 究竟比 ThreadLocal 快在哪儿?

在前面几篇源码解析的课程中,我们都有在源码中发现 FastThreadLocal 的身影。顾名思义,Netty 作为高性能的网络通信框架,FastThreadLocal 是比 JDK 自身的 ThreadLocal 性能更高的通信框架。FastThreadLocal 到底比 ThreadLocal 快在哪里呢?这节课我们就一起来探索 FastThreadLocal 高性能的奥秘。

说明:本文参考的 Netty 源码版本为 4.1.42.Final。

JDK ThreadLocal 基本原理

JDK ThreadLocal 不仅是高频的面试知识点,而且在日常工作中也是常用一种工具,所以首先我们先学习下 Java 原生的 ThreadLocal 的实现原理,可以帮助我们更好地对比和理解 Netty 的 FastThreadLocal。

如果你需要变量在多线程之间隔离,或者在同线程内的类和方法中共享,那么 ThreadLocal 大显身手的时候就到了。ThreadLocal 可以理解为线程本地变量,它是 Java 并发编程中非常重要的一个类。ThreadLocal 为变量在每个线程中都创建了一个副本,该副本只能被当前线程访问,多线程之间是隔离的,变量不能在多线程之间共享。这样每个线程修改变量副本时,不会对其他线程产生影响。

接下来我们通过一个例子看下 ThreadLocal 如何使用:

public class ThreadLocalTest {
    private static final ThreadLocal<String> THREAD_NAME_LOCAL = ThreadLocal.withInitial(() -> Thread.currentThread().getName());
    private static final ThreadLocal<TradeOrder> TRADE_THREAD_LOCAL = new ThreadLocal<>();
    public static void main(String[] args) {
        for (int i = 0; i < 2; i++) {
            int tradeId = i;
            new Thread(() -> {
                TradeOrder tradeOrder = new TradeOrder(tradeId, tradeId % 2 == 0 ? "已支付" : "未支付");
                TRADE_THREAD_LOCAL.set(tradeOrder);
                System.out.println("threadName: " + THREAD_NAME_LOCAL.get());
                System.out.println("tradeOrder info:" + TRADE_THREAD_LOCAL.get());
            }, "thread-" + i).start();
        }
    }
    static class TradeOrder {
        long id;
        String status;
        public TradeOrder(int id, String status) {
            this.id = id;
            this.status = status;
        }
        @Override
        public String toString() {
            return "id=" + id + ", status=" + status;
        }
    }
}

在上述示例中,构造了 THREAD_NAME_LOCAL 和 TRADE_THREAD_LOCAL 两个 ThreadLocal 变量,分别用于记录当前线程名称和订单交易信息。ThreadLocal 是可以支持泛型的,THREAD_NAME_LOCAL 和 TRADE_THREAD_LOCAL 存放 String 类型和 TradeOrder 对象类型的数据,你可以通过 set()/get() 方法设置和读取 ThreadLocal 实例。一起看下示例代码的运行结果:

threadName: thread-0
threadName: thread-1
tradeOrder info:id=1, status=未支付
tradeOrder info:id=0, status=已支付

可以看出 thread-1 和 thread-2 虽然操作的是同一个 ThreadLocal 对象,但是它们取到了不同的线程名称和订单交易信息。那么一个线程内如何存在多个 ThreadLocal 对象,每个 ThreadLocal 对象是如何存储和检索的呢?

接下来我们看看 ThreadLocal 的实现原理。既然多线程访问 ThreadLocal 变量时都会有自己独立的实例副本,那么很容易想到的方案就是在 ThreadLocal 中维护一个 Map,记录线程与实例之间的映射关系。当新增线程和销毁线程时都需要更新 Map 中的映射关系,因为会存在多线程并发修改,所以需要保证 Map 是线程安全的。那么 JDK 的 ThreadLocal 是这么实现的吗?答案是 NO。因为在高并发的场景并发修改 Map 需要加锁,势必会降低性能。JDK 为了避免加锁,采用了相反的设计思路。以 Thread 入手,在 Thread 中维护一个 Map,记录 ThreadLocal 与实例之间的映射关系,这样在同一个线程内,Map 就不需要加锁了。示例代码中线程 Thread 和 ThreadLocal 的关系可以用以下这幅图表示。

Drawing 0.png

那么在 Thread 内部,维护映射关系的 Map 是如何实现的呢?从源码中可以发现 Thread 使用的是 ThreadLocal 的内部类 ThreadLocalMap,所以 Thread、ThreadLocal 和 ThreadLocalMap 之间的关系可以用下图表示:

Drawing 1.png

为了更加深入理解 ThreadLocal,了解 ThreadLocalMap 的内部实现是非常有必要的。ThreadLocalMap 其实与 HashMap 的数据结构类似,但是 ThreadLocalMap 不具备通用性,它是为 ThreadLocal 量身定制的。

ThreadLocalMap 是一种使用线性探测法实现的哈希表,底层采用数组存储数据。如下图所示,ThreadLocalMap 会初始化一个长度为 16 的 Entry 数组,每个 Entry 对象用于保存 key-value 键值对。与 HashMap 不同的是,Entry 的 key 就是 ThreadLocal 对象本身,value 就是用户具体需要存储的值。

Drawing 2.png

当调用 ThreadLocal.set() 添加 Entry 对象时,是如何解决 Hash 冲突的呢?这就需要我们了解线性探测法的实现原理。每个 ThreadLocal 在初始化时都会有一个 Hash 值为 threadLocalHashCode,每增加一个 ThreadLocal, Hash 值就会固定增加一个魔术 HASH_INCREMENT = 0x61c88647。为什么取 0x61c88647 这个魔数呢?实验证明,通过 0x61c88647 累加生成的 threadLocalHashCode 与 2 的幂取模,得到的结果可以较为均匀地分布在长度为 2 的幂大小的数组中。有了 threadLocalHashCode 的基础,下面我们通过下面的表格来具体讲解线性探测法是如何实现的。

图片2.png

为了便于理解,我们采用一组简单的数据模拟 ThreadLocal.set() 的过程是如何解决 Hash 冲突的。

  1. threadLocalHashCode = 4,threadLocalHashCode & 15 = 4;此时数据应该放在数组下标为 4 的位置。下标 4 的位置正好没有数据,可以存放。
  2. threadLocalHashCode = 19,threadLocalHashCode & 15 = 4;但是下标 4 的位置已经有数据了,如果当前需要添加的 Entry 与下标 4 位置已存在的 Entry 两者的 key 相同,那么该位置 Entry 的 value 将被覆盖为新的值。我们假设 key 都是不相同的,所以此时需要向后移动一位,下标 5 的位置没有冲突,可以存放。
  3. threadLocalHashCode = 33,threadLocalHashCode & 15 = 3;下标 3 的位置已经有数据,向后移一位,下标 4 位置还是有数据,继续向后查找,发现下标 6 没有数据,可以存放。

ThreadLocal.get() 的过程也是类似的,也是根据 threadLocalHashCode 的值定位到数组下标,然后判断当前位置 Entry 对象与待查询 Entry 对象的 key 是否相同,如果不同,继续向下查找。由此可见,ThreadLocal.set()/get() 方法在数据密集时很容易出现 Hash 冲突,需要 O(n) 时间复杂度解决冲突问题,效率较低。

下面我们再聊聊 ThreadLocalMap 中 Entry 的设计原理。Entry 继承自弱引用类 WeakReference,Entry 的 key 是弱引用,value 是强引用。在 JVM 垃圾回收时,只要发现了弱引用的对象,不管内存是否充足,都会被回收。那么为什么 Entry 的 key 要设计成弱引用呢?我们试想下,如果 key 都是强引用,当 ThreadLocal 不再使用时,然而 ThreadLocalMap 中还是存在对 ThreadLocal 的强引用,那么 GC 是无法回收的,从而造成内存泄漏。

虽然 Entry 的 key 设计成了弱引用,但是当 ThreadLocal 不再使用被 GC 回收后,ThreadLocalMap 中可能出现 Entry 的 key 为 NULL,那么 Entry 的 value 一直会强引用数据而得不到释放,只能等待线程销毁。那么应该如何避免 ThreadLocalMap 内存泄漏呢?ThreadLocal 已经帮助我们做了一定的保护措施,在执行 ThreadLocal.set()/get() 方法时,ThreadLocal 会清除 ThreadLocalMap 中 key 为 NULL 的 Entry 对象,让它还能够被 GC 回收。除此之外,当线程中某个 ThreadLocal 对象不再使用时,立即调用 remove() 方法删除 Entry 对象。如果是在异常的场景中,记得在 finally 代码块中进行清理,保持良好的编码意识。

关于 JDK 的 ThreadLocal 的基本原理我们已经介绍完了,既然 ThreadLocal 已经非常成熟,而且在日常开发中也被广泛使用,Netty 为什么还要自己实现一个 FastThreadLocal 呢?性能真的比 ThreadLocal 高很多吗?我们接下来一起一探究竟。

FastThreadLocal 为什么快

FastThreadLocal 的实现与 ThreadLocal 非常类似,Netty 为 FastThreadLocal 量身打造了 FastThreadLocalThread 和 InternalThreadLocalMap 两个重要的类。下面我们看下这两个类是如何实现的。

FastThreadLocalThread 是对 Thread 类的一层包装,每个线程对应一个 InternalThreadLocalMap 实例。只有 FastThreadLocal 和 FastThreadLocalThread 组合使用时,才能发挥 FastThreadLocal 的性能优势。首先看下 FastThreadLocalThread 的源码定义:

public class FastThreadLocalThread extends Thread {
    private InternalThreadLocalMap threadLocalMap;
    // 省略其他代码
}

可以看出 FastThreadLocalThread 主要扩展了 InternalThreadLocalMap 字段,我们可以猜测到 FastThreadLocalThread 主要使用 InternalThreadLocalMap 存储数据,而不再是使用 Thread 中的 ThreadLocalMap。所以想知道 FastThreadLocalThread 高性能的奥秘,必须要了解 InternalThreadLocalMap 的设计原理。

上文中我们讲到了 ThreadLocal 的一个重要缺点,就是 ThreadLocalMap 采用线性探测法解决 Hash 冲突性能较慢,那么 InternalThreadLocalMap 又是如何优化的呢?首先一起看下 InternalThreadLocalMap 的内部构造。

public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
    private static final int DEFAULT_ARRAY_LIST_INITIAL_CAPACITY = 8;
    private static final int STRING_BUILDER_INITIAL_SIZE;
    private static final int STRING_BUILDER_MAX_SIZE;
    public static final Object UNSET = new Object();
    private BitSet cleanerFlags;    private InternalThreadLocalMap() {
        super(newIndexedVariableTable());
    }
    private static Object[] newIndexedVariableTable() {
        Object[] array = new Object[32];
        Arrays.fill(array, UNSET);
        return array;
    }    public static int nextVariableIndex() {
        int index = nextIndex.getAndIncrement();
        if (index < 0) {
            nextIndex.decrementAndGet();
            throw new IllegalStateException("too many thread-local indexed variables");
        }
        return index;
    }
    // 省略其他代码
}
class UnpaddedInternalThreadLocalMap {
    static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
    static final AtomicInteger nextIndex = new AtomicInteger();    Object[] indexedVariables;
    UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
        this.indexedVariables = indexedVariables;
    }
    // 省略其他代码
}

从 InternalThreadLocalMap 内部实现来看,与 ThreadLocalMap 一样都是采用数组的存储方式。但是 InternalThreadLocalMap 并没有使用线性探测法来解决 Hash 冲突,而是在 FastThreadLocal 初始化的时候分配一个数组索引 index,index 的值采用原子类 AtomicInteger 保证顺序递增,通过调用 InternalThreadLocalMap.nextVariableIndex() 方法获得。然后在读写数据的时候通过数组下标 index 直接定位到 FastThreadLocal 的位置,时间复杂度为 O(1)。如果数组下标递增到非常大,那么数组也会比较大,所以 FastThreadLocal 是通过空间换时间的思想提升读写性能。下面通过一幅图描述 InternalThreadLocalMap、index 和 FastThreadLocal 之间的关系。

Drawing 3.png

通过上面 FastThreadLocal 的内部结构图,我们对比下与 ThreadLocal 有哪些区别呢?FastThreadLocal 使用 Object 数组替代了 Entry 数组,Object[0] 存储的是一个Set<FastThreadLocal<?>> 集合,从数组下标 1 开始都是直接存储的 value 数据,不再采用 ThreadLocal 的键值对形式进行存储。

假设现在我们有一批数据需要添加到数组中,分别为 value1、value2、value3、value4,对应的 FastThreadLocal 在初始化的时候生成的数组索引分别为 1、2、3、4。如下图所示。

Drawing 4.png

至此,我们已经对 FastThreadLocal 有了一个基本的认识,下面我们结合具体的源码分析 FastThreadLocal 的实现原理。

FastThreadLocal 源码分析

在讲解源码之前,我们回过头看下上文中的 ThreadLocal 示例,如果把示例中 ThreadLocal 替换成 FastThread,应当如何使用呢?

public class FastThreadLocalTest {
    private static final FastThreadLocal<String> THREAD_NAME_LOCAL = new FastThreadLocal<>();
    private static final FastThreadLocal<TradeOrder> TRADE_THREAD_LOCAL = new FastThreadLocal<>();
    public static void main(String[] args) {
        for (int i = 0; i < 2; i++) {
            int tradeId = i;
            String threadName = "thread-" + i;
            new FastThreadLocalThread(() -> {
                THREAD_NAME_LOCAL.set(threadName);
                TradeOrder tradeOrder = new TradeOrder(tradeId, tradeId % 2 == 0 ? "已支付" : "未支付");
                TRADE_THREAD_LOCAL.set(tradeOrder);
                System.out.println("threadName: " + THREAD_NAME_LOCAL.get());
                System.out.println("tradeOrder info:" + TRADE_THREAD_LOCAL.get());
            }, threadName).start();
        }
    }
}

可以看出,FastThreadLocal 的使用方法几乎和 ThreadLocal 保持一致,只需要把代码中 Thread、ThreadLocal 替换为 FastThreadLocalThread 和 FastThreadLocal 即可,Netty 在易用性方面做得相当棒。下面我们重点对示例中用得到 FastThreadLocal.set()/get() 方法做深入分析。

首先看下 FastThreadLocal.set() 的源码:

public final void set(V value) {
    if (value != InternalThreadLocalMap.UNSET) { // 1. value 是否为缺省值
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // 2. 获取当前线程的 InternalThreadLocalMap
        setKnownNotUnset(threadLocalMap, value); // 3. 将 InternalThreadLocalMap 中数据替换为新的 value
    } else {
        remove();
    }
}

FastThreadLocal.set() 方法虽然入口只有几行代码,但是内部逻辑是相当复杂的。我们首先还是抓住代码主干,一步步进行拆解分析。set() 的过程主要分为三步:

  1. 判断 value 是否为缺省值,如果等于缺省值,那么直接调用 remove() 方法。这里我们还不知道缺省值和 remove() 之间的联系是什么,我们暂且把 remove() 放在最后分析。
  2. 如果 value 不等于缺省值,接下来会获取当前线程的 InternalThreadLocalMap。
  3. 然后将 InternalThreadLocalMap 中对应数据替换为新的 value。

首先我们看下 InternalThreadLocalMap.get() 方法,源码如下:

public static InternalThreadLocalMap get() {
    Thread thread = Thread.currentThread();
    if (thread instanceof FastThreadLocalThread) { // 当前线程是否为 FastThreadLocalThread 类型
        return fastGet((FastThreadLocalThread) thread);
    } else {
        return slowGet();
    }
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
    InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); // 获取 FastThreadLocalThread 的 threadLocalMap 属性
    if (threadLocalMap == null) {
        thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
    }
    return threadLocalMap;
}
private static InternalThreadLocalMap slowGet() {
    ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; 
    InternalThreadLocalMap ret = slowThreadLocalMap.get(); // 从 JDK 原生 ThreadLocal 中获取 InternalThreadLocalMap
    if (ret == null) {
        ret = new InternalThreadLocalMap();
        slowThreadLocalMap.set(ret);
    }
    return ret;
}

InternalThreadLocalMap.get() 逻辑很简单,为了帮助你更好地理解,下面使用一幅图描述 InternalThreadLocalMap 的获取方式。

Drawing 5.png

如果当前线程是 FastThreadLocalThread 类型,那么直接通过 fastGet() 方法获取 FastThreadLocalThread 的 threadLocalMap 属性即可。如果此时 InternalThreadLocalMap 不存在,直接创建一个返回。关于 InternalThreadLocalMap 的初始化在上文中已经介绍过,它会初始化一个长度为 32 的 Object 数组,数组中填充着 32 个缺省对象 UNSET 的引用。

那么 slowGet() 又是什么作用呢?从代码分支来看,slowGet() 是针对非 FastThreadLocalThread 类型的线程发起调用时的一种兜底方案。如果当前线程不是 FastThreadLocalThread,内部是没有 InternalThreadLocalMap 属性的,Netty 在 UnpaddedInternalThreadLocalMap 中保存了一个 JDK 原生的 ThreadLocal,ThreadLocal 中存放着 InternalThreadLocalMap,此时获取 InternalThreadLocalMap 就退化成 JDK 原生的 ThreadLocal 获取。

获取 InternalThreadLocalMap 的过程已经讲完了,下面看下 setKnownNotUnset() 如何将数据添加到 InternalThreadLocalMap 的。

private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
    if (threadLocalMap.setIndexedVariable(index, value)) { // 1. 找到数组下标 index 位置,设置新的 value
        addToVariablesToRemove(threadLocalMap, this); // 2. 将 FastThreadLocal 对象保存到待清理的 Set 中
    }
}

setKnownNotUnset() 主要做了两件事:

  1. 找到数组下标 index 位置,设置新的 value。
  2. 将 FastThreadLocal 对象保存到待清理的 Set 中。

首先我们看下第一步 threadLocalMap.setIndexedVariable() 的源码实现:

public boolean setIndexedVariable(int index, Object value) {
    Object[] lookup = indexedVariables;
    if (index < lookup.length) {
        Object oldValue = lookup[index]; 
        lookup[index] = value; // 直接将数组 index 位置设置为 value,时间复杂度为 O(1)
        return oldValue == UNSET;
    } else {
        expandIndexedVariableTableAndSet(index, value); // 容量不够,先扩容再设置值
        return true;
    }
}

indexedVariables 就是 InternalThreadLocalMap 中用于存放数据的数组,如果数组容量大于 FastThreadLocal 的 index 索引,那么直接找到数组下标 index 位置将新 value 设置进去,事件复杂度为 O(1)。在设置新的 value 之前,会将之前 index 位置的元素取出,如果旧的元素还是 UNSET 缺省对象,那么返回成功。

如果数组容量不够了怎么办呢?InternalThreadLocalMap 会自动扩容,然后再设置 value。接下来看看 expandIndexedVariableTableAndSet() 的扩容逻辑:

private void expandIndexedVariableTableAndSet(int index, Object value) {
    Object[] oldArray = indexedVariables;
    final int oldCapacity = oldArray.length;
    int newCapacity = index;
    newCapacity |= newCapacity >>>  1;
    newCapacity |= newCapacity >>>  2;
    newCapacity |= newCapacity >>>  4;
    newCapacity |= newCapacity >>>  8;
    newCapacity |= newCapacity >>> 16;
    newCapacity ++;
    Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
    Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
    newArray[index] = value;
    indexedVariables = newArray;
}

上述代码的位移操作是不是似曾相识?我们去翻阅下 JDK HashMap 中扩容的源码,其中有这么一段代码:

static final int tableSizeFor(int cap) {
    int n = cap - 1;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

可以看出 InternalThreadLocalMap 实现数组扩容几乎和 HashMap 完全是一模一样的,所以多读源码还是可以给我们很多启发的。InternalThreadLocalMap 以 index 为基准进行扩容,将数组扩容后的容量向上取整为 2 的次幂。然后将原数组内容拷贝到新的数组中,空余部分填充缺省对象 UNSET,最终把新数组赋值给 indexedVariables。

为什么 InternalThreadLocalMap 以 index 为基准进行扩容,而不是原数组长度呢?假设现在初始化了 70 个 FastThreadLocal,但是这些 FastThreadLocal 从来没有调用过 set() 方法,此时数组还是默认长度 32。当第 index = 70 的 FastThreadLocal 调用 set() 方法时,如果按原数组容量 32 进行扩容 2 倍后,还是无法填充 index = 70 的数据。所以使用 index 为基准进行扩容可以解决这个问题,但是如果 FastThreadLocal 特别多,数组的长度也是非常大的。

回到 setKnownNotUnset() 的主流程,向 InternalThreadLocalMap 添加完数据之后,接下就是将 FastThreadLocal 对象保存到待清理的 Set 中。我们继续看下 addToVariablesToRemove() 是如何实现的。

private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); // 获取数组下标为 0 的元素
    Set<FastThreadLocal<?>> variablesToRemove;
    if (v == InternalThreadLocalMap.UNSET || v == null) {
        variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>()); // 创建 FastThreadLocal 类型的 Set 集合
        threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); // 将 Set 集合填充到数组下标 0 的位置
    } else {
        variablesToRemove = (Set<FastThreadLocal<?>>) v; // 如果不是 UNSET,Set 集合已存在,直接强转获得 Set 集合
    }
    variablesToRemove.add(variable); // 将 FastThreadLocal 添加到 Set 集合中
}

variablesToRemoveIndex 是采用 static final 修饰的变量,在 FastThreadLocal 初始化时 variablesToRemoveIndex 被赋值为 0。InternalThreadLocalMap 首先会找到数组下标为 0 的元素,如果该元素是缺省对象 UNSET 或者不存在,那么会创建一个 FastThreadLocal 类型的 Set 集合,然后把 Set 集合填充到数组下标 0 的位置。如果数组第一个元素不是缺省对象 UNSET,说明 Set 集合已经被填充,直接强转获得 Set 集合即可。这就解释了 InternalThreadLocalMap 的 value 数据为什么是从下标为 1 的位置开始存储了,因为 0 的位置已经被 Set 集合占用了。

为什么 InternalThreadLocalMap 要在数组下标为 0 的位置存放一个 FastThreadLocal 类型的 Set 集合呢?这时候我们回过头看下 remove() 方法。

public final void remove() {
    remove(InternalThreadLocalMap.getIfSet());
}
public static InternalThreadLocalMap getIfSet() {
    Thread thread = Thread.currentThread();
    if (thread instanceof FastThreadLocalThread) {
        return ((FastThreadLocalThread) thread).threadLocalMap();
    }
    return slowThreadLocalMap.get();
}
public final void remove(InternalThreadLocalMap threadLocalMap) {
    if (threadLocalMap == null) {
        return;
    }
    Object v = threadLocalMap.removeIndexedVariable(index); // 删除数组下标 index 位置对应的 value
    removeFromVariablesToRemove(threadLocalMap, this); // 从数组下标 0 的位置取出 Set 集合,并删除当前 FastThreadLocal
    if (v != InternalThreadLocalMap.UNSET) {
        try {
            onRemoval((V) v); // 空方法,用户可以继承实现
        } catch (Exception e) {
            PlatformDependent.throwException(e);
        }
    }
}

在执行 remove 操作之前,会调用 InternalThreadLocalMap.getIfSet() 获取当前 InternalThreadLocalMap。有了之前的基础,理解 getIfSet() 方法就非常简单了,如果是 FastThreadLocalThread 类型,直接取 FastThreadLocalThread 中 threadLocalMap 属性。如果是普通线程 Thread,从 ThreadLocal 类型的 slowThreadLocalMap 中获取。

找到 InternalThreadLocalMap 之后,InternalThreadLocalMap 会从数组中定位到下标 index 位置的元素,并将 index 位置的元素覆盖为缺省对象 UNSET。接下来就需要清理当前的 FastThreadLocal 对象,此时 Set 集合就派上了用场,InternalThreadLocalMap 会取出数组下标 0 位置的 Set 集合,然后删除当前 FastThreadLocal。最后 onRemoval() 方法起到什么作用呢?Netty 只是留了一处扩展,并没有实现,用户需要在删除的时候做一些后置操作,可以继承 FastThreadLocal 实现该方法。

至此,FastThreadLocal.set() 的完成过程已经讲完了,接下来我们继续 FastThreadLocal.get() 方法的实现就易如反掌拉。FastThreadLocal.get() 的源码实现如下:

public final V get() {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    Object v = threadLocalMap.indexedVariable(index); // 从数组中取出 index 位置的元素
    if (v != InternalThreadLocalMap.UNSET) {
        return (V) v;
    }
    return initialize(threadLocalMap); // 如果获取到的数组元素是缺省对象,执行初始化操作
}
public Object indexedVariable(int index) {
    Object[] lookup = indexedVariables;
    return index < lookup.length? lookup[index] : UNSET;
}
private V initialize(InternalThreadLocalMap threadLocalMap) {
    V v = null;
    try {
        v = initialValue();
    } catch (Exception e) {
        PlatformDependent.throwException(e);
    }
    threadLocalMap.setIndexedVariable(index, v);
    addToVariablesToRemove(threadLocalMap, this);
    return v;
}

首先根据当前线程是否是 FastThreadLocalThread 类型找到 InternalThreadLocalMap,然后取出从数组下标 index 的元素,如果 index 位置的元素不是缺省对象 UNSET,说明该位置已经填充过数据,直接取出返回即可。如果 index 位置的元素是缺省对象 UNSET,那么需要执行初始化操作。可以看到,initialize() 方法会调用用户重写的 initialValue 方法构造需要存储的对象数据,如下所示。

private final FastThreadLocal<String> threadLocal = new FastThreadLocal<String>() {
    @Override
    protected String initialValue() {
        return "hello world";
    }
};

构造完用户对象数据之后,接下来就会将它填充到数组 index 的位置,然后再把当前 FastThreadLocal 对象保存到待清理的 Set 中。整个过程我们在分析 FastThreadLocal.set() 时都已经介绍过,就不再赘述了。

到此为止,FastThreadLocal 最核心的两个方法 set()/get() 我们已经分析完了。下面有两个问题我们再深入思考下。

  1. FastThreadLocal 真的一定比 ThreadLocal 快吗?答案是不一定的,只有使用FastThreadLocalThread 类型的线程才会更快,如果是普通线程反而会更慢。
  2. FastThreadLocal 会浪费很大的空间吗?虽然 FastThreadLocal 采用的空间换时间的思路,但是在 FastThreadLocal 设计之初就认为不会存在特别多的 FastThreadLocal 对象,而且在数据中没有使用的元素只是存放了同一个缺省对象的引用,并不会占用太多内存空间。

总结

本节课我们对比介绍了 ThreadLocal 和 FastThreadLocal,简单总结下 FastThreadLocal 的优势。

  • 高效查找。FastThreadLocal 在定位数据的时候可以直接根据数组下标 index 获取,时间复杂度 O(1)。而 JDK 原生的 ThreadLocal 在数据较多时哈希表很容易发生 Hash 冲突,线性探测法在解决 Hash 冲突时需要不停地向下寻找,效率较低。此外,FastThreadLocal 相比 ThreadLocal 数据扩容更加简单高效,FastThreadLocal 以 index 为基准向上取整到 2 的次幂作为扩容后容量,然后把原数据拷贝到新数组。而 ThreadLocal 由于采用的哈希表,所以在扩容后需要再做一轮 rehash。
  • 安全性更高。JDK 原生的 ThreadLocal 使用不当可能造成内存泄漏,只能等待线程销毁。在使用线程池的场景下,ThreadLocal 只能通过主动检测的方式防止内存泄漏,从而造成了一定的开销。然而 FastThreadLocal 不仅提供了 remove() 主动清除对象的方法,而且在线程池场景中 Netty 还封装了 FastThreadLocalRunnable,FastThreadLocalRunnable 最后会执行 FastThreadLocal.removeAll() 将 Set 集合中所有 FastThreadLocal 对象都清理掉,

FastThreadLocal 体现了 Netty 在高性能方面精益求精的设计精神,FastThreadLocal 仅仅是其中的冰山一角,下节课我们继续探索 Netty 中其他高效的数据结构技巧。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/499234.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【单调栈】力扣84.柱状图中最大的矩形

上篇文章我们介绍了使用 无重复值 单调栈代码解决 含有重复值 的问题&#xff0c;在文章的最后&#xff0c;留下了一道考察相同思想的题目&#xff0c;今天我们来看看如何套路解决该题。 &#xff08;还没看过前几篇介绍的小伙伴赶快关注&#xff0c;在 「单调栈」 集合里查看…

通过node 后端实现颜色窃贼 (取出某个图片的主体rgb颜色 )

1.需求 我前端轮播图的背景色 想通过每一张轮播图片的颜色作为背景色 这样的话 需要通过一张图片 取出图片的颜色 这个工作通过前端去处理 也可以通过后端去处理 前端我试了试 color-thief 的插件 但是 这个插件是基于canvas 的模式来的 我需要在小程序中使用这个插件 而且是…

HarmonyOS-如何使用ArkTS声明式语法和基础组件,实现待办列表。

介绍 本篇Codelab将介绍如何使用ArkTS声明式语法和基础组件&#xff0c;实现简易待办列表。效果为点击某一事项&#xff0c;替换标签图片、虚化文字。效果如图所示&#xff1a; 相关概念 ArkTS语法&#xff1a;ArkTS是HarmonyOS的主要应用开发语言。ArkTS基于TypeScript&…

2024/3/29(MybatisPlus插件代码生成,静态工具,逻辑删除,枚举处理器.JSON处理器,分页插件,通用分页实体)

jdbc:mysql://localhost:3306/mp?useUnicodetrue&characterEncodingutf8&serverTimezoneUTC 需要这样 日志查看级别

【C++杂货铺】内管管理

目录 &#x1f308;前言&#x1f308; &#x1f4c1; C/C中内存分布 &#x1f4c1; new 和 delete的使用 &#x1f4c1; new 和 delete的优点 &#x1f4c1; new 和 delete的原理 &#x1f4c2; operator new 和 operator delete函数 &#x1f4c2; 内置类型 &#x1f4c2…

代码随想录-DAY4|leetcode-24,19,142,面试题 02.07

文章目录 22. 两两交换链表中的节点19. 删除链表的倒数第N个节点size-n方式删除双指针方式&#xff08;推荐&#xff09; 面试题 02.07. 链表相交142. 环形链表II暴力解法快慢指针&#xff08;推荐&#xff09; 22. 两两交换链表中的节点 leetcode链接&#xff1a;两两交换链表…

怎样一次性给多篇word文档标注拼音?一键批量注音

随着办公自动化的普及&#xff0c;我们经常会遇到需要处理大量Word文档的情况。在这些文档中&#xff0c;有时需要将文字标注上拼音&#xff0c;特别是在处理一些包含生僻字或需要拼音辅助阅读的文档时。然而&#xff0c;手动一篇篇地给Word文档标注拼音不仅效率低下&#xff0…

Docker搭建LNMP环境实战(08):安装php-fpm

1、编写php测试文件 在文件夹&#xff1a;/mnt/hgfs/dockers/test_site/www目录下创建文件&#xff1a;test.php&#xff0c;内容为&#xff1a; <?phpecho "hello world!!!!!! From test.php"; ?>2、编写php-fpm部署配置文件 在文件夹&#xff1a;/mnt/h…

mars3d兼容老版本Chrome 浏览器的附件参考记录

问题 源代码里面是es5的写法&#xff0c;怎么在浏览器上就转换了。 mars3d会将es5转es6吗&#xff1f; 看加载的Cesium.js源代码没有问题&#xff0c;但是模块里面的源代码已经转换了&#xff0c;再低版本浏览器上面会无法运行“Uncaught SyntaxError: Unexpected token ?”…

JVM(一)——内存结构

一. 前言 1、什么是 JVM? 1&#xff09;定义&#xff1a; Java Virtual Machine - java 程序的运行环境&#xff08;java 二进制字节码的运行环境&#xff09; 2&#xff09;好处&#xff1a; 一次编写&#xff0c;到处运行自动内存管理&#xff0c;垃圾回收功能数组下标越…

测试员再也不怕漏测!花2年总结的这个测试模板太全了!

作为一个测试&#xff0c;最尴尬的莫过于分给你的task&#xff0c;别人做交叉兼容测试的时候&#xff0c;在你负责的内容里找出了很多你没有测试出来的bug。 我也曾因为测试不全被组长在工作群里艾特。说实话&#xff0c;真的恨不得找个地方躲起来。 为了避免自己再次出现类似…

用友BI告诉你,分析指标计算也可以很简单

分析数据&#xff0c;特别是分析财务数据&#xff0c;要计算得分析指标都非常多&#xff0c;涉及的数据来源也是各有不同&#xff0c;一旦哪个环节出了错就一切都得重来。难道分析指标的计算就没有更快更简单的办法了&#xff1f;奥威-用友BI告诉你&#xff0c;分析指标计算有别…

【JDBC编程】基于MySql的Java应用程序中访问数据库与交互数据的技术

꒰˃͈꒵˂͈꒱ write in front ꒰˃͈꒵˂͈꒱ ʕ̯•͡˔•̯᷅ʔ大家好&#xff0c;我是xiaoxie.希望你看完之后,有不足之处请多多谅解&#xff0c;让我们一起共同进步૮₍❀ᴗ͈ . ᴗ͈ აxiaoxieʕ̯•͡˔•̯᷅ʔ—CSDN博客 本文由xiaoxieʕ̯•͡˔•̯᷅ʔ 原创 CSDN …

新家装修选中央空调如何选?认准约克VRF中央空调

在现代家居生活中,追求舒适和健康生活环境的家庭越来越倾向于选择中央空调系统。面对市场上琳琅满目的中央空调品牌,如何挑选一款合适的家用中央空调成为许多消费者的一大难题。今天,我们以约克VRF中央空调为例,深入探讨其特点和优势,为广大家庭提供一个舒适的选择答案。 首先…

IP可以申请SSL证书吗?

目录 背景&#xff1a; 申请IP证书的基本条件&#xff1a; 支持IP地址的证书类型&#xff1a; 为什么要申请IP地址证书&#xff1f; 如何申请IP地址证书 背景&#xff1a; IP地址是可以实现https加密需求的&#xff0c;且IP SSL证书可以完美的解决企业对于IP地址实现http…

标准库不带操作系统移植FreeModbus到STM32

添加FreeModbus代码 首先准备一个空白的标准库项目。 下载FreeModbus源码。 将源码中的modbus文件夹复制到项目路径下&#xff0c;并把demo->BARE->port文件夹的内容也添加进来。 新建一个文件port.c备用。然后打开项目&#xff0c;将上述文件添加至项目&#xff0c;…

Sectigo多域名ssl证书1200元

多域名SSL证书是可以同时保护多个域名的域名型数字证书之一&#xff0c;为个人和企事业单位提供了多样化的数字证书方案。各个正规的CA认证机构所颁发的多域名费SSL证书产品中&#xff0c;Sectigo旗下的多域名SSL证书是使用范围比较广的一款。今天就随SSL盾小编了解Sectigo旗下…

2024三掌柜赠书活动第十九期:DevOps企业级CI/CD实战

目录 目录 前言 关于CI/CD 企业级CI/CD实战 关于《DevOps企业级CI/CD实战》 编辑推荐 内容简介 作者简介 图书目录 书中前言/序言 《DevOps企业级CI/CD实战》全书速览 结束语 前言 作为开发者&#xff0c;对于编程语言并不陌生&#xff0c;随着技术圈的不断进步和发…

EI、Scopus双检索 | 2024年第四届控制理论与应用国际会议

会议简介 Brief Introduction 2024年第四届控制理论与应用国际会议(ICoCTA 2024) 会议时间&#xff1a;2024年10月18 -20日 召开地点&#xff1a;中国杭州 大会官网&#xff1a;www.icocta.org 控制理论作为一门科学技术&#xff0c;已经广泛地运用于我们社会生活方方面面。随着…

java-pytorch 使用手动下载FashionMNIST数据集进行测试

java-pytorch 使用手动下载FashionMNIST数据集进行测试 先定义训练数据和测试数据的位置查看一下读取到的标签数据格式使用loc和iloc访问下数据&#xff0c;便于下面操作使用read_image函数查看下图片的数据大小开始写数据集使用DataLoader去加载我们自己的数据看下加载后的dat…