【IO多路转接】pollepoll

文章目录

  • 1 :peach:poll:peach:
    • 1.1 :apple:poll函数接口:apple:
    • 1.2 :apple:poll接口的使用:apple:
    • 1.3 :apple:poll的优缺点:apple:
  • 2 :peach:epoll:peach:
    • 2.1 :apple:epoll函数接口:apple:
      • 2.1.1 :lemon:epoll_create:lemon:
      • 2.1.2 :lemon:epoll_ctl:lemon:
      • 2.1.3 :lemon:epoll_wait:lemon:
    • 2.2 :apple:epoll工作原理:apple:
    • 2.3 :apple:epoll的优点:apple:
    • 2.4 :apple:epoll接口的使用:apple:
      • 2.4.1 :lemon:第一版本的epoll:lemon:
      • 2.4.2 :lemon:epoll工作方式:lemon:
      • 2.4.3 :lemon:对比LT和ET:lemon:
      • 2.4.4 :lemon:第二个版本的epoll:lemon:
      • 2.4.5 :lemon:Reactor:lemon:


1 🍑poll🍑

1.1 🍎poll函数接口🍎

#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
// pollfd结构
struct pollfd 
{
 int fd; /* file descriptor */
 short events; /* requested events */
 short revents; /* returned events */
};

参数说明:

  • fds是一个poll函数监听的结构列表. 每一个元素中, 包含了三部分内容: 文件描述符, 监听的事件集合, 返回的事件集合;
  • nfds表示fds数组的长度;
  • timeout表示poll函数的超时时间, 单位是毫秒(ms)

eventsrevents的取值:

事件描述是否可作为输入是否可作为输出
POLLIN普通数据和优先数据可读
POLLEDNORM普通数据可读
POLLEDBAND优先级带数据可读(Linux不支持)
POLLOUT普通数据和优先数据可写
POLLWRNORM普通数据可写
POLLWRBAND优先级带数据可写
POLLERR错误
POLLHUP挂起,比如管道的写端关闭后,读端描述符上将收到POLLHUP事件
POLLNVAL文件描述符没有打开

返回结果:

  • 返回值小于0, 表示出错;
  • 返回值等于0, 表示poll函数等待超时;
  • 返回值大于0, 表示poll由于监听的文件描述符就绪而返回;

1.2 🍎poll接口的使用🍎

通过对poll接口的介绍后大家不难发现,其实使用poll接口是比用select是更简单的,因为在之前我们写select服务器时我们需要自己来维护一个fd数组帮助我们将位图结构初始化,但是使用poll就不用了,我们只需要创建一个struct pollfd*结构的指针,动态开辟空间即可。

代码实例:

#include "Sock.hpp"
#include <memory>
#include <string>
#include<poll.h>
#include<cassert>

using namespace std;
const int N = 1024;
const int default_fd = -1;
const short default_event=0;
const uint16_t gport=8866;
class PollServer
{
public:
    PollServer(const uint16_t port=gport)
    :_port(port)
    ,_ppd(nullptr)
    {}
    ~PollServer()
    {
        _listensock.Close();
        delete []_ppd;
        _ppd=nullptr;
    }

    void init()
    {
        _listensock.Socket();
        _listensock.Bind(_port);
        _listensock.Listen();

        _ppd=new pollfd[N];
        for(int i=0; i<N; ++i)
        {
            _ppd[i].fd=default_fd;
            _ppd[i].events=default_event;
            _ppd[i].revents=default_event;
        }

    }

    void run()
    {
        _ppd[0].fd=_listensock.Fd();
        _ppd[0].events=POLLIN;
        while(true)
        {
            //int timeout=-1;
            int n=poll(_ppd, N, -1); 
            if (n > 0)
            {
                cout << "有一个就绪事件发生了" << endl;
                // 表示已经有n个连接到来了,此时我们能够直接accept吗?
                hand_event();
                printf_fd();
            }
            else if (n == 0)
            {
                cout << "time out" << endl;
            }
            else
            {
                cout << "select errno:" << errno << ":" << strerror(errno) << endl;
            }
        }
    }
private:
    void accepter()
    {
        string clientip;
        uint16_t clientport;
        int sock = _listensock.Accept(&clientip, &clientport);
        cout << "[ip:port]:" << clientip << ":" << clientport << endl;
        int pos = 1;
        while (pos < N)
        {
            if (_ppd[pos].fd == default_fd)
            {
                _ppd[pos].fd = sock;
                _ppd[pos].events=POLLIN;
                _ppd[pos].revents=default_event;
                break;
            }
            ++pos;
        }
        if (pos > N)
        {
            cout << "_fdarr full" << endl;
            close(sock);
        }
    }

