庖丁解牛:NIO核心概念与机制详解 06 _ 连网和异步 I/O

文章目录

  • Pre
  • 概述
  • 异步 I/O
  • Selectors
  • 打开一个 ServerSocketChannel
  • 选择键
  • 内部循环
  • 监听新连接
  • 接受新的连接
  • 删除处理过的 SelectionKey
  • 传入的 I/O
  • 回到主循环

在这里插入图片描述


Pre

庖丁解牛:NIO核心概念与机制详解 01

庖丁解牛:NIO核心概念与机制详解 02 _ 缓冲区的细节实现

庖丁解牛:NIO核心概念与机制详解 03 _ 缓冲区分配、包装和分片

庖丁解牛:NIO核心概念与机制详解 04 _ 分散和聚集

庖丁解牛:NIO核心概念与机制详解 05 _ 文件锁定


概述

在 Java NIO 中,连网操作与其他操作一样,依赖于通道(Channel)和缓冲区(Buffer)。通道是用于读取和写入数据的途径,而缓冲区则用于暂存数据。

与传统的同步 I/O 不同,Java NIO 中的通道操作是非阻塞的,这意味着在发起 IO 请求后,进程可以继续执行其他任务,而不需要等待 IO 操作完成。当 IO 操作完成后,进程会收到通知,此时再进行相应的处理。


异步 I/O

异步 I/O 是一种 没有阻塞地 读写数据的方法。通常,在代码进行 read() 调用时,代码会阻塞直至有可供读取的数据。同样, write() 调用将会阻塞直至数据能够写入。

另一方面,异步 I/O 调用不会阻塞。相反,你将注册对特定 I/O 事件的兴趣 ― 可读的数据的到达、新的套接字连接,等等,而在发生这样的事件时,系统将会告诉你。

异步 I/O 的一个优势在于,它允许你同时根据大量的输入和输出执行 I/O。同步程序常常要求助于轮询,或者创建许许多多的线程以处理大量的连接。使用异步 I/O,你可以监听任何数量的通道上的事件,不用轮询,也不用额外的线程。

来看个Demo

这个程序就像传统的 echo server,它接受网络连接并向它们回响它们可能发送的数据。不过它有一个附加的特性,就是它能同时监听多个端口,并处理来自所有这些端口的连接。并且它只在单个线程中完成所有这些工作。


import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class MultiPortEcho {
    private int ports[];
    private ByteBuffer echoBuffer = ByteBuffer.allocate(1024);

    public MultiPortEcho(int ports[]) throws IOException {
        this.ports = ports;

        go();
    }

    private void go() throws IOException {
        // Create a new selector
        Selector selector = Selector.open();

        // Open a listener on each port, and register each one
        // with the selector
        for (int i = 0; i < ports.length; ++i) {
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ServerSocket ss = ssc.socket();
            InetSocketAddress address = new InetSocketAddress(ports[i]);
            ss.bind(address);

            SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT);

            System.out.println("Going to listen on " + ports[i]);
        }

        while (true) {
            int num = selector.select();

            Set selectedKeys = selector.selectedKeys();
            Iterator it = selectedKeys.iterator();

            while (it.hasNext()) {
                SelectionKey key = (SelectionKey) it.next();

                if ((key.readyOps() & SelectionKey.OP_ACCEPT)
                        == SelectionKey.OP_ACCEPT) {
                    // Accept the new connection
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);

                    // Add the new connection to the selector
                    SelectionKey newKey = sc.register(selector, SelectionKey.OP_READ);
                    it.remove();

                    System.out.println("Got connection from " + sc);
                } else if ((key.readyOps() & SelectionKey.OP_READ)
                        == SelectionKey.OP_READ) {
                    // Read the data
                    SocketChannel sc = (SocketChannel) key.channel();

                    // Echo data
                    int bytesEchoed = 0;
                    while (true) {
                        echoBuffer.clear();

                        int r = sc.read(echoBuffer);

                        if (r <= 0) {
                            break;
                        }

                        echoBuffer.flip();

                        sc.write(echoBuffer);
                        bytesEchoed += r;
                    }

                    System.out.println("Echoed " + bytesEchoed + " from " + sc);

                    it.remove();
                }

            }

//System.out.println( "going to clear" );
//      selectedKeys.clear();
//System.out.println( "cleared" );
        }
    }

    public static void main(String args[]) throws Exception {
        if (args.length <= 0) {
            System.err.println("Usage: java MultiPortEcho port [port port ...]");
            System.exit(1);
        }

        int ports[] = new int[args.length];

        for (int i = 0; i < args.length; ++i) {
            ports[i] = Integer.parseInt(args[i]);
        }

        new MultiPortEcho(ports);
    }
}

