Java BIO、NIO(通信/群聊系统、零拷贝)、AIO
BIO、NIO、AIO特点和场景
BIO(Blocking I/O)、NIO(Non-blocking I/O)、AIO(Asynchronous I/O)是Java中用于处理I/O操作的三种不同的I/O模型,它们具有不同的特点和适用场景。
-
BIO(Blocking I/O):
- BIO是传统的I/O模型,它是同步阻塞的,也就是说当一个I/O操作发生时,应用程序会被阻塞,直到操作完成。
- 在BIO中,每个I/O操作通常需要一个单独的线程来处理,这意味着如果有大量并发的I/O操作,就需要创建大量线程,这可能会导致资源消耗和性能问题。
- BIO适用于简单的网络通信场景,但在高并发情况下,性能较差。
-
NIO(Non-blocking I/O):
-
NIO是一种更现代的I/O模型,它引入了非阻塞的概念,允许一个线程处理多个I/O操作。NIO使用了选择器(Selector)来监听多个通道的事件,从而减少了线程的数量。
-
在NIO中,应用程序可以异步地发起I/O操作,然后继续执行其他任务,而不需要等待I/O操作完成。这提高了并发性能,特别适用于需要处理大量并发连接的网络应用程序。
-
NIO适用于构建高性能的网络服务器和客户端,如Netty等框架。
-
-
AIO(Asynchronous I/O):
- AIO是Java 7引入的一种异步I/O模型,它进一步提高了I/O操作的效率。与NIO不同,AIO的异步I/O操作可以在后台完成,应用程序不需要等待。
- AIO适用于那些需要大量I/O操作并且不希望阻塞主线程的应用程序,比如高性能的文件和网络服务器。
- 在AIO中,操作系统负责通知应用程序I/O操作的完成,而不需要应用程序不断地轮询或等待。
总结:
- BIO适用于简单的同步I/O操作,但在高并发场景下性能较差。
- NIO适用于需要处理大量并发连接的网络应用程序,提供了非阻塞I/O和选择器的支持。
- AIO适用于需要高性能异步I/O操作的应用程序,它可以在后台进行I/O操作而不阻塞主线程。
对比表
特点 | BIO (Blocking I/O) | NIO (Non-blocking I/O) | AIO (Asynchronous I/O) |
---|---|---|---|
阻塞 | 阻塞式 I/O | 非阻塞式 I/O | 异步 I/O |
并发处理 | 低并发处理 | 中等并发处理 | 高并发处理 |
编程模型 | 同步编程模型 | 同步和部分异步编程模型 | 异步编程模型 |
缓冲区 | 需要自己手动管理缓冲区 | 使用缓冲区进行数据传输 | 使用缓冲区进行数据传输 |
文件操作支持 | 支持 | 支持 | 支持 |
网络操作支持 | 支持 | 支持 | 支持 |
效率 | 低效率,线程池开销大 | 相对高效,线程开销小 | 高效率,异步处理 |
适用场景 | 低并发、少连接数 | 中等并发、中等连接数 | 高并发、大连接数 |
复杂性 | 相对简单 | 相对复杂 | 较复杂 |
主要类和接口 | InputStream /OutputStream | Selector /Channel | AsynchronousChannel |
典型应用 | 单线程或多线程处理少量连接 | 网络服务器、文件操作等 | 高性能服务器、文件操作等 |
总的来说,BIO适用于连接数较少的场景,NIO适用于中等并发场景,而AIO适用于高并发的异步操作场景,如网络服务器、文件传输等。选择适当的I/O模型取决于应用程序的性能和并发需求。
BIO 案例
telnet客户端连接测试
cmd telnet [ip] [端口]
可以输入字符发送,或者ctrl+],然后使用send命令,send hello
启动Telnet客户端
服务端逻辑
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BIOServer {
public static void main(String[] args) throws Exception {
//线程池机制思路
//1. 创建一个线程池
//2. 如果有客户端连接,就创建一个线程,与之通讯
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
//创建ServerSocket
ServerSocket serverSocket = new ServerSocket(8989);
System.out.println("服务器启动了");
while (true) {
System.out.println("线程信息id = " + Thread.currentThread().getId() + "名字 = " + Thread.currentThread().getName());
//监听,等待客户端连接
System.out.println("等待连接....");
//会阻塞在accept()
final Socket socket = serverSocket.accept();
System.out.println("连接到一个客户端");
//就创建一个线程,与之通讯(单独写一个方法)
newCachedThreadPool.execute(() -> {
//可以和客户端通讯
handler(socket);
});
}
}
/**
* 与客户端通讯
*
* @param socket
*/
public static void handler(Socket socket) {
try {
System.out.println("线程信息id = " + Thread.currentThread().getId() + "名字 = " + Thread.currentThread().getName());
byte[] bytes = new byte[1024];
//通过socket获取输入流
InputStream inputStream = socket.getInputStream();
//循环的读取客户端发送的数据
while (true) {
System.out.println("线程信息id = " + Thread.currentThread().getId() + "名字 = " + Thread.currentThread().getName());
System.out.println("read....");
//没有可读数据时会阻塞
int read = inputStream.read(bytes);
if (read != -1) {
System.out.println(new String(bytes, 0, read));//输出客户端发送的数据
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("关闭和client的连接");
try {
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
总结
- 每个请求都需要创建独立的线程,与对应的客户端进行数据
Read
,业务处理,数据Write
。 - 当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大。
- 连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在
Read
操作上,造成线程资源浪费。
NIO三大核心组件关系
Channel
Channel
是用于进行非阻塞I/O操作的抽象,它代表了一个可以进行读写操作的数据源或数据目的地。Channel
提供了一种通用的、统一的接口,用于与不同类型的I/O设备(如文件、套接字、管道等)进行交互。
以下是关于Channel
的一些主要特点和常见的类型:
-
不同类型的
Channel
:FileChannel
:用于文件的读写操作,支持文件的随机读写。SocketChannel
:用于TCP套接字通信,可以连接到远程服务器或接受客户端连接。ServerSocketChannel
:用于监听传入的TCP连接请求,允许服务器接受客户端连接。DatagramChannel
:用于UDP通信,支持无连接的数据包传输。Pipe.SinkChannel
和Pipe.SourceChannel
:用于在同一虚拟机内的两个线程之间进行通信,支持管道传输。
-
通用操作:
- 所有类型的
Channel
都支持通用的操作,如读取数据到Buffer
、从Buffer
写入数据、关闭通道等。 Channel
的读写操作通常是非阻塞的,因此可以在一个线程中同时处理多个Channel
。
- 所有类型的
-
FileChannel:
FileChannel
是用于文件的I/O操作的主要通道类型。- 可以使用
FileChannel
来读取、写入、映射文件以及文件锁定等。 FileChannel
支持文件的随机访问,可以通过position()
方法设置读写位置。
-
SocketChannel 和 ServerSocketChannel:
SocketChannel
用于客户端套接字通信,可以连接到远程服务器。ServerSocketChannel
用于服务器套接字通信,用于监听传入的连接请求并创建对应的SocketChannel
来处理客户端连接。
-
DatagramChannel:
DatagramChannel
用于UDP通信,支持无连接的数据包传输。- 通过
DatagramChannel
可以发送和接收UDP数据报。
-
Pipe:
Pipe.SinkChannel
和Pipe.SourceChannel
通常用于在同一虚拟机内的两个线程之间进行通信。- 一个线程将数据写入
SinkChannel
,另一个线程从SourceChannel
读取数据。
总之,Channel
是Java NIO中用于进行非阻塞I/O操作的关键组件之一,它提供了统一的接口,可以用于处理不同类型的I/O设备。开发者可以根据应用程序的需求选择合适类型的Channel
,并使用它们来进行数据的读写操作,从而实现高效的I/O通信。
Buffer
Buffer
是用于进行数据读取和写入的重要抽象。它是Java NIO(New I/O)库的一部分,用于处理非阻塞I/O操作。Buffer
提供了一种方便的方式来操作底层数据源(通常是字节数组)的数据,同时可以在不同的I/O通道之间传递数据。
以下是关于NIO中的Buffer
的主要特点和用法:
-
缓冲区类型:Java NIO提供了多种类型的缓冲区,主要有以下几种:
ByteBuffer
:用于处理字节数据。CharBuffer
:用于处理字符数据。ShortBuffer
、IntBuffer
、LongBuffer
:分别用于处理短整数、整数和长整数数据。FloatBuffer
、DoubleBuffer
:用于处理浮点数和双精度浮点数数据。- 这些不同类型的缓冲区都是
Buffer
的子类,提供了不同数据类型的读写方法。
-
容量(Capacity):每个缓冲区都有一个固定的容量,表示它可以存储的数据元素数量。容量在缓冲区创建时被指定,并且不能更改。
-
位置(Position):位置表示下一个读取或写入操作将从缓冲区的哪个位置开始。初始位置通常为0,并且通过读取或写入数据而逐渐增加。位置不能超过缓冲区的容量。
-
限制(Limit):限制是一个不可读写的索引,表示在读取或写入操作时不能超过的位置。初始限制通常等于容量,但可以通过设置限制来限制读取或写入的数据范围。
-
标记(Mark):标记是一个临时的索引,可以通过
mark()
方法设置,并在需要时通过reset()
方法回到标记的位置。标记的主要用途是在读取或写入一些数据后能够返回到之前的位置。 -
读写操作:
Buffer
提供了一系列读取和写入数据的方法,如get()
、put()
、read()
、write()
等。这些方法根据当前位置(Position)和限制(Limit)来确定读写的数据范围。 -
翻转(Flip):通过
flip()
方法,可以将缓冲区从写模式切换到读模式。这个操作会将限制设置为当前位置,并将位置重置为0,准备开始读取数据。 -
清空(Clear):通过
clear()
方法,可以将缓冲区从读模式切换到写模式。这个操作会将限制设置为容量,位置重置为0,准备开始写入数据。 -
压缩(Compact):通过
compact()
方法,可以将未读取的数据复制到缓冲区的开头,然后将位置设置为复制后的数据后面。这个操作用于在写模式下向缓冲区写入数据之前,压缩未读取的数据。 -
重绕(Rewind):通过
rewind()
方法,可以将位置重置为0,保持限制不变,进入写模式。这个操作用于重新读取已经读取过的数据。 -
标记和重置:
mark()
方法用于设置标记位置,reset()
方法用于将位置重置为标记位置。
Buffer
在Java NIO中是一个重要的基础构建块,它允许开发者高效地进行数据的读取和写入操作,特别适用于非阻塞I/O编程。在使用Buffer
时,需要注意正确管理位置、限制、标记和容量,以确保数据的正确读写。
Selector
Selector
是一个多路复用器,用于管理多个Channel
的事件,以便一个线程可以同时处理多个通道上的I/O操作。Selector
是非常重要的,因为它可以帮助实现非阻塞I/O,提高网络和文件I/O的效率。
以下是关于Selector
的一些主要特点和用法:
-
多路复用:
Selector
允许一个线程同时监视多个Channel
上的I/O事件,如读就绪、写就绪、连接就绪、接收就绪等。通过使用Selector
,一个线程可以有效地管理多个I/O通道,而不需要为每个通道创建一个单独的线程。 -
SelectableChannel
:SelectableChannel
是可以在Selector
上注册的Channel
的子类,例如SocketChannel
、ServerSocketChannel
、DatagramChannel
等。通过将Channel
注册到Selector
上,可以监视它们的I/O事件。 -
SelectionKey
:当一个Channel
被注册到Selector
上时,会返回一个SelectionKey
对象,它表示了该Channel
在Selector
上的注册状态和感兴趣的事件。SelectionKey
包括事件类型(如读、写、连接、接收)、关联的Channel
等信息。 -
select()
方法:Selector
提供了select()
方法,用于阻塞等待至少一个通道在感兴趣的事件上就绪。一旦有一个或多个通道就绪,select()
方法会返回,并返回就绪通道的数量。这个方法可以帮助减少CPU的轮询开销。 -
selectedKeys()
方法:Selector
还提供了selectedKeys()
方法,用于获取就绪的SelectionKey
集合,然后可以遍历这些SelectionKey
来处理相应的事件。 -
register()
方法:SelectableChannel
可以通过register()
方法将自己注册到Selector
上,并指定感兴趣的事件类型。通常在创建Channel
后,需要将其注册到一个Selector
上才能开始监视事件。 -
cancel()
方法:通过SelectionKey
的cancel()
方法可以取消注册的通道,停止监视相应的事件。 -
非阻塞操作:使用
Selector
可以实现非阻塞I/O操作,当一个通道就绪时,可以处理它的I/O事件,而不需要等待。这允许一个线程有效地管理多个通道的状态。
Selector
在高性能的网络服务器、文件I/O操作以及需要管理多个通道的应用程序中非常有用。通过结合Selector
、Channel
和Buffer
等NIO组件,可以实现高效的非阻塞I/O编程,提高应用程序的性能和响应性。但需要注意,正确使用Selector
需要处理一些细节,如注册和取消注册通道、处理事件等,以确保程序的正确性和可维护性。
Buffer 缓冲区
这四个字段 mark
、position
、limit
和 capacity
是 Buffer
类中非常重要的属性,它们用于管理缓冲区的状态和限制。以下是对每个字段的详细解释:
-
mark
:mark
是一个标记位置,用于记录缓冲区中的某个特定位置。初始值为-1,表示没有设置标记。你可以通过mark()
方法来设置标记位置,然后通过reset()
方法来恢复到标记位置。这对于在读写模式之间切换时定位到特定位置非常有用。 -
position
:position
表示当前读取或写入的位置。初始值为0,即缓冲区的起始位置。position
在读写操作中会随着操作的进行而递增,标记了下一个要读取或写入的位置。 -
limit
:limit
表示读取或写入操作的限制位置。它限制了可以读取或写入的最大位置。初始时,limit
通常等于缓冲区的容量,表示可以读取或写入整个缓冲区。当使用flip()
方法切换到读模式时,limit
会被设置为当前的position
,这样就限制了读取操作只能在已写入数据的范围内进行。limit
的值可以随时进行修改以改变读写操作的限制。 -
capacity
:capacity
表示缓冲区的容量,即它可以容纳的数据元素的数量。一旦分配,缓冲区的容量通常不会更改。
这些字段共同管理了缓冲区的状态,允许你在读取和写入操作之间进行切换,并确保操作在正确的范围内进行。例如,position
指示了下一个要读取或写入的位置,而 limit
限制了操作的范围,mark
允许你在需要时回到特定位置。
在使用 Buffer
类时,了解和管理这些字段的值对于正确操作缓冲区非常重要。通常,你会使用 flip()
、clear()
、rewind()
、mark()
、reset()
等方法来更改和管理这些字段的值,以满足你的读写需求。
Channel 通道
FileChannel 类
FileChannel
是 Java NIO 中用于文件 IO 操作的一个重要类,它提供了对文件的读取和写入操作。FileChannel
通常与 ByteBuffer
一起使用,以高效地处理文件的读写。
-
创建 FileChannel:
若要创建一个
FileChannel
对象,通常需要通过FileInputStream
或FileOutputStream
来获取。例如,要创建一个用于读取文件的FileChannel
,可以使用以下代码:javaCopy codeFileInputStream fileInputStream = new FileInputStream("example.txt"); FileChannel fileChannel = fileInputStream.getChannel();
同样,要创建一个用于写入文件的
FileChannel
,可以使用FileOutputStream
。 -
FileChannel 的读取和写入:
FileChannel
提供了一系列方法来执行读取和写入操作,其中最常见的包括:int read(ByteBuffer dst)
:从文件中读取数据到给定的ByteBuffer
。int write(ByteBuffer src)
:将ByteBuffer
中的数据写入到文件。long position()
和FileChannel position(long newPosition)
:获取或设置当前的文件位置。long size()
:获取文件的大小。int read(ByteBuffer dst, long position)
和int write(ByteBuffer src, long position)
:在指定位置读取或写入数据。
-
FileChannel 的文件锁定:
FileChannel
还支持文件锁定机制,允许你在多个线程或进程之间控制文件的访问。通过FileChannel
的lock()
和tryLock()
方法,你可以请求文件的共享锁或排它锁。这对于确保同时只有一个进程能够对文件进行写入操作非常有用。 -
FileChannel 的关闭:
当你完成文件操作后,应该及时关闭
FileChannel
以释放资源。使用close()
方法可以关闭FileChannel
。 -
适用场景:
FileChannel
主要用于文件 IO 操作,特别适用于需要高性能和大文件处理的场景。与传统的InputStream
和OutputStream
相比,FileChannel
更为灵活,对于高并发的文件读写操作也更高效。
应用实例
本地文件写入、读取、单缓冲区读写、拷贝案例
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.io.File;
import java.io.FileInputStream;
public class NIOFileChannel {
public static void main(String[] args) throws Exception {
NIOFileChannel nio = new NIOFileChannel();
// nio.writeFile("c:\\file01.txt");
// nio.readFile("c:\\file01.txt");
// nio.readAndWriteFile("C:\\tmp\\nio\\1.txt", "C:\\tmp\\nio\\2.txt");
nio.copyFile("C:\\tmp\\nio\\1.txt", "C:\\tmp\\nio\\22.txt");
}
/**
* 写入文件数据
*
* @param filePath 文件路径
*/
public void writeFile(String filePath) throws Exception {
String str = "hello, world";
//创建一个输出流 -> channel
FileOutputStream fileOutputStream = new FileOutputStream(filePath);
//通过 fileOutputStream 获取对应的 FileChannel
//这个 fileChannel 真实类型是 FileChannelImpl
FileChannel fileChannel = fileOutputStream.getChannel();
//创建一个缓冲区 ByteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//将 str 放入 byteBuffer
byteBuffer.put(str.getBytes());
//对 byteBuffer 进行 flip
byteBuffer.flip();
//将 byteBuffer 数据写入到 fileChannel
fileChannel.write(byteBuffer);
fileOutputStream.close();
}
/**
* 读取文件数据
*
* @param filePath 文件路径
*/
public void readFile(String filePath) throws Exception {
//创建文件的输入流
File file = new File(filePath);
FileInputStream fileInputStream = new FileInputStream(file);
//通过 fileInputStream 获取对应的 FileChannel -> 实际类型 FileChannelImpl
FileChannel fileChannel = fileInputStream.getChannel();
//创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
//将通道的数据读入到 Buffer
fileChannel.read(byteBuffer);
//将 byteBuffer 的字节数据转成 String
System.out.println(new String(byteBuffer.array()));
fileInputStream.close();
}
/**
* 使用一个 Buffer 从通道中读取数据,并将数据写入到另一个通道中
*
* @param filePath1 读取文件路径
* @param filePath2 待写入文件路径
*/
public void readAndWriteFile(String filePath1, String filePath2) throws Exception {
FileInputStream fileInputStream = new FileInputStream(filePath1);
FileChannel fileChannel01 = fileInputStream.getChannel();
FileOutputStream fileOutputStream = new FileOutputStream(filePath2);
FileChannel fileChannel02 = fileOutputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
//循环读取
while (true) {
//清空 buffer
byteBuffer.clear();
int read = fileChannel01.read(byteBuffer);
System.out.println("read = " + read);
//读完
if (read == -1) {
break;
}
//将 buffer 中的数据写入到 fileChannel02--2.txt
byteBuffer.flip();
fileChannel02.write(byteBuffer);
}
//关闭相关的流
fileInputStream.close();
fileOutputStream.close();
}
/**
* 拷贝文件
*
* @param filePath1 源文件路径
* @param filePath2 待拷贝文件路径
*/
public static void copyFile(String filePath1, String filePath2) throws Exception {
//创建相关流
FileInputStream fileInputStream = new FileInputStream(filePath1);
FileOutputStream fileOutputStream = new FileOutputStream(filePath2);
//获取各个流对应的 FileChannel
FileChannel sourceCh = fileInputStream.getChannel();
FileChannel destCh = fileOutputStream.getChannel();
//使用 transferForm 完成拷贝
destCh.transferFrom(sourceCh, 0, sourceCh.size());
//关闭相关通道和流
sourceCh.close();
destCh.close();
fileInputStream.close();
fileOutputStream.close();
}
}
关于 Buffer 和 Channel 的注意事项和细节
ByteBuffer
支持类型化的 put
和 get
,put
放入的是什么数据类型,get
就应该使用相应的数据类型来取出,否则可能有 BufferUnderflowException
异常。
可以将一个普通 Buffer
转成只读 Buffer
NIO
还提供了 MappedByteBuffer
,可以让文件直接在内存(堆外的内存)中进行修改,而如何同步到文件由 NIO
来完成。
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
/**
* MappedByteBuffer 可让文件直接在内存(堆外内存)修改,操作系统不需要拷贝一次
*/
public class MappedByteBufferTest {
public static void main(String[] args) throws IOException {
modifyFile("C:\\tmp\\nio\\1.txt");
}
// 直接在内存中修改文件
public static void modifyFile(String filePath) throws IOException {
RandomAccessFile randomAccessFile = new RandomAccessFile(filePath, "rw");
//获取对应的通道
FileChannel channel = randomAccessFile.getChannel();
/**
* FileChannel.MapMode.READ_WRITE表示映射后的内存可以读取和写入。
* 0表示从文件的起始位置开始映射。
* 5表示映射到内存的大小,即将文件的前5个字节映射到内存中。这意味着我们可以在内存中修改这5个字节的内容。
*/
MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
mappedByteBuffer.put(0, (byte) 'H');
mappedByteBuffer.put(3, (byte) '9');
// 这段代码中,尝试将第6个字节的内容修改为字符 'Y',但是我们之前只映射了5个字节的数据,所以会抛出越界异常
// mappedByteBuffer.put(5, (byte) 'Y');
randomAccessFile.close();
System.out.println("修改成功");
}
}
前面的读写操作案例,都是通过一个 Buffer
完成的,NIO
还支持通过多个 Buffer
(即 Buffer
数组)完成读写操作,即 Scattering
和 Gathering
Selector 选择器
Selector
(选择器)是Java NIO中的一个关键类,用于实现非阻塞I/O操作。它允许一个线程同时管理多个通道(Channel),从而能够高效地处理多个客户端连接和请求。以下是关于Selector
的总结和要点:
基本介绍:
Selector
是Java NIO中的核心组件,用于实现非阻塞I/O。- 它能够监测多个注册的通道上是否有事件发生,并响应这些事件。
- 多个通道可以注册到同一个
Selector
中,实现多路复用,从而提高系统的并发性能。 - 使用
Selector
可以避免为每个连接创建一个线程,减少系统开销和线程切换的开销。
Selector的特点和用途:
Selector
允许单线程管理多个通道,即管理多个连接和请求。- 通过
Selector
,线程可以在某个通道上进行读写操作,如果没有数据可用,则线程可以在其他通道上执行IO操作,从而充分利用非阻塞I/O的特性。 - 非阻塞I/O避免了频繁的I/O阻塞,提高了线程的运行效率。
- 一个I/O线程可以并发处理多个客户端连接和读写操作,解决了传统同步阻塞I/O模型中一连接一线程的性能问题。
Selector的相关方法:
selector.select()
:阻塞方法,等待至少一个通道准备就绪,然后返回就绪通道的数量。selector.select(timeout)
:阻塞方法,等待一段时间(timeout毫秒)或至少一个通道准备就绪,然后返回就绪通道的数量。selector.wakeup()
:唤醒阻塞在select()
方法上的线程,用于强制使select()
方法立即返回。selector.selectNow()
:非阻塞方法,立即返回,不等待任何通道就绪。
总的来说,Selector
是Java NIO中非常重要的组件,用于实现高性能的非阻塞I/O操作。它可以让一个线程管理多个通道,处理多个客户端连接,提高系统的并发性能和可扩展性。Selector
的使用有助于避免多线程之间的上下文切换开销,提高系统的响应速度。
NIO 网络客户端/服务端案例
服务端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.SelectionKey;
import java.util.Iterator;
import java.util.Set;
public class NIOServer {
public static void main(String[] args) throws IOException {
// 创建ServerSocketChannel 用于监听客户端连接的通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(7777));
serverSocketChannel.configureBlocking(false);
// 创建Selector
Selector selector = Selector.open();
// 将ServerSocketChannel注册到Selector,关注OP_ACCEPT事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 监听事件
int selectCount = selector.select();
if (selectCount == 0) {
continue;
}
// 获取触发事件的SelectionKey
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
// 处理连接请求
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 处理读事件
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = socketChannel.read(buffer);
if (bytesRead == -1) {
socketChannel.close();
key.cancel();
} else if (bytesRead > 0) {
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
System.out.println("Received: " + new String(data));
}
}
keyIterator.remove();
}
}
}
}
客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class NIOClient {
public static void main(String[] args) throws IOException, InterruptedException {
for (int i = 0; i < 5; i++) {
int tmpI = i;
new Thread(() -> {
try {
sendMsg("hello" + tmpI);
} catch (IOException e) {
throw new RuntimeException(e);
}
}).start();
}
}
public static void sendMsg(String msg) throws IOException {
// 创建SocketChannel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
// 连接服务器
if (!socketChannel.connect(new InetSocketAddress("127.0.0.1", 7777))) {
while (!socketChannel.finishConnect()) {
System.out.println("连接服务器需要时间,可以做其他工作...");
}
}
// 发送数据给服务器
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
socketChannel.write(buffer);
// 接收服务器的响应
ByteBuffer responseBuffer = ByteBuffer.allocate(1024);
int bytesRead = socketChannel.read(responseBuffer);
if (bytesRead > 0) {
responseBuffer.flip();
byte[] responseData = new byte[responseBuffer.remaining()];
responseBuffer.get(responseData);
System.out.println("Server Response: " + new String(responseData));
}
// 关闭SocketChannel
socketChannel.close();
}
}
NIO 群聊系统
服务端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
public class GroupChatServer {
private Selector selector;
private ServerSocketChannel listenChannel;
private static final int PORT = 6667;
public GroupChatServer() {
try {
// 创建Selector
selector = Selector.open();
// 创建ServerSocketChannel
listenChannel = ServerSocketChannel.open();
// 绑定服务器端口
listenChannel.socket().bind(new InetSocketAddress(PORT));
// 设置为非阻塞模式
listenChannel.configureBlocking(false);
// 注册ServerSocketChannel到Selector,关注连接事件
listenChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
public void listen() {
try {
while (true) {
// 监听事件
int count = selector.select(2000);
if (count > 0) {
// 获取触发事件的SelectionKey集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
// 处理连接请求
SocketChannel clientChannel = listenChannel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector, SelectionKey.OP_READ);
System.out.println(clientChannel.getRemoteAddress() + " 上线");
}
if (key.isReadable()) {
// 处理读事件
readData(key);
}
iterator.remove();
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
// 处理异常后的操作
}
}
private void readData(SelectionKey key) {
SocketChannel channel = null;
try {
channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = channel.read(buffer);
if (count > 0) {
String message = new String(buffer.array());
System.out.println("收到来自 " + channel.getRemoteAddress() + " 的消息: " + message);
sendToOtherClients(message, channel);
}
} catch (IOException e) {
try {
System.out.println(channel.getRemoteAddress() + " 离线");
key.cancel();
channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
private void sendToOtherClients(String message, SocketChannel selfChannel) throws IOException {
System.out.println("服务器转发消息中...");
for (SelectionKey key : selector.keys()) {
Channel targetChannel = key.channel();
if (targetChannel instanceof SocketChannel && targetChannel != selfChannel) {
SocketChannel dest = (SocketChannel) targetChannel;
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
dest.write(buffer);
}
}
}
public static void main(String[] args) {
GroupChatServer server = new GroupChatServer();
server.listen();
}
}
客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
public class GroupChatClient {
private final String HOST = "127.0.0.1";
private final int PORT = 6667;
private Selector selector;
private SocketChannel socketChannel;
private String username;
public GroupChatClient() {
try {
// 创建Selector
selector = Selector.open();
// 创建SocketChannel并连接服务器
socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
// 设置为非阻塞模式
socketChannel.configureBlocking(false);
// 注册SocketChannel到Selector,关注读事件
socketChannel.register(selector, SelectionKey.OP_READ);
// 获取本地Socket地址作为用户名
username = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(username + " is online.");
} catch (IOException e) {
e.printStackTrace();
}
}
public void sendInfo(String info) {
info = username + " 说: " + info;
try {
// 发送消息给服务器
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}
public void readInfo() {
try {
int count = selector.select();
if (count > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = channel.read(buffer);
if (len > 0) {
System.out.println(new String(buffer.array(), 0, len));
}
}
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
GroupChatClient chatClient = new GroupChatClient();
// 启动一个线程来接收服务器和其他客户端的消息
new Thread(() -> {
while (true) {
chatClient.readInfo();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 主线程用于发送消息到服务器
while (true) {
// 从控制台读取用户输入
java.util.Scanner scanner = new java.util.Scanner(System.in);
String message = scanner.nextLine();
chatClient.sendInfo(message);
}
}
}
NIO 结合零拷贝
NIO(New I/O)和零拷贝(Zero-Copy)是两个与I/O操作相关的概念,它们可以在高性能的数据传输和处理中发挥重要作用。
零拷贝(Zero-Copy):
零拷贝是一种技术,旨在在数据传输和处理过程中尽量减少或消除数据的拷贝操作。它通过直接将数据从一个位置传输到另一个位置,而不需要在中间进行复制。零拷贝通常与操作系统和硬件的支持相关,以提供最佳性能。
主要特点和优点:
- 减少数据拷贝:零拷贝技术避免了数据在内存之间的复制,从而减少了CPU和内存带宽的开销。
- 提高性能:通过减少数据拷贝和内存复制,零拷贝可以提高数据传输和处理的性能。
- 节省资源:减少了不必要的CPU和内存资源消耗,特别适用于大规模的数据传输和处理任务。
零拷贝通常与操作系统的底层API和硬件的支持密切相关,例如使用mmap
映射文件、sendfile
系统调用等。
在实际应用中,NIO和零拷贝可以结合使用,以提高网络应用程序的性能。例如,你可以使用NIO来管理多个网络连接,并且使用零拷贝来传输大文件或数据块,以避免不必要的数据复制和传输开销。这两个概念在高性能和高吞吐量的应用程序中都非常有用。
案例
服务端
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class NIOServer {
public static void main(String[] args) throws Exception {
InetSocketAddress address = new InetSocketAddress(7001);
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(address);
// 创建buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
int readcount = 0;
while (-1 != readcount) {
try {
readcount = socketChannel.read(byteBuffer);
} catch (Exception ex) {
// ex.printStackTrace();
break;
}
//
byteBuffer.rewind(); // 倒带 position = 0 mark 作废
}
if (readcount > 0) {
System.out.println("收到数据长度:" + readcount);
}
}
}
}
客户端
package com.zxbd.project.test;
import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
public class NIOClient {
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 7001));
String filename = "C:\\Users\\22720\\Downloads\\历史数据.xls";
// 得到一个文件channel
FileChannel fileChannel = new FileInputStream(filename).getChannel();
// 准备发送
long startTime = System.currentTimeMillis();
// 在Linux下一个transferTo方法就可以完成传输
// 在Windows下一次调用transferTo只能发送8m,就需要分段传输文件
// transferTo底层使用到零拷贝
long transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel);
System.out.println("发送的总的字节数 = " + transferCount + " 耗时: " + (System.currentTimeMillis() - startTime));
// 关闭
fileChannel.close();
}
}
AIO
Java AIO(Asynchronous I/O,异步 I/O)是一种在 Java 中进行非阻塞 I/O 操作的方式,用于处理大量的并发连接或文件操作。与传统的同步 I/O 不同,它允许应用程序在等待 I/O 操作完成时执行其他任务,而不会阻塞整个线程或进程。
以下是 Java AIO 的一些基本介绍:
-
异步操作:Java AIO 提供了异步操作的支持,允许应用程序发起 I/O 请求后继续执行其他任务,而不必等待 I/O 操作完成。当 I/O 操作完成时,系统会通知应用程序。
-
NIO(New I/O)基础:Java AIO 构建在 Java NIO 的基础之上。它使用了通道(Channel)和缓冲区(Buffer)的概念,允许应用程序将数据从通道读取到缓冲区,或将数据从缓冲区写入通道。
-
异步事件驱动:Java AIO 采用了事件驱动的方式来处理异步 I/O 操作。应用程序需要注册事件监听器,以便在 I/O 操作完成时接收通知。这种方式可以有效地管理大量的并发连接。
-
高性能:由于异步操作的特性,Java AIO 可以实现高性能的 I/O 处理,特别适用于处理大量并发连接或需要高吞吐量的应用场景,如网络服务器、文件传输等。
-
复杂性:相对于传统的同步 I/O,Java AIO 的编程模型可能更复杂一些,因为它涉及到事件监听、回调函数等概念。但一旦习惯了这种模型,它可以提供更好的性能和可伸缩性。
-
主要类和接口:Java AIO 的主要类和接口包括
AsynchronousSocketChannel
、AsynchronousServerSocketChannel
、AsynchronousFileChannel
等,以及与异步事件处理相关的回调接口。
示例代码如下所示:
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
public class AIOExample {
public static void main(String[] args) throws Exception {
// 打开异步文件通道
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(
Paths.get("C:\\tmp\\nio\\1.txt"),
StandardOpenOption.READ
);
// 创建读取操作的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 发起异步读取操作
fileChannel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("读取完成,读取字节数:" + result);
// 处理读取的数据
attachment.flip();
while (attachment.hasRemaining()) {
System.out.print((char) attachment.get());
}
System.out.println();
// 关闭文件通道
try {
fileChannel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.err.println("读取失败:" + exc.getMessage());
// 关闭文件通道
try {
fileChannel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
});
System.out.println("主线程111");
}
}
上面的示例演示了如何使用 Java AIO 进行异步文件读取操作。当读取操作完成时,completed
方法将被调用,而当操作失败时,failed
方法将被调用。这种异步编程模型可以用于处理各种类型的异步 I/O 操作。