Netty 启动源码阅读

文章目录

  • 1. 入门
  • 2. Netty 代码实例
  • 3. Netty bind
    • 3.1 initAndRegister
      • 3.1.1 newChannel, 创建 NioServerSocketChannel
      • 3.1.2 init(channel); 初始化 NioServerSocketChannel
      • 3.1.3 register 注册channel
    • 3.2 doBind0 绑定端口
    • 3.3 ServerBootstrapAcceptor

1. 入门

主从Reactor模型 :Acceptor 接收到客户端TCP连接请求并处理完成后, 将新创建的SocketChannel 注册到 I/O线程池 (sub Reactor)传送门

主要步骤:

  • Acceptor 创建 ServerSocketChannel
  • Acceptor ServerSocketChannel 绑定端口
  • Acceptor ServerSocketChannel 设置非阻塞
  • Acceptor 创建 Selector,将 ServerSocketChannel 注册到 Selector 上,监听 SelectionKey.OP_ACCEPT 事件
  • Acceptor ServerSocketChannel 设置分发处理器,处理监听到的SelectionKey.OP_ACCEPT 事件,新创建的 SocketChannel 分发到到 I/O线程池 (sub Reactor)
  • I/O线程池 (sub Reactor)不断轮训处理 SocketChannel 上的读写请求

2. Netty 代码实例

传送门

ServerBootstrap:

  • 配置 EventLoopGroup 线程组:
    需要注意的是, 只绑定一个端口, bossEventLoopGroup 1个就够了, 2个会有一个闲置
   NioEventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(2);
   NioEventLoopGroup workerEventLoopGroup = new NioEventLoopGroup(8);
  • 设置 Channel 类型: NioServerSocketChannel
  • 设置 childHandlerChannelPipeline
  • 绑定端口: 创建 NioServerSocketChannel, 注册, 绑定端口, ServerSocketChannel 添加分发到子线程组的 handler
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.charset.CharacterCodingException;
import java.nio.charset.StandardCharsets;

public class NettyServer01 {
    public static void main(String[] args) {
        // 创建BossGroup和WorkerGroup,分别处理连接接受和数据读写
        NioEventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(2);
        NioEventLoopGroup workerEventLoopGroup = new NioEventLoopGroup(8);

        new ServerBootstrap() // 初始化ServerBootstrap
                .group(bossEventLoopGroup, workerEventLoopGroup) // 设置EventLoopGroup
                .channel(NioServerSocketChannel.class) // 指定服务器通道类
                .childHandler(new ChannelInitializer<NioSocketChannel>() { // 设置通道初始化器
                    /**
                     * 初始化通道,添加处理器到通道的管道中
                     * @param ch 当前初始化的通道
                     */
                    protected void initChannel(NioSocketChannel ch) {
                        // 添加多个处理器,分别处理入站和出站事件
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            /**
                             * 处理入站数据
                             * @param ctx 通道上下文
                             * @param msg 接收到的消息对象
                             */
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                ByteBuf byteBuf = inbound((ByteBuf) msg, "1");
                                ctx.fireChannelRead(byteBuf);
                            }
                        });
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws CharacterCodingException {
                                ByteBuf byteBuf = inbound((ByteBuf) msg, "2");
                                ctx.fireChannelRead(byteBuf);
                            }
                        });
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            /**
                             * 处理入站数据,将处理后的数据写回通道
                             * @param ctx 通道上下文
                             * @param msg 接收到的消息对象
                             */
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                ByteBuf byteBuf = inbound((ByteBuf) msg, "3");
                                ctx.channel().write(byteBuf);
                            }
                        });
                        ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
                            /**
                             * 处理出站数据,在数据写出前进行加工
                             * @param ctx 通道上下文
                             * @param msg 要写出的消息对象
                             * @param promise 写操作的承诺
                             */
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                                ByteBuf byteBuf = outbound((ByteBuf) msg, "4");
                                ctx.writeAndFlush(msg);
                                ctx.write(byteBuf, promise);
                            }
                        });
                        ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                                ByteBuf byteBuf = outbound((ByteBuf) msg, "5");
                                ctx.write(byteBuf, promise);
                            }
                        });
                        ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                                ByteBuf byteBuf = outbound((ByteBuf) msg, "6");
                                ctx.write(byteBuf, promise);
                            }
                        });
                    }
                })
                .bind(8080); // 绑定端口并启动服务器
    }

        /**
     * 对出站数据进行处理
     * @param msg 待处理的ByteBuf对象
     * @param no 数据标识号
     * @return 处理后的ByteBuf对象
     */
    private static ByteBuf outbound(ByteBuf msg, String no) {
        ByteBuf byteBuf = msg;
        String output = byteBufToString(byteBuf);
        System.out.printf("\n\noutbound%s output: %s", no, output);
        stringWriteToByteBuf(byteBuf, String.format("\noutbound%s 已处理", no));
        return byteBuf;
    }

    /**
     * 对入站数据进行处理
     * @param msg 待处理的ByteBuf对象
     * @param no 数据标识号
     * @return 处理后的ByteBuf对象
     */
    private static ByteBuf inbound(ByteBuf msg, String no) {
        String input = byteBufToString(msg);
        System.out.printf("\n\ninbound%s input: %s\n", no, input);
        stringWriteToByteBuf(msg, String.format("\ninbound%s 已处理", no));
        return msg;
    }

    /**
     * 将ByteBuf对象转换为字符串
     * @param msg 待转换的ByteBuf对象
     * @return 字符串表示的数据
     */
    private static String byteBufToString(ByteBuf msg) {
        return msg.toString(StandardCharsets.UTF_8);
    }

    /**
     * 将字符串写入ByteBuf对象
     * @param byteBuf 待写入的ByteBuf对象
     * @param msg 要写入的字符串数据
     */
    private static void stringWriteToByteBuf(ByteBuf byteBuf, String msg) {
        byteBuf.writeBytes(msg.getBytes(StandardCharsets.UTF_8));
    }
}

