【Netty】nio处理acceptreadwrite事件

       📝个人主页:五敷有你      

 🔥系列专栏:Netty

⛺️稳中求进,晒太阳

1.处理accept

1.1客户端代码

public class Client {
    public static void main(String[] args) {
        try (Socket socket = new Socket("localhost", 8080)) {
            System.out.println(socket);
            socket.getOutputStream().write("world".getBytes());
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

1.2 服务端代码

@Slf4j
public class ChannelDemo {
    public static void main(String[] args) {
        try (ServerSocketChannel channel = ServerSocketChannel.open()) {
            channel.bind(new InetSocketAddress(8080));
            System.out.println(channel);
            Selector selector = Selector.open();
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                int count = selector.select();
//                int count = selector.selectNow();
                log.debug("select count: {}", count);
//                if(count <= 0) {
//                    continue;
//                }

                // 获取所有事件
                Set<SelectionKey> keys = selector.selectedKeys();

                // 遍历所有事件,逐一处理
                Iterator<SelectionKey> iter = keys.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    // 判断事件类型
                    if (key.isAcceptable()) {
                        ServerSocketChannel c = (ServerSocketChannel) key.channel();
                        // 必须处理
                        SocketChannel sc = c.accept();
                        log.debug("{}", sc);
                    }
                    // 处理完毕,必须将事件移除
                    iter.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

💡 事件发生后能否不处理

事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发

水平触发是什么

参考这篇文章:细说八股 | NIO的水平触发和边缘触发到底有什么区别? - 掘金 (juejin.cn)

2. 处理read事件

2.1 服务端代码

@Slf4j
public class ChannelDemo {
    public static void main(String[] args) {
        try (ServerSocketChannel channel = ServerSocketChannel.open()) {
            channel.bind(new InetSocketAddress(8080));
            System.out.println(channel);
            Selector selector = Selector.open();
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                int count = selector.select();
//                int count = selector.selectNow();
                log.debug("select count: {}", count);
//                if(count <= 0) {
//                    continue;
//                }

                // 获取所有事件
                Set<SelectionKey> keys = selector.selectedKeys();

                // 遍历所有事件,逐一处理
                Iterator<SelectionKey> iter = keys.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    // 判断事件类型
                    if (key.isAcceptable()) {
                        ServerSocketChannel c = (ServerSocketChannel) key.channel();
                        // 必须处理
                        SocketChannel sc = c.accept();
                        sc.configureBlocking(false);
                        sc.register(selector, SelectionKey.OP_READ);
                        log.debug("连接已建立: {}", sc);
                    } else if (key.isReadable()) {
                        SocketChannel sc = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(128);
                        int read = sc.read(buffer);
                        if(read == -1) {
                            key.cancel();
                            sc.close();
                        } else {
                            buffer.flip();
                            debug(buffer);
                        }
                    }
                    // 处理完毕,必须将事件移除
                    iter.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

💡 为何要 iter.remove()

因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如

  • 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey

  • 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

💡 cancel 的作用

cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件。

注意:

关闭远程连接会发送一次可读事件,返回值为-1,需要cancel和close和remove

⚠️ 不处理边界的问题

服务端

public class Server {
    public static void main(String[] args) throws IOException {
        ServerSocket ss=new ServerSocket(9000);
        while (true) {
            Socket s = ss.accept();
            InputStream in = s.getInputStream();
            // 这里这么写,有没有问题
            byte[] arr = new byte[4];
            while(true) {
                int read = in.read(arr);
                // 这里这么写,有没有问题
                if(read == -1) {
                    break;
                }
                System.out.println(new String(arr, 0, read));
            }
        }
    }
}

客户端

public class Client {
    public static void main(String[] args) throws IOException {
        Socket max = new Socket("localhost", 9000);
        OutputStream out = max.getOutputStream();
        out.write("hello".getBytes());
        out.write("world".getBytes());
        out.write("你好".getBytes());
        max.close();
    }
}

输出

为什么呢???????

处理消息的边界(3种)

  1. 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽

  2. 另一种思路是按分隔符拆分,缺点是效率低

  3. TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量

服务端

下述代码:按分隔符拆分,然后动态扩容

private static void split(ByteBuffer source) {
    source.flip();
    for (int i = 0; i < source.limit(); i++) {
        // 找到一条完整消息
        if (source.get(i) == '\n') {
            int length = i + 1 - source.position();
            // 把这条完整消息存入新的 ByteBuffer
            ByteBuffer target = ByteBuffer.allocate(length);
            // 从 source 读,向 target 写
            for (int j = 0; j < length; j++) {
                target.put(source.get());
            }
            debugAll(target);
        }
    }
    source.compact(); // 0123456789abcdef  position 16 limit 16
}

public static void main(String[] args) throws IOException {
    // 1. 创建 selector, 管理多个 channel
    Selector selector = Selector.open();
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false);
    // 2. 建立 selector 和 channel 的联系(注册)
    // SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件
    SelectionKey sscKey = ssc.register(selector, 0, null);
    // key 只关注 accept 事件
    sscKey.interestOps(SelectionKey.OP_ACCEPT);
    log.debug("sscKey:{}", sscKey);
    ssc.bind(new InetSocketAddress(8080));
    while (true) {
        // 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
        // select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理
        selector.select();
        // 4. 处理事件, selectedKeys 内部包含了所有发生的事件
        Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
        while (iter.hasNext()) {
            SelectionKey key = iter.next();
            // 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
            iter.remove();
            log.debug("key: {}", key);
            // 5. 区分事件类型
            if (key.isAcceptable()) { // 如果是 accept
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                SocketChannel sc = channel.accept();
                sc.configureBlocking(false);
                ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
                // 将一个 byteBuffer 作为附件关联到 selectionKey 上
                SelectionKey scKey = sc.register(selector, 0, buffer);
                scKey.interestOps(SelectionKey.OP_READ);
                log.debug("{}", sc);
                log.debug("scKey:{}", scKey);
            } else if (key.isReadable()) { // 如果是 read
                try {
                    SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
                    // 获取 selectionKey 上关联的附件
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1
                    if(read == -1) {
                        key.cancel();
                    } else {
                        split(buffer);
                        // 需要扩容
                        if (buffer.position() == buffer.limit()) {
                            ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                            buffer.flip();
                            newBuffer.put(buffer); // 0123456789abcdef3333\n
                            key.attach(newBuffer);
                        }
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                    key.cancel();  // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
                }
            }
        }
    }
}

客户端

SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
SocketAddress address = sc.getLocalAddress();
// sc.write(Charset.defaultCharset().encode("hello\nworld\n"));
sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
System.in.read();

ByteBuffer 大小分配
  • 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer

  • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer

    • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能。

    • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

3.处理Write事件

write就是将buffer中的内容通过channel传输过去。

  • 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)

  • 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略

    • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上

    • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册

    • 如果不取消,会每次可写均会触发 write 事件

public class WriteServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8080));

        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        while(true) {
            selector.select();

            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
                    // 1. 向客户端发送内容
                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < 3000000; i++) {
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    int write = sc.write(buffer);
                    // 3. write 表示实际写了多少字节
                    System.out.println("实际写入字节:" + write);
                    // 4. 如果有剩余未读字节,才需要关注写事件
                    if (buffer.hasRemaining()) {
                        // read 1  write 4
                        // 在原有关注事件的基础上,多关注 写事件
                        sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
                        // 把 buffer 作为附件加入 sckey
                        sckey.attach(buffer);
                    }
                } else if (key.isWritable()) {
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    SocketChannel sc = (SocketChannel) key.channel();
                    int write = sc.write(buffer);
                    System.out.println("实际写入字节:" + write);
                    if (!buffer.hasRemaining()) { // 写完了
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                        key.attach(null);
                    }
                }
            }
        }
    }
}

客户端

public class WriteClient {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        SocketChannel sc = SocketChannel.open();
        sc.configureBlocking(false);
        sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
        sc.connect(new InetSocketAddress("localhost", 8080));
        int count = 0;
        while (true) {
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isConnectable()) {
                    System.out.println(sc.finishConnect());
                } else if (key.isReadable()) {
                    ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
                    count += sc.read(buffer);
                    buffer.clear();
                    System.out.println(count);
                }
            }
        }
    }
}
💡 write 为何要取消