    void serverio(int fd, int i)
    {
        char buffer[1024];
        ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
        if (n < 0)
        {
            cout << "read fail" << endl;
            return;
        }
        else if (n == 0)
        {
            cout << "client close,me too" << endl;
            close(fd);
            _ppd[i].fd=default_fd;
            _ppd[i].events=default_event;
            _ppd[i].revents=default_event;
        }
        else
        {
            buffer[n - 1] = 0;
            cout << "client:" << buffer << endl;
            string echo = buffer;
            echo += " [poll server echo]";
            write(fd, echo.c_str(), echo.size());
        }
    }

    void hand_event()
    {
        for (int i = 0; i < N; ++i)
        {
            int fd=_ppd[i].fd;
            short revent=_ppd[i].revents;
            if(fd == default_fd)
                continue;
            if (revent & POLLIN)
            {
                if (fd == _listensock.Fd())
                {
                    accepter();
                }
                else
                {
                    serverio(fd, i);
                }
            }
            else if(revent & POLLOUT)
            {
                cout<<"POLLOUT"<<endl;
            }

        }
    }

    void printf_fd()
    {
        for (int i = 0; i < N; ++i)
        {
            if (_ppd[i].fd != default_fd)
                cout << _ppd[i].fd << " ";
        }
        cout<<endl;
    }
private:
    Sock _listensock;
    uint16_t _port;
    pollfd* _ppd;
};

注意:此时在进行serverio时也会有粘包问题以及write的fd没有交给poll处理(写实事件并不一定就绪)问题。

验证:
在这里插入图片描述

1.3 🍎poll的优缺点🍎

poll优点
不同与select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现:

  • pollfd结构包含了要监视的event和已经就绪的revent,不再使用select手动设置fd集合的方式,接口使用比select更方便;
  • poll并没有最大数量限制 (但是数量过大后性能也是会下降);

poll缺点

poll中监听的文件描述符数目增多时:

  • 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符;
  • 每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中;
  • 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降.

从本质上来说,poll知识解决了select文件描述符个数限制问题,但是select其他缺点poll并没有解决,那么还有更好的方式来解决吗?这时就引出来了一个更为厉害的转接方式:epoll


2 🍑epoll🍑

2.1 🍎epoll函数接口🍎

2.1.1 🍋epoll_create🍋

int epoll_create(int size);

创建一个epoll的句柄:

  • 自从linux2.6.8之后,size参数是被忽略的(只要随便设置一个>0的数字就行)
  • 用完之后, 必须调用close()关闭;

2.1.2 🍋epoll_ctl🍋

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件注册函数:

  • 它不同于select是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型;
  • 第一个参数是epoll_create()的返回值(epoll的句柄);
  • 第二个参数表示动作,用三个宏来表示;
  • 第三个参数是需要监听的fd;
  • 第四个参数是告诉内核需要监听什么事件;

op参数的取值:

  • EPOLL_CTL_ADD :注册新的fd到epfd中;
  • EPOLL_CTL_MOD :修改已经注册的fd的监听事件;
  • EPOLL_CTL_DEL :从epfd中删除一个fd;

struct epoll_event结构如下:
在这里插入图片描述

events可以是以下几个宏的集合:

  • EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
  • EPOLLOUT : 表示对应的文件描述符可以写;
  • EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
  • EPOLLERR : 表示对应的文件描述符发生错误;
  • EPOLLHUP : 表示对应的文件描述符被挂断;
  • EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的;
  • EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里;

2.1.3 🍋epoll_wait🍋

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

收集在epoll监控的事件中已经就绪事件。

  • 参数events是分配好的epoll_event结构体数组;
  • epoll将会把发生的事件赋值到events数组中 (events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存);
  • maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size;
  • timeout是超时时间 (毫秒,0会立即返回,-1是永久阻塞);
  • 如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时, 返回小于0表示函数失败;

2.2 🍎epoll工作原理🍎

  • 1️⃣当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关:
struct eventpoll
{ 
 .... 
 /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/ 
 struct rb_root rbr; 
 /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/ 
 struct list_head rdlist; 
 .... 
};
  • 2️⃣每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件;这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度)
    而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法;这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。在epoll中,对于每一个事件,都会建立一个epitem结构体。
struct epitem
{ 
 struct rb_node rbn;//红黑树节点 
 struct list_head rdllink;//双向链表节点 
 struct epoll_filefd ffd; //事件句柄信息 
 struct eventpoll *ep; //指向其所属的eventpoll对象 
 struct epoll_event event; //期待发生的事件类型 
}
  • 3️⃣当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可,如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户. 这个操作的时间复杂度是O(1)

