NIO-Selector详解
Selector概述
Selector选择器,也可以称为多路复⽤器。它是Java NIO的核⼼组件之⼀,⽤于检查⼀个或多个Channel的状态是否处于可读、可写、可连接、可接收等。通过⼀个Selector选择器管理多个Channel,可以实现⼀个线程管理多个Channel对应的⽹络连接。使⽤单线程管理多个Channel可以避免多线程的线程上下⽂切换带来的额外开销。
SelectableChannel可选择通道
只有SelectableChannel才能被Selector管理,⽐如所有的Socket通道。⽽FileChannel并没有继承SelectableChannel,因此不能被Selector管理。
Channel注册到Selector上
Channel通过注册的⽅式关联Selector。⼀个Channel可以注册到多个Selector上,但在某⼀个Selector上只能注册⼀次。注册时需要告知Selector,Selector需要对通道的哪个操作感兴趣。
public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException{ return register(sel, ops, null); }
通道的操作如下:
-
可读:SelectionKey.OP_READ
-
可写:SelectionKey.OP_WRITE
-
可连接:SelectionKey.OP_CONNECT
-
可接收:SelectionKey.OP_ACCEPT
⽐如channel调⽤register⽅法进⾏注册到Selector,并告知Selector对哪个操作感兴趣:
channel.register(selector, SelectionKey.OP_READ);
也可以同时注册多个操作:
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
选择器会查询每个⼀个channel的操作事件,如果是该channel注册的操作已就绪,则进⾏响应。注意,这⾥channel的操作指的是channel完成某个操作的条件,表示该channel对于该操作已处于就绪状态。⽐如ServerSocketChannel已准备好接收新的连接,那么它注册的 SelectionKey.OP_ACCEPT
操作就处于就绪状态。⼜⽐如SocketChannel已准备好去连接Server服务器,那么它注册的SelectionKey.OP_CONNECT
操作就处于就绪状态。于是Selector就可以触发之后的动作。
SelectionKey选择键
SelectionKey封装了Channel和注册的操作。
当Selector调⽤select()⽅法时,会轮询所有注册在它身上的Channel,查看是否有处于某个操作(已注册到selector上的)就绪状态的Channel,把这些Channel放⼊到SelectionKey的集合中。
Selector的使用
-
创建Selector 通过Selector的open⽅法创建Selector对象。
// 创建Selector Selector selector = Selector.open();
-
Channel注册到Selector上 Channel必须处于非阻塞模式才能注册到Selector上
package com.my.io.selector; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; /** * @author zhupanlin * @version 1.0 * @description: selector的使用:注册channel到selector上 * @date 2024/1/26 11:02 */ public class Demo1 { public static void main(String[] args) throws IOException { // 1.创建Selector Selector selector = Selector.open(); // 2.获得Channel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 3.设置成非阻塞的模式 serverSocketChannel.configureBlocking(false); // 4.绑定端口 serverSocketChannel.bind(new InetSocketAddress(9001)); // 5.注册channel到selector上 SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } }
-
Selector轮询就绪状态的Channel
Selector通过调⽤select⽅法轮询已就绪的通道操作。select⽅法是阻塞的,直到⾄少有⼀个通道的注册操作已就绪。当完成select⽅法调⽤后,被选中的已就绪的所有channel通过Selector的selectedKeys()⽅法获得,该⽅法获得的是⼀个SelectionKey集合,其中每⼀个SelectionKey都表示⼀个Channel。于是可以根据SelectionKey的注册操作来做具体的业务处理。
package com.my.io.selector; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.util.Iterator; import java.util.Set; /** * @author zhupanlin * @version 1.0 * @description: Selector轮询就绪状态的Channel * @date 2024/1/26 11:10 */ public class Demo2 { public static void main(String[] args) throws IOException { Selector selector = Selector.open(); // serverSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); // 绑定端口 serverSocketChannel.bind(new InetSocketAddress(9001)); // 注册 SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // selector轮询 while (true){ // 阻塞等待某个操作就绪状态的channel selector.select(); // 获得一个集合,里面包含了这次selector执行select方法获得的发生就绪状态的多个channel Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 遍历 Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); if (key.isReadable()){ // 处理读状态的业务 }else if (key.isAcceptable()){ // 处理接收状态的业务 }else if (key.isConnectable()){ // 处理连接状态的业务 }else if (key.isWritable()){ // 处理写状态的业务 } // 保证下次channel有就绪状态的操作发生时可以被selector轮询到 iterator.remove(); } } } }
Selector示例
-
实现NIO通信的服务端
package com.my.io.selector; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; /** * @author zhupanlin * @version 1.0 * @description: 服务端demo * @date 2024/1/26 11:45 */ public class ServerDemo { public static void main(String[] args) throws IOException { // 获得Channel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 设置成非阻塞 serverSocketChannel.configureBlocking(false); // 绑定端口号 serverSocketChannel.bind(new InetSocketAddress(9001)); // 获得Selector Selector selector = Selector.open(); // 把channel注册到selector上面 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 让selector轮询监听 while (true){ // 阻塞直到有通道就绪 selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 获取有动作的selectionKey == channel Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()){ SelectionKey selectionKey = iterator.next(); handle(selectionKey); // 删除key,表示处理完成 iterator.remove(); } } } private static void handle(SelectionKey selectionKey) throws IOException { if (selectionKey.isAcceptable()){ // 当服务端处于接收的就绪状态 // 获得selectionKey中的channel ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectionKey.channel(); // 接收客户端连接,获得socketChannel SocketChannel socketChannel = serverSocketChannel.accept(); // 设置成非阻塞状态,否则无法被selector复用 socketChannel.configureBlocking(false); // 把socketChannel注册到selector上,让selector对socketChannel的read操作感兴趣 socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ); }else if (selectionKey.isReadable()){ // 当socketChannel处于读数据的就绪状态 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); // 读取socketChannel中的数据 //设置成非阻塞 socketChannel.configureBlocking(false); // 创建Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); // 读数据 int len = 0; while ((len = socketChannel.read(buffer)) > 0){ // 翻转 buffer.flip(); System.out.println(new String(buffer.array(), 0, len)); // 清除buffer中的数据 buffer.clear(); } socketChannel.register(selectionKey.selector(), SelectionKey.OP_WRITE); }else if (selectionKey.isWritable()){ } } }
-
客户端
package com.my.io.selector; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; /** * @author zhupanlin * @version 1.0 * @description: 客户端demo * @date 2024/1/26 11:28 */ public class ClientDemo { public static void main(String[] args) throws IOException { // 创建Channel SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9001)); // 设置成非阻塞模式 socketChannel.configureBlocking(false); // 得到buffer ByteBuffer buffer = ByteBuffer.allocate(1024); // 把数据写入到buffer中 buffer.put("hello selector".getBytes()); // 反转buffer buffer.flip(); // 把buffer中的数据写入到channel中 socketChannel.write(buffer); // 关闭 socketChannel.close(); } }