NIO基础
三大组件
Channel and Buffer
常用的只有ByteBuffer
Selector(选择器)
结合服务器的设计演化来理解Selector
多线程版设计
最早在nio设计出现前服务端程序的设计是多线程版设计
,即一个客户端对应一个socket连接,一个连接用一个线程处理,每个线程专管一个连接
由此造成的缺点有
- 内存占用高, 如果一个线程默认1m大小,1000个链接就是1g
- CPU在线程之间的上下文切换成本高
- 只适合连接数少的场景
线程池版设计
socketAPI工作在阻塞模式下,同一时刻内一个线程只能处理一个客户端的socket连接,切必须等待线程处理完成当前socket连接,且必须等断开旧的连接后才能处理新的socket连接,即使旧的连接没有任何读写请求,线程没有得到充分的利用
早期的Tomcat就采用的线程池版设计阻塞式io,比较适合http请求
Selector设计
Channel代表服务器和客户端连接,数据读写的通道,
将客户端连接服务器的各种事件操作通过Channel细致化,Selector是负责监听Channel请求的工具,将监听到的请求交给线程,如果Channel的流量太高,其他Channel会被搁置,所以只适合流量低连接数多的场景
ByteBuffer
基本使用
内存有限,缓存区不能跟文件一样大小增大,所以分多次读取
@Slf4j
public class AppTest {
public static void main(String[] args) {
// 获取fileChannel的方法 1.输入输出流 2.RandomAccessFile
try (FileChannel channel = new FileInputStream("data.txt").getChannel()) {
// 准备缓冲区 获取一块内存,大小由allocate决定
ByteBuffer buffer = ByteBuffer.allocate(10);
while (true){
//从Channel读取数据向buffer写入
int len = channel.read(buffer);
log.debug("读取到的字节数{}", len);
if (len == -1){ //没有内容了
break;
}
//打印buffer的内容
buffer.flip();// 切换至读模式
while (buffer.hasRemaining()) {// 是否还有未读数据
byte b = buffer.get();
log.debug("实际字节{}",(char) b);
}
buffer.clear();// 切换为写模式
}
} catch (IOException e) {
}
}
}
ByteBuffer结构
ByteBuffer有以下重要属性
- capacity(容量)
- position(读写指针)
- limit(读写限制)
常见方法
分配空间
获取数据
byte b = buffer.get();
byte b = buffer.get(1);// 不会改变position值
字符串转ByteBuffer方法
后两种会直接切换到读模式
Scattering Reads 分散读集中写
黏包、半包
public class TestByteBufferExam {
public static void main(String[] args) {
/*
网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔
但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为
Hello,world\n
I'm zhangsan\n
How are you?\n
变成了下面的两个 byteBuffer (黏包,半包)
Hello,world\nI'm zhangsan\nHo
w are you?\n
现在要求你编写程序,将错乱的数据恢复成原始的按 \n 分隔的数据
*/
ByteBuffer source = ByteBuffer.allocate(32);
source.put("Hello,world\nI'm zhangsan\nHo".getBytes());
split2(source);
source.put("w are you?\n".getBytes());//追加值
split2(source);
}
private static void split(ByteBuffer source) {
source.flip();
for (int i = 0; i < source.limit(); i++) {
// 找到一条完整消息
if (source.get(i) == '\n') {//不改变position值
int length = i + 1 - source.position();
// 把这条完整消息存入新的 ByteBuffer
ByteBuffer target = ByteBuffer.allocate(length);
// 从 source 读,向 target 写
for (int j = 0; j < length; j++) {
target.put(source.get());//改变了position值
}
debugAll(target);
}
}
source.compact();//未读完部分向前压缩并切换为写模式
}
}
文件编程
重点是网络编程所以文件编程了解就行
FileChannel
传输数据
Path
Files
FilesWalkFileTree
更便捷的遍历文件的API
private static void m1() throws IOException {
AtomicInteger dirCount = new AtomicInteger();
AtomicInteger fileCount = new AtomicInteger();
Files.walkFileTree(Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91"), new SimpleFileVisitor<Path>(){
//进入目录
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
System.out.println("进入目录====>"+dir);
dirCount.incrementAndGet();
return super.preVisitDirectory(dir, attrs);
}
//操作文件
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return super.visitFile(file, attrs);
}
//退出目录
@Override
public FileVisitResult postVisitDirectory(Path file, BasicFileAttributes attrs) throws IOException {
System.out.println(file);
fileCount.incrementAndGet();
return super.visitFile(file, attrs);
}
});
System.out.println("dir count:" +dirCount);
System.out.println("file count:" +fileCount);
}
网络编程
阻塞模式
阻塞模式下,服务器每次只能执行一个连接,单线程模式运行,必须等待客户端连接和客户端信息,否则阻塞,线程停止运行等待客户端,资源效率利用低
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
//使用nio来理解阻塞模式,单线程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建的了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept建立与客户端连接,socketChannel用来与客户端之间通信
log.debug("connecting...");
SocketChannel sc = ssc.accept(); // 如果客户端没启动,将会阻塞在这里等待客户端启动连接的请求
log.debug("connected... {}", sc);
channels.add(sc);
for (SocketChannel channel : channels) {
// 5. 接受客户端发生的数据
log.debug("before read... {}",channel);
channel.read(buffer);//如果客户端没有数据返回,将会阻塞等待客户端返回数据
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read... {}",channel);
}
}
}
}
-------
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",8080));
System.out.println("waiting...");
}
}
非阻塞模式
非阻塞模式下,不管客户端是否有链接或返回数据,线程都不会停止,但会一直轮训尝试获取连接和数据,资源浪费
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
//使用nio来理解阻塞模式,单线程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建的了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);//非阻塞模式
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
//非阻塞模式下 循环一直进行,轮训等待连接,还是有资源浪费
while (true) {
SocketChannel sc = ssc.accept(); // 非阻塞模式,如果没有客户端连接,sc返回null 线程继续运行
if (sc != null){
// 4. accept建立与客户端连接,socketChannel用来与客户端之间通信
log.debug("connected... {}", sc);
channels.add(sc);
}
for (SocketChannel channel : channels) {
// 5. 接受客户端发生的数据
log.debug("before read... {}",channel);
int read = channel.read(buffer);// 非阻塞,线程继续运行,如果没有返回数据read返回0
if (read > 0){
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read... {}",channel);
}
}
}
}
}
Selector
Selector底层有两个keys的集合,keys表示注册在selector中的Channel集合,selectedKeys表示上述Channel参数的事件集合,selectedKeys集合中的事件必须及时消费处理,并及时移除,否则selector会认为该事件未处理造成重复处理死循环
accept
serverSocket独有事件,客户端发起连接请求就会触发此事件connect
是客户端连接建立后触发的事件read
数据可读事件write
可写事件
第三步,把Channel注册在selector
中时,在内部注册SelectionKey的set集合,它存储了各个事件key,当事件被处理时应当从SelectionKey集合中移除它,否则有key而没有待处理的事件会报错空指针,所以使用Iterator
迭代器循环,能移除循环到的key
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
// 1. 创建 Selector,管理多个 Channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 2. 建立Selector和Channel的联系(注册)
// selectionKey 绑定一个事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// 这个key绑定accept事件,只关注accept事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key:{}", sscKey);
ssc.bind(new InetSocketAddress(8080));
while (true) {
// 3. select方法,没有事件发生则线程阻塞,有事件发生则线程恢复运行
// select 在事件未处理时不会阻塞,事件发生后要么取消要么处理,不能置之不理
selector.select();
// 4. 处理事件,selectKeys 内部包含了所有发生的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//处理key时,要从selectKeys集合中删除,否则有key而没有事件则会报错空指针
iterator.remove();
log.debug("key: {}", key);
// 5. 区分事件类型
if (key.isAcceptable()) {//如果是链接
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
//若果不处理链接,则第三步 selector 会认为事件未处理,未完成连接的消费,不会阻塞
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
log.debug("scKey:{}", scKey);
} else if (key.isReadable()) {//如果是 read
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
channel.read(buffer);
buffer.flip();
debugRead(buffer);
}
//key.cancel(); 取消事件
}
}
}
}
处理客户端断开
如果客户端异常断开,在catch块中删除key,因为无论是否有读数据,都会产生read返回值,正常断开的返回值是-1
消息边界问题
当Buffer小于消息长度时,消息接受不完整会造成消息乱码或失效
可能会出现的情况,10字节大小的Buffer在接受13字节大小的消息时,只打印了后3个字节的的消息,因为当Buffer一次读没有接受完,服务器会自动触发两次读事件,将没有读完的内容再次读取,第一次读到Buffer就被覆盖了。
Buffer应该重新设计成,可自动扩容和非局部变量
附件与扩容
在向selector中注册keys时,可以为此key添加att参数,即附件参数,附件参数在双方交互时都可以获取修改
处理内容过多问题
如果发生数据大于Buffer 缓冲区,会触发多次读事件,这时如果有其他的客户端事件是处理不了的,相当于被一个客户端连接阻塞了,一直在处理那个客户端的内容发送,因为发送缓冲区被占满暂时无法发送,这时要充分利用可以选择去读
服务端的写操作会尝试写入尽可能多的字节,但是写入缓冲区满了是无法写的,这时候write返回0,我们优化让他关注可写事件,不要做无谓的尝试,可写的时候再写入。
最终改造
服务器在向客户端第一次写操作没写完时,给当前只关注写操作SelectionKey事件追加加上 读操作事件的值,这样大数据量一次没写完,服务器自动触发的写操作可以被捕获,能写操作再写操作,不会因为缓冲区满产生不必要的写操作造成阻塞
public class WriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8088));
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
// 1. 向客户端发送大量数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 5000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 2. 返回代表实际写入的字节数
int write = sc.write(buffer);
System.out.println(write);
// 3. 判断是否有剩余字节未写完
if (buffer.hasRemaining()) {
// 4. 关键改造,让客户端关注可写事件,能写入时再写入 在原有关注数值上加上对应可写事件的数值常量
scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
// 5. 把未写完的数据挂到scKey的附件上
scKey.attach(buffer);
}
} else if (key.isWritable()) {//关注写事件
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println(write);
// 6. 清理操作
if (!buffer.hasRemaining()) {
// 清理Buffer 将附件设为空
key.attach(null);
// 写完毕 不再需要关注写事件
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
}
}
}
}
}
}
总结
多线程优化
boss线程的selector只处理连接事件,worker线程的selector则专门处理读写事件
线程队列优化
boss线程内调用 worker 的 register方法,其还是由boss线程调用,只有run方法中的方法才回另开线程,实现了 boss线程的selector只处理连接事件,worker线程的selector则专门处理读写事件。
其中利用线程队列的方法延迟select的注册,因为队列中的sc注册只消费一次,所以完成注册后select只监听read事件
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);// 非阻塞模式
Selector bossSelect = Selector.open();
SelectionKey bossKey = ssc.register(bossSelect, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8008));
// 1. 创建固定数量的worker
Worker worker = new Worker("worker-0");
while (true) {
log.debug("1 bossSelect before ...,{}", bossSelect);
bossSelect.select(); // select 方法 没有事件则阻塞
log.debug("2 bossSelect after ...,{}", bossSelect);
Iterator<SelectionKey> iterator = bossSelect.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("3 connected ...,{}", sc.getRemoteAddress());
// 2. 关联selector
log.debug("4 before ...,{}", sc.getRemoteAddress());
worker.register(sc);// 初始 selector 启动 worker 线程
log.debug("5 after ...,{}", sc.getRemoteAddress());
}
}
}
}
static class Worker implements Runnable {
private Thread thread;
private Selector workerSelect;
private String name;
private volatile boolean start = false;//还未初始化
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
public Worker(String name) {
this.name = name;
}
public void register(SocketChannel sc) throws IOException {
if (!start) {
thread = new Thread(this, name);
thread.start();
workerSelect = Selector.open();
start = true;
}
// 向队列添加任务,但这个任务没有立刻执行 boss
queue.add(()->{
log.debug("6 sc register ...,{}", workerSelect);
try {
sc.register(workerSelect, SelectionKey.OP_READ, null);// 在select阻塞时会注册失败
} catch (ClosedChannelException e) {
throw new RuntimeException(e);
}
});
workerSelect.wakeup();// wakeup方法是一次性方法,无论在select阻塞前后执行 都会唤醒一次select让select不被阻塞
}
@Override
public void run() {
while (true) {
try {
log.debug("7 workerSelect before ...,{}", workerSelect);
workerSelect.select();
Runnable task = queue.poll();
if (task != null){
log.debug("8 queue run ...,{}", workerSelect);
task.run();
}
log.debug("9 workerSelect after...,{}", workerSelect);
Iterator<SelectionKey> iterator = workerSelect.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
log.debug("10 read ...,{}", channel.getRemoteAddress());
channel.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
还可以去掉队列的操作,只调用select的唤醒
因为wakeup方法是一次性方法,无论在select阻塞前后执行 都会唤醒一次select让select不被阻塞
而客户端连接事件只在boss线程执行一次所以调用了一次register方法,而start方法在唤醒前执行了,此时可能阻塞了,而wakeup方法的特殊性,能保证select完成注册,所以只需一次唤醒一次select完成注册,即可保证客户端被正常注册,消息被正常接受而不会被先运行的workerSelect.select();
方法阻塞
多worker改造
worker开几个线程比较合适,若要充分发挥服务器的多核优势,可以设置至少物理核的数量
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
NIO vs BIO
stream vs channel
IO模型
同步阻塞、同步非阻塞、同步多路复用、异步阻塞(不存在)、异步非阻塞
零拷贝
切换了三次,数据复制了4次
直接内存第一次优化
DMA是一个独立的硬件。
零拷贝指不在Java JVM内存中进行拷贝动作
AIO
Async异步IO
由主线程处产生的异步线程叫守护线程,如果主线程结束了,那么守护线程也会结束,哪怕守护线程还在工作中
所以我们使主线程阻塞,等待异步返回结果
@Slf4j
public class AioFileChannel {
public static void main(String[] args) throws IOException {
try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ)) {
// 参数1 ByteBuffer
// 参数2 读取的起始位置
// 参数3 附件
// 参数4 回调对象 CompletionHandler
ByteBuffer buffer = ByteBuffer.allocate(16);
log.debug("read begin...");
channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override // read 成功
public void completed(Integer result, ByteBuffer attachment) {
log.debug("read completed...{}", result);
attachment.flip();
debugAll(attachment);
}
@Override // read 失败
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
log.debug("read end...");
} catch (IOException e) {
e.printStackTrace();
}
System.in.read();//使主线程阻塞不结束
}
}