在这里插入图片描述总结一下, epoll的使用过程就是三部曲:

  • 调用epoll_create创建一个epoll句柄;
  • 调用epoll_ctl, 将要监控的文件描述符进行注册;
  • 调用epoll_wait, 等待文件描述符就绪;

2.3 🍎epoll的优点🍎

  • 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效,不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开;
  • 数据拷贝轻量: 只在合适的时候调用 epoll_ctl() 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll都是每次循环都要进行拷贝);
  • 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响;
  • 没有数量限制: 文件描述符数目无上限;(IO效率不随fd数目增加而线性下降)

网上有些博客说, epoll中使用了内存映射机制(内核直接将就绪队列通过mmap的方式映射到用户态,避免了拷贝内存这样的额外性能开销);这种说法是不准确的,我们定义的struct epoll_event是我们在用户空间中分配好的内存,势必还是需要将内核的数据拷贝到这个用户空间的内存中的。

2.4 🍎epoll接口的使用🍎

2.4.1 🍋第一版本的epoll🍋

为了更加方便使用 epoll接口,便封装了一个类专门处理epoll接口:

#include "Sock.hpp"
#include <memory>
#include <string>
#include<sys/epoll.h>
#include<cassert>
using namespace std;


const int default_sz=132;
const int default_epfd=-1;
class Epoller
{
public:
    Epoller()
    :_epfd(default_epfd)
    {}
    ~Epoller()
    {
        if(_epfd != default_epfd)
            close(_epfd);
    }
    void create()
    {
        _epfd=epoll_create(default_sz);
        if(_epfd < 0)
        {
            cout<<"epoll_create fail"<<endl;
        }
    }

    bool add_event(int fd, uint32_t events)
    {
        epoll_event ev;
        ev.data.fd=fd;
        ev.events=events;
        int n=epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);
        if(n < 0)
        {
            cout<<"add_event fail"<<endl;
            return false;
        }
        return true;
    }
    bool mod_event(int fd, uint32_t events)
    {
        epoll_event ev;
        ev.data.fd=fd;
        ev.events=events;
        int n=epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &ev);
        if(n < 0)
        {
            cout<<"mod_event fail"<<endl;
            return false;
        }
        return true;
    }
    bool del_event(int fd)
    {
        int n=epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
        if(n < 0)
        {
            cout<<"del_event fail"<<endl;
            return false;
        }
        return true;
    }

    int ep_wait(epoll_event* revents, int max_events, int timeout)
    {
        return epoll_wait(_epfd, revents, max_events, timeout);
    }

    int get_epfd()
    {
        return _epfd;
    }
private:
    int _epfd;
};

epoll服务器的编写:

const uint16_t g_port=8899;
const int max_epollrevent_sz=64;
class EpollServer
{
public:
    EpollServer(uint16_t port=g_port)
    :_port(port)
    {}
    ~EpollServer()
    {
        _listensock.Close();
    }

    void init()
    {
        _listensock.Socket();
        _listensock.Bind(_port);
        _listensock.Listen();

        _epoller.create();
    }
    void run()
    {
        _epoller.add_event(_listensock.Fd(), EPOLLIN);
        int timeout=-1;//以阻塞方式进行等待
        while(true)
        {
            int n=_epoller.ep_wait(_rvents, max_epollrevent_sz, timeout);
            if(n < 0)
            {
                cout<<"ep_wait fail"<<endl;
            }
            else if(n == 0)
            {
                cout<<"timeout"<<endl;
            }
            else
            {
                cout<<"当前已经有"<<n<<"个事件就绪"<<endl;
                hander_event(n);
            }
        }
    }
private:
    void hander_event(int n)//由于只有n个事件就绪,所以我们只需要遍历0~n即可
    {
        for(int i=0; i<n; ++i)
        {
            int fd=_rvents[i].data.fd;
            uint32_t revent=_rvents->events;
            if (revent & EPOLLIN)//读事件就绪时
            {
                if (fd == _listensock.Fd())
                {
                    accepter(); // 进行accept获取新连接
                }
                else
                {
                    serverio(fd); // 用于进行数据io
                }
            }
        }
    }
    void accepter()
    {
        string clientip;
        uint16_t clientport;
        int sock=_listensock.Accept(&clientip, &clientport);
        cout<<"【"<<clientip<<","<<clientport<<"】事件已经就绪,fd:"<<sock<<endl;
        //将新连接添加到_epoller
        _epoller.add_event(sock, EPOLLIN);
    }
    void serverio(int fd)
    {
        char buffer[1024];
        ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
        if (n < 0)
        {
            cout << "read fail" << endl;
        }
        else if (n == 0)
        {
            cout << "client close,me too" << endl;
            close(fd);
            _epoller.del_event(fd);
        }
        else
        {
            buffer[n - 1] = 0;
            cout << "client:" << buffer << endl;
            string echo = buffer;
            echo += " [epoll server echo]";
            send(fd, echo.c_str(), echo.size(), 0);
        }
    }

private:
    uint16_t _port;
    Sock _listensock;
    Epoller _epoller;
    epoll_event _rvents[max_epollrevent_sz];
};