Selectors

我们来基于 MultiPortEcho 的源代码中的 go() 方法的实现,因此应该看一下源代码,以便对所发生的事情有个更全面的了解。

异步 I/O 中的核心对象名为 SelectorSelector 就是你注册对各种 I/O 事件的兴趣的地方,而且当那些事件发生时,就是这个对象告诉你所发生的事件。

所以,我们需要做的第一件事就是创建一个 Selector

// Create a new selector
Selector selector = Selector.open();

然后,我们将对不同的通道对象调用 register() 方法,以便注册我们对这些对象中发生的 I/O 事件的兴趣。register() 的第一个参数总是这个 Selector。


打开一个 ServerSocketChannel

为了接收连接,我们需要一个 ServerSocketChannel。事实上,我们要监听的每一个端口都需要有一个 ServerSocketChannel 。

对于每一个端口,我们打开一个 ServerSocketChannel,如下所示:

ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking( false );
  
ServerSocket ss = ssc.socket();
InetSocketAddress address = new InetSocketAddress( ports[ii] );
ss.bind( address );

第一行创建一个新的 ServerSocketChannel ,最后三行将它绑定到给定的端口。
第二行将 ServerSocketChannel 设置为 非阻塞的 。我们必须对每一个要使用的套接字通道调用这个方法,否则异步 I/O 就不能工作。


选择键

下一步是将新打开的 ServerSocketChannels 注册到 Selector上。为此我们使用 ServerSocketChannel.register() 方法,如下所示:

SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT );

register() 的第一个参数总是这个 Selector
第二个参数是 OP_ACCEPT,这里它指定我们想要监听 accept 事件,也就是在新的连接建立时所发生的事件。这是适用于 ServerSocketChannel 的唯一事件类型。

请注意对 register() 的调用的返回值。 SelectionKey 代表这个通道在此 Selector 上的这个注册。当某个 Selector 通知你某个传入事件时,它是通过提供对应于该事件的 SelectionKey 来进行的。SelectionKey 还可以用于取消通道的注册。


内部循环

现在已经注册了我们对一些 I/O 事件的兴趣,下面将进入主循环。使用 Selectors 的几乎每个程序都像下面这样使用内部循环:

int num = selector.select();
  
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
  
while (it.hasNext()) {
     SelectionKey key = (SelectionKey)it.next();
     // ... deal with I/O event ...
}

首先,我们调用 Selectorselect() 方法。这个方法会阻塞,直到至少有一个已注册的事件发生。当一个或者更多的事件发生时, select() 方法将返回所发生的事件的数量。

接下来,我们调用 SelectorselectedKeys() 方法,它返回发生了事件的 SelectionKey 对象的一个 集合 。

我们通过迭代 SelectionKeys 并依次处理每个 SelectionKey 来处理事件。对于每一个 SelectionKey,你必须确定发生的是什么 I/O 事件,以及这个事件影响哪些 I/O 对象。


监听新连接

程序执行到这里,我们仅注册了 ServerSocketChannel,并且仅注册它们“接收”事件。为确认这一点,我们对 SelectionKey 调用 readyOps() 方法,并检查发生了什么类型的事件:

if ((key.readyOps() & SelectionKey.OP_ACCEPT)
     == SelectionKey.OP_ACCEPT) {
  
     // Accept the new connection
     // ...
}

可以肯定地说, readOps() 方法告诉我们该事件是新的连接。


接受新的连接

因为我们知道这个服务器套接字上有一个传入连接在等待,所以可以安全地接受它;也就是说,不用担心 accept() 操作会阻塞:

ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
SocketChannel sc = ssc.accept();

下一步是将新连接的 SocketChannel 配置为非阻塞的。而且由于接受这个连接的目的是为了读取来自套接字的数据,所以我们还必须将 SocketChannel 注册到 Selector上,如下所示:

sc.configureBlocking( false );
SelectionKey newKey = sc.register( selector, SelectionKey.OP_READ );

注意我们使用 register()OP_READ 参数,将 SocketChannel 注册用于 读取 而不是 接受 新连接。