3. Netty bind

主要下面两个方法:

  • final ChannelFuture regFuture = initAndRegister();
    初始化channel,并且注册ServerSocketChannelbossEventLoopGroup 的 一个 EventLoopSelector
  • doBind0(regFuture, channel, localAddress, promise); 绑定端口号
    public ChannelFuture bind(InetAddress inetHost, int inetPort) {
        return bind(new InetSocketAddress(inetHost, inetPort));
    }

    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }

     private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

在这里插入图片描述

3.1 initAndRegister

io.netty.bootstrap.AbstractBootstrap#initAndRegister

初始化channel,并且注册ServerSocketChannelbossEventLoopGroup 的 一个 EventLoopSelector

inal ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }

3.1.1 newChannel, 创建 NioServerSocketChannel

  • 创建 channel = channelFactory.newChannel();
  • io.netty.channel.ReflectiveChannelFactory#newChannel
  • io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel
    • NioServerSocketChannel处理连接事件: super(null, channel, SelectionKey.OP_ACCEPT);
    • AbstractNioChannel: 设置非阻塞 ch.configureBlocking(false);
    • AbstractChannel 初始化unsafe、pipeline: unsafe = newUnsafe(); pipeline = newChannelPipeline();
    • DefaultChannelPipeline: tail = new TailContext(this); head = new HeadContext(this);

3.1.2 init(channel); 初始化 NioServerSocketChannel

初始化 Channelio.netty.bootstrap.ServerBootstrap#init, pipeline 添加 ServerBootstrapAcceptor 是一个异步过程,需要 EventLoop 线程负责执行。而当前 EventLoop 线程该去执行 register0() 的注册流程,所以等到 register0() 执行完之后才能被添加到 Pipeline 当中

   p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
  • 注册 Channel : ChannelFuture regFuture = config().group().register(channel);
    • config().group() 就是 bossEventLoopGroup
  • executeio.netty.util.concurrent.SingleThreadEventExecutor#execute
    • addTask(task);
    • if (!inEventLoop) startThread();

3.1.3 register 注册channel

  • 注册 ServerSocketChannelbossEventLoopGroup 的一个 EventLoopSelector 上,监听 SelectionKey.OP_ACCEPT 事件
ChannelFuture regFuture = config().group().register(channel);
  • io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}
  • io.netty.channel.AbstractChannel.AbstractUnsafe#register
