文章目录
- 1. 概述
- 2. TCP 阻塞式IO 网络编程实例
- 2.1 TCP网络编程服务端
- 2.2 ByteBufferUtil
- 2.3 客户端代码
- 2.4 运行截图
- 3. TCP 非阻塞式IO 网络编程实例
- 3.1 服务端
- 3.2 客户端
- 3.3 运行截图
- 4. 多路复用
- 4.1 服务器端
- 4.2 客户端
- 4.3 运行截图
- 5. AIO
- 5.1 AIO 服务端
- 5.2 客户端
- 5.3 运行截图
- 6. Channel / Buffer
- 6.1 Channel
- 6.2 ByteBuffer
- 参考文献
1. 概述
- 网络编程, 就是编写程序, 使两台联网的电脑可以交换数据,
- 套接字是网络数据传输用的软件设备, 用来连接网络的工具
- 在 linux中 socket被认为是文件中的一种, 在网络数据传输过程中, 使用文件I/O的相关函数
- socket 帮助程序员封装了网络的底层细节,如:错误检测、包大小、包分解、包重传、网络地址等,让程序员将网络连接看作可以读/写字节的流
- 套接字常用网络协议: TCP、UDP
之前还有一篇文章: Linux C++ Socket 套接字、select、poll、epoll 实例
套接字进行网络连接流程, 如下图:
服务器端:
- 创建服务器套接字
socket()
- 绑定端口
bind()
- 监听端口
listen()
- 接受客户端请求
accept()
- 读取客户端请求的数据
read()
- 返回客户端要响应的数据
write()
- …
- 关闭与客户端的连接
close()
- 关闭服务器套接字
close()
客户端:
- 创建客户端套接字
socket()
- 连接服务端
connect()
- 请求服务端数据, 发送操作数和操作符到服务器
write()
- 从服务器读取操作结果
read()
- …
- 关闭客户端套接字
close()
流程图如下, 具体代码示例可以看下面的 2. TCP 阻塞式IO 网络编程实例
2. TCP 阻塞式IO 网络编程实例
accept 和 read 都是阻塞的, 当 accept 到新连接, 或者 read 到数据程序才往下走
为了提高服务端处理能力, 一个客户端连接一个线程处理
不能一个线程处理多个客户端, 某个客户端会阻塞这个线程处理其他客户端
2.1 TCP网络编程服务端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
public class BlockServer {
public static void main(String[] args) throws IOException {
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
System.out.println("等待客户端连接...");
SocketChannel sc = ssc.accept(); // 阻塞方法,线程停止运行
System.out.println("接收到客户端连接: " + sc);
channels.add(sc);
for (SocketChannel channel : channels) {
// 5. 接收客户端发送的数据
System.out.println("开始读取客户端中的数据:" + channel);
channel.read(buffer); // 阻塞方法,线程停止运行
buffer.flip();
String request = ByteBufferUtil.read(buffer);
System.out.println(request);
buffer.clear();
System.out.println("已经读取完客户端中的数据:" + channel);
}
}
}
}
2.2 ByteBufferUtil
public class ByteBufferUtil {
public static String read(ByteBuffer byteBuffer) throws CharacterCodingException {
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(byteBuffer);
return charBuffer.toString();
}
public static ByteBuffer read(String string) throws CharacterCodingException {
return StandardCharsets.UTF_8.encode(string);
}
public static void main(String[] args) throws CharacterCodingException {
System.out.println(ByteBufferUtil.read(ByteBufferUtil.read("test")));
}
}
2.3 客户端代码
public class BlockClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
System.out.println("开始连接服务端...");
sc.connect(new InetSocketAddress("localhost", 8080));
String str = "test";
System.out.println("连接服务端成功,写入数据: " + str);
sc.write(ByteBufferUtil.read(str));
}
}
2.4 运行截图
3. TCP 非阻塞式IO 网络编程实例
不停的轮询, 看看有没有accept 到新连接, 没有连接不阻塞等待, 继续去看看已经建立的连接有没有read到客户端的新数据, read到新数据处理, read不到不处理
为了提高服务端处理能力, 可以一个客户端连接一个线程处理, 线程不停的轮询自己要处理的客户端
也可以一个线程处理多个客户端, 相较于上面的阻塞I/O模型, 非阻塞不至于某个客户端阻塞这个线程处理其他客户端
3.1 服务端
ssc.configureBlocking(false);
设置为非阻塞模式
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class NonBlockServer {
public static void main(String[] args) throws IOException, InterruptedException {
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 非阻塞模式
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
SocketChannel sc = ssc.accept(); // 非阻塞,线程还会继续运行,如果没有连接建立,但sc是null
if (sc != null) {
System.out.println("接收到客户端连接: " + sc);
sc.configureBlocking(false); // 非阻塞模式
channels.add(sc);
}
for (SocketChannel channel : channels) {
System.out.println("开始读取客户端中的数据:" + channel);
// 5. 接收客户端发送的数据
int read = channel.read(buffer);// 非阻塞,线程仍然会继续运行,如果没有读到数据,read 返回 0
if (read > 0) {
buffer.flip();
System.out.println((ByteBufferUtil.read(buffer)));
buffer.clear();
System.out.println("已经读取完客户端中的数据:" + channel);
} else {
TimeUnit.MILLISECONDS.sleep(100);
}
}
}
}
}
3.2 客户端
客户端同上
3.3 运行截图
4. 多路复用
可以调用 select/poll/epoll , 阻塞在select/poll/epoll, select/poll/epoll 监听多个客户端连接事件或写入的数据, 然后这些事件可再有多个线程分一分处理掉
4.1 服务器端
打开选择器并将其与通道注册,监听接受连接操作:
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
监听选择器上的事件,返回已就绪的通道数量:
int count = selector.select();
获取所有事件(连接、读取):
Set<SelectionKey> keys = selector.selectedKeys();
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
public class SelectorServer {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
// 绑定端口并打印通道信息
channel.bind(new InetSocketAddress(6666));
System.out.println(channel);
// 打开选择器并将其与通道注册,监听接受连接操作
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
// 无限循环,等待选择器上的事件
while (true) {
// 监听选择器上的事件,返回已就绪的通道数量
int count = selector.select();
System.out.println("select count: " + count);
// 如果没有就绪的通道,则继续循环等待
if (count <= 0) {
continue;
}
// 获取并迭代处理所有就绪的事件
// 获取所有事件
Set<SelectionKey> keys = selector.selectedKeys();
// 遍历所有事件,逐一处理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 处理接受连接事件
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel sc = c.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
System.out.println("连接已建立:" + sc);
}
// 处理读取数据事件
else if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
int read = sc.read(buffer);
if (read == -1) {
// 如果读取返回-1,表示连接已关闭
key.cancel();
sc.close();
} else {
// 否则,将缓冲区反转并打印读取的数据
buffer.flip();
System.out.println(new String(buffer.array(), StandardCharsets.UTF_8));
}
}
// 事件处理完毕后,从迭代器中移除,避免重复处理
// 处理完毕,必须将事件移除
iter.remove();
}
}
} catch (IOException e) {
// 打印IO异常堆栈跟踪
e.printStackTrace();
}
}
}
4.2 客户端
import netty.ByteBufferUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
public class SelectorClient {
public static void main(String[] args) throws IOException {
// 创建Socket通道并连接到服务器
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 6666));
// 初始化输入和输出ByteBuffer
ByteBuffer inputBuffer = ByteBuffer.allocate(512);
ByteBuffer serverOutput = ByteBuffer.allocate(512);
// 循环接收用户输入并发送给服务器
while (true) {
// 使用Scanner获取用户输入
Scanner in = new Scanner(System.in);
String input = in.nextLine();
System.out.println("user input: " + input);
// 清空输入缓冲区,放入用户输入,然后反转准备写入
inputBuffer.clear();
inputBuffer.put(input.getBytes(StandardCharsets.UTF_8));
inputBuffer.flip();
// 将输入数据写入Socket通道
sc.write(inputBuffer);
System.out.println("send to server " + input);
// 循环读取服务器响应
while (true) {
// 清空服务器响应缓冲区,准备读取数据
serverOutput.clear();
// 从Socket通道读取数据
sc.read(serverOutput);
// 如果没有读取到数据,继续尝试读取
if (!serverOutput.hasRemaining()) {
continue;
}
// 反转缓冲区,读取数据并打印
serverOutput.flip();
System.out.println("server response " + ByteBufferUtil.read(serverOutput));
// 读取完成后退出内层循环
break;
}
}
}
}
4.3 运行截图
5. AIO
异步I/O模型
告诉内核启动某个操作, 并且把数据copy到用户缓冲区再通知我们
5.1 AIO 服务端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
/**
* AIO服务器类,用于演示异步IO的服务器端实现。
* 使用AsynchronousServerSocketChannel处理客户端连接和数据传输。
*/
public class AIOServer {
/**
* 程序入口,初始化并启动AIO服务器。
* 绑定服务器端口并等待客户端连接。
*
* @param args 命令行参数
* @throws IOException 如果绑定端口失败
*/
public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
ssc.bind(new InetSocketAddress(6666));
ssc.accept(null, new AcceptHandler(ssc));
while (true) ;
}
/**
* 关闭客户端通道的方法。
* 用于处理读取或写入操作失败时关闭通道。
*
* @param sc 客户端通道
*/
private static void closeChannel(AsynchronousSocketChannel sc) {
try {
System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 读取数据的完成处理器,实现读取客户端数据并响应的逻辑。
*/
private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;
public ReadHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}
/**
* 当读取操作完成时被调用。
* 解析读取的数据并写回响应到客户端。
*
* @param result 读取操作的结果
* @param attachment 读取操作的附加上下文
*/
@Override
public void completed(Integer result, ByteBuffer attachment) {
try {
if (result == -1) {
return;
}
System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
attachment.flip();
String request = Charset.defaultCharset().decode(attachment).toString();
System.out.println(request.toString());
attachment.clear();
attachment.put(("你好:" + request).getBytes());
attachment.flip();
sc.write(attachment);
attachment.clear();
// 读取下一个读时间
sc.read(attachment, attachment, new ReadHandler(sc));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 当读取操作失败时被调用。
* 关闭客户端通道并打印异常堆栈跟踪。
*
* @param exc 引发的异常
* @param attachment 读取操作的附加上下文
*/
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
closeChannel(sc);
exc.printStackTrace();
}
}
/**
* 接受连接的完成处理器,用于处理客户端的连接请求。
*/
private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
private final AsynchronousServerSocketChannel ssc;
public AcceptHandler(AsynchronousServerSocketChannel ssc) {
this.ssc = ssc;
}
/**
* 当接受操作完成时被调用。
* 设置读取缓冲区并开始读取客户端发送的数据。
*
* @param sc 接受到的客户端通道
* @param attachment 接受操作的附加上下文
*/
@Override
public void completed(AsynchronousSocketChannel sc, Object attachment) {
try {
System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
} catch (IOException e) {
e.printStackTrace();
}
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 读事件由 ReadHandler 处理
System.out.println("开始读");
sc.read(buffer, buffer, new ReadHandler(sc));
System.out.println("读完成");
// 处理完第一个 accept 时,需要再次调用 accept 方法来处理下一个 accept 事件
ssc.accept(null, this);
}
/**
* 当接受操作失败时被调用。
* 打印异常堆栈跟踪。
*
* @param exc 引发的异常
* @param attachment 接受操作的附加上下文
*/
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
}
}
5.2 客户端
同 4.2
5.3 运行截图
6. Channel / Buffer
6.1 Channel
Channel: 传输数据的通道
其实和数据流挺像的,不过数据流是单向的而Channel 是双向的,可以向channel中写数据,也可以从channel中读取数据
NIO 基础组件之 Channel
6.2 ByteBuffer
ByteBuffer是Buffer子类,是字节缓冲区,特点如下所示。
大小不可变。一旦创建,无法改变其容量大小,无法扩容或者缩容;
读写灵活。内部通过指针移动来实现灵活读写;
支持堆上内存分配和直接内存分配
一文搞懂ByteBuffer使用与原理
参考文献
- UNIX 网络编程 卷1: 套接字联网API
- TCP/IP网络编程 尹圣雨 著 金国哲 译
- Linux IO模式及 select、poll、epoll详解
- 浅谈select,poll和epoll的区别
- 黑马 Netty 课程