Netty NioEventLoop详解

文章目录

  • 前言
  • 类图
  • 主要功能
  • NioEventLoop如何实现事件循环
  • NioEventLoop如何处理多路复用
  • Netty如何管理Channel和Selector
      • 管理Channel
      • 管理Selector
      • 注意事项


前言

Netty通过事件循环机制(EventLoop)处理IO事件和异步任务,简单来说,就是通过一个死循环,不断处理当前已发生的IO事件和待处理的异步任务。
① NioEventLoop是一个基于JDK NIO的异步事件循环类,它负责处理一个Channel的所有事件在这个Channel的生命周期期间。

② NioEventLoop的整个生命周期只会依赖于一个单一的线程来完成。一个NioEventLoop可以分配给多个Channel,NioEventLoop通过JDK Selector来实现I/O多路复用,以对多个Channel进行管理。

③ 如果调用Channel操作的线程是EventLoop所关联的线程,那么该操作会被立即执行。否则会将该操作封装成任务放入EventLoop的任务队列中。

④ 所有提交到NioEventLoop的任务都会先放入队列中,然后在线程中以有序(FIFO)/连续的方式执行所有提交的任务。

⑤ NioEventLoop的事件循环主要完成了:

  • 已经注册到Selector的Channel的监控,并在感兴趣的事件可执行时对其进行处理;
  • 完成任务队列(taskQueue)中的任务,以及对可执行的定时任务和周期性任务的处理(scheduledTaskQueue中的可执行的任务都会先放入taskQueue中后,再从taskQueue中依次取出执行)。

类图

示例:pandas 是基于NumPy 的一种工具,该工具是为了解决数据分析任务而创建的。

主要功能

NioEventLoop 主要负责以下几个方面的功能:

  1. 事件循环NioEventLoop 是一个单线程的事件循环,它不断地轮询注册在其上的 Channel,看是否有就绪的 I/O 事件(如读、写、连接等),然后进行相应的处理。
  2. 任务执行:除了处理 I/O 事件外,NioEventLoop 还负责执行定时任务和普通任务。这些任务可以是用户自定义的,也可以是 Netty 框架内部的。
  3. Channel 的注册与注销:Netty 中的 Channel 在创建后需要注册到一个 EventLoop 上,这样它才能开始处理 I/O 事件。同样地,当 Channel 不再需要时,也需要从 EventLoop 上注销。
  4. Selector 的管理NioEventLoop 内部使用 Java NIO 的 Selector 来轮询 Channel 的就绪状态。它负责 Selector 的创建、销毁以及选择操作。

在 Netty 中,通常会有多个 NioEventLoop 实例,它们通常与多线程模型结合使用,以实现真正的并发处理。每个 NioEventLoop 负责处理一组 Channel 的 I/O 事件,这样可以将不同的 Channel 分布到不同的线程上,从而实现并发处理。

总的来说,NioEventLoop 是 Netty 中负责处理 I/O 事件和任务执行的核心组件,它使得 Netty 能够高效地处理大量的网络连接和 I/O 操作。

NioEventLoop如何实现事件循环

NioEventLoop在Netty中实现事件循环的过程是Netty网络框架的核心机制之一,它结合了Java NIO的非阻塞特性和事件驱动模型,以高效地处理网络事件。以下是NioEventLoop如何实现事件循环的详细过程:

  1. 轮询就绪事件
    在事件循环中,NioEventLoop调用Selectorselect()方法来等待Channel上就绪的事件。select()方法会阻塞,直到至少有一个Channel的事件就绪,或者超时。

  2. 处理就绪事件
    一旦select()方法返回,NioEventLoop会获取就绪的Channel集合,并遍历这些Channel。对于每个就绪的Channel,NioEventLoop会根据其感兴趣的事件类型调用相应的处理器(通常是ChannelPipeline中的ChannelInboundHandler)来处理这些事件。这可能包括读取数据、写入数据、处理连接等。

  3. 执行非I/O任务
    除了处理I/O事件外,NioEventLoop还负责执行提交给它的非I/O任务。这些任务可能包括用户自定义的业务逻辑、回调方法等。NioEventLoop通常有一个任务队列,用于存储待执行的任务。在事件循环中,它会检查任务队列,并依次执行其中的任务。

  4. 定时任务调度
    NioEventLoop还支持定时任务的调度。用户可以将定时任务提交给NioEventLoop,并指定任务的执行时间或执行间隔。NioEventLoop内部会维护一个定时任务列表,并根据任务的调度信息在合适的时间点执行这些任务。

