Reactor 模式全解:实现非阻塞 I/O 多路复用

4302f48a32e14b49bdb6314e2b0af1ba.png

8d21465ea8f14802aced98dcb5f9bb43.png

6ed7c2d2f8f64ce898ef83d0c1aed539.png

Reactor网络模式是什么?

Reactor网络模式时目前网络最常用的网络模式。如果你使用Netty,那么你在使用Reactor;如果你使用Twisted,那么你子啊使用Reactor;如果你使用netpoll,那么你在使用Reactor。

这里先给出答案:
Reactor = I/O多路复用+非阻塞I/O。

什么是I/O多路复用?

我们还是先使用文字拆解来看看每个词是什么意思吧。

拆词解释

I/O

I/O表示输入和输出,英文为Input/Output。I为输入,O为输出。我们日常编程中操作最多的无非就是网络和文件了,这两类就属于I/O,我们通常称为网络I/O和文件I/O。

下面是两个Java和Go操作文件I/O的例子:
java按行读取文件:

  try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
            String line;
            while ((line = br.readLine()) != null) {
                System.out.println(line);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

 

Go按行读取文件:

    file, _ := os.Open(fileName)
    defer file.Close()

    reader := bufio.NewReader(file)
    for {
        line,err := reader.ReadString('\n')
        if err != nil {
            break
        }
    fmt.Print(line)

 

好的,I/O搞清楚了我们就是搞清楚多路。

多路

多路字面意思就是多条路,放在计算机网络编程中的话,一般是指多个通道或者数据源。比如:你的进程或者需要打开很多的文件或者有很多网络连接,并监控这些文件或者网络连接是否发生变化(也就是是否产生一些事件)以进行必要的处理。

复用

复用的字面意思就是重复使用。我们把多路放到计算机网络编程中的话,一般是指重复使用一个或者几个线程,这里的关键是线程一定要很少而且重复使用它来完成I/O+多路。
总的来说就是:重复使用一个或者几个献策会给你来完成多路I/O的变化(事件)处理。

给I/O多路复用下个定义

好了,有了以上的背景或许你已经对I/O多路复用有了自己的理解和定义。这儿我根据自己的理解来对I/O多路复用进行定义:
I/O多路复用就是使用一个或者几个进程或者线程来完成大量通道或者数据源的事件监控和处理。

I/O多路复用复用了什么?

复用了线程。
在没有I/O多路复用之前,可能客户端每创建一个连接服务端都需要新建一个线程来处理事件,这样10K个客户端连接就需要10K个线程,服务端应对这些连接很吃力,因为创建线程有开销,切换线程有开销,还有同步,锁,死锁等问题。

那有了I/O多路复用之后,服务端可能1个线程就可以应对10K个连接的事件。

一般使用什么技术实现I/O多路复用

I/O多路复用技术实现依赖于操作系统,但是主流操作系统都是支持,下面是三大操作系统对I/O多路复用的支持:

  1. Linux: 这个是目前的大哥,Linux使用epoll,当然还有select, poll,目前网络上基本都用epoll
  2. MacOS: Kqueue
  3. Windows: IOCP I/O完成端口

I/O多路复用就告一段落,看下非阻塞I/O。

什么是非阻塞I/O

非阻塞I/O是相对于阻塞I/O而言的,它们之间的区别就是你进行I/O操作时是否阻塞你后续的执行。非阻塞不会阻塞后续执行,而阻塞会。这就好比:
你用某App网上下单到店取一样。假设你直接到店里面用手机下单,你必须在店里等待食物准备好。在这个过程中,你不能去做其他任何事情,直到拿到东西后,你才离开。

而非阻塞I/O就像是你网上下单起手配送,在起手配送期间你可以和你的朋友或者同时唠唠嗑,等骑手把东西送到给你打电话的时候你就下去拿。

总的来说:阻塞I/O中的程序在等待I/O完成时会一直停留在相应操作上,不会执行后续的代码。与之相反,在非阻塞I/O模式下,程序会立即返回一个状态值,如果I/O尚未完成,则程序可以继续执行其他任务,然后随后再次检查I/O状态。
阻塞I/O: 死等
非阻塞I/O:立即返回,下次重试

I/O多路复用和非阻塞I/O组合到一起擦出什么样的火花?

Reactor设计模式结合了非阻塞I/O和I/O多路复用,使得单个线程就能高效地处理多个网络通信。这种结合擦出的“火花”就是使事件驱动的网络服务器变得可能,这种服务器可以以非常轻量级的方式支持大规模并发连接。

在Reactor模式中,一个中央分派器(Reactor)负责监听所有I/O事件(使用select、poll、epoll等系统调用),并且当某个事件发生时,它将调用预先注册的回调函数来处理这些事件。由于采用了非阻塞I/O,这个中央分派器在等待I/O事件时不会被阻塞,这使得它可以在任何给定时间处理上千甚至上万个不同的I/O请求。
下面是单线程的Reactor模型:

单线程Reactor模式

e47b5644f5874f69998ce30f1f2099e7.png

快速实现一个Reactor

  1. 代码关键点1:Reactor线程创建一个事件循环(可能这会勾起你想起Netty的Boss)
 // 创建线程,执行事件循环
    pthread_t accept_threads[2];
    for (int i = 0; i < 1; i++) {
        printf("create acceptor thread. index: %d\n", i);
        // run_event_loop为事件的处理函数,循环处理
        pthread_create(&accept_threads[i], NULL, run_event_loop, &server_fd);
        pthread_detach(accept_threads[i]);
    }

 

  1. 代码关键点2: 事件处理线程(可能这会勾起你想起Netty的Worker)
// 事件发生后的处理函数: handle_io_event
pthread_create(&worker_thread, NULL, handle_io_event, &client_fd);

 

  1. 代码关键点3: 非阻塞I/O
// 设置文件描述符为非阻塞I/O
fcntl(client_fd, F_SETFL, fcntl(client_fd, F_GETFL, 0) | O_NONBLOCK);

// 设置非阻塞
fcntl(server_fd, F_SETFL, fcntl(server_fd, F_GETFL, 0) | O_NONBLOCK);

 

  1. 完整代码
#include <stdio.h>
#include <pthread.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <signal.h>

#define PORT 12345
#define MAX_EVENTS 10
#define BUFF_SIZE 1024
#define WORKER_SIZE 4

// epoll file descriptor
int epoll_fd;

// handlers
void* run_event_loop(void* arg);
void* handle_io_event(void* arg);
void wait_to_death();

int main() {
    int server_fd;
    struct sockaddr_in server_addr;

    // 创建server
    server_fd = socket(AF_INET, SOCK_STREAM, 0);
    // 设置非阻塞
    fcntl(server_fd, F_SETFL, fcntl(server_fd, F_GETFL, 0) | O_NONBLOCK);

    // 绑定
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(PORT);

    printf("binding\n");
    bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));

    printf("listen\n");
    // 监听
    listen(server_fd, MAX_EVENTS);

    printf("epoll create\n");
    // epoll创建
    epoll_fd = epoll_create1(0);

    struct epoll_event event;
    event.events = EPOLLIN;
    event.data.fd = server_fd;
    printf("epoll add\n");
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event);

    // 创建线程,执行事件循环
    pthread_t accept_threads[2];
    for (int i = 0; i < 1; i++) {
        printf("create acceptor thread. index: %d\n", i);
        pthread_create(&accept_threads[i], NULL, run_event_loop, &server_fd);
        pthread_detach(accept_threads[i]);
    }

    wait_to_death();

    close(epoll_fd);
    close(server_fd);

    return 0;
}

