Reactor模型:网络线程模型演进

一,阻塞IO线程池模型(BIO)


这是传统的网络编程方案所采用的线程模型。
即有一个主循环,socket.accept阻塞等待,当建立连接后,创建新的线程/从线程池中取一个,把该socket连接交由新线程全权处理。
这种方案优点即实现简单,缺点则是方案的伸缩性受到线程数的限制。
// 循环监听
while (true) {
    // 阻塞监听客户端请求
    client = server.accept();
    System.out.println(client.getRemoteSocketAddress() + "客户端连接成功!");
    // 将该客户端请求通过线程池放入HandlMsg线程中进行处理
    executorService.execute(new HandleMsg(client));
}

二,Reactor单线程模型


有了NIO后,可以采用IO多路复用机制了。
这是一个单Reactor单线程模型,时序图见下文,该方案只有一个线程,所有Channel的连接均注册在了该Reactor上,由一个线程全权负责所有的任务。
这种方案实现简单,且不受线程数的限制,但受限于使用场景,仅适合于IO密集的应用,不太适合CPU密集的应用,且适合于CPU资源紧张的应用上。

三,Reactor线程池模型


Reactor负责全部IO任务(包括每个Channel的连接和读写),线程池负责业务逻辑的处理。
虽然该方案可以充分利用CPU资源,但是这个方案比单线程版本多了进出Thread Pool的两次上下文切换。

