Channel(通道)
Channel 是一个通道,它建立了与数据源(如文件、网络套接字等)之间的连接。我们可以利用它来读取和写入数据,就像打开了一条自来水管,让数据在 Channel 中自由流动。
BIO 中的流是单向的,分为各种 InputStream
(输入流)和 OutputStream
(输出流),数据只是在一个方向上传输。通道与流的不同之处在于通道是双向的,它可以用于读、写或者同时用于读写。
Channel 与前面介绍的 Buffer 打交道,读操作的时候将 Channel 中的数据填充到 Buffer 中,而写操作时将 Buffer 中的数据写入到 Channel 中。
另外,因为 Channel 是全双工的,所以它可以比流更好地映射底层操作系统的 API。特别是在 UNIX 网络编程模型中,底层操作系统的通道都是全双工的,同时支持读写操作。
Channel
的子类如下图所示。
其中,最常用的是以下几种类型的通道:
FileChannel
:文件访问通道;SocketChannel
、ServerSocketChannel
:TCP 通信通道;DatagramChannel
:UDP 通信通道;
Channel 最核心的两个方法:
read
:读取数据并写入到 Buffer 中。write
:将 Buffer 中的数据写入到 Channel 中。
这里我们以 FileChannel
为例演示一下是读取文件数据的。
RandomAccessFile reader = new RandomAccessFile("/Users/guide/Documents/test_read.in", "r"))
FileChannel channel = reader.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
Selector(选择器)
Selector(选择器) 是 NIO 中的一个关键组件,它允许一个线程处理多个 Channel。Selector 是基于事件驱动的 I/O 多路复用模型,主要运作原理是:通过 Selector 注册通道的事件,Selector 会不断地轮询注册在其上的 Channel。当事件发生时,比如:某个 Channel 上面有新的 TCP 连接接入、读和写事件,这个 Channel 就处于就绪状态,会被 Selector 轮询出来。Selector 会将相关的 Channel 加入到就绪集合中。通过 SelectionKey 可以获取就绪 Channel 的集合,然后对这些就绪的 Channel 进行相应的 I/O 操作。
一个多路复用器 Selector 可以同时轮询多个 Channel,由于 JDK 使用了 epoll()
代替传统的 select
实现,所以它并没有最大连接句柄 1024/2048
的限制。这也就意味着只需要一个线程负责 Selector 的轮询,就可以接入成千上万的客户端。
Selector 可以监听以下四种事件类型:
SelectionKey.OP_ACCEPT
:表示通道接受连接的事件,这通常用于ServerSocketChannel
。SelectionKey.OP_CONNECT
:表示通道完成连接的事件,这通常用于SocketChannel
。SelectionKey.OP_READ
:表示通道准备好进行读取的事件,即有数据可读。SelectionKey.OP_WRITE
:表示通道准备好进行写入的事件,即可以写入数据。
Selector
是抽象类,可以通过调用此类的 open()
静态方法来创建 Selector 实例。Selector 可以同时监控多个 SelectableChannel
的 IO
状况,是非阻塞 IO
的核心。
一个 Selector 实例有三个 SelectionKey
集合:
- 所有的
SelectionKey
集合:代表了注册在该 Selector 上的Channel
,这个集合可以通过keys()
方法返回。 - 被选择的
SelectionKey
集合:代表了所有可通过select()
方法获取的、需要进行IO
处理的 Channel,这个集合可以通过selectedKeys()
返回。 - 被取消的
SelectionKey
集合:代表了所有被取消注册关系的Channel
,在下一次执行select()
方法时,这些Channel
对应的SelectionKey
会被彻底删除,程序通常无须直接访问该集合,也没有暴露访问的方法。
简单演示一下如何遍历被选择的 SelectionKey
集合并进行处理:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key != null) {
if (key.isAcceptable()) {
// ServerSocketChannel 接收了一个新连接
} else if (key.isConnectable()) {
// 表示一个新连接建立
} else if (key.isReadable()) {
// Channel 有准备好的数据,可以读取
} else if (key.isWritable()) {
// Channel 有空闲的 Buffer,可以写入数据
}
}
keyIterator.remove();
}
Selector 还提供了一系列和 select()
相关的方法:
int select()
:监控所有注册的Channel
,当它们中间有需要处理的IO
操作时,该方法返回,并将对应的SelectionKey
加入被选择的SelectionKey
集合中,该方法返回这些Channel
的数量。int select(long timeout)
:可以设置超时时长的select()
操作。int selectNow()
:执行一个立即返回的select()
操作,相对于无参数的select()
方法而言,该方法不会阻塞线程。Selector wakeup()
:使一个还未返回的select()
方法立刻返回。- ……
使用 Selector 实现网络读写的简单示例:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NioSelectorExample {
public static void main(String[] args) {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
// 将 ServerSocketChannel 注册到 Selector 并监听 OP_ACCEPT 事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int readyChannels = selector.select();
if (readyChannels == 0) {
continue;
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
// 处理连接事件
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
// 将客户端通道注册到 Selector 并监听 OP_READ 事件
client.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 处理读事件
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = client.read(buffer);
if (bytesRead > 0) {
buffer.flip();
System.out.println("收到数据:" +new String(buffer.array(), 0, bytesRead));
// 将客户端通道注册到 Selector 并监听 OP_WRITE 事件
client.register(selector, SelectionKey.OP_WRITE);
} else if (bytesRead < 0) {
// 客户端断开连接
client.close();
}
} else if (key.isWritable()) {
// 处理写事件
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.wrap("Hello, Client!".getBytes());
client.write(buffer);
// 将客户端通道注册到 Selector 并监听 OP_READ 事件
client.register(selector, SelectionKey.OP_READ);
}
keyIterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
在示例中,我们创建了一个简单的服务器,监听 8080 端口,使用 Selector 处理连接、读取和写入事件。当接收到客户端的数据时,服务器将读取数据并将其打印到控制台,然后向客户端回复 "Hello, Client!"。