一、概念
NIO, 即new io,也叫非阻塞io
二、NIO三个核心组件:
- Buffer数据缓冲区
- Channel通道
- Selector选择器
1、Buffer缓冲区
缓冲区本质上是一个可以存放数据的内存块(类似数组),可以在这里进行数据写入和读取。此内存块包含在NIO Buffer对象中,并且有一些列API,可以对内存块中的数据进行操作,相比较直接对数组的操作,Buffer API更加容易操作和管理。
1) Buffer进行数据写入与读取的步骤
- 将数据写入缓冲区
- 切换为读取模式,调用API: buffer.flip(),此时position位置会变为0,即从头开始读
- 读取缓冲区数据
- 调用API: buffer.cler()清除整个缓冲区数据或者buffer.compact()清除已读的数据
2) Buffer工作原理
三个重要属性
- capacity容量: 即内存块的大小
- position位置: 内存块当前数据操作的位置,即读数据的位置或者写数据的位置
- limit 限制: 写入模式,等于capacity。 读取模式,limit等于写入的数据量
3) ByteBuffer内存类型
ByteBuffer为性能关键型代码提供了"直接内存(堆外内存,off-heap memory)" 和 "非直接内存(堆内内存,on-heap memory)"两种实现
// 堆内内存 ByteBuffer buffer = ByteBuffer.allocate(4);
// 堆外内存 ByteBuffer buffer = ByteBuffer.allocateDirect(4);
堆外内存好处:
1、进行网络IO或者文件IO时,堆外内存会比堆内内存少一次拷贝。(file/socket --- OS memory --- jvm heap), GC会移动对象内存,在写file或socket的过程中,JVM的实现中,会先把数据复制到堆外,再进行写入。
2、GC范围之外,降低了GC压力,但实现了自动管理。 DirectByteBuffer中有一个Cleaner对象(PhantomReference),Cleaner被GC前会执行clean方法,触发DirectByteBuffer中定义的Deallocator
建议:
1、性能确实可观的时候才去使用,分配给大型、长寿命(网络传输、文件读写场景)
2、通过虚拟机参数MaxDirectMemortSieze限制堆外内存大小,防止耗尽整个机器的内存。
什么是堆外内存?
正常的java非空对象,都是在jvm中的,而且受垃圾收集器的管理,即堆内内存(on-heap memory) 在某种特定的场景下,例如在文件读取写入的时候,如果一份数据写在堆内存的某个位置,假如此时在位置A,而此时jvm垃圾回收器进行了一次垃圾回收,此时的数据在内存中的位置可能会发生变化,变成了位置B,操作系统在读取这个数据的时候,通过A去查找,会导致读取到错误的数据,这种场景的解决方案: 堆外内存(off-heap memory),jvm在会先将写入的数据,复制一份到堆外,操作系统直接从堆外读取。
示例:
// 初始化buffer,此时每个元素会被初始化为0
ByteBuffer buffer = ByteBuffer.allocate(4);
System.out.println(String.format("初始化: capacity容量: %s, limit限制: %s, position位置: %s",
buffer.capacity(), buffer.limit(), buffer.position()));
>> 输出: 初始化: capacity容量: 4, limit限制: 4, position位置: 0
// 写入3个字节数据
buffer.put((byte) 1);
buffer.put((byte) 2);
buffer.put((byte) 2);
System.out.println(String.format("初始化: capacity容量: %s, limit限制: %s, position位置: %s",
buffer.capacity(), buffer.limit(), buffer.position()));
>> 输出: 初始化: capacity容量: 4, limit限制: 4, position位置: 3
//读数据, 不用buffer.flip()切换,position为4,应该是读到第4个字节的数据,但第4个字节没有,则默认为0,即buffer中有了4个数据, 所以读取模式下,limit也为4
byte b1 = buffer.get();
System.out.println(String.format("写入两个字节数据后,capacity容量: %s,limit限制:%s, position位置:%s",
buffer.capacity(), buffer.limit(), buffer.position()));
System.out.println(b1);
// >> 输出 capacity容量: 4,limit限制:4, position位置:4
// >> 0
// 调用buffer.flip(), 将position变为0
buffer.flip();
System.out.println(String.format("capacity容量: %s,limit限制:%s, position位置:%s",
buffer.capacity(), buffer.limit(), buffer.position()));
// >> 输出 capacity容量: 4,limit限制:4, position位置:0
// 从position=1开始读取数据
byte b2 = buffer.get();
System.out.println(String.format("capacity容量: %s,limit限制:%s, position位置:%s",
buffer.capacity(), buffer.limit(), buffer.position()));
System.out.println(b2);
// >> 输出 capacity容量: 4,limit限制:4, position位置:1
// >> 1
// 清除已经读过的数据, 转为写入模式
buffer.compact();
System.out.println(String.format("capacity容量: %s,limit限制:%s, position位置:%s",
buffer.capacity(), buffer.limit(), buffer.position()));
// >> 输出 capacity容量: 4,limit限制:4, position位置:3
//清除所有数据
buffer.clear();
System.out.println(String.format("capacity容量: %s,limit限制:%s, position位置:%s",
buffer.capacity(), buffer.limit(), buffer.position()));
// >> 输出 capacity容量: 4,limit限制:4, position位置:0
buffer.compact()
方法: 将buffer的position设置为capicity - buffer中剩余未读取的数据索引, 即: position = capicity - compact之前的position
buffer.put((byte)1);
buffer.put((byte)2);
System.out.println();
System.out.format("capacity: %s, limit: %s, position: %s", buffer.capacity(), buffer.limit(), buffer.position());
// capacity: 4, limit: 4, position: 2
buffer.compact();
System.out.println();
System.out.format("capacity: %s, limit: %s, position: %s", buffer.capacity(), buffer.limit(), buffer.position());
// capacity: 4, limit: 4, position: 2
byte ss = buffer.get();
System.out.println();
System.out.println(ss);
System.out.format("capacity: %s, limit: %s, position: %s", buffer.capacity(), buffer.limit(), buffer.position());
// ss = 0
// capacity: 4, limit: 4, position: 3
2、Channel通道
buffer缓冲区数据传输的渠道,类似BIO中的sokcet+io流
1)客户端通道: SocketChannel
NIO的客户端通道,SocketChannel用于建立TCP网络连接,类似jav.net.Socket
示例:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class NIOClient {
public static void main(String[] args) throws IOException {
SocketChannel client = SocketChannel.open();
// 设置SocketChannel为非阻塞模式,默认的socket相关都是阻塞的
client.configureBlocking(false);
client.connect(new InetSocketAddress("127.0.0.1", 8888));
// 没连接上则一直等待
while (!client.finishConnect()){
Thread.yield();
}
Scanner scanner = new Scanner(System.in);
System.out.println("连接服务端成功,请输入需要发送的内容:" );
// 发送内容
String msg = scanner.nextLine();
// 将数据包装为buffer
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
// 是否还有数据剩余,有就继续写
while (buffer.hasRemaining()) {
client.write(buffer);
}
// 读取服务端响应数据
ByteBuffer responseBuffer = ByteBuffer.allocate(1024);
while (client.isOpen() && client.read(responseBuffer) != -1) {
// 长连接情况下,需要手动判断数据有没有读取结束
// (此处做一个简单的判断: 超过0字节就认为请求结束了)
// position > 0, 代表该请求有数据在写入,否则一直阻塞等待
if (responseBuffer.position() > 0)
break;
}
responseBuffer.flip();
byte[] content = new byte[responseBuffer.limit()];
ByteBuffer buffer1 = responseBuffer.get(content);
System.out.println("收到服务端响应: " + new String(content));
scanner.close();
client.close();
}
}
2)服务端通道: ServerSocketChannel
NIO的服务端通道,类似java.net.ServerSocket
示例:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class NIOServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8888));
System.out.println("服务端启动成功!等待新连接...");
while (true) {
// 在nio中,accept方法是非阻塞的,如果没有接受到请求,会返回null
SocketChannel accept = serverSocketChannel.accept();
// 有新的连接,读取数据
if (accept != null) {
System.out.println("收到新连接:" + accept.getRemoteAddress());
accept.configureBlocking(false);
// 读取数据
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (accept.isOpen() && accept.read(requestBuffer) != -1) {
// 长连接情况下,需要手动判断数据有没有读取结束
// (此处做一个简单的判断: 超过0字节就认为请求结束了)
// position > 0, 代表该请求有数据在写入,否则一直阻塞等待
if (requestBuffer.position() > 0)
break;
}
// 没数据了,不再继续后面的操作
if (requestBuffer.position() == 0)
continue;
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println("收到数据来自:" + accept.getRemoteAddress());
System.out.println(new String(content));
// 响应结果
String response = "HTTP/1.1 200 OK \r\n" +
"Content-Length: 11 \r\n\r\n" +
"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
accept.write(buffer);
}
}
}
}
}
3、Selector选择器
Selector是一个Java NIO组件,可以监听一个或多个NIO通道,并根据绑定的事件,确定哪些通道已准备好进行相应的操作,如某个通道注册了读取事件,Selector就可以确定该通道什么时候可以进行读取。
实现了单个线程可以管理多个通道,从而管理多个网络连接,节约内存开销。
一个线程使用Selector监听多个channel的不同事件,4个事件分别对应SelectionKey的4个常量:
- Connect连接事件(SelectionKey.OP_CONNECT)
- Accept准备就绪事件(SelectionKey.OP_ACCEPT)
- Read读取事件(SelectionKey.OP_READ)
- Write写入事件(SelectionKey.OP_WRITE)
实现一个线程处理多个通道的核心概念理解: 事件驱动机制 非阻塞的网络通道下,开发者通过Selector注册对于通道感兴趣的事件类型,线程通过监听时间来处理相应的代码执行(拓展:更底层是操作系统的多路复用机制)
示例:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
public class NIOServer2 {
private static List<SocketChannel> channelList = new ArrayList<>();
public static void main(String[] args) throws IOException {
// 1、创建网络服务端ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 2、构建Selector一个选择器,将channel注册上去
Selector selector = Selector.open();
// 将serverSocketChannel注册到selector, 注册感兴趣事件:ACCEPT事件
SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel);
// 3、绑定端口
serverSocketChannel.bind(new InetSocketAddress(8888));
System.out.println("服务端启动成功!等待新连接...");
while (true) {
// 不再轮询通道,改用下面轮询事件的方式.select方法有阻塞效果,直到有事件通知才会有返回
selector.select();
// 获取选择器事件集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 遍历
Iterator<SelectionKey> iter = selectionKeys.iterator();
while (iter.hasNext()) {
SelectionKey next = iter.next();
iter.remove();
// 关注READ和ACCEPT事件
if (next.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) next.attachment();
// 将客户端注册到Selector上,并关注READ事件
SocketChannel clientChannel = server.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector, SelectionKey.OP_READ, clientChannel);
System.out.println("收到新连接: " + clientChannel.getRemoteAddress());
}
if (next.isReadable()) {
SocketChannel socketChannel = (SocketChannel) next.attachment();
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (socketChannel.isOpen() && socketChannel.read(buffer) != -1) {
if (buffer.position() > 0)
break;
}
if (buffer.position() == 0)
continue;
buffer.flip();
byte[] content = new byte[buffer.limit()];
buffer.get(content);
System.out.println("收到消息,来自:"+ socketChannel.getRemoteAddress());
System.out.println(new String(content));
String res = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer byteBuffer = ByteBuffer.wrap(res.getBytes());
while (byteBuffer.hasRemaining()) {
socketChannel.write(byteBuffer);
}
// 取消时间订阅
next.cancel();
}
}
}
}
}