void* run_event_loop(void* arg) {
    int server_fd = *(int*)arg;

    while (1) {
        struct epoll_event events[MAX_EVENTS];
        int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        for (int i = 0; i < n; i++) {
            if (events[i].data.fd == server_fd) {
                // 新连接
                int client_fd = accept(server_fd, NULL, NULL);

                // 设置非阻塞
                fcntl(client_fd, F_SETFL, fcntl(client_fd, F_GETFL, 0) | O_NONBLOCK);

                // worker线程负责处理这个事件
                pthread_t worker_thread;
                pthread_create(&worker_thread, NULL, handle_io_event, &client_fd);
            }
        }
    }
}

void* handle_io_event(void* arg) {
    int client_fd = *(int*)arg;

    while (1) {
        char buff[BUFF_SIZE] = {0};
        int len = read(client_fd, buff, BUFF_SIZE);
        if (len <= 0) {
            close(client_fd);

            struct epoll_event event;
            event.events = EPOLLIN;
            event.data.fd = client_fd;
            epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, &event);

            break;
        }
        else {
            printf("Received %s from client\n", buff);
        }
    }

    return NULL;
}

void wait_to_death() {
    sigset_t allset;
    sigemptyset(&allset);
    sigaddset(&allset, SIGINT); // Ctrl+C
    sigaddset(&allset, SIGQUIT); // Ctrl+\

    int sig;
    for (;;) {
        int err = sigwait(&allset, &sig);
        if (err == 0) {
            printf("received signal %d, prepare to exit\n", sig);
            break;
        }
    }
}

 