代码中需要注意的地方:

  • 1️⃣此时在进行serverio时也会有粘包问题以及写事件并不一定就绪的问题(这个我们在第二版本会讲解处理方式)。
  • 2️⃣在select/poll编程中,在读取消息时当对端已经把连接关闭时都会修改数组(select中是数组,而poll中是指针),目的都是让内核不要在关心该事件了,epoll也是同理,不同的是调用del_event将不在关心的事件删除而已。

2.4.2 🍋epoll工作方式🍋

epoll有2种工作方式:水平触发(LT)和边缘触发(ET)
我们来举一个生活中小栗子来帮助更好的理解这两种方式:

比如你正在打游戏,你的妈妈喊你吃饭,这时她通知你的方式可能有下面两种:

  1. 每隔一段时间通知你一次,直到你来吃饭为止;(LT)
  2. 只通知你一次,后面就不管你了;(ET)

看一个实际例子:

  • 我们已经把一个tcp socket添加到epoll描述符 ;
  • 这个时候socket的另一端被写入了2KB的数据;
  • 调用epoll_wait,并且它会返回,说明它已经准备好读取操作;
  • 然后调用read, 只读取了1KB的数据;
  • 继续调用epoll_wait…

水平触发Level Triggered 工作模式

  • 当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理,或者只处理一部分;
  • 如上面的例子, 由于只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait仍然会立刻返回并通知socket读事件就绪;
  • 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回;
  • 支持阻塞读写和非阻塞读写;

注意:epoll默认状态下就是LT工作模式.

边缘触发Edge Triggered工作模式

如果我们将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式。

  • 当epoll检测到socket上事件就绪时, 必须立刻处理;
  • 如上面的例子, 虽然只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候,epoll_wait 不会再返回了;
  • 也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会;
  • ET的性能比LT性能更高( epoll_wait 返回的次数少了很多),Nginx默认采用ET模式使用epoll;
  • 只支持非阻塞的读写;

2.4.3 🍋对比LT和ET🍋

LT是 epoll 的默认行为,使用 ET 能够减少 epoll 触发的次数,但是代价就是强逼着程序猿一次响应就绪过程中就把所有的数据都处理完。这也就表明ET的代码复杂程度更高。

相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些,但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理,不让这个就绪被重复提示的话,其实性能也是差不多的。

2.4.4 🍋第二个版本的epoll🍋

在开始写代码前,我们增加一个Connection的结构体,它的主要成员如下:
在这里插入图片描述为什么要设计这么一个结构体呢?我们知道,使用ET模式就要求程序员将缓冲区的数据一次性全部取走,所以为了简便就使用了_inbuffer_outbuffer这两个缓冲区来处理读到的数据以及要发送的数据。另外该成员中还存在3个回调函数,这样我们只需要不断的监视就绪的事件中_events,就能够回调不同的处理方式。
除此之外,我们还得在EpollServer类中再增加一个成员变量:
在这里插入图片描述
使用fdConnection建立唯一映射关系,当我们监听到一个新连接到来时,就将新连接添加到_conns中管理,所以我们接下来便可以完善代码了:

const uint16_t g_port=8899;
const int max_epollrevent_sz=64;
const int g_num=8848;


class Connection;
class EpollServer;
using func_t=std::function<void (Connection*, const protocol_ns::Request&)>;
using callbact_t=std::function<void(Connection*)>;

