Linux学习之高级IO

之前的内容我们基本掌握了基础IO,如套接字,文件描述符,重定向,缓冲区等知识都是文的基本认识,而高级IO则是指更加高效的IO。

对于应用层,在读写的时候,本质就是把数据写给OS,若一方为空,就会进行阻塞式等待,读写的本质也就是数据的拷贝。

所以高效的 IO就需要在多线程的情况下,单位时间的的拷贝的数据更多,等待的比重越小。

目录

五种IO模型

非阻塞式IO

多路复用(多路转接)

IO多路转接之select

初识select

 select函数原型

IO多路转接之epoll

1.认识有关epoll的接口

2.epoll原理

3.编写epoll

4.epoll的工作模式

注意:

读写的改进 reactor


五种IO模型

 当前有五种IO模型:

我们以钓鱼佬钓鱼为例:(鱼竿:文件描述符) (上钩:读写就绪)(鱼:数据)

1.钓鱼佬张三,在完成打窝,之后选择好地点就开始钓鱼,在等待的过程中,雷打不动,任何人都干扰不到他,他死死的盯着鱼漂,一有动静,钩被咬了就提竿上鱼。这种等待的方式我们称为阻塞式IO,这也是大部分常用的读写接口的方式

2.钓鱼佬李四是一个资深的钓鱼佬,在完成准备工作时,李四就先观察了鱼漂,发现没动静,就拿起了手机刷视频,看一会儿之后再检查鱼漂发现上钩了,于是提杆上鱼。李四每隔一段时间进行检查,如果满足条件就读写,反之就干自己的事。这种方式称为非阻塞式IO--非阻塞轮询

3.钓鱼佬王五,拥有较为先进的鱼竿,完成准备工作后,直接就把鱼竿插入地上,一心一意的干其它事,鱼竿上有报警装置,一旦上鱼,立刻发送警报告知王五,所以王五只需要看有没有报警信号。这种方式被称为信号驱动式IO

4.赵六,身家万贯的热爱钓鱼的钓鱼佬,秉持着杆多鱼多的准则,直接准备了一卡车鱼竿,依次完成准备工作,入水后准备开钓,由于鱼竿数量多,赵六在岸上来回检查看哪只鱼竿上货了。这种方式称为多路复用(多路转接)。

5.田七,以吃鱼为爱好的世界五百强上市公司CEO,司机小王拉着田七来钓鱼,刚准备钓鱼,由于公司业务繁忙,对钓鱼不是很感冒,田七又回到了公司处理,且告知小王车上装备齐全,你在这里钓鱼,掉满了之后再给我打电话,我来接你。对于田七来说,这种方式称为异步IO

首先我们来看看阻塞式与非阻塞式 IO:

两者在效率上基本一样(有人觉得非阻塞式效率会高一点,这指的是非阻塞可以去干别的事,但是对于IO效率(等+拷贝)是一样的),区别是等的方式不同,一个一直等,一个间隔等一会儿。

再看同步与异步:

同步:参与等或者参数与了拷贝,都可以是同步IO。异步:两者都没参与,只拿数据。

那么综上:哪一种效率更高呢?

实际上就是田七,多路复用IO。我们学习的重点也就是多路复用.

非阻塞式IO