删除处理过的 SelectionKey

在处理 SelectionKey 之后,我们几乎可以返回主循环了。但是我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活的键出现,这会导致我们尝试再次处理它。我们调用迭代器的 remove() 方法来删除处理过的 SelectionKey

it.remove();

现在我们可以返回主循环并接受从一个套接字中传入的数据(或者一个传入的 I/O 事件)了。


传入的 I/O

当来自一个套接字的数据到达时,它会触发一个 I/O 事件。这会导致在主循环中调用 Selector.select(),并返回一个或者多个 I/O 事件。这一次, SelectionKey 将被标记为 OP_READ 事件,如下所示:

} else if ((key.readyOps() & SelectionKey.OP_READ)
     == SelectionKey.OP_READ) {
     // Read the data
     SocketChannel sc = (SocketChannel)key.channel();
     // ...
}

与以前一样,我们取得发生 I/O 事件的通道并处理它。在本例中,由于这是一个 echo server,我们只希望从套接字中读取数据并马上将它发送回去。


回到主循环

每次返回主循环,我们都要调用 selectSelector()方法,并取得一组 SelectionKey。每个键代表一个 I/O 事件。我们处理事件,从选定的键集中删除 SelectionKey,然后返回主循环的顶部。

这个程序有点过于简单,因为它的目的只是展示异步 I/O 所涉及的技术。在现实的应用程序中,我们需要通过将通道从 Selector 中删除来处理关闭的通道。而且我们可能要使用多个线程。这个程序可以仅使用一个线程,因为它只是一个演示,但是在现实场景中,创建一个线程池来负责 I/O 事件处理中的耗时部分会更有意义。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/168997.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C# 监测 Windows 设备变动事件

本程序通过WPF窗口的 WindowProc 函数处理Windows的硬件或配置改变的事件。开发环境为VS 2022。 基础信息 硬件或配置改变的基础有以下内容&#xff1a; 消息: WM_DEVICECHANGE 要实现的WindowProc 函数参数&#xff1a; protected IntPtr WndProc(IntPtr hwnd, int msg, In…

React 中 react-i18next 切换语言( 项目国际化 )

背景 平时中会遇到需求&#xff0c;就是切换语言&#xff0c;语种等。其实总的来说都是用i18n来实现的 思路 首先在项目中安装i18n插件&#xff0c;然后将插件引入到项目&#xff0c;然后配置语言包&#xff08;语言包需要你自己来进行配置&#xff0c;自己编写语言包&#xff…

C++初阶 | [四] 类和对象(下)

摘要&#xff1a;初始化列表&#xff0c;explicit关键字&#xff0c;匿名对象&#xff0c;static成员&#xff0c;友元&#xff0c;内部类&#xff0c;编译器优化 类是对某一类实体(对象)来进行描述的&#xff0c;描述该对象具有哪些属性、哪些方法&#xff0c;描述完成后就形成…

【zabbix监控三】zabbix之部署代理服务器

一、部署代理服务器 分布式监控的作用&#xff1a; 分担server的几种压力解决多机房之间的网络延时问题 1、搭建proxy主机 1.1 关闭防火墙&#xff0c;修改主机名 systemctl disbale --now firewalld setenforce 0 hostnamectl set-hostname zbx-proxy su1.2 设置zabbix下…

【C++ Primer Plus学习记录】for循环

很多情况下都需要程序执行重复的任务&#xff0c;C中的for循环可以轻松地完成这种任务。 我们来从程序清单5.1了解for循环所做的工作&#xff0c;然后讨论它是如何工作的。 //forloop.cpp #if 1 #include<iostream> using namespace std;int main() {int i;for (i 0; …

百云齐鲁 | 云轴科技ZStack成功实践精选(山东)

山东省作为我国重要的工业基地和北方地区经济发展的战略支点&#xff0c;在“十四五”规划中将数字强省建设分为数字基础设施、数字科技、数字经济、数字政府、数字社会、数字生态六大部分&#xff0c;涵盖政治、经济、民生等多个方面&#xff0c;并将大数据、云计算、人工智能…

数电实验-----实现74LS153芯片扩展为8选1数据选择器以及应用(Quartus II )