通过这个事件循环机制,NioEventLoop能够高效地处理大量的网络连接和I/O事件,同时保持单线程的事件处理模型,简化了并发控制和线程安全的处理。此外,Netty还通过多线程模型和多个NioEventLoop实例的配合使用,实现了真正的并发处理,从而能够处理更大规模的并发连接和事件。

    @Override
    protected void run() {
        int selectCnt = 0;
        // 必须使用死循环不断进行事件轮询,获取任务和通道的 IO 事件
        for (;;) {
            try {
                int strategy;
                try {
                    /**
                     * 返回处理策略,就分为两种:
                     * 有任务 hasTasks() == true,就不能等待IO事件了,先直接调用 selectNow() 方法,
                     * 获取当前准备好IO 的通道channel 的数量(0 表示一个都没有),处理 IO 事件 和任务。
                     *
                     * 没有任务 hasTasks() == false,返回 SelectStrategy.SELECT (是负数),
                     * 没有要及时处理的任务,先阻塞等待 IO 事件
                     */
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                        // 返回下一个计划任务准备运行的截止时间纳秒值
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            // 返回 -1,说明没有下一个计划任务,
                            // 将 curDeadlineNanos 设置为 NONE,
                            // 调用 selector.select 方法时,就没有超时,
                            // 要无限等待了,除非被唤醒或者有准备好的 IO 事件。
                            curDeadlineNanos = NONE;
                        }
                        // 设置 超时等待时间
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {
                                // 当前没有任务,那么就通过 selector 查看有没有 IO 事件
                                // 并设置超时时间,超时时间到了那么就要执行计划任务了
                                // 如果 curDeadlineNanos 是 NONE,就没有超时,无限等待。
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // 这个更新只是为了帮助阻止不必要的选择器唤醒,
                            // 所以使用lazySet是可以的(没有竞争条件)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // 如果我们在这里接收到IOException,那是因为Selector搞错了。
                    // 让我们重新构建选择器并重试。
                    // https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }
                /**
                 * 代码走到这里,
                 * 要么有 IO 事件,即 strategy >0
                 * 要么就是有任务要运行。
                 * 如果两个都不是,那么就有可能是 JDK 的 epoll 的空轮询 BUG
                 */

                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    // 如果 ioRatio
                    try {
                        if (strategy > 0) {
                            processSelectedKeys();
                        }
                    } finally {
                        // 确保运行了所有待执行任务,包括当前时间已经过期的计划任务
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    // strategy > 0 说明有 IO 事件,
                    // 那么需要调用 processSelectedKeys() 方法,执行 IO 时间
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // 计算 IO 操作花费的时间
                        final long ioTime = System.nanoTime() - ioStartTime;
                        // 按照比例计算可以运行任务的超时时间 ioTime * (100 - ioRatio) / ioRatio,
                        // 超时时间到了,即使还有任务没有运行,也直接返回了,等下一个周期在运行这些任务
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    // strategy == 0 说明没有 IO 事件,不用处理 IO 了
                    // 调用 runAllTasks(0) 方法,超时时间为0,这将运行最小数量的任务
                    ranTasks = runAllTasks(0);
                }

                if (ranTasks || strategy > 0) {
                    // 要么有任务运行,要么有 IO 事件处理
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) {
                    // 即没有任务运行,也没有IO 事件处理,就有可能是 JDK 的 epoll 的空轮询 BUG
                    // 调用 unexpectedSelectorWakeup(selectCnt) 方法处理。
                    // 可能会重新建立 Select

                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
            } catch (Error e) {
                throw e;
            } catch (Throwable t) {
                handleLoopException(t);
            } finally {
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        // 如果事件轮询器开始 shutdown,就要关闭 IO 资源
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Error e) {
                    throw e;
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    }

NioEventLoop如何处理多路复用

NioEventLoop 在 Netty 中处理多路复用的方式主要是基于 Java NIO 的非阻塞特性和选择器(Selector)机制。多路复用允许单个线程同时处理多个通道(Channel)的 I/O 事件,从而提高了系统的吞吐量和响应速度。

以下是 NioEventLoop 如何处理多路复用的具体步骤:

  1. 初始化与注册

    • NioEventLoop 在初始化时创建一个 Selector 对象,用于监控多个 Channel 的状态。
    • 当一个 Channel 需要处理 I/O 事件时,它会被注册到 NioEventLoopSelector 上,并指定感兴趣的事件类型(如读、写等)。
  2. 轮询就绪事件

    • NioEventLoop 进入事件循环,调用 Selectorselect() 方法等待通道上就绪的事件。
    • select() 方法会阻塞,直到至少有一个 Channel 的事件就绪,或者超时。
  3. 处理就绪事件

    • select() 方法返回时,NioEventLoop 获取就绪的 Channel 集合。
    • 对于每个就绪的 ChannelNioEventLoop 根据其感兴趣的事件类型调用相应的处理器(如 ChannelInboundHandler)来处理这些事件。
  4. 多路复用核心

    • Selector 的核心作用在于它能够同时监控多个 Channel,并且只选择那些处于就绪状态的 Channel 进行处理。
    • 通过这种方式,NioEventLoop 可以使用单个线程高效地处理大量并发的 Channel,而无需为每个 Channel 创建一个独立的线程。
  5. 异步非阻塞通信

    • 由于 Netty 采用了异步通信模式,NioEventLoop 中的 IO 操作(如读、写)都是非阻塞的。
    • 这意味着当一个 IO 操作不能立即完成时(例如,等待数据从网络读取),线程不会被阻塞,而是可以继续处理其他任务或事件。
  6. 任务调度与执行

    • 除了处理 I/O 事件外,NioEventLoop 还负责执行提交给它的任务(如定时任务、用户自定义任务等)。
    • 这些任务与 I/O 事件一起,在 NioEventLoop 的事件循环中得到调度和执行。

Netty如何管理Channel和Selector

管理Channel

  1. 注册Channel
    当一个新的连接建立时,Netty会创建一个新的Channel实例(例如NioSocketChannelNioServerSocketChannel),并将其注册到NioEventLoop中的Selector上。注册过程包括将Channel添加到Selector的监控列表中,并设置其感兴趣的事件类型(如读、写、连接等)。

  2. 事件处理
    一旦Selector检测到某个Channel上有事件就绪(例如数据可读或可写),它会通知NioEventLoop。然后,NioEventLoop会调用相应的ChannelPipelineChannelHandler来处理这些事件。

  3. 关闭Channel
    当连接关闭时,Netty会注销Channel并从Selector的监控列表中移除它。同时,相关的资源也会被释放。

管理Selector

  1. 创建Selector
    NioEventLoop初始化时,它会创建一个Selector实例。这个Selector用于监控所有注册到该NioEventLoopChannel

  2. 轮询就绪事件
    NioEventLoop在事件循环中不断调用Selectorselect()方法来等待Channel上的事件就绪。一旦有事件就绪,Selector会返回这些事件的集合。

  3. 处理就绪事件
    NioEventLoop遍历Selector返回的就绪事件集合,并为每个事件调用相应的处理器。这通常涉及到调用ChannelPipeline中的ChannelHandler来处理这些事件。

  4. 优化Selector的使用
    Netty可能会使用多个SelectorNioEventLoop实例来优化性能,特别是在处理大量并发连接时。通过分散负载到多个线程和Selector上,Netty能够充分利用多核CPU的资源,提高吞吐量。

注意事项

  • 通常情况下,你不需要直接管理Selector。Netty的NioEventLoopChannel抽象为你提供了高级的API来处理I/O事件和连接。
  • 在编写自定义的ChannelHandler时,你需要关注如何处理事件,而不是如何管理ChannelSelector
  • 如果你需要执行定时任务或自定义的后台任务,可以使用NioEventLoop的任务队列来提交这些任务。这些任务会在事件循环的适当时候得到执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/528272.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

信息泄露漏洞的JS整改方案

引言 🛡️ 日常工作中,我们经常会面临线上环境被第三方安全厂商扫描出JS信息泄露漏洞的情况,这给我们的系统安全带来了潜在威胁。但幸运的是,对于这类漏洞的整改并不复杂。本文将介绍几种可行的整改方法,以及其中一种…

【C语言】:枚举和联合体

这里写自定义目录标题 1、枚举1.1 枚举类型的声明1.2 枚举类型的优点1.3 枚举类型的使用 2、联合体(共用体)2.1 联合体类型的声明2.2 联合体的特点2.3联合体大小的计算 1、枚举 1.1 枚举类型的声明 枚举顾名思义就是⼀⼀列举,把可能的取值⼀…

Tomcat以服务方式启动,无法访问网络共享目录问题

关于“Tomcat以服务方式启动,无法访问网络共享目录问题”解决方式如下: 1、通过doc命令【services.msc】打开本地服务找到,找到tomcat服务所在位置 2、右键打开Tomcat服务的属性 3、选择 登陆选项卡 4、选择“此账户”选项,并…

Nginx配置文件修改结合内网穿透实现公网访问多个本地web站点

文章目录 1. 下载windows版Nginx2. 配置Nginx3. 测试局域网访问4. cpolar内网穿透5. 测试公网访问6. 配置固定二级子域名7. 测试访问公网固定二级子域名 1. 下载windows版Nginx 进入官方网站(http://nginx.org/en/download.html)下载windows版的nginx 下载好后解压进入nginx目…

自动驾驶中的多目标跟踪_第二篇

自动驾驶中的多目标跟踪:第二篇 上一节介绍了多目标跟踪的定义、应用场景和类型以及面临的挑战;在这一节,我们回顾贝叶斯滤波,简单介绍运动模型和量测模型,卡尔曼滤波等。 附赠自动驾驶学习资料和量产经验:链接 贝叶…

Spring事务简介,事务角色,事务属性

1.Spring事务简介 事务作用:在数据层保障一系列的数据库操作同成功同失败Spring事务作用:在数据层或业务层保障一系列的数据操作同成功同失败 public interface PlatformTransactionManager{void commit(TransactionStatus status) throws TransactionE…

macU盘在电脑上读不出来 u盘mac读不出来怎么办 macu盘不能写入 Tuxera NTFS for Mac免费下载

对于Mac用户来说,使用U盘是很常见的操作,但有时候可能会遇到Mac电脑无法读取U盘的情况,这时候就需要使用一些特定的工具软件来帮助我们解决问题。本文就来告诉大家macU盘在电脑上读不出来是怎么回事,u盘mac读不出来怎么办。 一、m…

PaddleVideo:onnx模型导出

本文节介绍 PP-TSM 模型如何转化为 ONNX 模型,并基于 ONNX 引擎预测。 1:环境准备 安装 Paddle2ONNX python -m pip install paddle2onnx 安装 ONNXRuntime # 建议安装 1.9.0 版本,可根据环境更换版本号 python -m pip install onnxrunti…

C语言——关于指针运算的例题分析

1.指针运算中关于 sizeof 和 strlen 的例题分析 1. sizeof(数组名),这⾥的数组名表⽰整个数组,计算的是整个数组的⼤⼩。 2. &数组名,这⾥的数组名表⽰整个数组,取出的是整个数组的地址。 3. 除此之外所有的数组名都表⽰…

【算法每日一练]-动态规划(保姆级教程 篇17 状态压缩)

目录 今日知识点: 把状态压缩成j,dp每行i的布置状态,从i-1和i-2行进行不断转移 把状态压缩成j,dp每行i的布置状态,从i-1行进行状态匹配,然后枚举国王数转移 POJ1185:炮兵阵地 思路: 题目:互…

互联网大厂ssp面经之路:计算机网络part2

什么是 HTTP 和 HTTPS?它们之间有什么区别? a. HTTP(超文本传输协议)和HTTPS(安全超文本传输协议)是用于在Web上传输数据的协议。它们之间的区别在于安全性和数据传输方式。 b. HTTP是一种不安全的协议&…

算法训练营第二十一天(二叉树part7)

算法训练营第二十一天(二叉树part7) 530.二叉搜索树的最小绝对差 力扣题目链接(opens new window) 题目 给你一棵所有节点为非负值的二叉搜索树,请你计算树中任意两节点的差的绝对值的最小值。 示例: 提示:树中至…

原来进制题如此简单

进制相关 二进制数与其他进制数之间的转化 二进制数转其他进制数(十进制数除外)一般不能直接转化,一般需要过度至十进制数,再转化为其他进制数。同理,其他进制数(十进制数除外)转二进制数也需过…

flutter多入口点entrypoint

native中引擎对象本身消耗内存(每个引擎对象约莫消耗42MB内存) 多引擎:native多引擎>启动>flutter多入口点entrypoint>多main函数>多子包元素集>多(子)程序 单引擎(复用):native单引擎>复用启动>flutter多入口点entrypoint>多m…

error:LNK2005 已经在*.obj中定义 的原因分析及对策

LNK2005是一个重复定义错误,造成LNK2005主要有以下几种情况: 目录 全局变量的重复定义 情况A:全局变量在.cpp文件中的多次声明 情况B:变量名重复 头文件的包含重复 解决方案 #ifndef标识符宏定义 pragma once预编译 头文件…

使用yml文件配置python日志

新建一个logging.yml文件,内容如下: logging库提供了多个组件:Logger、Handler、Filter、Formatter: Logger 对象提供应用程序可直接使用的接口,供应用代码使用; Handler 发送日志到适当的目的地&#xff…

PMC管理中落实生产作业计划的思路与方法

在快节奏的现代商业环境中,PMC(生产及物料控制)管理对于确保企业生产流程的高效运转至关重要。生产作业计划的落实不仅关乎企业的生产效率和成本控制,更是企业竞争力的重要体现。那么,PMC管理中如何有效落实生产作业计…

上传应用程序到苹果应用商店的工具和要点

引言 在今天的移动应用市场中,将应用程序上传到苹果应用商店(App Store)是许多开发者的首要任务之一。然而,不同操作系统下的开发者可能需要使用不同的工具和遵循不同的要求来完成这一任务。本文将介绍在 macOS、Windows 和 Linu…

【C语言】:字符函数和字符串函数

这里写目录标题 1、strlen的使用和模拟实现2、strcpy的使用和模拟3、strcat 的使用和模拟实现4、strcmp 的使用和模拟实现5、strncpy 函数的使用6、strncat 函数的使用7、strncmp函数的使用8、strstr 的使用和模拟实现9、strtok 函数的使用10、strerror 函数的使用11、字符分类…

独家原创 | SCI 1区 高创新轴承故障诊断模型!

往期精彩内容: Python-凯斯西储大学(CWRU)轴承数据解读与分类处理 Python轴承故障诊断 (一)短时傅里叶变换STFT Python轴承故障诊断 (二)连续小波变换CWT_pyts 小波变换 故障-CSDN博客 Python轴承故障诊断 (三)经验模态分解EMD_轴承诊断 …