只要向 channel 发送数据时,socket 缓冲可写,这个事件会频繁触发,因此应当只在 socket 缓冲区写不下时再关注可写事件,数据写完之后再取消关注.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/719069.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

泛微开发修炼之旅--19ecode获取用户人员信息方案汇总及代码示例(含pc端和移动端)

文章详情链接&#xff1a;19ecode获取用户人员信息方案汇总及代码示例&#xff08;含pc端和移动端&#xff09;

Web前端开发实战:HTML5+CSS3+JavaScript+Vue+Bootstrap

&#x1f482; 个人网站:【 摸鱼游戏】【神级代码资源网站】【工具大全】&#x1f91f; 一站式轻松构建小程序、Web网站、移动应用&#xff1a;&#x1f449;注册地址&#x1f91f; 基于Web端打造的&#xff1a;&#x1f449;轻量化工具创作平台&#x1f485; 想寻找共同学习交…

C#语言中的Async/await最佳实践

自从 C# 5 中引入 async/await 以来&#xff0c;开发人员之间一直对 async/await 关键字的最佳实践以及幕后实际发生的事情感到困惑。 让我们先从基础开始。 在 Windows 窗体的早期&#xff0c;UI 延迟与 I/O 操作所花费的时间成正比。这意味着&#xff0c;如果您尝试将数据保…