class Connection
{
public:
    Connection(int fd, string ip, uint16_t port)
    :_fd(fd)
    ,_ip(ip)
    ,_port(port)
    {}
    void resgister(callbact_t calb_read, callbact_t calb_write, callbact_t calb_excep)
    {
        _calb_read=calb_read;
        _calb_write=calb_write;
        _calb_excep=calb_excep;
    }
    //用于进行数据IO
    int _fd;
    string _inbuffer;
    string _outbuffer;
    //用户信息
    string _ip;
    uint16_t _port;
    //用户关心的事件
    uint32_t _events;
    //IO处理函数
    callbact_t _calb_read;
    callbact_t _calb_write;
    callbact_t _calb_excep;
};
class EpollServer
{
public:
    EpollServer(func_t func, uint16_t port=g_port)
    :_func(func)
    ,_port(port)
    {}
    ~EpollServer()
    {
        _listensock.Close();
    }

    void init()
    {
        _listensock.Socket();
        _listensock.Bind(_port);
        _listensock.Listen();

        _epoller.create();
        add_connection(_listensock.Fd(), EPOLLIN | EPOLLET);//将_listensock添加到_conns中
        cout<<"EpollServer init success"<<endl;
    }


    void loop_once(int timeout)
    {
        int n=_epoller.ep_wait(_rvents, max_epollrevent_sz, timeout);
        for(int i=0; i<n; ++i)
        {
            int fd=_rvents[i].data.fd;
            uint32_t events=_rvents[i].events;
            cout<<n<<":"<<i<<endl;
            cout<<"正在处理"<<fd<<"事件上的"<<((events & EPOLLIN) ? "EPOLLIN" : "OTHER")<<endl;

            //将所有的异常情况,最后全部转化成为read,write的异常
            if ((events & EPOLLERR) || (events & EPOLLHUP))
                events |= (EPOLLIN | EPOLLOUT);

            if((events & EPOLLIN) && conn_isexist(fd))
                _conns[fd]->_calb_read(_conns[fd]);//如果是读事件就执行读事件的回调
            else if((events & EPOLLOUT) && conn_isexist(fd))
                _conns[fd]->_calb_write(_conns[fd]);//如果是写事件就执行写事件的回调
        }
    }
    void run()
    {
       int timeout=-1;
       while(true)
       {
            loop_once(timeout);
       }
    }

private:
    void add_connection(int fd, uint32_t events, string ip="127.0.0.1", uint16_t port=g_port)
    {
        //将fd设置为非阻塞,保证ET模式下不会一直卡在wait
        Util::SetNonBlock(fd);
        //构建Connection对象,交给_conns管理
        Connection* con=new Connection(fd, ip, port);
        if(fd == _listensock.Fd())
            con->resgister(std::bind(&EpollServer::accepter, this, std::placeholders::_1), nullptr, nullptr);
        else
            con->resgister(std::bind(&EpollServer::reader, this, std::placeholders::_1),
            std::bind(&EpollServer::writer, this, std::placeholders::_1),
            std::bind(&EpollServer::excepter, this, std::placeholders::_1));
        con->_events=events;
        _conns.insert({fd, con});
        //将事件写到内核中
        bool r=_epoller.add_event(fd, events);
        cout<<"_conns insert success,fd:"<<fd<<",ip:"<<ip<<",port"<<port<<endl;
    }

    void accepter(Connection* conn)
    {
    }
    void reader(Connection* conn)
    {
    }
    void writer(Connection* conn)
    {
    }
    void excepter(Connection* conn)
    {
    }
    bool conn_isexist(int fd)
    {
        return _conns.find(fd) != _conns.end();
    }

private:
    uint16_t _port;
    Sock _listensock;
    Epoller _epoller;
    epoll_event _rvents[max_epollrevent_sz];//就绪的响应事件
    unordered_map<int, Connection*> _conns;//使用fd与Connection*建立映射关系
    func_t _func;//用于执行上层传入的回调
};

代码中需要注意的地方:

  • 1️⃣:在添加连接时,首先将fd设置为非阻塞,保证了read/write一定能够把数据读取完毕/发送完毕;
  • 2️⃣:在添加连接的时候,由于参数不匹配,所以使用了bind来调整参数个数:
    在这里插入图片描述
  • 3️⃣:在进行数据IO时,我们分成了readerwriter

现在的重点是如何实现accepter/reader/writer:
在实现前,我们就必须考虑协议定制的问题了,要读取或者发送一个完整的报文,我们之前实现网络版本计算器时已经实现过一次协议定制,所以此时直接拿来用即可。
先来实现acceper:

    void accepter(Connection* conn)
    {
        do
        {
            string clientip;
            uint16_t clientport;
            int err=0;
            int sock = _listensock.Accept(&clientip, &clientport, err);
            if (sock > 0)
            {
                cout << "【" << clientip << "," << clientport << "】事件已经就绪,fd:" << sock << endl;
                // 将新连接添加到_conns中管理
                add_connection(sock, EPOLLIN | EPOLLET, clientip, clientport);
            }
            else
            {
                if(err == EAGAIN || err == EWOULDBLOCK)
                    break;
                else if(err == EINTR)
                    continue;
                else
                    cout<<"accept fail"<<endl;
            }
        } while (conn->_events & EPOLLET);
    }

