一、Selector
1.1 Selector简介
1.1.1 Selector 和 Channel的关系
Selector 一般称为选择器 ,也可以翻译为 多路复用器 。
它是 Java NIO 核心组件中的一个,用于检查一个或多个 NIO Channel(通道)的状态是否处于可读、可写。由此可以实现单线程管理多个 channels,也就是可以管理多个网络链接。
使用 Selector 的好处: 使用更少的线程来就可以来处理通道了, 相比使用多个线程,避免了线程上下文切换带来的开销。
1.1.2 可选择通道
-
不是所有的 Channel 都可以被 Selector 复用的。
-
判断 Channel 能被 Selector 复用的前提:是否继承了抽象类 SelectableChannel
-
如果继承了 SelectableChannel,则可以被复用,否则不能
-
-
SelectableChannel 类提供了实现通道的可选择性所需要的公共方法。它是所有支持就绪检查的通道类的父类
-
所有 socket 通道,都继承了 SelectableChannel 类,都是可选择的,包括从管道(Pipe)对象的中获得的通道
-
FileChannel 类,没有继承 SelectableChannel,因此是不可选择通道,不能被选择器复用
-
-
一个通道可以被注册到多个选择器上,但对每个选择器而言只能被注册一次
- 通道和选择器之间的关系,使用注册的方式完成
- SelectableChannel 可以被注册到 Selector 对象上,在注册的时候,需要指定通道的哪些操作是 Selector 感兴趣的
1.1.3 Channel 注册到 Selector
-
使用 Channel.register(Selector sel,int ops) 方法,将一个通道注册到一个选择器
- Selector sel 指定通道要注册的选择器
- int ops 指定选择器需要查询的通道操作。可以供选择器查询的通道操作,从类型来分,包括以下四种
- 可读 : SelectionKey.OP_READ
- 可写 : SelectionKey.OP_WRITE
- 连接 : SelectionKey.OP_CONNECT
- 接收 : SelectionKey.OP_ACCEPT
- 如果 Selector 对通道的多操作类型感兴趣,可以用“位或”操作符来实现:比如:int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE ;
-
选择器查询的不是通道的操作,而是通道的某个操作的一种就绪状态
什么是操作的就绪状态?
一旦通道具备完成某个操作的条件,表示该通道的某个操作已经就绪,就可以被 Selector 查询到,程序可以对通道进行对应的操作
比如:
- 某个SocketChannel 通道可以连接到一个服务器,则处于“连接就绪”(OP_CONNECT)
- 一个 ServerSocketChannel 服务器通道准备好接收新进入的连接,则处于“接收就绪”(OP_ACCEPT)状态
- 一个有数据可读的通道,可以说是“读就绪”(OP_READ)
- 一个等待写数据的通道,可以说是“写就绪”(OP_WRITE)
1.1.4 选择键(SelectionKey)
-
Channel 注册后,并且一旦通道处于某种就绪的状态,就可以被选择器查询到。这个工作用选择器 Selector 的 select() 方法完成。select 方法的作用:对感兴趣的通道操作,进行就绪状态的查询
-
Selector 可以不断的查询 Channel 中发生的操作的就绪状态,并且挑选感兴趣的操作就绪状态。一旦通道有操作的就绪状态达成,并且是 Selector 感兴趣的操作,就会被 Selector 选中,放入 选择键集合 中
-
一个选择键,首先是包含了注册在 Selector 的通道操作的类型,比如:SelectionKey.OP_READ。也包含了特定的通道与特定的选择器之间的注册关系
- 开发应用程序时,选择键是编程的关键。NIO 的编程,就是根据对应的选择键,进行不同的业务逻辑处理
-
选择键的概念,和事件的概念比较相似
- 一个选择键类似监听器模式里边的一个事件。由于 Selector 不是事件触发的模式,而是主动去查询的模式,所以不叫事件Event,而是叫 SelectionKey 选择键
1.2 Selector使用方法
1、Selector 的创建。
通过调用 Selector.open()方法创建一个 Selector 对象
// 创建选择器
Selector selector = Selector.open();
2、注册 Channel 到 Selector
- 要实现 Selector 管理 Channel,需要将 Channel 注册到相应的 Selector 上。通过调用通道的 register() 方法会将它注册到一个选择器上
// 创建选择器
Selector selector = Selector.open();
// 创建通道
ServerSocketChannel ssc = ServerSocketChannel.open();
// 设置通道为非阻塞模式
ssc.configureBlocking(false);
// 为通道绑定连接
ssc.bind(new InetSocketAddress(9999));
// 将通道注册到选择器,关注接收状态
ssc.register(selector, SelectionKey.OP_ACCEPT);
注意点:
-
与 Selector 一起使用时,Channel 必须处于非阻塞模式下,否则将抛出异常
IllegalBlockingModeException。FileChannel 不能切换到非阻塞模式,不能与 Selector 一起使用,而套接字相关的所有的通道都可以
-
一个通道,并非一定要支持所有的四种操作
- 比如:服务器通道 ServerSocketChannel 支持 Accept 接收操作,而 SocketChannel 客户端通道则不支持。
- 可以通过通道上的 validOps() 方法,来获取特定通道下所有支持的操作集合,返回int值
3、轮询查询就绪操作
-
通过 Selector 的 select() 方法,可以查询出已经就绪的通道操作,这些就绪的状态集合,保存在 SelectionKey 对象的 Set 集合中。Selector 几个重载的查询 select()方法:
- select():阻塞到至少有一个通道的注册状态就绪
- select(long timeout):和 select()一样,但最长阻塞时间为 timeout 毫秒
- selectNow():非阻塞,不管通道的注册状态是否就绪就立刻返回
-
select() 方法返回的 int 值,表示有多少通道已经就绪
- 即两次 select() 方法之间的时间段上,有多少通道变成就绪状态。
- 例如:首次调用 select()方法,如果有一个通道变成就绪状态,返回了 1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回 1。如果对第一个就绪的 Channel 没有做任何操作,现在就有两个就绪的通道,但在每次 select() 方法调用之间,只有一个通道就绪了
-
一旦调用 select() 方法,并且返回值不为 0 时,在 Selector 中有一个 selectedKeys() 方法,返回选择键集合,迭代集合的每一个选择键元素,根据就绪操作的类型,完成对应的操作
// 查询是否有状态就绪
//int n = selector.select();
//int n = selector.select(2000);
int n = selector.selectNow();
System.out.println(n);
// 获取选择键集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 遍历选择键集合(1)
for (SelectionKey key : selectionKeys) {
if (key.isAcceptable()) {
// 可接收
System.out.println("可接收");
} else if (key.isConnectable()) {
// 可连接
System.out.println("可连接");
} else if (key.isReadable()) {
// 可读
System.out.println("可读");
} else if (key.isWritable()) {
// 可写
System.out.println("可写");
}
}
// 遍历选择键集合(2)
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
// 可接收
System.out.println("可接收");
} else if (key.isConnectable()) {
// 可连接
System.out.println("可连接");
} else if (key.isReadable()) {
// 可读
System.out.println("可读");
} else if (key.isWritable()) {
// 可写
System.out.println("可写");
}
//iterator.remove();
}
4、停止选择的方法
选择器执行选择的过程,系统底层会依次询问每个通道是否已经就绪,这个过程可能会造成调用线程进入阻塞状态,可以有以下两种方式可以唤醒在 select() 方法中阻塞的线程
-
wakeup():通过调用 Selector 对象的 wakeup() 方法让处在阻塞状态的select() 方法立刻返回。该方法使得选择器上的第一个还没有返回的选择操作立即返回。如果当前没有进行中的选择操作,那么下一次对 select() 方法的一次调用将立即返回
-
close():关闭 Selector。该方法使得任何一个在选择操作中阻塞的线程都被唤醒(类似 wakeup()),同时使得注册到该 Selector 的所有 Channel 被注销,所有的键将被取消,但是 Channel 本身并不会关闭
1.3 代码示例
服务端:
@Test
public void ServerDemo() throws Exception {
// 1、获取服务端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 2、切换非阻塞模式
serverSocketChannel.configureBlocking(false);
// 3、绑定端口号
serverSocketChannel.bind(new InetSocketAddress(8080));
// 4、获取selector选择器
Selector selector = Selector.open();
// 5、通道注册到选择器,进行监听
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6、选择器进行轮询,后续操作
while (selector.select() > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();
while (selectionKeyIterator.hasNext()) {
// 获取就绪操作
SelectionKey selectionKey = selectionKeyIterator.next();
// 判断什么操作
if (selectionKey.isAcceptable()) {
// 获取连接
SocketChannel accept = serverSocketChannel.accept();
// 切换非阻塞模式
accept.configureBlocking(false);
// 注册
accept.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel)selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 读取数据
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
byteBuffer.flip();
System.out.println(new java.lang.String(byteBuffer.array(), 0, length));
byteBuffer.clear();
}
}
}
selectionKeyIterator.remove();
}
}
客户端:
public static void main(String[] args) throws Exception{
// 1、获取通道,绑定主机和端口号
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8080));
// 2、切换到非阻塞模式
socketChannel.configureBlocking(false);
// 3、创建buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 自定义输入
Scanner scanner = new Scanner(System.in);
java.lang.String d = "";
System.out.println("请输入发送内容...");
while (scanner.hasNext()) {
// 5、向缓冲区写入数据
d = "[" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-mm-dd hh:mm:ss")) + "] " + scanner.nextLine();
byteBuffer.put(d.getBytes());
// 6、缓冲区读写模式切换
byteBuffer.flip();
// 7、把数据从缓冲区写入通道
socketChannel.write(byteBuffer);
// 清空缓冲区
byteBuffer.clear();
}
}
注意点:
- 先启动服务端,再启动客户端
- 设置非阻塞模式、连接或绑定、创建缓冲区;这三步顺序可以调换
- 客户端
- 使用connect()连接,不是bind()绑定
- 数据写入后不要关闭通道,否则服务端得不到客户端通道,也就接收不到数据
- 关闭操作应在发生IO异常,或接收到服务端关闭通知后再进行
- 服务端
- 借用select(int timeout)方法,实现服务端的关闭操作;在 while() 轮论后关闭
- isAcceptable() 分支中不要关闭客户端通道,否则后续读取操作无法进行
- isReadable() 分支中也不要关闭客户端通道,否则将不能进行下次读取或接收
- 一次轮询查询后,要清理选择键集合,为下次做准备;selectionKeys.clear();
1.4 NIO 编程步骤总结
服务端:
- 创建 ServerSocketChannel 通道,并绑定监听端口
- 设置 Channel 通道是非阻塞模式
- 创建 Selector 选择器
- 把 Channel 注册到 Socketor 选择器上,监听就绪状态
- 调用 Selector 的 select 方法(循环调用),监测通道的就绪状况
- 调用 selectKeys 方法获取就绪 channel 集合
- 遍历就绪 channel 集合,判断就绪事件类型,实现具体的业务操作
- 根据业务,决定是否需要再次注册监听事件,重复执行第三步操作
二、Pipe和FileLock
- Java NIO 管道是两个线程之间的单向数据连接
- Pipe 有一个 source 通道和一个sink 通道
- 数据会被写到 sink 通道,从 source 通道读取
2.1 Pipe
-
创建管道:
// 1、获取管道 Pipe pipe = Pipe.open();
-
写入管道。可以循环写入,类似文件复制
// 2、获取sink通道,用来传送数据 Pipe.SinkChannel sink = pipe.sink(); // 3、创建发送缓冲区,并写入数据 ByteBuffer buf1 = ByteBuffer.allocate(1024); buf1.put("Hello World!测试".getBytes()); // 4、读写模式反转 buf1.flip(); // 5、sink发送数据:把buf1中的数据写入通道 sink.write(buf1);
-
从管道读取数据。可以循环读取,类似文件复制
// 6、获取source通道 Pipe.SourceChannel source = pipe.source(); // 7、创建接收缓冲区 ByteBuffer buf2 = ByteBuffer.allocate(1024); // 8、读取数据,并输出 // 把通道中的数据读入buf2 int length = source.read(buf2); System.out.println(new String(buf2.array(),0,length));
-
代码示例:
public class PipeDemo { public static void main(String[] args) throws IOException { // 1、获取管道 Pipe pipe = Pipe.open(); // 2、获取sink通道,用来传送数据 Pipe.SinkChannel sink = pipe.sink(); // 3、创建发送缓冲区,并写入数据 ByteBuffer buf1 = ByteBuffer.allocate(1024); buf1.put("Hello World!测试".getBytes()); // 4、读写模式反转 buf1.flip(); // 5、sink发送数据:把buf1中的数据写入通道 sink.write(buf1); // 6、获取source通道 Pipe.SourceChannel source = pipe.source(); // 7、创建接收缓冲区 ByteBuffer buf2 = ByteBuffer.allocate(1024); // 8、读取数据,并输出 // 把通道中的数据读入buf2 int length = source.read(buf2); System.out.println(new String(buf2.array(),0,length)); // 9、关闭 source.close(); sink.close(); } }
2.2 FileLock
2.2.1 FileLock简介
- 文件锁在 OS 中很常见,如果多个程序同时访问、修改同一个文件,很容易因为文件数据不同步而出现问题。给文件加一个锁,同一时间,只能有一个程序修改此文件,或者程序都只能读此文件,这就解决了同步问题
- 文件锁是进程级别的,不是线程级别的。文件锁可以解决多个进程并发访问、修改同一个文件的问题,但不能解决多线程并发访问、修改同一文件的问题。使用文件锁时,同一进程内的多个线程,可以同时访问、修改此文件
- 文件锁是当前程序所属的 JVM 实例持有的,一旦获取到文件锁(对文件加锁),要调用 release(),或者关闭对应的 FileChannel 对象,或者当前 JVM 退出,才会释放这个锁
- 一旦某个进程(比如说 JVM 实例)对某个文件加锁,则在释放这个锁之前,此进程不能再对此文件加锁,就是说 JVM 实例在同一文件上的文件锁是不重叠的(进程级别不能重复在同一文件上获取锁)
文件锁分类:
-
排它锁(独占锁):对文件加排它锁后,该进程可以对此文件进行读写,该进程独占此文件,其他进程不能读写此文件,直到该进程释放文件锁
-
共享锁:某个进程对文件加共享锁,其他进程也可以访问此文件,但这些进程都只能读此文件,不能写。线程是安全的。只要还有一个进程持有共享锁,此文件就只能读,不能写
-
如果指定为共享锁,则其它进程可读此文件,所有进程均不能写此文件,如果某进程试图对此文件进行写操作,会抛出异常
2.2.2 FileLock的使用
相关方法:
- lock():对整个文件加锁,默认为排它锁
- lock(long position, long size, booean shared)
-
前 2 个参数指定要加锁的部分(可以只对此文件的部分内容加锁)
-
第 3 个参数值指定是否是共享锁
-
- tryLock():对整个文件加锁,默认为排它锁
- tryLock(long position, long size, booean shared)
- 前 2 个参数指定要加锁的部分(可以只对此文件的部分内容加锁)
- 第 3 个参数值指定是否是共享锁
- boolean isShared()
- 判断此文件锁是否是共享锁
- boolean isValid()
- 判断此文件锁是否还有效
lock 与 tryLock的区别:
- lock 是阻塞式的,如果未获取到文件锁,会一直阻塞当前线程,直到获取文件锁
- tryLock 和 lock 的作用相同,只不过 tryLock 是非阻塞式的,tryLock 尝试获取文件锁,获取成功就返回锁对象,否则返回 null,不会阻塞当前线程
FileLock使用:
文件锁只能通过FileChannel对象来使用
// 创建FileChannel对象;文件锁只能通过FileChannel对象来使用
FileChannel fc = new FileOutputStream(".\\1.txt").getChannel();
// 对文件加锁
FileLock lock = fc.lock();
// 对文件进行一些读写操作
// 释放锁
lock.release();
2.2.3 完整示例
public class FileLockDemo {
public static void main(String[] args) throws IOException {
String input = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss")) + " 测试\n";
System.out.println("输入:" + input);
// 创建缓冲区,并存入数据
ByteBuffer buf = ByteBuffer.wrap(input.getBytes());
// 文件路径
String filePathStr = "file/05.txt";
Path path = Paths.get(filePathStr);
// 文件通道
FileChannel fc = FileChannel.open(path, StandardOpenOption.APPEND);
// position位置
if(fc.size()!=0){
fc.position(fc.size() - 1);
}
// 文件锁
// 1、阻塞模式;默认为独占锁
FileLock lock = fc.lock();
// 2、阻塞模式;共享锁;只能读,不能写;写时会抛异常
// FileLock lock = fc.lock(0,Long.MAX_VALUE,true);
// 3、非阻塞模式;默认为独占锁;获取不到锁时,返回null,不阻塞
// FileLock lock = fc.tryLock();
// 4、非阻塞模式;共享锁;只能读,不能写;写时会抛异常
// FileLock lock = fc.tryLock(0, Long.MAX_VALUE, true);
System.out.println("是否为共享锁:" + lock.isShared());
// 把buf中的数据写入文件通道
fc.write(buf);
// 关闭
fc.close();
System.out.println("写操作完成!");
// 读取数据
System.out.println("=========流读取==========");
readFileByStream(filePathStr);
System.out.println("=========通道读取==========");
readFileByChannel(filePathStr);
}
/**
* 流读取
* @param filePathStr
* @throws IOException
*/
private static void readFileByStream(String filePathStr) throws IOException {
FileReader fr = new FileReader(filePathStr);
BufferedReader br = new BufferedReader(fr);
String line = "";
while((line = br.readLine()) !=null){
System.out.println(line);
}
br.close();
}
/**
* 通道读取
* @param filePathStr
* @throws IOException
*/
private static void readFileByChannel(String filePathStr) throws IOException {
FileInputStream fis = new FileInputStream(filePathStr);
FileChannel fc = fis.getChannel();
ByteBuffer buf = ByteBuffer.allocate(1024);
int length = 0;
while((length = fc.read(buf)) > 0 ){
// buf.flip();
System.out.println(new String(buf.array(),0,length));
buf.clear();
}
fc.close();
}
}
三、其它
3.1 Path
3.1.1 Path简介
- Java Path 接口是 Java NIO 更新的一部分,同 Java NIO 一起已经包括在 Java6 和 Java7 中。Java Path 接口是在 Java7 中添加到 Java NIO 的。Path 接口位于java.nio.file 包中,所以 Path 接口的完全限定名称为java.nio.file.Path
- Java Path 实例表示文件系统中的路径。一个路径可以指向一个文件或一个目录。路径可以是绝对路径,也可以是相对路径。绝对路径包含从文件系统的根目录到它指向的文件或目录的完整路径。相对路径包含相对于其他路径的文件或目录的路径
- 在许多方面,java.nio.file.Path 接口类似于 java.io.File 类,但是有一些差别。不过,在许多情况下,可以使用 Path 接口来替换 File 类的使用
3.1.2 创建Path实例
-
使用 java.nio.file.Path 实例必须创建一个 Path 实例。可以使用 Paths 类 (java.nio.file.Paths) 中的静态方法 Paths.get() 来创建路径实例。Paths.get() 方法相当于是 Path 实例的工厂方法
Path path = Paths.get(".\\src\\main\\resources\\06.txt");
创建绝对路径:
// windows系统
Path path1 = Paths.get("d:\\01.txt");
// Linux、MacOS系统
Path path2 = Paths.get("/home/jakobjenkov/myfile.txt");
在 Java 字符串中, \是一个转义字符,需要编写\,告诉 Java 编译器在字符串中写入一个\字符。如果在 Windows 机器上使用了从/开始的路径,那么路径将被解释为相对于当前驱动器
创建相对路径:
- 使用 Paths.get(basePath,relativePath)方法创建一个相对路径
// 相对路径:d:\abc\xyz
Path path2 = Paths.get("d:\\abc", "xyz");
// 相对路径:d:\abc\xyz\03.txt
Path path3 = Paths.get("d:\\abc", "xyz\\03.txt");
3.2 Files
- Java NIO Files 类(java.nio.file.Files)提供了几种操作文件系统中的文件的方法。通常与java.nio.file.Path 实例一起工作
常用方法:
Modifier and Type | Method and Description |
---|---|
static long | copy(InputStream in, Path target, CopyOption... options) 将输入流中的所有字节复制到文件。 |
static long | copy(Path source, OutputStream out) 将文件中的所有字节复制到输出流。 |
static Path | copy(Path source, Path target, CopyOption... options) 将文件复制到目标文件。 |
static Path | createDirectories(Path dir, FileAttribute<?>... attrs) 首先创建所有不存在的父目录来创建目录。 |
static Path | createDirectory(Path dir, FileAttribute<?>... attrs) 创建一个新的目录。 |
static Path | createFile(Path path, FileAttribute<?>... attrs) 创建一个新的和空的文件,如果该文件已存在失败。 |
static Path | walkFileTree(Path start, FileVisitor<? super Path> visitor) 走一个文件树。 |
static Path | walkFileTree(Path start, Set<FileVisitOption> options, int maxDepth, FileVisitor<? super Path> visitor) 走一个文件树。 |
3.2.1 Files.createDirectory()
- Files.createDirectory()方法,用于根据 Path 实例创建一个新目录
@Test
public void testCreateDirectory() {
// createDirectory
Path path = Paths.get("./src/main/resources/abc");
// 创建能目录
try {
Files.createDirectory(path);
}catch (FileAlreadyExistsException e) {
System.out.println("目录已存在"+ e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
第一行创建表示要创建的目录的 Path 实例。在 try-catch 块中,用路径作为参数调用Files.createDirectory()方法。如果创建目录成功,将返回一个 Path 实例,该实例指向新创建的路径。
如果该目录已经存在,则是抛出一个 java.nio.file.FileAlreadyExistsException。如果出现其他错误,可能会抛出 IOException。例如,如果想要的新目录的父目录不存在,则可能会抛出 IOException
3.2.2 Files.copy()
copy(InputStream in, Path target, CopyOption... options)
将输入流中的所有字节复制到文件。
不覆盖:
-
从一个路径拷贝一个文件到另外一个目录如果目标文件已经存在,则抛出一个 java.nio.file.FileAlreadyExistsException 异常。如果有其他错误,则会抛出一个 IOException。例如,如果将该文件复制到不存在的目录,则会抛出 IOException
Path path1 = Paths.get(".\\src\\main\\resources\\1.wav"); Path path2 = Paths.get(".\\src\\main\\resources\\abc\\1.wav"); try { Path copy = Files.copy(path1, path2); } catch (FileAlreadyExistsException e) { // 目录已存在 } catch (IOException e) { e.printStackTrace(); }
覆盖:
- **Path copy = Files.copy(path1, path2, StandardCopyOption.REPLACE_EXISTING);**如果目标文件已经存在,这个参数指示 copy() 方法覆盖现有的文件
3.2.3 Files.move()
- 移动文件 或者 重命名,move() 之后原文件不存在。Files.move()的第三个参数。这个参数告诉 Files.move()方法来覆盖目标路径上的任何现有文件
@Test
public void testMove() {
Path path1 = Paths.get("./src/main/resources/01.txt");
Path path2 = Paths.get("./src/main/resources/11.txt");
try {
Path move = Files.move(path1, path2);
} catch (FileAlreadyExistsException e) {
// 文件已存在
} catch (IOException e) {
e.printStackTrace();
}
}
3.2.4 Files.delete()
- 删除一个文件或者目录
@Test
public void testDelete() {
Path path = Paths.get("./src/main/resources/01.txt");
try {
Files.delete(path);
} catch (IOException e) {
e.printStackTrace();
}
}
3.2.5 Files.walkFileTree()
-
Files.walkFileTree() 方法包含递归遍历目录树功能,将 Path 实例和 FileVisitor 作为参数。Path 实例指向要遍历的目录,FileVisitor 在遍历期间被调用
-
FileVisitor 是一个接口,必须自己实现 FileVisitor 接口,并将实现的实例传递给 walkFileTree() 方法。在目录遍历过程中,FileVisitor 实现的每个方法都将被调用。如果不需要实现所有这些方法,那么可以扩展 SimpleFileVisitor 类,它包含FileVisitor 接口中所有方法的默认实现(适配器模式)
-
FileVisitor 接口的方法中,每个都返回一个 FileVisitResult 枚举实例。
FileVisitResult 枚举包含以下四个选项:
- CONTINUE:继续
- TERMINATE:终止
- SKIP_SIBLING:跳过同级
- SKIP_SUBTREE:跳过子级
@Test
public void walkFileTree(){
// 要查找的目录范围
Path path = Paths.get("./src/main/resources");
// 要查找的目标文件
//String findFile = File.separator + "03.txt";
String findFile = "01.txt";
// 是否找到
final boolean[] isExist = {false};
try {
Path path1 = Files.walkFileTree(path, new SimpleFileVisitor<Path>() {
/**
* 重载内部类方法
* @param file
* @param attrs
* @return
* @throws IOException
*/
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
String fileString = file.toAbsolutePath().toString();
// 如果目标文件和路径结尾相等,即为找到
if (fileString.endsWith(findFile)) {
// 把查找标记为true
isExist[0] = true;
System.out.println("文件已找到!路径为:" + fileString);
// 中止退出
return FileVisitResult.TERMINATE;
}
// 继续下次查找
return FileVisitResult.CONTINUE;
}
});
if(!isExist[0]){
System.out.println("没有找到!");
}
} catch (IOException e) {
e.printStackTrace();
}
}
3.3 AsynchronousFileChannel
在 Java 7 中,Java NIO 中添加了 AsynchronousFileChannel,也就是是异步地将数据写入文件。
3.3.1 创建AsynchronousFileChannel
- 通过静态方法 open()创建,AsynchronousFileChannel.open(path,StandardOpenOption.READ);
// 1、得到文件路径
Path path = Paths.get(".\\src\\main\\resources\\03.txt");
// 2、创建AsynchronousFileChannel
AsynchronousFileChannel afc = null;
try {
afc = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
} catch (IOException e) {
e.printStackTrace();
}
3.3.2 通过Future读取数据
**abstract Future<Integer>
read(ByteBuffer dst, long position)
**从给定的文件位置开始,从该通道读取一个字节序列到给定的缓冲区。
@Test
public void testReadAsyncFileChannel() throws IOException {
// 1、得到文件路径
Path path = Paths.get("./src/main/resources/03.txt");
// 2、创建AsynchronousFileChannel
AsynchronousFileChannel afc = null;
try {
afc = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
} catch (IOException e) {
e.printStackTrace();
}
// 3、创建缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
// 4、把通道中的数据读到缓冲区,得到Future
Future<Integer> future = afc.read(buf, 0);
// 5、判断是否读取完成
while (!future.isDone()) {
}
// 6、输出
buf.flip();
System.out.println(new String(buf.array(), 0, buf.limit()));
buf.clear();
// 7、关闭
afc.close();
}
- 创建了一个 AsynchronousFileChannel
- 创建一个 ByteBuffer,它被传递给 read() 方法作为参数,以及一个 0 的位置
- 在调用 read() 之后,循环,直到返回的 isDone()方法返回 true
- 读取操作完成后,数据读取到 ByteBuffer 中,然后打印到 System.out 中
3.3.3 通过CompletionHandler读取数据
**abstract <A> void read(ByteBuffer dst, long position, A attachment, CompletionHandler<Integer,? super A> handler)
**从给定的文件位置开始,从该通道读取一个字节序列到给定的缓冲区。
@Test
public void testCompletionHandlerRead() {
// 1、得到文件路径
Path path = Paths.get("./src/main/resources/03.txt");
// 2、创建AsynchronousFileChannel
AsynchronousFileChannel afc = null;
try {
afc = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
} catch (IOException e) {
e.printStackTrace();
}
// 3、创建缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
// 4、把通道中的数据读到缓冲区
afc.read(buf, 0, buf, new CompletionHandler<Integer, ByteBuffer>() {
/**
* 读取完成
* @param result
* @param attachment
*/
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("读取到:" + result);
attachment.flip();
System.out.println(new String(attachment.array(),0,attachment.limit()));
attachment.clear();
}
/**
* 读取失败
* @param exc
* @param attachment
*/
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("读取失败!");
}
});
}
- 读取操作完成,将调用 CompletionHandler 的 completed() 方法
- 对于 completed() 方法的参数传递一个整数,它告诉我们读取了多少字节,以及传递给 read() 方法的“附件”。“附件”是 read()方法的第三个参数。在本代码中,它是 ByteBuffer,数据也被读取
- 如果读取操作失败,则将调用 CompletionHandler 的 failed() 方法
3.3.4 通过Future写数据
@Test
public void testWriteAsyncFileFuture() throws IOException {
// 1、得到文件路径
Path path = Paths.get("./src/main/resources/04.txt");
// 2、创建AsynchronousFileChannel
AsynchronousFileChannel afc = null;
try {
afc = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
} catch (IOException e) {
e.printStackTrace();
}
// 3、创建缓冲区
// ByteBuffer buf = ByteBuffer.allocate(1024);
// buf.put("测试abc".getBytes());
// buf.flip();
ByteBuffer buf = ByteBuffer.wrap("测试testWriteAsyncFileFuture".getBytes());
// 4、把缓冲区中的数据写入通道,得到Future
Future<Integer> future = afc.write(buf, 0);
// 5、判断是否写入完成
while (!future.isDone()) {
}
// 6、输出
System.out.println("写入完成!");
// 7、关闭
afc.close();
}
- AsynchronousFileChannel 以写模式打开
- 创建一个 ByteBuffer,并将一些数据写入其中
- ByteBuffer 中的数据被写入到文件中
- 检查返回的 Future,以查看写操作完成时的情况
注意,文件必须已经存在。如果该文件不存在,那么 write()方法将抛出一个
java.nio.file.NoSuchFileException
3.3.5 通过CompletionHandler写数据
@Test
public void testWriteAsyncFileCompletion() {
// 1、得到文件路径
Path path = Paths.get("./src/main/resources/04.txt");
// 2、创建AsynchronousFileChannel
AsynchronousFileChannel afc = null;
try {
afc = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
} catch (IOException e) {
e.printStackTrace();
}
// 3、创建缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put("测试testWriteAsyncFileCompletion".getBytes());
buf.flip();
// 4、把缓冲区中的数据写入通道
afc.write(buf, 0, null, new CompletionHandler<Integer, ByteBuffer>() {
/**
* 写入完成
* @param result
* @param attachment
*/
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("写入:" + result);
}
/**
* 写入失败
* @param exc
* @param attachment
*/
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("写入失败!");
}
});
}
- 写操作完成时,将会调用 CompletionHandler 的 completed()方法
- 如果写失败,则会调用 failed()方法
3.4 字符集(Charset)
public static void main(String[] args) throws CharacterCodingException {
// 1、获取Charset对象
Charset charset = Charset.forName("utf8");
// 2、建立缓冲区,准备数据
CharBuffer buf = CharBuffer.allocate(1024);
buf.put("测试abc");
buf.flip();
// 3、获取新的编码器
CharsetEncoder charsetEncoder = charset.newEncoder();
// 4、编码
ByteBuffer byteBuffer = charsetEncoder.encode(buf);
System.out.println("编码后:");
for (int i = 0; i < byteBuffer.limit(); i++) {
System.out.println(byteBuffer.get());
}
// 5、获取新的解码器
byteBuffer.flip();
CharsetDecoder charsetDecoder = charset.newDecoder();
CharBuffer decoderBuf = charsetDecoder.decode(byteBuffer);
System.out.println("解码后:");
System.out.println(decoderBuf);
// 7、用其它字符格式解码
Charset gbkChar = Charset.forName("gbk");
byteBuffer.flip();
System.out.println("使用其他编码进行解码:");
System.out.println(gbkChar.decode(byteBuffer));
// 8、获取Charset所支持的字符编码
Map<String, Charset> scssm = Charset.availableCharsets();
Set<Map.Entry<String, Charset>> entries = scssm.entrySet();
for (Map.Entry<String, Charset> entry : entries) {
System.out.println(entry + " : " + entry.getValue());
}
}
四、多人聊天室
4.1 服务端
超过 timeout 毫秒没有连接,关闭服务端
public class ChatServer {
public static void main(String[] args) throws IOException {
new ChatServer().startServer();
}
/**
* 服务器端启动的方法
*/
public void startServer() throws IOException {
// 1、创建Selector选择器
Selector selector = Selector.open();
// 2、创建服务端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 3、为channel通道绑定端口、设置为非阻塞模式
serverSocketChannel.bind(new InetSocketAddress(8000));
serverSocketChannel.configureBlocking(false);
// 4、把channel通道注册到选择器
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器已启动成功!");
// 5、循环查询就绪状态
while(selector.select() > 0){
// 6、得到选择键集合,并遍历
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey sk : selectionKeys) {
// 6.1、可接收状态:表示服务端已做好准备,可以接收客户的连接了
if(sk.isAcceptable()){
// 处理可接收时的操作
acceptOperator(serverSocketChannel,selector);
}
// 6.2、可读状态:表示客户端已发送完毕,可以在服务端读取数据了
if(sk.isReadable()){
// 处理可读时的操作
readOperator(selector,sk);
}
}
// 清理选择键集合:为下次轮询查询做准备
selectionKeys.clear();
}
}
/**
* 可接收状态时的处理操作
* @param serverSocketChannel 服务端通道
* @param selector 选择器
* @throws IOException
*/
private static void acceptOperator(ServerSocketChannel serverSocketChannel,Selector selector) throws IOException {
// 1、获取客户端通道 SocketChannel
SocketChannel sc = serverSocketChannel.accept();
// 2、设置为非阻塞模式
sc.configureBlocking(false);
// 3、把通道注册到选择器上,监听可读状态
sc.register(selector,SelectionKey.OP_READ);
// 4、回复客户端
ByteBuffer replyStr = Charset.forName("utf8").encode("欢迎进入聊天室!");
sc.write(replyStr);
}
/**
* 可读状态时的处理操作
* @param selector 选择器
* @param sk 选择键
* @throws IOException
*/
private static void readOperator(Selector selector,SelectionKey sk) throws IOException {
// 1、从选择键SelectionKey获取已经就绪的客户端通道
SocketChannel socketChannel = (SocketChannel) sk.channel();
// 2、创建Buffer
ByteBuffer buf = ByteBuffer.allocate(1024);
// 3、循环读取客户端消息
String message = "";
while(socketChannel.read(buf) > 0){
// 切换buf读写模式
// 调用 flip()方法会将 position 设回 0,并将 limit 设置成之前 position 的值:即readLength
buf.flip();
message += Charset.forName("utf8").decode(buf);
}
// 4、把通道再次注册到选择器,监听可读状态
socketChannel.register(selector,SelectionKey.OP_READ);
// 5、把客户端消息广播到其它客户端
if(message.length() > 0){
System.out.println(message);
castOtherClient(message,selector,socketChannel);
}
}
/**
* 给其它客户端广播消息
* @param message 消息
* @param selector 选择器
* @param sc 自已的通道
* @throws IOException
*/
private static void castOtherClient(String message,Selector selector,SocketChannel sc) throws IOException {
// 1、获取所有已经接入的通道的选择键
Set<SelectionKey> keys = selector.keys();
// 2、循环遍历:找出除了自已之外的其它客户端通道,并发送消息
for (SelectionKey key : keys) {
//System.out.println(key);
// 获取当前选择键的通道
Channel targetChannel = key.channel();
// 向除了自已之外的其它客户端通道发送消息
if(targetChannel instanceof SocketChannel && targetChannel != sc){
// 发送消息
((SocketChannel)targetChannel).write(Charset.forName("utf8").encode(message));
}
}
}
}
4.2 客户端
- ChatClient.java
Scanner 会阻塞等待
public class ChatClient {
/**
* 启动方法
*/
public void startClient(String name) {
System.out.print(name + ",你好,");
SocketChannel sc = null;
try {
// 1、创建选择器
Selector selector = Selector.open();
// 2、创建客户端通道,连接服务端
sc = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8000));
// 3、设置为非阻塞模式
sc.configureBlocking(false);
// 4、把通道注册到选择器
sc.register(selector, SelectionKey.OP_READ);
// 5、创建新线程,接收消息
new Thread(new ClientThread(selector)).start();
// 6、向服务端发送消息
Scanner scanner = new Scanner(System.in);
String msg = "";
while (scanner.hasNextLine()) {
msg = scanner.nextLine();
if (msg.length() > 0) {
sc.write(Charset.forName("utf8").encode(name + ":" + msg));
}
}
} catch (IOException e) {
//e.printStackTrace();
} finally {
// 关闭
try {
if (sc != null) {
sc.close();
}
System.out.println("客户端已关闭!");
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
- ClientThread.java
public class ClientThread implements Runnable{
/**
* 选择器
*/
private Selector selector;
/**
* 构造器
* @param selector
*/
public ClientThread(Selector selector){
this.selector = selector;
}
@Override
public void run() {
try{
while(selector.select() > 0){
// 得到选择键集合,并遍历
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey sk : selectionKeys) {
// 可读状态:表示可以读取服务器端发送的数据了
if(sk.isReadable()){
// 处理可读时的操作
readOperator(selector,sk);
}
}
// 清理选择键集合:为下次轮询查询做准备
selectionKeys.clear();
}
}catch (IOException e){
e.printStackTrace();
}
}
/**
* 可读状态时的处理操作
* @param selector 选择器
* @param sk 选择键
* @throws IOException
*/
private void readOperator(Selector selector,SelectionKey sk) throws IOException {
// 1、从选择键SelectionKey获取已经就绪的客户端通道
SocketChannel sc = (SocketChannel) sk.channel();
// 2、创建Buffer
ByteBuffer buf = ByteBuffer.allocate(1024);
// 3、循环读取客户端消息
String message = "";
while(sc.read(buf) > 0){
// 切换buf读写模式
// 调用 flip()方法会将 position 设回 0,并将 limit 设置成之前 position 的值:即readLength
buf.flip();
message += Charset.forName("utf8").decode(buf);
}
// 4、把通道再次注册到选择器,监听可读状态。好像不用再次注册
sc.register(selector,SelectionKey.OP_READ);
// 5、输出消息
if(message.length() > 0){
System.out.println(message);
}
}
}
-
模拟聊天用户,每个用户是独立的;startClient() 不能用静态方法,否则将共用客户端通道
-
用户A
public class AClient { public static void main(String[] args) { new ChatClient().startClient("lucy"); } }
-
用户B
public class BClient { public static void main(String[] args) { new ChatClient().startClient("mack"); } }
-