四,主从Reactor模型(Netty的线程模型)


  • MainReactor负责连接任务,SubReactor负责IO读写、业务计算。
  • MainReactor和每个SubReactor都是单独的线程,可以调整SubReactor的数量适应CPU资源紧张的应用。
  • 该方案有一个不太明显的缺点,即Session没有分优先级,所有Session平等对待均分到所有的线程中,这样可能会导致优先级低耗资源的Session堵塞高优先级的Session。( TODO 看下Netty的优化

五,主从Reactor线程池模型


和主从Reactor模型相比, 只是把业务计算放到线程池里了,IO读写还是在SubReactor线程里。
该模型可以更为灵活的适应大多应用场景,通过:调整SubReactor数量、调整Thread Pool参数等。
注意:
  1. 如果将IO读写放到线程池里,可能会出现问题:SubReactor选中读就绪事件立马交给线程池,但线程还没来得及read,Channel由于仍然读就绪被select出来重复执行。
  2. 上图这样把Channel的读写放在SubReactor,那么此SubReactor上不同Channel的读写会阻塞,但可能效率很高也问题不大。
主从Reactor线程池模型代码示例(调试过了,注意细节见注释)
客户端
public class ReactorClient {

    public static void main(String[] args) throws IOException, InterruptedException {
        for (int i = 0; i < 4; i++) {
            new Thread(() -> {
                try {
                    send();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    static void send() throws IOException {
        // 阻塞模式读写
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 9090));
        ByteBuffer writeBuff = ByteBuffer.allocate(20);

        /**
         * 分配太小时,客户端表现:接收数据不完整,但正常退出;服务端表现:业务读写正常,但业务结束后会收到2次读就绪事件,一次读返-1,关闭channel,一次读就会报java.io.IOException: Connection reset by peer
         * TODO 研究下这个原理和如何分配大小
         */
        ByteBuffer readBuff = ByteBuffer.allocate(2000);

        writeBuff.put(("i am client " + Thread.currentThread().getName()).getBytes());
        writeBuff.flip();

        new Thread(new Runnable() {
            @Override
            @SneakyThrows
            public void run() {
                socketChannel.write(writeBuff);
                System.out.println(Thread.currentThread().getName() + " 已发送数据,等待返回");
                readBuff.clear();

                // 阻塞等服务端消息
                socketChannel.read(readBuff);

                readBuff.flip();
                System.out.println(Thread.currentThread().getName() + " 接受服务端消息:" + new String(readBuff.array()));

                // 正常来讲应放入finally
                socketChannel.close();
            }
        }).start();
    }
}

服务端

/**
 * 主从Reactor多线程模型
 */
public class MainSubReactorMultiThread {

    private static final int SUB_COUNT = 4;

    public static void main(String[] args) {
        MainSubReactorMultiThread.MainReactor mainReactor = new MainSubReactorMultiThread.MainReactor(9090);
        mainReactor.run();
    }

    /**
     * 选择就绪的连接事件
     */
    public static class MainReactor implements Runnable {
        ServerSocketChannel serverSocketChannel;
        Selector selector;

        public MainReactor(int port) {
            try {
                serverSocketChannel = ServerSocketChannel.open();
                selector = Selector.open();
                serverSocketChannel.socket().bind(new InetSocketAddress(port));
                serverSocketChannel.configureBlocking(false);
                // 注册了连接事件
                SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                // 并且在selectionKey对象附加了一个Acceptor对象,这是用来处理连接请求的类
                selectionKey.attach(new MainSubReactorMultiThread.Acceptor(serverSocketChannel));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void run() {
            while (true) {
                try {
                    System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "mainSelector, 开始监听");
                    selector.select();
                    System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "mainSelector, 监听到连接件");
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        // 这里因为是通过attach附加了事件响应的Runnable,所以不用区分事件类型
                        dispatcher(selectionKey);
                        iterator.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        private void dispatcher(SelectionKey selectionKey) {
            Runnable runnable = (Runnable) selectionKey.attachment();
            // 同线程执行
            runnable.run();
        }
    }

    /**
     * 选择就绪的读写事件
     */
    public static class SubReactor implements Runnable {
        Selector subSelector;
        int index;

        public SubReactor(Selector subSelector, int index) {
            this.subSelector = subSelector;
            this.index = index;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "subSelector:" + index + ", 开始监听");
                    int selectNum = subSelector.select();
                    if (selectNum != 0) {
                        System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "subSelector:" + index + ", 监听到就绪事件:" + JSON.toJSONString(subSelector.selectedKeys()));
                    } else {
                        System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "subSelector:" + index + ", 未监听到事件,继续轮训");
                        continue;
                    }
                    Set<SelectionKey> selectionKeys = subSelector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        // 这里因为是通过attach附加了事件响应的Runnable,所以不用区分事件类型
                        dispatcher(selectionKey);
                        iterator.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        @SneakyThrows
        private void dispatcher(SelectionKey selectionKey) {
            while (true) {
                Runnable runnable = (Runnable) selectionKey.attachment();
                if (runnable != null) {
                    // 同线程执行
                    runnable.run();
                    return;
                }
                System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "subSelector:" + index + ", runnable对象未添加完成,等待10ms");
                Thread.sleep(10);
            }

            /**
             * 可能在Acceptor里刚注册channel到selector就被reactor选中执行了,这时注册channel的地方还没执行attach方法,runnable会报NPE,所以要判空
             */
//            Runnable runnable = (Runnable) selectionKey.attachment();
//            runnable.run();
        }
    }

    /**
     * 处理连接
     */
    public static class Acceptor implements Runnable {

        private static Selector[] subSelector = new Selector[SUB_COUNT];

        private ServerSocketChannel serverSocketChannel;

        /**
         * 单线程不会冲突
         */
        private int index = -1;

        @SneakyThrows
        public Acceptor(ServerSocketChannel serverSocketChannel) {
            for (int i = 0; i < SUB_COUNT; i++) {
                subSelector[i] = Selector.open();
                SubReactor subReactor = new SubReactor(subSelector[i], i);
                new Thread(subReactor).start();
            }
            this.serverSocketChannel = serverSocketChannel;
        }

        @Override
        public void run() {
            try {
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
                int ind = getNextIndex();
                /**
                 * 本来以为没必要的,但如果不wakeup,会在下一步register阻塞!底层在等待synchronized同步锁
                 * TODO 研究下原理
                 */
                subSelector[ind].wakeup();
                SelectionKey selectionKey = socketChannel.register(subSelector[ind], SelectionKey.OP_READ);
                selectionKey.attach(new MainSubReactorMultiThread.ThreadPollWorkHandler(socketChannel));
                System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "客户端已连接:" + socketChannel.getRemoteAddress());
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private int getNextIndex() {
            if (index++ == SUB_COUNT - 1) {
                index = 0;
            }
            return index;
        }
    }

    /**
     * 处理读写
     */
    public static class ThreadPollWorkHandler implements Runnable {

        private static ExecutorService executorService = Executors.newCachedThreadPool();

        private SocketChannel socketChannel;

        public ThreadPollWorkHandler(SocketChannel socketChannel) {
            this.socketChannel = socketChannel;
        }

       
        @Override
        public void run() {
            try {
                System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "socketChannel:" + socketChannel.hashCode() + ", " + "开始处理socket读");

                /**
                 * 读数据
                 */
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                int readLength = socketChannel.read(byteBuffer);
                if (readLength == -1) {
                    System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "socketChannel:" + socketChannel.hashCode() + ", " + "客户端已关闭,关闭此通道");
                    socketChannel.close();
                    return;
                }
                String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
                System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "socketChannel:" + socketChannel.hashCode() + ", 客户端:" + socketChannel.getRemoteAddress() + ", socket读完成: " + message);

                /**
                 * 线程池处理业务计算
                 */
                TaskHandler taskHandler = new TaskHandler(socketChannel, message);
                Future<String> taskResult = executorService.submit(taskHandler);

                /**
                 * 写数据
                 */
                ByteBuffer writeBuffer = ByteBuffer.wrap((socketChannel.getRemoteAddress() + ":" + taskResult.get()).getBytes(StandardCharsets.UTF_8));
                socketChannel.write(writeBuffer);
                System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "socketChannel:" + socketChannel.hashCode() + ", " + "已返回客户端数据,请求处理最终完成");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    static class TaskHandler implements Callable<String> {

        private SocketChannel socketChannel;

        private String parameter;

        public TaskHandler(SocketChannel socketChannel, String parameter) {
            this.socketChannel = socketChannel;
            this.parameter = parameter;
        }

        @Override
        public String call() throws Exception {
            System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】线程池Thread:" + Thread.currentThread().getName() + "socketChannel:" + socketChannel.hashCode() + ", 客户端:" + socketChannel.getRemoteAddress() + ", 开始处理业务计算 参数: " + parameter);
            Thread.sleep(1000);
            String result = String.format("response(%s) for (%s)", RandomStringUtils.randomAlphanumeric(30), parameter).trim();
            System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】线程池Thread:" + Thread.currentThread().getName() + "socketChannel:" + socketChannel.hashCode() + ", 客户端:" + socketChannel.getRemoteAddress() + ", 业务计算完成 返回: " + result);
            return result;
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/755151.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C++课程设计——演讲比赛系统】

文章目录 前言一、演讲比赛程序需求二、每个功能模块的实现1. 创建管理类(.h文件)2.1. 创建管理类(.cpp文件)3.创建参赛选手类(.h)4.将整体逻辑进行封装 测试项目总结 前言 在学习完C的stl容器后&#xff0c;我们来写一下小项目对其进行应用&#xff01; 项目名称为&#xff1…

常见的反爬手段和解决思路(爬虫与反爬虫)

常见的反爬手段和解决思路&#xff08;爬虫与反爬虫&#xff09; 学习目标1 服务器反爬的原因2 服务器长反什么样的爬虫&#xff08;1&#xff09;十分低级的应届毕业生&#xff08;2&#xff09;十分低级的创业小公司&#xff08;3&#xff09;不小心写错了没人去停止的失控小…

排序算法(1)之插入排序----直接插入排序和希尔排序

个人主页&#xff1a;C忠实粉丝 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 C忠实粉丝 原创 排序之插入排序----直接插入排序和希尔排序(1) 收录于专栏【数据结构初阶】 本专栏旨在分享学习数据结构学习的一点学习笔记&#xff0c;欢迎大家在评论区交流讨…

JavaParser抽取测试用例对应的被测方法

背景介绍 博主目前要做的工作需要将一个java项目的所有RD手写的测试用例和被测方法对应起来&#xff0c;最后将得到的结果存入一个json文件。 本教程以项目GitHub - binance/binance-connector-java 为例。 结果展示 最终会得到一个 funcTestMap.json&#xff0c;里面存放着…

昇思25天学习打卡营第6天|linchenfengxue

​​​​​​SSD目标检测 SSD&#xff0c;全称Single Shot MultiBox Detector&#xff0c;是Wei Liu在ECCV 2016上提出的一种目标检测算法。使用Nvidia Titan X在VOC 2007测试集上&#xff0c;SSD对于输入尺寸300x300的网络&#xff0c;达到74.3%mAP(mean Average Precision)以…

kafka-Stream详解篇(附案例)

文章目录 Kafka Stream 概述Kafka Stream 概念Kafka Stream 数据结构入门案例一需求描述与分析配置KafkaStream定义处理流程声明Topic接收处理结果发送消息测试 入门案例二需求描述与分析定义处理流程接收处理结果声明Topic 更多相关内容可查看 Kafka Stream 概述 Kafka Strea…

脉冲同步器(快到慢)

目录 描述 输入描述&#xff1a; 输出描述&#xff1a; 参考代码 描述 sig_a 是 clka&#xff08;300M&#xff09;时钟域的一个单时钟脉冲信号&#xff08;高电平持续一个时钟clka周期&#xff09;&#xff0c;请设计脉冲同步电路&#xff0c;将sig_a信号同步到时钟域 cl…

Excel 宏录制与VBA编程 —— 15、MsgBox参数详解

Msgbox参数具体如下 Msgbox参数使用1 Msgbox参数使用2&#xff08;返回值示例&#xff09; &ensp ;###### 关注 笔者 - jxd

Vue 项目运行时,报错Error: Cannot find module ‘node:path‘

Vue 项目运行时&#xff0c;报错Error: Cannot find module ‘node:path’ internal/modules/cjs/loader.js:883throw err;^Error: Cannot find module node:path Require stack: - D:\nodejs\node_modules\npm\node_modules\node_modules\npm\lib\cli.js - D:\nodejs\node_mo…

GMSB文章七:微生物整合分析

欢迎大家关注全网生信学习者系列&#xff1a; WX公zhong号&#xff1a;生信学习者Xiao hong书&#xff1a;生信学习者知hu&#xff1a;生信学习者CDSN&#xff1a;生信学习者2 介绍 本文通过多元方差分析和典型相关分析研究微生物&#xff08;species&#xff09;、细胞因子…

【面试干货】与的区别:位运算符与逻辑运算符的深入探讨

【面试干货】&与&&的区别&#xff1a;位运算符与逻辑运算符的深入探讨 1、&&#xff1a;位运算符2、&&&#xff1a;逻辑运算符3、&与&&的区别 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; & 和 …

NIVision-LabVIEW在灰度图上画圆

问题来源 在csdn上看到的这样一个问题&#xff0c;好像也没个正经答案&#xff0c;都用chatGPT回答&#xff0c;挺没劲的。不说提供个vi源代码&#xff0c;至少也来张截图嘛。我想着问题也不难&#xff0c;就自己动动手吧。 代码展示1 1、首先使用imaq ArrayToImage.vi创建了一…

《昇思25天学习打卡营第01天|sun65535》

开始 昇思25天打卡训练营&#xff0c;让我第一次了解了华为昇思的平台&#xff0c;之前也有自己本地使用4060训练了一些“小模型”&#xff0c;但是都是比较皮毛的知识&#xff0c;只是根据教程去搭建。很少了解到具体的过程。昇思25天打卡训练营给了一个比较全面的训练课程。…

IoTDB Committer+Ratis PMC Member:“两全其美”的秘诀是?

IoTDB & Ratis 双向深耕&#xff01; 还记得一年前我们采访过拥有 IoTDB 核心研发 Ratis Committer “双重身份”的社区成员宋子阳吗&#xff1f;&#xff08;点此阅读&#xff09; 我们高兴地发现&#xff0c;一年后&#xff0c;他在两个项目都更进一步&#xff0c;已成为…

Firefox 编译指南2024 Windows10- 定制化您的Firefox(四)

1. 引言 定制化您的Firefox浏览器是一个充满乐趣且富有成就感的过程。在2024年&#xff0c;Mozilla进一步增强了Firefox的灵活性和可定制性&#xff0c;使得开发者和高级用户能够更深入地改造和优化浏览器以满足个人需求。从界面的微调到功能的增强&#xff0c;甚至是核心代码…

vscode 生成项目目录结构 directory-tree 实用教程

1. 安装插件 directory-tree 有中文介绍&#xff0c;极其友好&#xff01; 2. 用 vscode 打开目标项目 3. 快捷键 Ctrl Shift p&#xff0c;输入 Directory Tree 后回车 会在 README.md 文件的底部生成项目目录&#xff08;若项目中没有 README.md 文件&#xff0c;则会自动创…

casefold()方法——所有大写字符转换为小写

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 语法参考 casefold()方法是Python3.3版本之后引入的&#xff0c;其效果和lower()方法非常相似&#xff0c;都可以转换字符串中所有大写字符为小写。…

windows MSVC编译安装libcurl

$ git clone https://github.com/curl/curl.git $ cd curl/winbuild依照curl/winbuild/README.md的指示&#xff0c; 启动visual studio的命令行工具&#xff0c;这里要注意别选错. 如果要编译出x64版本的libcurl&#xff0c;就用x64的命令行工具&#xff1b;如果要编译出x86…

怎么修改钻孔表的大小?

导入 在Cadence中最后要生成Gerber文件交由板厂制版时&#xff0c;其中有个提取钻孔表的过程。以往的过程并没有对钻孔表要求&#xff0c;今天却要修改钻孔表的大小了&#xff0c;如何做呢&#xff1f;这是一个非常罕见的操作&#xff0c;特此记录。 原理 1、先来复习一下如何…

使用AI的100种方法#翻译神器N3

Text "100 ways to" and "use AI" in the poster center .A cozy desk setup with an open notebook featuring notes and drawings, a cup of coffee, a white pen, and dried flowers. Warm, earthy tones create a calming, aesthetic vibe. 第 3 种可能…