目录 一、74LS153芯片介绍 管脚图 功能表 二、4选1选择器扩展为8选1选择器 1.扩展原理 2.电路图连接&#xff08;Quartus II &#xff09; 3.仿真结果 三、8选1选择器的应用 1.三变量表决器 2.奇偶校验电路 一、74LS153芯片介绍 74ls153芯片是属于四选一选择器的芯片。…

Nginx-负载均衡-动静分离-虚拟主机

负载均衡 负载均衡基本使用 1 配置上游服务器 upstream myserver { #是server外层server ip1:8080;server ip1:8080; }2 配置代理 server {location / { proxy_pass http://myserver;#请求转向myserver 定义的服务器列表 注意这个http不能丢 pro…

VulnHub DC-7

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【python】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收藏…

【数据结构与算法】Kadane‘s算法(动态规划、最大子数组和)

文章目录 一、算法原理二、例题2.1 最大子数组和2.2 环形子数组的最大和 一、算法原理 Kadanes算法是一种用于解决最大子数组和问题的动态规划算法。这类问题的目标是在给定整数数组中找到一个连续的子数组&#xff0c;使其元素之和最大&#xff08;数组含有负数&#xff09;。…

MySQL为什么选择了B+树

首先MySQL的数据**&#xff08;索引记录&#xff09;**是存在磁盘里的&#xff0c;磁盘读取非常慢&#xff0c;所以要尽可能减少磁盘操作&#xff0c;因此我们需要更好的利用索引。 首先索引按顺序排列了数据&#xff0c;那么很显然最好的查找方式是二分查找&#xff0c;数组自…

【Spring Boot】使用WebSocket协议完成来单提醒及客户催单功能

1 WebSocket介绍 WebSocket 是基于 TCP 的一种新的网络协议。它实现了浏览器与服务器全双工通信(双向传输)——浏览器和服务器只需要完成一次握手&#xff0c;两者之间就可以创建持久性的连接&#xff0c; 并进行双向数据传输。 1.1 HTTP协议和WebSocket协议对比 1、HTTP是短…

【10套模拟】【7】

关键字&#xff1a; 二叉排序树插入一定是叶子、单链表简单选择排序、子串匹配、层次遍历

【Python】问题描述:输入A、B,输出A+B。样例输入12 45样例输出57

1、问题描述 输入A、B&#xff0c;输出AB。 样例输入 12 45 样例输出 57 nums list(map(int,input().split(" "))) print(sum(nums))

Java 算法篇-链表的经典算法:判断回文链表、判断环链表与寻找环入口节点(“龟兔赛跑“算法实现)

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 链表的创建 2.0 判断回文链表说明 2.1 快慢指针方法 2.2 使用递归方式实现反转链表方法 2.3 实现判断回文链表 - 使用快慢指针与反转链表方法 3.0 判断环链表说明…

基于人工水母算法优化概率神经网络PNN的分类预测 - 附代码

基于人工水母算法优化概率神经网络PNN的分类预测 - 附代码 文章目录 基于人工水母算法优化概率神经网络PNN的分类预测 - 附代码1.PNN网络概述2.变压器故障诊街系统相关背景2.1 模型建立 3.基于人工水母优化的PNN网络5.测试结果6.参考文献7.Matlab代码 摘要&#xff1a;针对PNN神…

vue-admin-template改变接口地址

修改登录接口 1.f12查看请求接口 模仿返回数据写接口 修改方式1 1.在env.devolopment修改 修改方式2 vue.config.js 改成本地接口地址 配置转发 后端创建相应接口&#xff0c;使用map返回相同的数据 修改前端请求路径 修改前端返回状态码 utils里面的request.js

Iceberg学习笔记(1)—— 基础知识

Iceberg是一个面向海量数据分析场景的开放表格式&#xff08;Table Format&#xff09;&#xff0c;其设计的目的是解决数据存储和计算引擎之间的适配的问题 表格式&#xff08;Table Format&#xff09;可以理解为元数据以及数据文件的一种组织方式&#xff0c;处于计算框架&…

Positive Technologies 利用 PT Cloud Application Firewall 保护中小型企业的网络资源

云产品按月订购&#xff0c;无需购买硬件资源 PT Cloud Application Firewall 是 Positive Technologies 推出的首个用于保护网络应用程序的商用云产品。Web 应用层防火墙 (web application firewall, WAF) 现在可以通过 技术合作伙伴——授权服务商和云提供商以订购方式提供1…

浅析ChatGPT中涉及到的几种技术点

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️ &#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…