代码中需要循环获取新的连接,当新连接到来时就将新连接添加到_conns中管理即可。

再来实现reader:

    void hander_requset(Connection* conn)
    {
        bool quit = false;
        while (!quit)
        {
            string requestStr;
            // ParsePackage函数作用是将_inbuffer中取出一个完整报文并将其写入到requestStr中
            // 返回值==0表示没有一个完整报文;>0表示一个完整报文的长度
            int t = protocol_ns::ParsePackage(conn->_inbuffer, &requestStr);
            if (t > 0)
            {
                // 去除报头
                requestStr = protocol_ns::RemoveHeader(requestStr, t);
                // 进行反序列化
                protocol_ns::Request req;
                req.Deserialize(requestStr);
                // 执行回调进行处理
                protocol_ns::Response resp=_func(req);
                //序列化
                string responseStr;
                resp.Serialize(&responseStr);
                //添加报头
                responseStr=protocol_ns::AddHeader(responseStr);
                //将数据写到发送缓冲区中
                conn->_outbuffer+=responseStr;
            }
            else
                quit=true;
        }
    }
    bool reader_hander(Connection* conn)
    {
        bool res=true;
        do
        {
            char buffer[g_num]={0};
            int n=read(conn->_fd, buffer, sizeof(buffer)-1);
            if(n > 0)
            {
                buffer[n]=0;
                conn->_inbuffer+=buffer;
            }
            else if (n == 0)
            {
                conn->_calb_excep(conn);
                break;
            }
            else
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                    break;//表示已经将数据全部读取完毕,跳出循环
                else if (errno == EINTR)
                    continue;
                else
                {
                    conn->_calb_excep(conn);
                    res=false;
                    break;
                }
            }
        }while(conn->_events & EPOLLET);
        return res;
    }
    void reader(Connection* conn)
    {
        if(!reader_hander(conn))
            return;
        //处理request,返回response
        hander_requset(conn);
        //一般我们在面对写入的时候,直接写入,没写完才交给epoll
        if(!conn->_outbuffer.empty())
            conn->_calb_write(conn);//如果发送缓冲区不为空,就进行一次writer
    }

上面代码中比较重要的地方都标有注释。
处理reader的方式是先将所有的数据全部读到_inbuffer中,然后再根据协议进行处理,将处理好的数据放进_outbuffer中,最后判断一下_outbuffer是否为空,不为空的话就手动调用写事件的回调函数进行处理。

在进行写事件处理时我们要明白一件事:读事件需要一直关心,因为你需要一直监听是否有新的socket到来,但是写事件其实就不需要了,一直关心反而占用CPU资源,只有当_outbuffer不为空时我们才去关心。

writer的编写:

    bool enable_read_write(Connection *conn, bool read, bool write)
    {
        conn->_events = ((read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0) | EPOLLET);
        return _epoller.mod_event(conn->_fd, conn->_events);
    }
    void writer(Connection *conn)
    {
        {
            bool safe = true;
            do
            {
                ssize_t n = write(conn->_fd, conn->_outbuffer.c_str(), conn->_outbuffer.size());
                if (n > 0)
                {
                    conn->_outbuffer.erase(0, n);
                    if (conn->_outbuffer.empty())
                        break;
                }
                else
                {
                    if (errno == EAGAIN || errno == EWOULDBLOCK)
                    {
                        break;
                    }
                    else if (errno == EINTR)
                        continue;
                    else
                    {
                        safe = false;
                        conn->_calb_excep(conn);
                        break;
                    }
                }
            } while (conn->_events & EPOLLET);
            if (!safe)
                return;
            if (!conn->_outbuffer.empty())
                enable_read_write(conn, true, true);
            else
                enable_read_write(conn, true, false);
        }
    }

最后excepter的编写就很简单了:

    void excepter(Connection *conn)
    {
        // 1. 先从epoll移除fd
        _epoller.del_event(conn->_fd);

        // 2. 从_conns中移除fd
        _conns.erase(conn->_fd);

        // 3. 关闭fd
        close(conn->_fd);

        // 5. 释放conn对象
        delete conn;
    }

2.4.5 🍋Reactor🍋