ResNet——Deep Residual Learning for Image Recognition(论文阅读)

论文名&#xff1a;Deep Residual Learning for Image Recognition 论文作者&#xff1a;Kaiming He et.al. 期刊/会议名&#xff1a;CVPR 2016 发表时间&#xff1a;2015-10 ​论文地址&#xff1a;https://arxiv.org/pdf/1512.03385 1.什么是ResNet ResNet是一种残差网络&a…

【八股系列】介绍React高阶组件,适用于什么场景?

文章目录 1. HOC的工作原理2. 返回的新组件3. 适用场景4. 注意事项5. 示例代码 React高阶组件&#xff08; Higher-Order Components&#xff0c;简称HOC&#xff09;是 React中一种高级的 复用组件逻辑的技术。 HOC自身不是 React API的一部分&#xff0c;而是基于 Reac…

【Spine学习05】之网格权重概念,以及让小臂动起来

上一节绑定好骨骼设置好了父级之后呢&#xff0c; 会发现操纵只有大臂能摆动&#xff0c;但是小臂以及手部无法K帧动起来。 当然如果你拿到的图片分层非常细&#xff0c;大小腿分开 例如这种的&#xff0c;铠甲勇士&#xff0c;那么其实绑不绑定权重意义不大&#xff0c; 因为骨…

职教本科人工智能工程技术教学解决方案

前言 随着人工智能技术的迅猛发展&#xff0c;其在各行各业的应用日益广泛&#xff0c;对高层次技术技能型人才的需求也愈发迫切。在这一背景下&#xff0c;职业教育本科层次的人工智能工程技术专业应运而生&#xff0c;旨在培养能够从事人工智能数据处理、模型构建、系统应用研…

echarts legend 背景色渐变

问题与本文无关&#xff1a;如果检测软件显示loadsh.js 的版本是4.17.10 装element-ui 2.15.8版本以下&#xff0c;2.15.6经过测试可以 代码&#xff1a; <template><div class"levelMain"><div class"survey-head"><div class"…

【思考】Vue2响应丢失、$set

【思考】Vue2响应丢失、$set vue2响应丢失情况复现原因解决总结 vue2响应丢失情况复现 场景&#xff1a;直接通过数组下标去修改数组造成响应丢失 <template><div><p v-for"(item, index) in list" :key"index">{{item}}</p><…

同三维T80004EHL-W-4K30 4K HDMI编码器,支持WEBRTC协议

输入&#xff1a;1路HDMI1路3.5音频&#xff0c;1路HDMI环出1路3.5音频解嵌输出 4K30超高清,支持U盘/移动硬盘/TF卡录制&#xff0c;支持WEBRTC协议&#xff0c;超低延时&#xff0c;支持3个点外网访问 1个主流1个副流输出&#xff0c;可定制选配POE供电模块&#xff0c;WEBR…