fcntl
一个文件描述符 , 默认都是阻塞 IO,通过函数fcntl可以修改文件描述符的属性。
函数原型如下
#include <unistd.h>
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );
传入的 cmd 的值不同 , 后面追加的参数也不相同 .
fcntl 函数有 5 种功能 :
复制一个现有的描述符( cmd=F_DUPFD .
获得 / 设置文件描述符标记 (cmd=F_GETFD F_SETFD).
获得 / 设置文件状态标记 (cmd=F_GETFL F_SETFL).
获得 / 设置异步 I/O 所有权 (cmd=F_GETOWN F_SETOWN).
获得 / 设置记录锁 (cmd=F_GETLK,F_SETLK F_SETLKW).
我们此处只是用第三种功能 , 获取 / 设置文件状态标记 , 就可以将一个文件描述符设置为非阻塞
实现函数 SetNoBlock
基于 fcntl, 我们实现一个 SetNoBlock 函数 , 将文件描述符设置为非阻塞 .
void SetNoBlock(int fd) { 
 int fl = fcntl(fd, F_GETFL); 
 if (fl < 0) { 
 perror("fcntl");
 return; 
 }
 fcntl(fd, F_SETFL, fl | O_NONBLOCK); 
}
使用 F_GETFL 将当前的文件描述符的属性取出来 ( 这是一个位图 ).
然后再使用 F_SETFL 将文件描述符设置回去 . 设置回去的同时 , 加上一个 O_NONBLOCK 参数

了解到这两个主要的接口,我们就可以简单的实现一下非阻塞:

#include<iostream>
#include<string.h>
#include<unistd.h>
#include<fcntl.h>
#include<cstdio>
using namespace std;



//默认阻塞,我们设置为非阻塞
void setNonBlock(int fd)
{
  int f1=fcntl(fd,F_GETFL);
  if(f1<0)
  {
    cerr<<"get failed"<<endl;
  }
  //设置标记位,实际上是在原基础上新增一个标记位
  fcntl(fd,F_SETFL,f1 | O_NONBLOCK);//以非阻塞式打开
  cout<<"set O_NONBLOCK succed"<<endl;
}



int main()
{
  char buffer[1024];//缓冲区
  while(true)
  {
    printf("please enter#");
    fflush(stdout);
    setNonBlock(0);
    sleep(1);
    ssize_t size=read(0,buffer,sizeof(buffer)-1);//从标准输入中读取
    if(size>0)
    {
      buffer[size-1]=0;
      cout<<"读取到数据 :"<<buffer<<endl;
    }else if(size==0)
    {
      cout<<"read done"<<endl;
      break;
    }else{
      cerr<<errno<<":"<<strerror(errno)<<endl;
    }
  }

  return 0;
}

一般阻塞式,系统就会等待着我们输入数据后,才进行下一步工作(这里是打印输入的字符)。

我们设置为非阻塞式有几点需要注意:

1.设置为非阻塞,系统不会等待着我们输出,才打印,而是以读取到数据<0打印错误信息。

2,出错非两种情况:第一个就是fd真的异常 ,第二个就是底层还没就绪,你就返回了。(这里是第二种)。

3.非阻塞也能获取到数据并打印,不过给你就绪的时间非常短。

对于第二点,如何区分是还没就绪还是fd异常,可以通过打印错误码而知晓。

多路复用(多路转接)

实际上对于阻塞式,非阻塞式,非阻塞式轮询这些等待和拷贝一个函数就会搞定了,但是对于多路转接,因为要提高IO效率,因此等待与拷贝是分开的的,多路复用的本质就是非阻塞轮询,因此我们需要将非阻塞和轮询逐一实现。

IO多路转接之select

初识select

系统提供 select 函数来实现多路复用输入 / 输出模型 .
select 系统调用是用来让我们的程序监视多个文件描述符的状态变化的 ;
程序会停在 select 这里等待,直到被监视的文件描述符有一个或多个发生了状态改变

 select函数原型

 select的函数原型如下: #include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

其中有三个参数类型为fd_set,位图类型,为输入输出型参数。

为输入型参数时:用户告诉内核,我给一些fd,你帮我注意观察一些fd的读事件,如果读事件就绪,你就告诉我

为输出型参数时:内核告诉用户,你给我传来的fd,我已经帮你知晓了有哪些已经就绪(进行位图修改),你可以读了。

至于为什么用位图,因为位图本质就是设置某个数的第几位为0/1,举例:如果此时内核发现0号和1号文件被描述符已经就绪,就会把第0位和第1位比特位置为1, (比特位的位置代表对应文件描述符,从右向左)。比特位的内容就可以表示内核是否要关心。

作为输入型位图:

比特位的位置代表文件描述符

比特位的内容代表是否内核需要关心

 作为输出型位图:

比特位的位置还是文件描述符

比特位的内容代表用户关心的fd的读事件就绪了

于是就提供了一系列位图的接口用来修改为位图。

  其中timeout为一个结构体,里面有两个成员:一个代表秒,一个代表微秒。、

如果设置了timeout,在该时间内如果有已经就绪的,立即返回,之后的timeout表示剩下的时间。

如果没设置,只要有一个就绪了,就返回。

select的返回值:

  1. 返回值

    • 正常情况下,返回已准备好的文件描述符的个数。
    • 经过超时时长后仍无设备准备好,返回值为 0。
    • 如果 select 被某个信号中断,返回 -1 并设置 errno 为 EINTR
    • 如果出错,返回 -1 并设置相应的 errno

对于服务端来说,使用多路复用可以提高IO效率,当没有用访问服务端使,此时select的个数为零,但一有客户端访问,就会有对应的文件描述符(接收缓冲区)被创建,并且添加进行状态检测。

select.hpp

#include "Socket.hpp"
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>

const int max_fd = (sizeof(fd_set)) * 8; // 8个位图最多存储的bit位
static const uint32_t defaultport = 8080;
class SelectServer
{
public:
  SelectServer(const uint32_t Port = defaultport) : port(Port)
  {
    // 对辅助数组初始化为-1
    for (int i = 0; i < 1024; i++)
    {
      fd_arry[i] = defaultfd;
    }
  }
  ~SelectServer()
  {
    _listenfd.Close();
  }
  bool Init()
  {
    _listenfd.Createsockfd();
    _listenfd.Bind(port);
    _listenfd.Listen();
    return true;
  }
  void Accept()
  {
     // 连接就绪,获取新连接可以通信了
        std::string clientip;
        uint16_t clientport;
        int sockfd = _listenfd.Accept(&clientip, &clientport); // 就绪了,不在阻塞
        if (sockfd < 0)
          return;
        std::cout << "获取连接成功"
                  << "描述符:" << sockfd << std::endl;
        // 获取之后不能进行读取,而是设置select,不然还是会阻塞在select中
        int pos = 1;
        for (; pos < max_fd; pos++)
        {
          if (fd_arry[pos] != defaultfd)
          {
            continue;
          }
          else
          {
            break;
          }
        }
        if (pos == max_fd)
        {
          std::cout << "can not handle" << std::endl;
          close(sockfd); // 无法处理就关掉接收的文件描述符
        }
        else
        {
          // 找到了空闲位置,此时放到fd_arry里面管理
          fd_arry[pos] = sockfd;
        }

  }
  void recver(int fd,int i)
  {
        // 不在位图中代表的是接收后新的fd
        char buffer[1024];
        ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
        if (n > 0)
        {
          std::cout << "read meesage:" << buffer << std::endl;
        }else if(n==0)
        {
          std::cout<<"client closed" << std::endl;
          //如果为空,说明要关闭连接了
          close(fd);
          fd_arry[i]=defaultfd;

        }else{
          std::cout<<"read warning"<<std::endl;
          close(fd);
          fd_arry[i]=defaultfd;
        }
  }
  void Disapacher(fd_set fds)
  {
    // 为了确定就绪的文件描述符。需要遍历整个数组
    for (int i = 0; i < max_fd; i++)
    {
      int fd = fd_arry[i];
      if (fd == defaultfd)
      {
        continue;
      }
      // 先判断文件描述符是否在位图中
      if (fd == _listenfd.Fd() && FD_ISSET(fd, &fds)) // 判断是否是位图中的,是就代表就绪
      {
       Accept();
      }
      else
      {
        recver(fd,i);
      }
    }
  }
  void Start()
  {
    int listensock = _listenfd.Fd(); // 获取套接字的文件描述符
    fd_arry[0] = listensock;
    fd_set rfds;                     // 读的文件描述符位图
    FD_ZERO(&rfds);                  // 先进行清空
    struct timeval timeout = {10, 0}; // 设置时间戳,每隔五秒检验一次
    int fdmax = fd_arry[0];
    while (true)
    {
      for (int i = 0; i < max_fd; i++)
      {
        if (fd_arry[i] == defaultfd)
        {
          continue;
        }
        else
        {
          // 如果不为-1,说明该位置有要检测的文件描述符
          FD_SET(fd_arry[i], &rfds); // 将指定的文件描述符添加进来(交给select进行检测)
          // 同时找出最大的文件描述符
          if (fd_arry[i] > fdmax)
          {
            fdmax = fd_arry[i];
            std::cout << "max has updated:" << fdmax << std::endl;
          }
        }
      }

      // 这里不能直接进行accept,accept的本质就是检测并获取listenfd上面的事件,accept只能阻塞等一个
      // 因为我们要实现多路复用,所以这里是要去等待多个套接字并检测他们的状态,使用select
      int n = select(fdmax + 1, &rfds, nullptr, nullptr, &timeout); // 通过内核的select帮我进行检测,刚开始启动只有一个listenfd
      switch (n)
      {
      case 0:
        std::cout << "time out" << std::endl;
        break;
      case -1:
        std::cout << "select error" << std::endl;
      default:
        // 有链接了,如果对链接不处理,select会一直通知你,需要处理并设置位图
        // 新连接到来我们认为是读事件,
        Disapacher(rfds);//名为事件派发器
        break;
      }
    }
  }

private:
  Sock _listenfd;
  uint32_t port;
  int fd_arry[max_fd]; // 辅助数组,统计所有描述符
};

 main.cc

#include"SelectSever.hpp"
#include<memory>
#include<iostream>


void usehelp(char*s)
{
  printf("use correct code:%s port[1024]\n",s);
}
int main(int argc,char*argv[])
{

  std::unique_ptr<SelectServer>  server(new(SelectServer));
  server->Init();
  server->Start();
  return 0;
}

编写select的思路:

1.首先创建,绑定,监听套接字,之后创建文件描述符数组对所有文件描述符管理。

2.插入最开始的listenfd到数组里,遍历数组找出最大的文件描述符,设置位图用来标识就绪的文件描述符,之后select数组最大fd+1文。

2.select之后判断是否有新链接,有新的连接(这里我们默认是读事件)就会有新的文件描述符,对事件进行处理。

3.处理时,数组里的文件描述符分两种,一种使位图中的就绪的fd,一种是就绪后进行accept的新的fd,因此对整个数组进行判断,是位图中的,就行新接受新连接,是accept的进行读事件,反之是-1,continue。

但是select是有很明显的缺陷:

1.等待的fd是有上限的--1024

2.输入输出参数较多,数据拷贝频率高

3.输入输出参数多,每一次都对需要关心的fd重置以达到复用

4.管理数组fd,有太多次进行遍历

为了解决种种情况,我们使用了下一种接口(poll):

poll 函数接口
说明
fds 是一个 poll 函数监听的结构列表 . 每一个元素中 , 包含了三部分内容 : 文件描述符 , 监听的事件集合 , 返 回的事件集合.
nfds 表示 fds 数组的长度 .
timeout 表示 poll 函数的超时时间 , 单位是毫秒 (ms).
返回结果
返回值小于 0, 表示出错 ;
返回值等于 0, 表示 poll 函数等待超时 ;
返回值大于 0, 表示 poll 由于监听的文件描述符就绪而返回

这里我们介绍一下,重点放在select_poll上。

IO多路转接之epoll

epoll的说明是:为了处理大批量句柄而改进的poll,

1.认识有关epoll的接口

epoll_create 创建一个epoll模型

 epoll_wait  等待多长时间进行一次事件检查 返回就绪的fd与event

 返回值位已经准备就绪的个数。对于event的宏表示的就绪时间,类型如下:

EPOLLIN : 表示对应的文件描述符可以读 ( 包括对端 SOCKET 正常关闭 );
EPOLLOUT : 表示对应的文件描述符可以写 ;
EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 ( 这里应该表示有带外数据到来 );
EPOLLERR : 表示对应的文件描述符发生错误 ;
EPOLLHUP : 表示对应的文件描述符被挂断 ;
EPOLLET : EPOLL 设为边缘触发 (Edge Triggered) 模式 , 这是相对于水平触发 (Level Triggered) 来说的 .
EPOLLONESHOT :只监听一次事件 , 当监听完这次事件之后 , 如果还需要继续监听这个 socket 的话 , 需要
再次把这个 socket 加入到 EPOLL 队列里 .

epoll_ctl  作用是对系统的文件描述符的事件增加,删除,修改。即对文件描述符做管理

 第一个参数就是epoll_create的返回值,op代表三个选项(增加,修改,删除),第三个代表那个fd对应的哪个事件。

对于epoll的使用时要使用这三个系统接口的。

2.epoll原理

当某一进程调用 epoll_create 方法时, Linux 内核会创建一个 eventpoll 结构体,这个结构体中有两个成 员与epoll 的使用方式密切相关
每一个 epoll 对象都有一个独立的 eventpoll 结构体,用于存放通过 epoll_ctl 方法向 epoll 对象中添加进来 的事件.
这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来 ( 红黑树的插 入时间效率是lgn ,其中 n 为树的高度 ).
而所有添加到 epoll 中的事件都会与设备 ( 网卡 ) 驱动程序建立回调关系,也就是说,当响应的事件发生时 会调用这个回调方法.
这个回调方法在内核中叫 ep_poll_callback, 它会将发生的事件添加到 rdlist 双链表中 .
epoll 中,对于每一个事件,都会建立一个 epitem结构体

实际上epoll和poll是两个不同的模型,没太多关系,epoll是专门为用户设计的一个底层事件调度的模型。

3.编写epoll

对于select来说,我们使用一个系统调用接口加上文件描述符数组来进行非阻塞式的等待。

那么对于epoll来说就是通过三个系统调用接口,1.创建模型,3.进行非阻塞式等待(时间自己设置),2.使用epoll_ctl管理事件(本质上时使用红黑树的节点绑定事件进行高效的管理)。

每一个事件和她的描述符都被封装在一个结构体epoll_event当中,之后通过一个数组管理这里epol_event。

第一个不太完美的版本的eopp如下:

epoler.hpp

//封装epoll接口
#pragma once
#include <sys/epoll.h>
#include <unistd.h>
#include<iostream>
#include<string.h>
#include"Nocopy.hpp"

uint32_t EVENT_IN=(EPOLLIN); //读事件
uint32_t EVENT_OUT=(EPOLLOUT);//写事件

//因为我们不想该类被拷贝,所以继承不被拷贝的类
class Epoll :public nocopy
{
    public:
    static const int size=128;
       Epoll():_timeout(3000)     //设置检测时间3s一次,若为零就是非阻塞等待
       {
         //创建epoll模型
         _epfd=epoll_create(size);
         if(_epfd==-1)
         {
            std::cout<<"errno: "<<errno<<"error message:"<<strerror(errno)<<std::endl;
         }else
         {
            std::cout<<"create succed ,epfd:"<<_epfd<<std::endl;
         }

       }
       //使用epoll接口进行等待
       int Epollwait(struct epoll_event events[],int num) //输入参数events表示就序的事件 num表示个数
       {
         int ret=epoll_wait(_epfd,events,num,_timeout);//就绪了就从这里拿
       }

       int Epollctl(int oper,int sock,uint32_t event)
       {
         //向指定模型根据oper添加修改删除事件
         if(oper==EPOLL_CTL_DEL)
         {
            //这里对删除事件进行单独处理
            int ret=epoll_ctl(_epfd,oper,sock,nullptr);
         }else
         {
            //注册事件
            struct epoll_event ev; //创建事件结构体并设置
            ev.events=event;//设置事件
            ev.data.fd=sock;//设置fd
            int ret=epoll_ctl(_epfd,oper,sock,&ev);//进行什么操作呢  本质就是向红黑树新增节点,节点绑定事件
            if(ret==-1)
            {
               std::cout<<"errno: "<<errno<<"error message:"<<strerror(errno)<<std::endl;
            }else
            {
               std::cout<<"add succed ,epfd:"<<_epfd<<std::endl;

            }
            return ret;
         }
       }
       ~Epoll()
       {
        if(_epfd==-1)
        {
         close(_epfd);
         std::cout<<"close succed ,epfd:"<<_epfd<<std::endl;
        }
       }
    private:
        int _epfd;
        int _timeout; 
};

epollserver.hpp

#pragma once
#include"Socket.hpp"
#include"Epoller.hpp"
#include"Nocopy.hpp"
#include<iostream>
#include<memory>
#include<sys/epoll.h>

//设置你要关心的时事件

const uint16_t defalutport=8080;
class EpollServer :public nocopy
{
     public:
         static const int num=64;//每次从就绪队列中取64个就绪事件
        EpollServer(uint16_t port=defalutport):_port(port),_listensock(new Sock),_epoller(new Epoll)
        {
        }
        bool Init()
        {
         _listensock->Createsockfd();
         _listensock->Bind(_port);
         _listensock->Listen();
         return true;
        }
        void Start()
        {
          //将listensock添加到epoll中检测你关心的读写事件,这里就是添加
        _epoller->Epollctl(EPOLL_CTL_ADD,_listensock->Fd(),EVENT_IN);
        
         struct epoll_event evns[num];//num代表最多就绪的个数,若果等待的个数太多,就会分配批次取wait
         while(true)
         {
          int n=_epoller->Epollwait(evns,num); //返回值代表就绪的个数
          if(n>0)
          {
            //在事件插入之后,有事件就绪了
            std::cout<<"检测对应的fd的事件,fd:"<<evns[0].data.fd<<std::endl; 
            //既然就绪,那就开始处理事件
            HandlerEvent(evns,n);

          }else if(n==0)
          {
            std::cout<<"time out"<<std::endl;
          }else
          {
            std::cout<<"wait error"<<std::endl;
          }
         }

        }
        void Accepter()
        {
          std::string clientip;
                uint16_t clientport;
               int sock =_listensock->Accept(&clientip,&clientport);
               if(sock>0)
               {
                //获取到新连接再插入到红黑树当中
                _epoller->Epollctl(EPOLL_CTL_ADD,sock,EVENT_IN);
               }else
               {
                  std::cout<<"accept error"<<std::endl;
               }
        }
        void Dispatcher(int fd)
        {
          char buffer[1024];
          // 已经获取新连接,开始read
          int n = read(fd, buffer, sizeof(buffer) - 1);
          if (n > 0)
          {
            buffer[n] = 0;
            std::cout << "读取到数据:" << buffer << std::endl;
            //读完之后写回给客户端
            std::string echo_string="server get message";
            std::string content=echo_string+std::string(buffer);
            write(fd,content.c_str(),content.size());

          }else if(n==0)
          {
            std::cout << "client close:"<< std::endl;
            //删除对应的文件描述符 
            _epoller->Epollctl(EPOLL_CTL_DEL,fd,0); //从红黑树中拿出去,再关闭fd
            close(fd);
          }else
          {
            std::cout << "read error!!" << std::endl;
            _epoller->Epollctl(EPOLL_CTL_DEL,fd,0); //从红黑树中拿出去,再关闭fd
            close(fd);
          }
        }

        //这里的事件假设就两种,读 写
        void HandlerEvent( struct epoll_event evns[],int num)
        {
          for(int i=0;i<num;i++)
          {
            uint32_t event=evns[i].events;
            int fd=evns[i].data.fd; 
            //判断是哪一种事件
            if(event&EVENT_IN)
            {
              //判断fd是等待的fd,还是已经获取新连接的fd
              if(fd==_listensock->Fd())
              {
                Accepter();
              }else
              {
                Dispatcher(fd);
              }
              
            }else if(event & EVENT_OUT)
            {
              //写事件就绪
            }
            
           }
        }


        ~EpollServer()
        {
            _listensock->Close();
        }


     private:
        std::shared_ptr<Sock> _listensock;
        std::shared_ptr<Epoll> _epoller;
        uint16_t _port;

};

4.epoll的工作模式

epoll的工作模式有利各种:ET 和LT

LT(level triggered)工作模式--水平工作模式

epoll 默认状态下就是 LT 工作模式 .
1.当 epoll 检测到 socket 上事件就绪的时候 , 可以不立刻进行处理 . 或者只处理一部分 .
如上面的例子 , 由于只读了 1K 数据 , 缓冲区中还剩 1K 数据 , 在第二次调用 epoll_wait , epoll_wait 仍然会立刻返回并通知socket 读事件就绪 .
直到缓冲区上所有的数据都被处理完 , epoll_wait 才不 会立刻返回 .
2.支持阻塞读写和非阻塞读写

 通俗点说,如果上层有新的数据,你不拿走,上层就一直通知你让你取数据,只要有就拿走。

ET(Edge Triggered)工作模式  --边缘工作模式

如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式.

epoll 检测到 socket 上事件就绪时 , 必须立刻处理 .
如上面的例子 , 虽然只读了 1K 的数据 , 缓冲区还剩 1K 的数据 , 在第二次调用 epoll_wait 的时候 ,
epoll_wait 不会再返回了 .
也就是说 , ET 模式下 , 文件描述符上的事件就绪后 , 只有一次处理机会 .
ET 的性能比 LT 性能更高 ( epoll_wait 返回的次数少了很多 ). Nginx 默认采用 ET 模式使用 epoll.
只支持非阻塞的读写

通俗点说,只有发生变化(从没链接到有新连接,从一个变多个,从多个变没),就会进行通知取走数据,只有变化,才会拿走。

由于通知的次数少,且通知是有限的,所以一般而言ET的工作效率更高一些,但也因为之中特性,程序员必须每次把数据都取完(循环读取,直到读取出错),否则就会遗漏。

但是读完数据,我们此时的read是阻塞的,我们不能让她阻塞,这会导致服务器挂起(这是不好的),所以我们需要设置读是非阻塞的。直到非阻塞状态下还读完了,就会返回错误码。

且由于我们把缓冲区的数据都把走了,这一位则TCP通信时,窗口就更大了,一次拿去的数据也多了。

注意:

即使如此,一般情况ET比LT高效,但是你是一直比LT高效吗,而且既然你ET都能一次性非阻塞的读取完,我LT不行吗?只是我们一般去使用ET,如果可以,你也可以去使用LT修改。

读写的改进 reactor

所以我们的读写是不合理的,首先没有协议的定制,其次读取并不是非阻塞的。

这里就还是以编写计算器的客户端与服务端为例:

重要的读写部分在TcpServer.hpp中:

TcpServer.hpp

#pragma once
#include<string>
#include<iostream>
#include<memory>
#include<functional>
#include<unordered_map>
#include <arpa/inet.h>
#include"Nocopy.hpp"
#include"Socket.hpp"
#include"Epoller.hpp"
#include"Comen.hpp"
#include"Protocol.hpp"
#include"Calculator.hpp"

class Connection;
class TcpServer;
using funct=std::function<void (std::shared_ptr<Connection>)>; //定义了返回值为void,参数为td::shared_ptr<Connection>指针的包装器
const static int buffersize=1024;
const uint16_t defaultport=8080;                                                                                                    
const int num=64; 
class Connection
{
    public:
      void SetHandle(funct &recv_cb,funct &send_cb,funct &except_cb)
      {
          _recver_callback=recv_cb;
          _send_callback=send_cb;
          _except_callback=except_cb;
      }
      void Appendinbuffer(std::string buffer)
      {
        _inbuffer+=buffer;//输入缓冲区

      }
      void Appendoubuffer(std::string buffer)
      {
        _outbuffer+=buffer;
      }
      const std::string &inbuffer()
      {
        return _inbuffer;
      }

      Connection(int sock,std::shared_ptr<TcpServer> tcpserver_ptr):_sock(sock),_tcpserver_callback(tcpserver_ptr)//回值指针用来传递TCPserver
      {
      }
      int getsock()
      {
        return _sock;
      }
      std::string& getinbuf()
      {
        return _inbuffer;
      }
      std::string& getoutbuf()
      {
        return _outbuffer;
      }
      ~Connection(){}
    private:
    int _sock;
    private:
    std::string _inbuffer;//输入缓冲区
    std::string _outbuffer;//输出缓冲区
    public:
    funct _recver_callback; //接收回调
    funct _send_callback;   //发送回调
    funct _except_callback; //异常回调

    //tcpserver回值指针
    std::shared_ptr<TcpServer> _tcpserver_callback;
};

class TcpServer
{
    public:
        TcpServer(funct OnMessage,uint16_t port=defaultport):_epoll_server(new Epoll),_listensocket(new Sock),_port(port),_quit(true)//回调函数用来设置处理事件
        {
            _OnMessage=OnMessage;
            _listensocket->Createsockfd();
            _listensocket->Bind(_port);
            _listensocket->Listen();
            SetNonBlock(_listensocket->Fd()); //将文件设置为非阻塞等待
            AddConnection(_listensocket->Fd(),EVENT_IN,std::bind(&TcpServer::Accepter,this,std::placeholders ::_1),nullptr,nullptr);
        }
        void Init()
        {
           
        }
        void Accepter(std::shared_ptr<Connection> connection)
        {
          //连接到来开始接受
          while(true)
          {
            struct sockaddr_in peer;
            socklen_t len=sizeof(peer);
            int fd=::accept(connection->getsock(),(sockaddr*)&peer,&len); //获取新链接
            if(fd>0)
            {
              uint16_t clientport=ntohs(peer.sin_port);
              char ipbuffer[10];
              inet_ntop(AF_INET,&peer.sin_addr,ipbuffer,sizeof(ipbuffer));
              std::string ip=std::string(ipbuffer);
              std::cout<<"get new link,ip:"<<ip<<",fd :"<<fd<<",port"<<clientport<<std::endl;
              //获取的新连接设置非阻塞,现在是ET模式
              SetNonBlock(fd);
              //之后再添加到哈希表中,epoll_event中,
               AddConnection(fd,EVENT_IN,\
                   std::bind(&TcpServer::Recver,this,std::placeholders::_1),   //根据函数地址与参数bind成一个包装器对象
                   std::bind(&TcpServer::Sender,this,std::placeholders::_1),\
                   std::bind(&TcpServer::Exepter,this,std::placeholders::_1));
            }else
            {
              //直到非阻塞被读到没有新的连接了
              if(errno==EWOULDBLOCK)
              {
                break;
              }else if(errno==EINTR)
              {
                continue;
              }else{
                //读出错
                break;
              }
              
            }
          }

        }
        void AddConnection(int sock,uint32_t event,funct recv_cb,funct send_cb,funct except_cb)
        {
            // 将listensock添加到epoll中检测你关心的读写事件,这里就是添加 并且设置为边缘工作模式
            _epoll_server->Epollctl(EPOLL_CTL_ADD, sock, event);
            //c除此之外,每一个fd都要构建成Connetcion对象,通过connect管理事件的处理
            std::shared_ptr<Connection> new_connection=std::make_shared<Connection>(sock,std::shared_ptr<TcpServer>(this));
            new_connection->SetHandle(recv_cb,send_cb,except_cb);
            //第三步,就是添加到unordered_map
            _connection_map[sock]=new_connection;
        }

        //事件管理,进行事件的处理
        void Recver(std::shared_ptr<Connection> connection)
        {
          int sock=connection->getsock();
          //ET模式读
          while(true)
          {
            char buffer[buffersize];
            memset(buffer,0,sizeof(buffer));
            ssize_t n=recv(sock,buffer,sizeof(buffer)-1,0); //这里的数据默认为字符流,如果还有二进制,需要换为vector 
            if(n>0)
            {
              //读取成功
              connection->Appendinbuffer(buffer);
            }else if(n==0)
            {
              //连接关闭
              std::cout<<"link closed !"<<std::endl;
              connection->_except_callback(connection);


            }else
            {
             //读取错误
              if(errno==EWOULDBLOCK)
              {
                break;
              }else if(errno==EINTR)
              {
                continue;
              }else{

              std::cout<<"read error!"<<std::endl;
              connection->_except_callback(connection);
              break;
              }
            }
          }
            //读完之后,将数据交给上层
            _OnMessage(connection); //协议的定制
        }
        void Sender(std::shared_ptr<Connection> connection)
        {
          while(true)
          {
            std::string &buffer=connection->getoutbuf();
            int sockfd=connection->getsock();
            ssize_t n=send(sockfd,buffer.c_str(),buffer.size(),0);
            if(n>0)
            {
              //清空缓冲区
              buffer.erase(0,n);
              if(buffer.empty())
                break;
            }else if(n==0)
            {
              return ;
            }else 
            {
              if(errno==EWOULDBLOCK)
              {
                break;
              }else if(errno==EINTR)
              {
                continue;//信号中断,跳过本次
              }else 
              {
                //发失败了,进入异常处理
                std::cout<<"send error"<<std::endl;
                connection->_except_callback(connection);
                return;
              }
            }
           if(!buffer.empty())
           {
              //对写事件的关心
              
              EnableEvent(connection->getsock(),true,true);
           }else
           {
             //关闭对写事件的处理
              EnableEvent(connection->getsock(),true,false);
           }
          }
        }
        void EnableEvent(int fd,bool readable,bool writeable )
        {
          uint32_t event=0;
          event |=((readable ? EPOLLIN:0)|(writeable ? EPOLLOUT : 0)|EPOLLET);
          _epoll_server->Epollctl(EPOLL_CTL_MOD,fd,event);//修改事件的读写,如果没发完,就继续发,通过修改事件为写
        }
        void Exepter(std::shared_ptr<Connection> connection)
        {
          //处理链接异常或着断开
          std::cout<<"事件异常,断开连接!"<<std::endl;
          //首先移除链接
          //EnableEvent(connection->getsock(),false,false);//读写都设置为False
          _epoll_server->Epollctl(EPOLL_CTL_DEL,connection->getsock(),0);//移出事件 
          //关闭异常的文件描述符
          close(connection->getsock());
          std::cout<<"已移除connection,已关闭文件描述符:"<<connection->getsock()<<std::endl;
          //把map中的也删了
        }

        bool isConnectionsafe(int fd)
        {
          auto it=_connection_map.find(fd);
          if(it==_connection_map.end())
          {
            return false;
          }
          return true;
        }
        void Disptcher(int timeout)
        {
          //进行等待就绪时间
          int n=_epoll_server->Epollwait(events,num,timeout);
          for(int i=0;i<n;i++)
          {
            uint32_t event=events[i].events;
            int sock=events[i].data.fd;
            //进行事件判断,并转化成读写事件
            if((event & EVENT_IN)&& isConnectionsafe(sock))
            {
              //读就绪
              if(_connection_map[sock]->_recver_callback)//函数存在
              {
                _connection_map[sock]->_recver_callback(_connection_map[sock]);//fd对应的connetcion信息,其中调用对应的方法,参数为connection对象
              }
            }
            if((event & EPOLLOUT)&& isConnectionsafe(sock))
            {
              //写就绪
              if(_connection_map[sock]->_recver_callback)
              {
                _connection_map[sock]->_send_callback(_connection_map[sock]);
              }
            }
            if(event & EPOLLHUP)
            {
              //读关闭
                event |=(EPOLLIN & EPOLLOUT);
            }
            if(event &EPOLLERR)
            {
              //错误
              
              event |=(EVENT_IN & EVENT_OUT);
            }
          }

        }
        void Loop()
        {
            _quit=false;
            while (!_quit)
            {
             Disptcher(3000);//3s

            }
            _quit=true;
        }
        
        ~TcpServer(){}

    private:
        std::shared_ptr<Epoll> _epoll_server;  //epoll接口
        std::unordered_map<int,std::shared_ptr<Connection>> _connection_map;  //连接池
        std::shared_ptr<Sock> _listensocket; //套接字接口
        struct epoll_event events[num];
        uint32_t _port;
        bool _quit; //是否退出

        funct _OnMessage;//上层回调处理信息

}; 

剩余代码的所在地:reactor · 但成伟/编程学习 - 码云 - 开源中国 (gitee.com)

主要难点是对事件与链接的管理,事件的执行,读写非阻塞这几点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/603122.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

1W 3KVDC 隔离 单输出 DC/DC 电源模块 ——TPF 系列

TPF系列提供输出稳压&#xff0c;精度高&#xff0c;对于输出电压有要求的场合特别适合&#xff0c;工业级环境温度&#xff0c;用于PCB安装的国际标准结构。此系列产品小巧&#xff0c;效率高&#xff0c;低输出纹波及提供3000V以上的直流电压隔离&#xff0c;封装有SIP和DIP可…

实测幻方新出的超强AI大模型,中文能力对比GPT4.0不落下风

目前从网上的消息来看&#xff0c;DeepSeek中文综合能力&#xff08;AlignBench&#xff09;开源模型中最强&#xff0c;与GPT-4-Turbo&#xff0c;文心4.0等闭源模型在评测中处于同一梯队。 话不多说&#xff0c;我们开测&#xff01; 1.首先我们来让他直接来一段逻辑推理【并…

Jetpack Compose三:主题和基础控件的使用

设置主题 与Android View的主题定义方式不同&#xff0c;Jetpack Compose中的主题由许多较低级别的结构体和相关API组成&#xff0c;它们包括颜色、排版和形状属性。 Theme.kt控制工程的主题&#xff0c;它是一个可组合的Compose函数 最后主题函数ComposeStudyTheme的相关设置…

安装Nox夜神模拟器关闭了HyperV后Docker运行不了怎么办?

1.背景 为了模拟真机&#xff0c;尝试安装了Nox夜神模拟器&#xff0c; 安装过程要求关闭Hyper-V。当时只是在程序安装卸载中关闭了系统服务。以为到时勾选上就好了。操作路径&#xff1a;控制面板\所有控制面板项\程序和功能\启用或关闭Windows功能\Hyper-V。 后来卸载掉了夜神…

D盘被格式化了能找回吗 d盘格式化了数据可以找回来吗

D盘作为电脑中重要的磁盘之一&#xff0c;很多用户都会将一些重要的数据保存在D盘。但在磁盘空间不足的情况下&#xff0c;或许有些用户会将其进行格式化&#xff0c;D盘被格式化了如何恢复数据&#xff1f; 如果是比较重要的数据&#xff0c;建议用户立即进行数据恢复操作&am…

Java-异常处理-定义三角形类Triangle和异常三角形IllegalTriangleException类 (1/2)

任意一个三角形&#xff0c;其任意两边之和大于第三边。当三角形的三条边不满足前述条件时&#xff0c;就表示发生了异常&#xff0c;将这种异常情况定义为IllegalTriangleException类。 自定义异常类IllegalTriangleException&#xff1a; 当三角形的三条边不满足条件&#x…

数据丢失不慌张,手机数据恢复一键解决!

如今手机已经成为我们生活中不可或缺的一部分。无论是工作、学习还是娱乐&#xff0c;手机都扮演着重要的角色。随着使用时间的增加&#xff0c;手机数据丢失的问题也时常发生。那么手机数据恢复有哪些方法呢&#xff1f;面对这种情况&#xff0c;先不要慌张&#xff0c;本文将…

3dmax-vray6渲染器参数设置

适用于3dmax2018-2023版本 一、【公用】 小图输出大小:1500*1125&#xff0c;勾选大气、效果&#xff1b; 大图输出大小:3000*2250&#xff0c;勾选大气、效果、置换&#xff1b; 二、【vray】 小图抗锯齿类型:渐进式&#xff1b;最小细分:1&#xff0c;最大细分:100&#…

CRM(客户关系管理系统)

商机流程 为什么选择简道云CRM&#xff1f; 行业痛点 很多客户有复杂的订单成本计算方式&#xff0c;复杂多变的审批流程&#xff0c;个性化/流程化的数据结构&#xff0c;没有自定义能力就很难满足。 解决方案 在CRM套件的基础上自定义编辑/搭建了适合公司业务的CRMERP 两…

数据结构之单单单——链表

一.链表 1&#xff09;链表的概念 链表&#xff08;Linked List&#xff09;是一种物理存储结构上非连续&#xff0c;非顺序的储存结构&#xff0c;数据元素的逻辑顺序是通过链表中指针链接次序实现的。要注意&#xff0c;链表也是线性表----->但链表在物理结构上不是线性的…

安装helm

&#xff08;作者&#xff1a;陈玓玏&#xff09; 文档&#xff1a;https://helm.sh/zh/docs/intro/install/ 文档记载了几种安装方法&#xff0c;我用的是一步到位的那种&#xff0c;直接运行curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 …

python作业五

题目&#xff1a;注册登录 制作一个注册登录模块 注册&#xff1a;将用户填入的账户和密码保存到一个文件(users.bin) 登陆&#xff1a;将用户填入账户密码和users.bin中保存的账户密码进行比对,如果账户和密码完全相同 那 么登录成功&#xff0c;否则登录失败…

C语言(递归)

Hi~&#xff01;这里是奋斗的小羊&#xff0c;很荣幸各位能阅读我的文章&#xff0c;诚请评论指点&#xff0c;关注收藏&#xff0c;欢迎欢迎~~ &#x1f4a5;个人主页&#xff1a;小羊在奋斗 &#x1f4a5;所属专栏&#xff1a;C语言 本系列文章为个人学习笔记&#x…

win10安装.NET Framework 3.5(包括.net2.0和3.0)

打开控制面板 选择”程序” 点击”启用或关闭Windows功能“ 把.NET Framework 3.5选项勾选即可&#xff0c;若没有下载的&#xff0c;下载即可。 PS:如果下载过程出错&#xff0c;按如下流程&#xff1a; 右击”此电脑”选择“管理”&#xff0c;找到“服务和应用程序”&#x…

JAVA(三)常用类和API

目录 常用类与基础API---String String的内存结构 构造器和常用方法 字符串构建 String与其他结构间的转换 String的常用API 系列1&#xff1a;常用方法 系列2&#xff1a;查找 系列3&#xff1a;字符串截取 系列4&#xff1a;和字符/字符数组相关 系列5&#xff1a;开头…

Mac 解决外接移动硬盘(NTFS格式)无法写入的问题

文章目录 1. 问题描述2. 解决步骤 1. 问题描述 MacOS 可以识别 NTFS 格式的磁盘&#xff0c;但是默认情况下是只读模式&#xff0c;即无法向 NTFS 格式的磁盘写入数据。这是因为 NTFS 是 Windows 系统默认的文件系统格式&#xff0c;而 MacOS 对 NTFS 的写入支持是有限的。 如…

python软件开发遇到的坑-相对路径文件读写异常,不稳定

1. os.chdir()会影响那些使用相对路径读写文件的程序&#xff0c;使其变得不稳定&#xff0c;默认情况下&#xff0c;当前工作目录是主程序所在目录&#xff0c;使用os.chdir会将当前工作目录修改到其他路径。 资料&#xff1a; python相对路径写对了却报错是什么原因呢&#…

什么情况下 MySQL 连查询都能被阻塞?

MySQL 的锁也是不少&#xff0c;在哪种情况下会连查询都能被阻塞&#xff1f;这是一个有意思的问题。 工作中&#xff0c;很多开发和 DBA 可能接触较多的锁也就行锁了。对于行锁&#xff0c;阻塞写能理解&#xff0c;阻塞读实在是想不到。能阻塞读的那肯定是颗粒度更大的锁了&…

用于视频大型多模态模型(Video-LMMs)的复杂视频推理和鲁棒性评估套件

1 引言 最近,大型语言模型(LLMs)在同时处理广泛的NLP任务的同时展示了令人印象深刻的推理和规划能力。因此,将它们与视觉模态集成,特别是用于视频理解任务,催生了视频大型多模态模型(Video-LMMs)。这些模型充当视觉聊天机器人,接受文本和视频作为输入,并处理各种任务,包括视频…

技术分享 | 京东商品API接口|京东零售数据可视化平台产品实践与思考

导读 本次分享题目为京东零售数据可视化平台产品实践与思考。 主要包括以下四个部分&#xff1a; 1.京东API接口介绍 2. 平台产品能力介绍 3. 业务赋能案例分享 01 京东API接口介绍 02 平台产品能力介绍 1. 产品矩阵 数据可视化产品是一种利用数据分析和可视化技术&…