首先来回答什么是Reactor?
Reactor是基于多路转接包含事件派发器,连接管理器的半同步半异步的IO服务器。
其实我们实现的第二个版本的epoll就是一个简易版本的Reactor(但是没有加上异步处理)。异步就是使用多进程/多线程的方式让事件处理交给另外一个进程/线程处理,防止当前业务进程/业务线程阻塞而导致整个业务无法处理。

如果使用多线程的方式我们可以使用下面这种方式将之前的代码进行优化:
在这里插入图片描述
这样一个sock就对应这一个线程来处理。

多进程方式:

  • 我们可以使用管道来处理,父进程负责获得listensock,子进程将管道的读端添加进epoll中,由于子进程继承了父进程的listensock(不将listensock添加进Reactor),所以让子进程自己accept获得sock然后添加进Reactor,父进程可以使用轮询的方式随机挑选子进程向管道写入数据;
  • 除了使用管道外,还可以多进程加锁竞争的方式来进行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/120102.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

pcie对phy的skew要求

我正在「拾陆楼」和朋友们讨论有趣的话题&#xff0c;你⼀起来吧&#xff1f; 拾陆楼知识星球入口 pcie的设计中有这样一条要求&#xff0c;所有但phy/tx*_clk pin的clock skew要小于skew要求。 这里提供一下实现方法&#xff0c;如果你有更好的办法可以在评论区留言或者私信…

利用maven的dependency插件将项目依赖从maven仓库中拷贝到一个指定的位置

https://maven.apache.org/plugins/maven-dependency-plugin/copy-dependencies-mojo.html 利用dependency:copy-dependencies可以将项目的依赖从maven仓库中拷贝到一个指定的位置。 使用默认配置拷贝依赖 如果直接执行mvn dependency:copy-dependencies&#xff0c;是将项目…

IP地址与MAC地址(硬件地址)的区别

IP地址和硬件地址都是用于标识网络设备的地址&#xff0c;但它们的作用和使用方式不同。IP地址是用于在网络中唯一标识一个设备的逻辑地址它是由网络协议栈分配的&#xff0c;可以动态地分配和改变。而硬件地址是设备的物理地址&#xff0c;也称为MAC地址&#xff0c;是由设备制…

TCP/IP的基础知识

文章目录 TCP/IP的基础知识硬件&#xff08;物理层&#xff09;网络接口层&#xff08;数据链路层&#xff09;互联网层&#xff08;网络层&#xff09;TCP/IP的具体含义传输层应用层&#xff08;会话层以上的分层&#xff09;TCP/IP分层模型与通信示例发送数据包的一个例子接收…

什么是微服务?与分布式又有什么区别?

什么是微服务&#xff0c;我们先从传统的单体结构进行了解&#xff0c;对两者进行对比。 单体结构 单体结构是一种传统的软件架构模式&#xff0c;它将应用程序划分为一组相互依赖的模块和组件。这些模块和组件通常都是构建在同一个平台上的&#xff0c;并且紧密耦合在一起。…

一种可以实现安全便捷文件摆渡的跨网文件安全交换软件

为了保护数据的安全性和完整性&#xff0c;很多企业都采用了内外网物理隔离的方式&#xff0c;防止核心数据泄露或被恶意篡改。然而&#xff0c;这也给企业内部或与外部合作伙伴之间的文件交换带来了很多不便和挑战。如何在保证数据安全的前提下&#xff0c;实现跨网文件的快速…

【h5 uniapp】 滚动 滚动条,数据跟着变化

uniapp项目 需求&#xff1a; 向下滑动时&#xff0c;数据增加&#xff0c;上方的日历标题日期也跟着变化 向上滑动时&#xff0c;上方的日历标题日期跟着变化 实现思路&#xff1a; 初次加载目前月份的数据 以及下个月的数据 this.getdate()触底加载 下个月份的数据 onReach…

缓冲流详解

缓冲流概述 缓冲流也称为高效流、或者高级流。之前学习的字节流可以称为原始流。 作用&#xff1a;缓冲流自带缓冲区、可以提高原始字节流、字符流读写数据的性能。 字节缓冲流 字节缓冲流性能优化原理&#xff1a; 字节缓冲输入流自带了8KB缓冲池&#xff0c;以后我们直接…

计算机找不到MSVCR120.dll,MSVCR120.dll丢失的三种解决方法

在计算机使用过程中&#xff0c;我们经常会遇到一些错误提示&#xff0c;其中之一就是“MSVCR120.dll丢失”。这个错误通常出现在运行某些程序时&#xff0c;导致程序无法正常启动。那么&#xff0c;如何解决MSVCR120.dll丢失的问题呢&#xff1f;小编将详细介绍解决方法&#…