【Vue】——组件的注册与引用(二)

&#x1f4bb;博主现有专栏&#xff1a; C51单片机&#xff08;STC89C516&#xff09;&#xff0c;c语言&#xff0c;c&#xff0c;离散数学&#xff0c;算法设计与分析&#xff0c;数据结构&#xff0c;Python&#xff0c;Java基础&#xff0c;MySQL&#xff0c;linux&#xf…

VBA学习(4):一键生成Sheet表目录

当Sheet表非常多的时候&#xff0c;一般我们会在第一张工作表中做一张目录&#xff0c;方便快速查找定位相应表格&#xff0c;以下示例将介绍如何通过宏程序一键生成目录。 效果如下&#xff1a; 参考代码如下&#xff1a; Sub SheetList()Dim sht As Worksheet, i As Long, s…

uniapp canvas生成海报

效果 封装组件&#xff0c;父组件 ref 调用 downImgUrl()函数&#xff0c;其他根据自己需求改 <template><view><view class"bgpart"><canvas class"canvas-wrap" canvas-id"canvasID" type"2d"></canvas…

GPT3.5的PPO目标函数怎么来的:From PPO to PPO-ptx

给定当前优化的大模型 π \pi π&#xff0c;以及SFT模型 π S F T \pi_{SFT} πSFT​ 原始优化目标为: max ⁡ E ( s , a ) ∼ R L [ π ( s , a ) π S F T ( s , a ) A π S F T ( s , a ) ] \max E_{(s,a)\sim RL}[\frac{\pi(s,a)}{\pi_{SFT}(s,a)}A^{\pi_{SFT}}(s,a)] m…

编程精粹—— Microsoft 编写优质无错 C 程序秘诀 03:强化你的子系统

这是一本老书&#xff0c;作者 Steve Maguire 在微软工作期间写了这本书&#xff0c;英文版于 1993 年发布。2013 年推出了 20 周年纪念第二版。我们看到的标题是中译版名字&#xff0c;英文版的名字是《Writing Clean Code ─── Microsoft’s Techniques for Developing》&a…

windows无法完成格式化

方法. 使用CMD格式化 请将U盘连接到电脑&#xff0c;并确保电脑能够正常识别。 1. 在搜索框中输入“命令提示符”。在左侧结果中的“命令提示符”上点击右键&#xff0c;选择“以管理员身份运行”。 2. 在新窗口中&#xff0c;键入“diskpart”并按“回车”&#xff0c;然后…

如何通过小猪APP分发轻松实现Web封装APP

你有没有想过将你的网站或者Web应用变成一个真正的APP&#xff1f;这听起来可能有点复杂&#xff0c;但其实在今天的技术环境下&#xff0c;这已经变得非常简单了。特别是有了像小猪APP分发这样的工具&#xff0c;你可以轻松地将你的Web应用封装成一个APP。 为什么要将Web应用封…

Golang | Leetcode Golang题解之第164题最大间距

题目&#xff1a; 题解&#xff1a; type pair struct{ min, max int }func maximumGap(nums []int) (ans int) {n : len(nums)if n < 2 {return}minVal : min(nums...)maxVal : max(nums...)d : max(1, (maxVal-minVal)/(n-1))bucketSize : (maxVal-minVal)/d 1// 存储 (…

如何在不丢失数据的情况下解锁安卓手机密码

手机是我们生活中必不可少的工具&#xff0c;可以帮助我们与朋友和家人保持联系&#xff0c;了解最新消息&#xff0c;甚至经营我们的业务。然而&#xff0c;当我们在 Android 手机或 iPhone 上设置密码时&#xff0c;我们经常会忘记密码&#xff0c;或者根本没有设置密码。当这…

安卓多媒体(音频录播、传统摄制、增强摄制)

本章介绍App开发常用的一些多媒体处理技术&#xff0c;主要包括&#xff1a;如何录制和播放音频&#xff0c;如何使用传统相机拍照和录像&#xff0c;如何截取视频画面&#xff0c;如何使用增强相机拍照和录像。 音频录播 本节介绍Android对音频的录播操作&#xff0c;内容包…