搞定收工,如有错误请指正,谢谢

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/491102.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python与供应链-2预测误差及指数平滑需求预测模型

主要介绍预测误差和指数平滑模型的相关理论,然后再通过Python的statsmodels封装的指数平滑函数预测需求。 1预测误差 预测误差是指预测结果与预测对象发展变化的真实结果之间的差距。这种误差分为绝对误差和相对误差。绝对误差是预测值与实际观测值的绝对差距,而相对误差则…

Spring学习——什么是循环依赖及其解决方式

文章目录 前言一、什么是循环依赖二、解决思路1、循环依赖分类2、对象初始化步骤及对象分类3、spring是如何解决的4、图解5、三级缓存1、区别2、ObjectFactory是什么 三、源码debug1、spring创建对象过程1、dubug第一步——找到getBean2、dubug第二步——getBean与doGetBean3、…

35.基于SpringBoot + Vue实现的前后端分离-在线考试系统(项目 + 论文)

项目介绍 本站是一个B/S模式系统&#xff0c;采用SpringBoot Vue框架&#xff0c;MYSQL数据库设计开发&#xff0c;充分保证系统的稳定性。系统具有界面清晰、操作简单&#xff0c;功能齐全的特点&#xff0c;使得基于SpringBoot Vue技术的在线考试系统设计与实现管理工作系统…

Uniapp三种常用提示框

具体参数方法可参考: uniapp交互反馈 uni.showToast(OBJECT) //显示消息提示框。 uni.hideToast() //隐藏消息提示框。 //具体使用 uni.showToast({title: 新增成功,duration: 2000 }); uni.showLoading(OBJECT) //显示 loading 提示框, 需主动调用 uni.hideLoading 才能关闭提…

区块链安全之DDoS防护的重要性及其实施策略

随着区块链技术的不断发展和广泛应用&#xff0c;其安全问题也日益凸显。其中&#xff0c;分布式拒绝服务(DDoS)攻击是对区块链网络稳定性和效率构成潜在威胁的重要因素之一。本文旨在深入探讨区块链为何需要采取DDoS高防措施&#xff0c;并提出相应的防护策略。 一、区块链面…

毕马威:量子计算成未来3-5年重大挑战

毕马威&#xff08;KPMG&#xff09;是一家全球性的专业服务网络&#xff0c;其历史可追溯到19世纪末。作为“四大”会计师事务所之一&#xff0c;毕马威在审计、税务和咨询服务领域享有盛誉。公司在全球范围内拥有多个办事处&#xff0c;服务遍及各个行业&#xff0c;包括金融…

【大数据】Flink学习笔记

认识Flink Docker安装Flink version: "2.1" services:jobmanager:image: flinkexpose:- "6123"ports:- "20010:8081"command: jobmanagerenvironment:- JOB_MANAGER_RPC_ADDRESSjobmanagertaskmanager:image: flinkexpose:- "6121"- …

Visio Viewer for Mac(Visio文件查看工具)

Visio Viewer for Mac是一款专为Mac用户设计的Microsoft Visio文件查看器。它拥有直观易用的用户界面&#xff0c;使得用户可以快速加载和显示Visio文件&#xff0c;无需安装完整的Microsoft Visio软件。 软件下载&#xff1a;Visio Viewer for Mac3.1.0激活版 Visio Viewer fo…

java Web线上网游商品交易平台用eclipse定制开发mysql数据库BS模式java编程jdbc

一、源码特点 jsp线上网游商品交易平台是一套完善的web设计系统&#xff0c;对理解JSP java SERLVET mvc编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,eclipse开发&#xff0c;数据库为Mysql5.0…

spring使用内置jetty创建提供http接口服务