第二证券:北交所30%的涨跌幅限制?

随着我国股市的不断发展&#xff0c;股市生意的涨跌幅束缚也成为了一个备受注重的论题。在北交所&#xff0c;股票的涨跌幅束缚为30%&#xff0c;这一束缚是否合理呢&#xff1f;本文将从多个角度进行剖析。 首先&#xff0c;涨跌幅束缚对于股市的安稳起着重要的效果。股票价格…

JavaScript使用对象

对象(object)是最基本、最通用的类型&#xff0c;具有复合性结构&#xff0c;属于引用型数据&#xff0c;对象的结构具有弹性&#xff0c;内部的数据是无序的&#xff0c;每个成员被称为属性。在JavaScript中&#xff0c;对象是一个泛化的概念&#xff0c;任何值都可以转换为对…

任正非说:流程的作用就三个:一是正确及时交付,二是赚到钱,三是没有腐败。

你好&#xff01;这是华研荟【任正非说】系列的第32篇文章&#xff0c;让我们聆听任正非先生的真知灼见&#xff0c;学习华为的管理思想和管理理念。 一、流程的作用就三个&#xff1a;一是正确及时交付&#xff0c;二是赚到钱&#xff0c;三是没有腐败。如果这三个目的都实现了…

个性化联邦学习-综述

介绍阅读的三篇个性化联邦学习的经典综述文章 Three Approaches for Personalization with Applications to Federated Learning 论文地址 文章的主要内容 介绍了用户聚类&#xff0c;数据插值&#xff0c;模型插值三种个性化联邦学习的方法。 用户聚类&#xff1a; 目的&a…

接口开发之使用C#插件Quartz.Net定时执行CMD任务工具

C#制作定时任务工具执行CMD命令 概要准备知识点实现原理thinkphp配置winform执行CMD命令读取ini配置文件定时任务Quartz.Net 完整代码Job.csIniFunc.csForm1.csconfig.ini简易定时任务工具雏形 概要 很多时候写接口上线后还会遇到很多修改&#xff0c;类似JAVA,C#,delphi制作的…

为什么说数据安全运维难?有好用的数据安全运维平台吗?

随着息技术的快速发展&#xff0c;不少企业在实行数字化转型&#xff0c;同时也面临着越来越多的数据安全运维挑战。不少企业都觉得数据安全运维难&#xff0c;都在找好用的数据安全运维平台。今天我们就来聊聊为什么说数据安全运维难&#xff1f;以及是否有好用的数据安全运维…

[MICROSAR Adaptive] --- Hello Adaptive World

Automotive E/E Architecture and AUTOSAR Adaptive Platform Vector Solution: MICROSAR Adaptive First project: Hello Adaptive World Summary 1 引言 1.1 AP诞生的历史背景 新一代电子电器架构通常将车内的节点分为三类。计算平台,预控制器和传感器执行器相关的节点,…

家用电脑做服务器,本地服务器搭建,公网IP申请,路由器改桥接模式,拨号上网

先浇一盆冷水&#xff01; 我不知道其他运营商是什么情况。联通的运营商公网IP端口 80、8080、443 都会被屏蔽掉&#xff0c;想要开放必须企业备案&#xff08;个人不行&#xff09;才可以。也就是说&#xff0c;只能通过其他端口进行showtime了。 需要哪些东西&#xff1f; 申…

Spring Boot中使用Spring Data JPA访问MySQL

Spring Data JPA是Spring框架提供的用于简化JPA&#xff08;Java Persistence API&#xff09;开发的数据访问层框架。它通过提供一组便捷的API和工具&#xff0c;简化了对JPA数据访问的操作&#xff0c;同时也提供了一些额外的功能&#xff0c;比如动态查询、分页、排序等。 …

数据分析:职场不可或缺的技能

前言 在当今数字化时代&#xff0c;数据分析已经变得越来越不可或缺。不论你从事哪个行业&#xff0c;不论你在职场的哪个阶段&#xff0c;数据分析技能都将成为你在工作中脱颖而出的秘密武器。本文将阐明数据分析的重要性&#xff0c;以及如何学习数据分析&#xff0c;以及如…

MobaXterm配置SSHTunnel

本地与远程服务器之间存在防火墙&#xff0c;防火墙只允许SSH端口通过&#xff0c;为访问远程服务器&#xff0c;我们可以借助MobaXterm来与SSH服务器建立隧道&#xff0c;使得防火墙外的用户能够访问远程服务器 配置 打开SSHTunnel 新建SSH tunnel 点击开启就生效了&…