if (eventLoop.inEventLoop()) {
    register0(promise);
} else {
    try {
        eventLoop.execute(new Runnable() {
        @Override
        public void run() {
            register0(promise);
        }
);
  • io.netty.channel.AbstractChannel.AbstractUnsafe#register0
  • io.netty.channel.nio.AbstractNioChannel#doRegister
    • 调用java 的 nio:selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

3.2 doBind0 绑定端口

io.netty.bootstrap.AbstractBootstrap#doBind0

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
  • io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
  • io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
  • io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
  • io.netty.channel.AbstractChannelHandlerContext#invokeBind
  • io.netty.channel.DefaultChannelPipeline.HeadContext#bind
  • io.netty.channel.AbstractChannel.AbstractUnsafe#bind
  • io.netty.channel.socket.nio.NioServerSocketChannel#doBind 调用java 的 bind
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

3.3 ServerBootstrapAcceptor

child 就是 workerEventLoopGroupsocketChannel 注册到 workerEventLoopGroup 进行处理

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (!future.isSuccess()) {
                forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
          forceClose(child, t);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/780970.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MATLAB制作一个简单的函数绘制APP

制作一个函数绘制APP&#xff0c;输入函数以及左右端点&#xff0c;绘制出函数图像。 编写回调函数&#xff1a; 结果&#xff1a;

HTML 【实用教程】(2024最新版)

核心思想 —— 语义化 【面试题】如何理解 HTML 语义化 ?仅通过标签便能判断内容的类型&#xff0c;特别是区分标题、段落、图片和表格 增加代码可读性&#xff0c;让人更容易读懂对SEO更加友好&#xff0c;让搜索引擎更容易读懂 html 文件的基本结构 html 文件的文件后缀为 …

移动硬盘“需格式化”预警:专业数据恢复指南

移动硬盘“需格式化”危机&#xff1a;了解背后的真相 在日常的数字生活中&#xff0c;移动硬盘作为我们存储重要数据的“保险箱”&#xff0c;其稳定性与安全性直接关系到我们信息的完整与便捷访问。然而&#xff0c;当您尝试打开移动硬盘时&#xff0c;屏幕上赫然出现的“需…

科技赋能智慧应急:“数字孪生+无人机”在防汛救灾中的应用

近期&#xff0c;全国多地暴雨持续&#xff0c;“麻辣王子工厂停工”“水上派出所成水上的派出所了”等相关词条冲上热搜&#xff0c;让人们看到了全国各地城市内涝、洪涝带来的严重灾情。暴雨带来的影响可见一斑&#xff0c;潜在的洪水、泥石流、山体滑坡等地质灾害更应提高警…

aardio —— 今日减bug

打字就减bug 鼠标双击也减bug 看看有多少bug够你减的 使用方法&#xff1a; 1、将资源附件解压缩&#xff0c;里面的文件夹&#xff0c;放到aardio\plugin\plugins 目录 2、aardio 启动插件 → 插件设置 → 选中“今日减bug” → 保存。 3、重启 aardio&#xff0c;等aa…

BUUCTF[PWN][fastbin attack]

fastbin_attack例题 题目&#xff1a;[BUUCTF在线评测 (buuoj.cn)](https://buuoj.cn/challenges#[ZJCTF 2019]EasyHeap) 整体思路&#xff1a;利用编辑时edit_heap函数的栈溢出漏洞&#xff0c;覆盖heaparray中的栈指针指向free的got表&#xff0c;将其改为system的plt表&…

make工具

1、什么是make&#xff1f; make是个命令&#xff0c;是个可执行程序&#xff0c;是个工具&#xff0c;用来解析Makefile文件的命令&#xff0c;这个命令存放在/usr/bin/目录下 -rwxr-xr-x 1 root root 250K 2月 15 2022 make -rwxr-xr-x 1 root root 4.8K 2月 15 2022 ma…

Linux_实现简易日志系统

目录 1、认识可变参数 2、解析可变参数 3、打印可变参数 3.1 va_list 3.2 va_start 3.3 va_arg 3.4 va_end 3.5 小结 4、实现日志 4.1 日志左半部分 4.2 日志右半部分 4.3 日志的存档归类 结语 前言&#xff1a; 在Linux下实现一个日志系统&#xff0c;该日…

Open3D 删除点云中重叠的点(方法二)

目录 一、概述 1.1原理 1.2应用 二、代码实现 三、实现效果 3.1原始点云 3.2处理后点云 3.3数据对比 一、概述 在点云处理中&#xff0c;重叠点&#xff08;即重复点&#xff09;可能会对数据分析和处理的结果产生负面影响。因此&#xff0c;删除重叠点是点云预处理中常…

一招解决找不到d3dcompiler43.dll,无法继续执行代码问题

当您的电脑遇到d3dcompiler43.dll缺失问题时&#xff0c;首先需要了解d3dcompiler43.dll文件及其可能导致问题的原因&#xff0c;之后便可以选择合适的解决方案。在此&#xff0c;我们将会为您提供寻找d3dcompiler43.dll文件的多种处理方法。 一、d3dcompiler43.dll文件分析 d…

【C++第十课 - stack_queue】stack、queue的使用、适配器模型stack、queue和priority_queue的底层实现、deque

目录 一、stack使用1、push2、pop3、empty4、top题目1、最小栈2、栈的压入、弹出序3、逆波兰表达式求值 二、queue的使用priority_queue习题 三、适配器stack的底层实现queue的底层实现priority_queue的底层实现仿函数/函数对象函数指针 四、deque 一、stack使用 stack是个容器…

【74LS163做24进制计数器】2021-11-19

缘由用74LS163做24进制计数器-其他-CSDN问答,仿真multisim两个74LS163芯片如何构成47进制计数器-吐槽问答-CSDN问答 参考74ls163中文资料汇总&#xff08;74ls163引脚图及功能_内部结构图及应用电路&#xff09; - 电子发烧友网

1.pwn的汇编基础(提及第一个溢出:整数溢出)

汇编掌握程度 能看懂就行&#xff0c;绝大多数情况不需要真正的编程(shellcode题除外) 其实有时候也不需要读汇编&#xff0c;ida F5 通常都是分析gadget&#xff0c;知道怎么用&#xff0c; 调试程序也不需要分析每一条汇编指令&#xff0c;单步执行然后查看寄存器状态即可 但…

【Python机器学习】模型评估与改进——多分类指标

多分类问题的所有指标基本是上都来自于二分类问题&#xff0c;但是要对所有类别进行平均。多分类的精度被定义为正确分类的样本所占的比例。同样&#xff0c;如果类别是不平衡的&#xff0c;精度并不是很好的评估度量。 想象一个三分类问题&#xff0c;其中85%的数据点属于类别…

Java(七)——多态

个人简介 &#x1f440;个人主页&#xff1a; 前端杂货铺 ⚡开源项目&#xff1a; rich-vue3 &#xff08;基于 Vue3 TS Pinia Element Plus Spring全家桶 MySQL&#xff09; &#x1f64b;‍♂️学习方向&#xff1a; 主攻前端方向&#xff0c;正逐渐往全干发展 &#x1…

Go语言如何入门,有哪些书推荐?

Go 语言之所以如此受欢迎&#xff0c;其编译器功不可没。Go 语言的发展也得益于其编译速度够快。 对开发者来说&#xff0c;更快的编译速度意味着更短的反馈周期。大型的 Go 应用程序总是能在几秒钟之 内完成编译。而当使用 go run编译和执行小型的 Go 应用程序时&#xff0c;其…

VMware虚拟机搭建CentOS7环境

相关资料 安装VMware 双击VMware-workstation(16.1.1软件安装包.exe安装文件,点下一步 激活码文件复制激活码激活安装linux 1、点击创建虚拟机

Open3D 删除点云中重叠的点(方法一)

目录 一、概述 二、代码实现 三、实现效果 3.1原始点云 3.2处理后的点云 3.3计算结果 一、概述 在点云处理中&#xff0c;重叠点&#xff08;即重复点&#xff09;可能会对数据分析和处理的结果产生负面影响。因此&#xff0c;删除重叠点是点云预处理中常见且重要的步骤。…

【网络安全】实验一(网络拓扑环境的搭建)

一、本次实验的实验目的 学习利用 VMware 创建虚拟环境 学习利用 VMware 搭建各自网络拓扑环境 二、创建虚拟机 三、克隆虚拟机 选择克隆的系统必须处于关机状态。 方法一&#xff1a; 方法二&#xff1a; 需要修改克隆计算机的名字&#xff0c;避免产生冲突。 四、按照要求完…

机器学习原理之 -- 神经网络:由来及原理详解

神经网络&#xff08;Neural Networks&#xff09;是受生物神经系统启发而设计的一类计算模型&#xff0c;广泛应用于图像识别、语音识别、自然语言处理等领域。其基本思想是通过模拟人脑神经元的工作方式&#xff0c;实现对复杂数据的自动处理和分类。本文将详细介绍神经网络的…