4.网络编程
4.1.非阻塞 VS 阻塞
在网络编程中,**阻塞(Blocking)和非阻塞(Non-blocking)**是两种不同的编程模型,描述了程序在进行网络通信时的行为方式。
- 阻塞(Blocking):
- 在阻塞模型中,当程序发起一个网络请求时,它会一直等待直到操作完成或者发生错误。
- 在网络通信过程中,如果数据没有到达,或者连接还没有建立,程序会被挂起,直到数据到达或者连接建立完成。
- 在阻塞模型中,通常一个线程只处理一个连接,因此需要为每个连接创建一个新的线程,这会增加系统开销,尤其在高并发环境下,可能导致资源耗尽和性能下降。
- 非阻塞(Non-blocking):
- 在非阻塞模型中,程序可以在发起网络请求后立即返回,不必等待操作完成。
- 如果数据没有到达或者连接尚未建立,程序不会被挂起,而是会立即返回一个状态,告诉调用者当前操作尚未完成。
- 在非阻塞模型中,程序可以不断轮询网络状态,不断尝试进行数据读取或者连接操作,直到操作完成或者发生错误。
- 通过使用非阻塞模型,一个线程可以同时处理多个连接,避免了为每个连接创建新线程的开销,提高了系统的性能和资源利用率。
在实际的网络编程中,可以根据具体的需求和系统性能要求选择合适的编程模型。阻塞模型通常更加简单直观,适用于连接数较少且并发要求不高的场景;而非阻塞模型更加灵活,适用于需要处理大量并发连接的高性能网络应用。
4.1.1.阻塞
阻塞模式 一个线程,可能会影响别的线程运行
accept 会影响 read , read 也会影响 accept
Server
/**
*
* Server 服务端
* @author 13723
* @version 1.0
* 2024/2/20 13:11
*/
public class Server {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static void main(String[] args) throws IOException {
// 使用NIO来理解阻塞模式 单线程进行处理
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1.创建一个ServerSocketChannel 创建一个服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 2.绑定监听端口
ssc.bind(new InetSocketAddress(9000));
// 3.建立一个练级的集合
List<SocketChannel> socketChannelList = new ArrayList<SocketChannel>();
while (true){
logger.error("--------------- connection start ----------------");
// 3.accept 建立和客户端之间的连接,说白了就是和客户端之间进行通信
// 这里会的方法会阻塞,线程会停止运行 (这里会等一个新的连接,如果没有新的连接建立会一直阻塞在这里)
SocketChannel sc = ssc.accept();
logger.error("--------------- connection {} ----------------",sc);
socketChannelList.add(sc);
// 5.介绍客户端发送的数据
for (SocketChannel socketChannel : socketChannelList) {
logger.error("--------------- before read ----------------");
socketChannel.read(buffer);
// 切换为读模式
buffer.flip();
ByteBufferUtil.debugAll(buffer);
// 切换为写模式 重新接收新的数据
buffer.clear();
logger.error("--------------- after read ----------------");
}
}
}
}
Client
/**
* 客户端
* @author 13723
* @version 1.0
* 2024/2/20 13:24
*/
public class Client {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static void main(String[] args) throws IOException {
// 1.创建客户端连接
SocketChannel sc = SocketChannel.open();
// 2.设置连接信息
sc.connect(new InetSocketAddress("localhost",9000));
// 等待
logger.error("--------------- waiting ---------------");
}
}
启动的时候,Server正常进行启动,Client 以debug的方式进行启动
4.1.2.非阻塞
server
缺点:很明显,当我们没有数据的时候
accept 和 read 还再循环
public class Server {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static void main(String[] args) throws IOException {
// 使用NIO来理解阻塞模式 单线程进行处理
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1.创建一个ServerSocketChannel 创建一个服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// TODO 设置为非阻塞模式
ssc.configureBlocking(false);
// 2.绑定监听端口
ssc.bind(new InetSocketAddress(9000));
// 3.建立一个练级的集合
List<SocketChannel> socketChannelList = new ArrayList<SocketChannel>();
while (true){
// logger.error("--------------- connection start ----------------");
// 3.accept 建立和客户端之间的连接,说白了就是和客户端之间进行通信
// 切换成非阻塞模式了,如果没有连接建立 返回的时一个null值
SocketChannel sc = ssc.accept();
if (sc != null){
logger.error("--------------- connection {} ----------------",sc);
sc.configureBlocking(false);
socketChannelList.add(sc);
}
// 5.介绍客户端发送的数据
for (SocketChannel socketChannel : socketChannelList) {
// logger.error("--------------- before read ----------------");
// 编程非阻塞,但是线程仍然会继续运行 如果没有读取到数据 read会返回0
int read = socketChannel.read(buffer);
if (read > 0){
// 切换为读模式
buffer.flip();
ByteBufferUtil.debugAll(buffer);
// 切换为写模式 重新接收新的数据
buffer.clear();
logger.error("--------------- after read ----------------");
}
}
}
}
}
client
客户端代码 和上面一样没有做额外改动
4.2.Selector
介绍选择器Selector之前,先介绍一个概念 IO事件
IO事件
- IO事件表示通道内的某种IO操作已经准备就绪
例如:在Server Scoket通道上发生的一个IO事件,代表一个新的连接已经准备好,这个事件就叫做接收就绪事件。或者说,一个通道内如果有数据可以读取,就会发生一个IO事件,代表该连接数据已经准备好,这个事件就叫做读就绪事件
JavaNIO将NIO事件做了简化,只定义了四个事件,他们用SelectionKey的4个常量来表示
- SelectionKey.OP_CONNECT
- 表示连接就绪事件,用于表示客户端连接建立后触发的事件。客户端的 SocketChannel 关注此事件,以便在连接建立后执行相应的操作。
- SelectionKey.OP_ACCEPT
- 表示接受连接就绪事件,用于表示服务器端有连接请求时触发的事件。服务器端的 ServerSocketChannel 关注此事件,以便在有新的连接请求时执行相应的操作。
- SelectionKey.OP_READ
- 表示读就绪事件,用于表示通道中有数据可以读取的事件。通常由 SocketChannel 关注,以便在通道中有数据可读时执行相应的读取操作。
- SelectionKey.OP_WRITE
- 表示写就绪事件,用于表示通道可以写入数据的事件。通常由 SocketChannel 关注,以便在通道可写入数据时执行相应的写入操作
管理多个channel , 可以发现channel是否有事件发生,有事件发生再去执行 防止cpu空转造成系统资源浪费
/**
*
* Server 服务端
* @author 13723
* @version 1.0
* 2024/2/20 13:11
*/
public class Server {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static void main(String[] args) throws IOException {
// 创建一个Selector对象
Selector selector = Selector.open();
ByteBuffer buffer = ByteBuffer.allocate(16);
// 创建channel
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 建立selector和Channel之间的连接(将Channel注册到Selector中)
// SelectionKey 将来事件发生后,通过它,可以知道哪种事件,是那个Channel发生的事件
// 0 表示不关注任何事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// ** 事件有四种类型
// ?? accept 会在有连接请求时触发 (SelectionKey关注)
// ?? connect 客户端连接建立后触发的事件
// ?? read 可读事件 (SocketChannel关注)
// ?? write 可写事件(SocketChannel关注)
// 设置具体的事件 (设置只关注 accept事件);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
// 2.绑定监听端口
ssc.bind(new InetSocketAddress(9000));
while (true){
// 3.调用selector的select方法(没有事件发生,那么还是阻塞的)
// !! 注意Selector在编程时,未处理时,不会阻塞会一直进行执行(要么处理,要么取消 不能不管)
selector.select();
// 4.处理事件(selectionKeys 中所有的可用的事件)
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
// 遍历的时候 想要删除 必须使用迭代器遍历
while (iterator.hasNext()){
SelectionKey key = iterator.next();
logger.error("key : {}",key);
// 拿到对应channel
ServerSocketChannel channel = (ServerSocketChannel)key.channel();
// 建立连接
SocketChannel accept = channel.accept();
logger.error("accept : {}",accept);
// 一个处理是accept 还可以进行取消
// key.cancel();
}
}
}
}
4.2.1处理read
读取数据 每个channel 里面 针对不同的事件类型 又创建了不同的channel进行维护
public class Server {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static void main(String[] args) throws IOException {
// 创建一个Selector对象
Selector selector = Selector.open();
// 创建channel
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 建立selector和Channel之间的连接(将Channel注册到Selector中)
// SelectionKey 将来事件发生后,通过它,可以知道哪种事件,是那个Channel发生的事件
// 0 表示不关注任何事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// ** 事件有四种类型
// ?? accept 会在有连接请求时触发 (SelectionKey关注)
// ?? connect 客户端连接建立后触发的事件
// ?? read 可读事件 (SocketChannel关注)
// ?? write 可写事件(SocketChannel关注)
// 设置具体的事件 (设置只关注 accept事件);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
// 2.绑定监听端口
ssc.bind(new InetSocketAddress(9000));
while (true){
// 3.调用selector的select方法(没有事件发生,那么还是阻塞的)
// !! 注意Selector在编程时,未处理时,不会阻塞会一直进行执行(要么处理,要么取消 不能不管)
selector.select();
// 4.处理事件(selectionKeys 中所有的可用的事件)
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
// 遍历的时候 想要删除 必须使用迭代器遍历
while (iterator.hasNext()){
SelectionKey key = iterator.next();
logger.error("key : {}",key);
// 5.区分事件类型
if (key.isAcceptable()) {
// accept事件
// 拿到对应channel
ServerSocketChannel channel = (ServerSocketChannel)key.channel();
// 建立连接
SocketChannel sc = channel.accept();
// 设置channel为非阻塞的
sc.configureBlocking(false);
// 将管理权交给selector(负责管理当前处理的channel)
SelectionKey scKey = sc.register(selector, 0, null);
// 注意 这里是read
scKey.interestOps(SelectionKey.OP_READ);
logger.error("sc : {}",sc);
}else if (key.isReadable()){
// 读取数据的事件
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
channel.read(buffer);
// 切换为读模式
buffer.flip();
ByteBufferUtil.debugRead(buffer);
}
}
}
}
}
4.2.2.用完key之后为什么要remove
重点:SelectedKey 只会往里面添加 key ,但是不会进行删除(也就是事件处理完成后,会标记成处理,但是不会删除)
// TODO 删除key 一定要删除
// SelectedKey 只会往里面添加 key ,但是不会进行删除(也就是事件处理完成后,会标记成处理,但是不会删除)
// 不然下次进来还是上一个Key上一个Key是没有事件,所有会报空指针
// 这就是这里要使用迭代器的原因,迭代器可以边遍历边删除,forEach不行
iterator.remove();
4.2.3.处理客户端断开
else if (key.isReadable()){
try {
// 读取数据的事件
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
// 如果是正常断开。那么read返回的是-1 因为每次断开都会触发一次读事件
int read = channel.read(buffer);
if (read == -1){
// 删除key
key.cancel();
}else {
// 切换为读模式
buffer.flip();
ByteBufferUtil.debugRead(buffer);
// TODO 删除key 一定要删除
// SelectedKey 只会往里面添加 key ,但是不会进行删除(也就是事件处理完成后,会标记成处理,但是不会删除)
// 不然下次进来还是上一个Key上一个Key是没有事件,所有会报空指针
iterator.remove();
}
}catch (Exception e){
// 客户端关闭了,这里需要将key从SelectedKey集合中真正的删除
e.printStackTrace();
key.cancel();
}
}
正常断开
// 客户端 需要手动调用close
public class Client {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static void main(String[] args) throws IOException {
// 1.创建客户端连接
SocketChannel sc = SocketChannel.open();
// 2.设置连接信息
sc.connect(new InetSocketAddress("localhost",9000));
// 等待
logger.error("--------------- waiting ---------------");
// 正常断开 不写就是异常断开
sc.close();
}
}
异常断开(强制断开)
4.2.4.处理消息边界
当客户端发动的服务端的中文信息过长时,就可能会出现乱码的情况
- 一种思路是,固定消息的长度,数据包的大小一样,服务器按照预定长度读取,缺点是浪费带宽
- 另一种思路是按照分隔符拆分,缺点是效率低下
- TLV格式,Type类型,Length长度,Value数据,类型和长度已知情况下,就可以方便获取消息大小,分配合适的buffer,缺点是buffer需要提前分配,如果内容过大,则会影响server吞吐量
- HTTP 1.1 是LTV格式
- HTTP 2.0 是LTV格式
server
每次将ByteBuffer作为参数进行传递 也就是 通过
服务端 注册
ByteBuffer buffer = ByteBuffer.allocate(15);
SelectionKey scKey = sc.register(selector, 0, buffer); 客户端 获取 重新设置
ByteBuffer buffer = (ByteBuffer) key.attachment();
key.attach(newByteBuffer);
public class Server {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static void main(String[] args) throws IOException {
// 创建一个Selector对象
Selector selector = Selector.open();
// 创建channel
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 建立selector和Channel之间的连接(将Channel注册到Selector中)
// SelectionKey 将来事件发生后,通过它,可以知道哪种事件,是那个Channel发生的事件
// 0 表示不关注任何事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// ** 事件有四种类型
// ?? accept 会在有连接请求时触发 (SelectionKey关注)
// ?? connect 客户端连接建立后触发的事件
// ?? read 可读事件 (SocketChannel关注)
// ?? write 可写事件(SocketChannel关注)
// 设置具体的事件 (设置只关注 accept事件);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
// 2.绑定监听端口
ssc.bind(new InetSocketAddress(9000));
while (true){
// 3.调用selector的select方法(没有事件发生,那么还是阻塞的)
// !! 注意Selector在编程时,未处理时,不会阻塞会一直进行执行(要么处理,要么取消 不能不管)
selector.select();
// 4.处理事件(selectionKeys 中所有的可用的事件)
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
// 遍历的时候 想要删除 必须使用迭代器遍历
while (iterator.hasNext()){
SelectionKey key = iterator.next();
logger.error("key : {}",key);
// 5.区分事件类型
if (key.isAcceptable()) {
// accept事件
// 拿到对应channel
ServerSocketChannel channel = (ServerSocketChannel)key.channel();
// 建立连接
SocketChannel sc = channel.accept();
// 设置channel为非阻塞的
sc.configureBlocking(false);
// 将管理权交给selector(负责管理当前处理的channel)
//!! 1.将ByteBuffer 注册到SelectionKey中,这样保证每个人SelectionKey都有一个独有的ByteBuff
//!! 这种称为附件 attachment
//!! buffer不在作为局部变量了
ByteBuffer buffer = ByteBuffer.allocate(15);
SelectionKey scKey = sc.register(selector, 0, buffer);
// 注意 这里是read
scKey.interestOps(SelectionKey.OP_READ);
logger.error("sc : {}",sc);
// TODO 删除key 一定要删除
// SelectedKey 只会往里面添加 key ,但是不会进行删除(也就是事件处理完成后,会标记成处理,但是不会删除)
// 不然下次进来还是上一个Key上一个Key是没有事件,所有会报空指针
iterator.remove();
}else if (key.isReadable()){
try {
// 读取数据的事件
SocketChannel channel = (SocketChannel) key.channel();
// !!2.从读事件中 拿到附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
// 为了保证每个Channel都有一个独有的ByteBuffer
// 如果是正常断开。那么read返回的是-1 因为每次断开都会触发一次读事件
int read = channel.read(buffer);
if (read == -1){
// 删除key
key.cancel();
}else {
// 切换为读模式
buffer.flip();
split(buffer);
// !!3.判断一次是否读取完全
// 如果position 和 limit一样 说明没有读取完成,需要扩容
if (buffer.position() == buffer.limit()){
ByteBuffer newByteBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
// 新的bytebuffer 是旧的两倍 将旧的ByteBuffer内容设置的到新的中
buffer.flip();
newByteBuffer.put(buffer);
// 新的buffer替换原来的buffer
key.attach(newByteBuffer);
}
iterator.remove();
}
}catch (Exception e){
// 客户端关闭了,这里需要将key从SelectedKey集合中真正的删除
e.printStackTrace();
key.cancel();
}
}
}
}
}
private static void split(ByteBuffer source) {
// 找到一个完整消息, \n
for (int i = 0; i < source.limit(); i++) {
if (source.get(i) == '\n') {
// 计算消息的长度 (换行符合 + 1 - 起始索引(就是ByteBuffer的Position))
int length = i + 1 - source.position();
// 找到一个完整消息了(get(i)不会移动指针)
ByteBuffer target = ByteBuffer.allocate(length);
// 从source读取,向target写
for (int j = 0; j < length; j++) {
target.put(source.get());
}
// 打印拆出来的信息
ByteBufferUtil.debugAll(target);
}
}
// 因为可能还有没有读取完成的数据,比如一半的数据,留给下次读取
source.compact();
}
}
4.2.5.ByteBuffer大小的分配
- 每个channel都需要记录可能被切分的消息,因为ByteBuffer不能被多个channel共同使用,因此需要为每个channel维护—个独立的 ByteBuffer
- ByteBuffer 不能太大,比如一个ByteBuffer 1Mb的话,要支持百万连接就要1Tb内存,因此需要设计大小可变的 ByteBuffer
- 一种思路是首先分配一个较小的buffer,例如4k,如果发现数据不够,再分配8k的buffer,将4kbuffer内容拷贝至8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现http://tutorials,jenkov.com/java-performance/resizable-array.html
- 另一种思路是用多个数组组成buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗
4.2.6.写入内容过多问题
服务端一次向客户端写入太大的数据
服务端
public class WriteServer {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
// 直接关注 accept事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(9000));
while (true){
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()){
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// 向客户端发送大量数据
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 通过Channel写入数据 并不能保证一次将所有数据写入到 客户端
// 返回值代表实际写入的字节数
while (buffer.hasRemaining()){
int write = sc.write(buffer);
logger.error("实际写入的字节数:{}",write);
}
}
}
}
}
}
客户端
public class WriteClient {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",9000));
// 3.接收数据
int count = 0;
while (true){
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
logger.error("接收的字节数:{}",count);
buffer.compact();
}
}
}
问题
改进
思路就是:先尝试写一次 如果一次没写完,那么就在关联一个SelectionKey,继续写,就不用while循环一直在那里尝试写了,注意的是,SelectionKey 是可以 进行相加的,比如 既可以读 也可以 ,通过附件 attach传递没有发送完的数据。
注意 读取完成后 记得把数据释放掉。
while (true){
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()){
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
// !! 这里可能原来的是读取事件
scKey.interestOps(SelectionKey.OP_READ);
// 向客户端发送大量数据
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 30000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 通过Channel写入数据 并不能保证一次将所有数据写入到 客户端
// 返回值代表实际写入的字节数
int write = sc.write(buffer);
logger.error("实际写入的字节数:{}",write);
// 先尝试写了一次,然后观察是否还有剩余内容
if(buffer.hasRemaining()){
// 关注一个写事件
// !! 这里又加了一个写事件,为了防止把原先的事件覆盖,所以这里需要加上原来事件
// 读事件 1 写事件 4 加一起 等于5 说明 又关注读又关注写
scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
// 要把未写完的数据 放到SelectionKey中
scKey.attach(buffer);
}
} else if (key.isWritable()) {
// 把上一次 buffer取出来, 关注的socketChannel拿出来
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
// 继续写(数据量很多 就会反复进入可写事件)
int write = sc.write(buffer);
logger.error("实际写入的字节数:{}",write);
// 写完清理附件
if (!buffer.hasRemaining()){
// 内容写完了 清楚buffer 可写事件 也不需要进行关联了
key.attach(null);
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
}
}
}
}