一、基本概念
选择器提供一种选择执行已经就绪的任务的能力。selector选择器可以让单线程处理多个通道。如果程序打开了多个连接通道,每个连接的流量都比较低,可以使用Selector对通道进行管理。
二、如何创建选择器
1.创建Selector
Selector selector = Selector.open();
2.必须将通道设置为非阻塞模式才能注册到选择器上
Channel.configureBlocking(false);
3.把通道注册到选择器上,会返回一个选择键
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
SelectionKey的操作有:
- SelectionKey.OP_CONNECT,指某个通道连接到服务器
- SelectionKey.OP_ACCEPT,只有ServerSocketChannel有这个事件,查看是否有新的连接
- SelectionKey.OP_READ,是否有可读的通道就绪
- SelectionKey.OP_WRITE,写数据的通道是否就绪
注册完成后,可以调用select()方法轮询是否有就绪的通道
int count = selector.select();
select()方法,返回就绪的通道数量
三、服务器端模板
//服务器端模板代码
public static void Server_Standard_Code_template() {
try {
ServerSocketChannel ssc=ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("localhost",80));
//只有设置为非阻塞才能注册到选择器中
ssc.configureBlocking(false);
//创建一个选择器
Selector selector = Selector.open();
//通道注册进选择器中---监听客户端连接事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true){
//获取以及就绪的通道数量
int select = selector.select();
//没有通道就绪
if(select==0)
{
continue;
}
//获取已经就绪的
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext())
{
SelectionKey selectionKey = iterator.next();
//客户端连接请求事件
if(selectionKey.isAcceptable())
{
//接收连接
}else if(selectionKey.isReadable())
{
//读取数据
}
else if(selectionKey.isWritable())
{
//写数据
}
//移除
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
四、NIO通讯实例
服务器端
public class NIOServer {
//通道管理器
private Selector selector;
/**
* 获取一个ServerSocket通道,并对该通道做一些初始化工作
* @param port 端口号
* @throws IOException
*/
public void initServer(int port) throws IOException {
//获取一个ServerSocket通道
ServerSocketChannel socketChannel = ServerSocketChannel.open();
//设置通道为非阻塞
socketChannel.configureBlocking(false);
//将通道对应的ServerSocket绑定到port端口
socketChannel.socket().bind(new InetSocketAddress(port));
//获取一个通道管理器
this.selector = Selector.open();
/**
* 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件
* 注册该事件后,当该事件到达时,selector.select()会返回
* 如果该事件没有到达,selector.select()会一直阻塞
*/
socketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
public void listen() throws IOException {
while (true){
//当注册的事件到达时,方法返回,否则该方法一直阻塞
selector.select();
//获取selector中选项的迭代器
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
//删除已经选择的key,防止重复处理
iterator.remove();
//客户端连接请求事件
if(key.isAcceptable()){
//接收连接
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
//获取客户端连接的通道
SocketChannel channel = serverSocketChannel.accept();
//设置为非阻塞
channel.configureBlocking(false);
//向客户端发送数据源
ByteBuffer buf = ByteBuffer.allocate(1024);
String message = "你好我是服务器端,我接收到了你的消息";
buf.put(message.getBytes(StandardCharsets.UTF_8));
//把缓冲区切换成读取模式
buf.flip();
//将buffer写入channel
while (buf.hasRemaining()){
channel.write(buf);
}
//和客户端连接成功后,为了接收到客户端的信息,需要给通道设置读取权限
channel.register(this.selector,SelectionKey.OP_READ);
}else if(key.isReadable()){
//读取数据
read(key);
}
}
}
}
public void read(SelectionKey key) throws IOException {
//得到事件发生的socket通道
SocketChannel channel = (SocketChannel) key.channel();
//创建读取的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//将数据读取到缓冲区
channel.read(buffer);
// 4、把缓冲区切换成写出模式
buffer.flip();
String rs = new String(buffer.array(),0,buffer.remaining());
System.out.println(rs);
}
public static void main(String[] args) throws IOException {
NIOServer server = new NIOServer();
server.initServer(8100);
server.listen();
}
}
客户端
public class NIOClient {
//通道管理器
private Selector selector;
public static void main(String[] args) throws IOException {
NIOClient client = new NIOClient();
client.initClick("127.0.0.1",8100);
client.listen();
}
public void initClick(String ip,int port) throws IOException {
//获取一个socket
SocketChannel channel = SocketChannel.open();
//设置通道为非阻塞
channel.configureBlocking(false);
//获取一个通道管理器
this.selector = Selector.open();
channel.connect(new InetSocketAddress(ip,port));
channel.register(this.selector, SelectionKey.OP_CONNECT);
}
public void listen() throws IOException {
while (true){
selector.select();
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if(key.isConnectable()){
SocketChannel channel = (SocketChannel) key.channel();
if(channel.isConnectionPending()){
channel.finishConnect();
}
//设置为阻塞
channel.configureBlocking(false);
//向客户端发送数据源
ByteBuffer buffer = ByteBuffer.allocate(1024);
String message = "服务器端你好,我是客户端";
buffer.put(message.getBytes(StandardCharsets.UTF_8));
//把缓冲区切换成读取模式
buffer.flip();
//将buffer写入channel
while (buffer.hasRemaining()){
channel.write(buffer);
}
channel.register(this.selector,SelectionKey.OP_READ);
}else if(key.isReadable()){
read(key);
}
}
}
}
public void read(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
byte[] data = buffer.array();
// 4、把缓冲区切换成写出模式
buffer.flip();
String rs = new String(data,0,buffer.remaining());
System.out.println(rs);
}
}