1、添加pom文件依赖 <dependency><groupId>org.eclipse.jetty</groupId><artifactId>jetty-server</artifactId><version>9.4.22.v20191022</version> </dependency> <dependency><groupId>org.eclipse.jetty<…

azure服务器通过手机客户端远程连接

下载客户端 输入ip地址 输入用户名和密码 连接成功 人工智能学习网站&#xff1a; https://chat.xutongbao.top

大鱼来客实景无人直播系统源码开发部署---支持OEM贴牌-无限开户

实景无人直播系统需要包含以下几个主要功能&#xff1a; 视频采集与编码&#xff1a;使用摄像头或其他视频设备进行视频采集&#xff0c;并对视频进行编码压缩&#xff0c;以便实现实时传输。 视频传输与接收&#xff1a;将编码后的视频通过网络传输至客户端&#xff0c;客户端…

探究 HTTPS 的工作过程

目录 1. HTTPS 协议原理 1.1. 为什么要有HTTPS协议 1.2. 如何理解安全 1.3. HTTPS 协议是什么 2. HTTPS 的前置概念 2.1. 什么是加密 && 解密 2.2. 为什么要加密 2.3. 常见的加密方式 2.3.1. 对称加密 2.3.2. 非对称加密 2.4. 数据摘要 && 数据指纹…

Avalonia11.0.2+.Net6.0支持多语言,国际化

Avalonia11.0.2+.Net6.0支持多语言,国际化 操作系统项目结构最终效果具体实现安装Prism.Avalonia准备多语言文件语言资源加载类界面标记扩展类界面中使用国际化VM具体实现VM里面使用多语言方法操作系统 项目结构 最

【jenkins+cmake+svn管理c++项目】windows修改jenkins的工作目录

jenkins默认的存放源码的workspace是&#xff1a; C:\Users\用户\AppData\Local\Jenkins\.jenkins\workspace。由于jenkins会拉取大量的源代码以及编译生成一些文件&#xff0c;我希望我能自己指定目录作为它的工作空间&#xff0c;放在这里显然不太合适。 那么修改目录的方式有…

【剑指offer】顺时针打印矩阵

题目链接 acwing leetcode 题目描述 输入一个矩阵&#xff0c;按照从外向里以顺时针的顺序依次打印出每一个数字。 数据范围矩阵中元素数量 [0,400]。 输入&#xff1a; [ [1, 2, 3, 4], [5, 6, 7, 8], [9,10,11,12] ] 输出&#xff1a;[1,2,3,4,8,12,11,10,9,5,6,7] 解题 …

移动端Web笔记day03

移动 Web 第三题 01-移动 Web 基础 谷歌模拟器 模拟移动设备&#xff0c;方便查看页面效果&#xff0c;移动端的效果是当手机屏幕发生了变化&#xff0c;页面和页面中的元素也要跟着等比例变化。 屏幕分辨率 分类&#xff1a; 硬件分辨路 -> 物理分辨率&#xff1a;硬件…

【软考】设计模式之状态模式

目录 1. 说明2. 应用场景3. 结构图4. 构成5. 优缺点5.1 优点5.2 缺点 6. java示例6.1 非状态模式6.1.1 问题分析6.1.2 接口类6.1.2 实现类6.1.3 客户端6.1.4 结果截图 6.2 状态模式6.2.1 抽象状态类6.2.2 状态类6.2.3 上下文类6.2.4 上下文类 1. 说明 1.允许一个对象在其内部状…

算法之美:二叉树演进之AVL平衡二叉树底层原理

在之前的文章中&#xff0c;我们初步了解了二叉查找树&#xff08;又称二叉排序树&#xff09;&#xff0c;这使我们意识到使用特定策略的查询可以显著提高查找效率。本文将进一步探讨二叉树的演进。由于树相关算法较多且相对复杂&#xff0c;因为我后续将拆解讲述&#xff0c;…

SUSE 15 SP5 一键安装 Oracle 19C(19.22)单机版

前言 Oracle 一键安装脚本&#xff0c;演示 SUSE 15 SP5 一键安装 Oracle 19C&#xff08;19.22&#xff09;单机版过程&#xff08;全程无需人工干预&#xff09;&#xff1a;&#xff08;脚本包括 ORALCE PSU/OJVM 等补丁自动安装&#xff09; ⭐️ 脚本下载地址&#xff1…