【Linux网络编程】IO多种转接之Reactor

Reactor

在这里插入图片描述

点赞👍👍收藏🌟🌟关注💖💖
你的支持是对我最大的鼓励,我们一起努力吧!😃😃

基于上一篇epoll的学习,现在我们也知道epoll的工作模式有两种,一种默认LT工作模式,另一种是ET模式。关于epoll的LT工作模式我们已经写过了。接下来我们写一份基于ET模式下的Reator,处理所有的IO。

Reactor = 如何正确的处理IO+协议定制+业务逻辑处理

下面我们写一个简洁版的Reactor,它是一个半同步半异步IO,具体它什么原理,怎么做的,有什么特征。我们在代码层面上解开它的面纱。代码写完总结就理解了。其实Reactor是在Liunx网络中,最常用,最频繁的一种网络IO设计模式!

我们是这样打算的,对错误码,日志函数,套接字,epoll做封装然后在写服务器的时候用到的时候调用即可。错误码,日志函数,套接字以前我们封装过今天直接用就行了。

错误码封装

#pragma once

enum
{
    USAGG_ERR = 1,
    SOCKET_ERR,
    BIND_ERR,
    LISTEN_ERR,
    EPOLL_CREATE_ERR
};

日志函数封装

#pragma once

#include<iostream>
#include<string>
#include<stdio.h>
#include <cstdarg>
#include<ctime>
#include<sys/types.h>
#include<unistd.h>
#include<fstream>

#define DEBUG  0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4

#define LOG_NORMAL "log.txt"
#define LOG_ERR "log.error"

const char* level_to_string(int level)
{
    switch(level)
    {
        case DEBUG: return "DEBUG";
        case NORMAL: return "NORMAL";
        case WARNING: return "WARNING";
        case ERROR: return "ERROR";
        case FATAL: return "FATAL";
    }
}

//时间戳变成时间
char* timeChange()
{
    time_t now=time(nullptr);
    struct tm* local_time;
    local_time=localtime(&now);

    static char time_str[1024];

    snprintf(time_str,sizeof time_str,"%d-%d-%d %d-%d-%d",local_time->tm_year + 1900,\
                    local_time->tm_mon + 1, local_time->tm_mday,local_time->tm_hour, \
                    local_time->tm_min, local_time->tm_sec);

    return time_str;
}



void logMessage(int level,const char* format,...)
{
    //[日志等级] [时间戳/时间] [pid] [message]
    //[WARNING] [2024-3-21 10-46-03] [123] [创建sock失败]
#define NUM 1024
    //获取时间
    char* nowtime=timeChange();
    char logprefix[NUM];
    snprintf(logprefix,sizeof logprefix,"[%s][%s][pid: %d]",level_to_string(level),nowtime,getpid());

    //
    char logconten[NUM];
    va_list arg;
    va_start(arg,format);
    vsnprintf(logconten,sizeof logconten,format,arg);

    
    std::cout<<logprefix<<logconten<<std::endl;  
};

今天这里我们把套接字封装改一下,以前把它相关接口都写成静态的了,不需要对象直接调用了。今天呢,都改成非静态的,未来这个类提供的方法都要以对象调用访问。

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Err.hpp"
#include "Log.hpp"

using namespace std;

class Sock
{
    const static int backlog = 32;
    const static int defaultsock = -1;

public:
    Sock(int sock = defaultsock) : _listensock(sock)
    {}

    ~Sock()
    {
        if (_listensock != defaultsock)
            close(_listensock);
    }

public:
    int sock()
    {
        // 1. 创建socket文件套接字对象
        _listensock= socket(AF_INET, SOCK_STREAM, 0);
        if (_listensock < 0)
        {
            logMessage(FATAL, "create socket error");
            exit(SOCKET_ERR);
        }
        logMessage(NORMAL, "create socket success: %d", _listensock);


        int opt = 1;
        setsockopt(_listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
    }

	//方便外面获取_listensock
    int Fd()
    {
        return _listensock;
    }

    void Bind(int port)
    {
        // 2. bind绑定自己的网络信息
        struct sockaddr_in local;
        memset(&local, 0, sizeof(local));
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        local.sin_addr.s_addr = INADDR_ANY;
        if (bind(_listensock, (struct sockaddr *)&local, sizeof(local)) < 0)
        {
            logMessage(FATAL, "bind socket error");
            exit(BIND_ERR);
        }
        logMessage(NORMAL, "bind socket success");
    }

    void Listen()
    {
        // 3. 设置socket 为监听状态
        if (listen(_listensock, backlog) < 0)
        {
            logMessage(FATAL, "listen socket error");
            exit(LISTEN_ERR);
        }
        logMessage(NORMAL, "listen socket success");
    }

    int Accept(string *clientip, uint16_t *clientport,int* err)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        int sock = accept(_listensock, (struct sockaddr *)&peer, &len);
        *err=errno;
        if (sock < 0){}
            //logMessage(ERROR, "accept error, next");
        else
        {
            //logMessage(NORMAL, "accept a new link success, get new sock: %d", sock); // ?
            *clientip = inet_ntoa(peer.sin_addr);
            *clientport = ntohs(peer.sin_port);
        }
        return sock;
    }

private:
    int _listensock;
};

对于一个epoll来说,首先要先给epoll创建出来,然后epoll的接口都要用到epoll创建成功的返回值。所以直接在这个epoll类中定义个成员变量直接就把创建epoll成功的返回值拿到。然后服务器不需要直接拿着这个返回值,而直接调用这个类对象使用里面的提供的接口就行了。epoll这里先写一个大体框架,后面需要什么了再加

#pragma once

#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>
#include "Err.hpp"
#include "Log.hpp"

using namespace std;

const int defaultepfd = -1;
const int size = 128;

class Epoller
{

public:
    Epoller(int epfd = defaultepfd) : _epfd(epfd)
    {
    }

    ~Epoller()
    {
        if (_epfd != defaultepfd)
            close(_epfd);
    }

public:
   
private:
    int _epfd;//创建epoll返回值
};

目前调用逻辑,后面在加内容

#include "TcpServer.hpp"
#include "Err.hpp"
#include <memory>

static void usage(std::string proc)
{
    std::cerr << "Usage:\n\t" << proc << " port" << "\n\n";
}

int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        usage(argv[0]);
        exit(USAGG_ERR);
    }

    uint16_t port=atoi(argv[1]);
    std::unique_ptr<TcpServer> uls(new TcpServer(port));
    uls->initServer();
    uls->Dispatch();

    return 0;
}

服务器大体框架

#pragma once

#include <iostream>
#include "Sock.hpp"
#include "Err.hpp"
#include "Log.hpp"
#include "Epoller.hpp"

const int defaultport = 8080;

class TcpServer
{

public:
    TcpServer(int port = defaultport) : _port(port)
    {
    }

    ~TcpServer()
    {
    }

    void initServer()
    {  
    }

    void Dispatch()//事件派发
    {
    }
public:


private:
    uint16_t _port;
    Sock _sock;
    Epoller _epoll;
};

接下来就是编写服务器了

初始化服务器

void initServer()
{  
    // 1.创建套接字
    _sock.sock();
    _sock.Bind(_port);
    _sock.Listen();

    // 2.创建epoll模型
}

创建套接字之后,我们要在创建一个epoll模型,因此我们在Epoller写一个创建epoll模型的接口,然后服务器直接调用就行了

```cpp
void Create()
{
    _epfd = epoll_create(size);
    if (_epfd < 0)
    {
        logMessage(FATAL, "epoll_create error, code: %d, errstring: %s", errno, strerror(errno));
        exit(EPOLL_CREATE_ERR);
    }
    logMessage(NORMAL, "epoll create success, epfd: %d", _epfd);
}
void initServer()
{  
    // 1.创建套接字
    _sock.sock();
    _sock.Bind(_port);
    _sock.Listen();

    // 2.创建epoll模型
    _epoll.Create();
    
	// 3.将目前唯一的一个sock,添加到epoll中
}

创建好套接字,epoll模型接下来我们先把_listensock套接字添加到epoll里,看看这样写有没有什么问题

不过先在Epoller类中补充一个用户告诉内核你要帮我关心对应fd的什么事件。

// user -> kernel
bool AddEvents(int sock, uint32_t event)
{
    struct epoll_event ev;
    ev.events = event;
    ev.data.fd = sock;
    int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev);
    if (n < 0)
    {
        logMessage(ERROR, "sock join epoll fail");
        return false;
    }
    return true;
}

写好后我们直接调用就好了

void initServer()
{  
    // 1.创建套接字
    _sock.sock();
    _sock.Bind(_port);
    _sock.Listen();

    // 2.创建epoll模型
    _epoll.Create();

    // 3.将目前唯一的一个sock,添加到epoll中
    _epoll.AddEvents(_sock.Fd(),EPOLLIN | EPOLLET);
}

但是现在有小一个问题,前面说过的,如果未来你的套接字工作模式是ET模式,那么该套接字必须处于非阻塞,_listensock套接字创建处理默认就是阻塞的,因此我们需要将文件描述符设为非阻塞

我们在高级IO的非阻塞IO哪里就已经写过一个将一个fd变成非阻塞了,现在拿过来用就行了,不过我们也还是把它封装起来。然后调用。

#include <iostream>
#include <unistd.h>
#include <fcntl.h>

using namespace std;

class Util
{
public:
    static bool SetNonBlock(int fd)
    {
        int fl = fcntl(fd, F_GETFL);
        if (fl < 0)
            return false;
        fcntl(fd, F_SETFL, fl | O_NONBLOCK);
        return true;
    }
};

到目前为止好像貌似没问题了,但是真的吗?

void initServer()
{  
    // 1.创建套接字
    _sock.sock();
    _sock.Bind(_port);
    _sock.Listen();

    // 2.创建epoll模型
    _epoll.Create();

    // 3. 将目前唯一的一个sock,添加到epoller中, 之前需要先将对应的fd设置成为非阻塞
    Util::SetNonBlock(_sock.Fd());
    _epoll.AddEvents(_sock.Fd(),EPOLLIN | EPOLLET);
}

我们回过头看看之前写过的epoll服务器的代码,这里处理普通sock就绪事件的问题,你怎么保证你把本轮收到的数据都读完?即使把本轮数据都读完了,就一定能够读到一个完整的请求吗?即使未来在这里循环读取,可以反正我们非阻塞也都写过。那就能保证读到一个完整请求吗?
不一定!

那没读到完整的请求我们是不是只能把本轮读到的数据暂时保存到buffer里,可是暂时保存到buffer里,你保存了,那别人怎么办? 你这个sock和其他sock说你们别着急先别覆盖我的数据,buffer里面保存的是我的数据,你们先不要写。你想多了!

你读完之后整个这个代码区间就全部释放掉了,因为这是栈上面的空间。所以即便你没读完,或者你把这轮读完了。那么下轮在读,buffer早就释放了。你可能本轮读到后半部分但前半部分已经没有了,可能不到下次,下一个循环进来就给你清空了。

在这里插入图片描述
所以怎么样进行正确读写,光有一个套接字和定义一个栈上的缓存区远远不够!

所以我们需要给每一个文件描述符,都要把输入,输出用户层缓存区都要给它带上!

现在我们清楚知道历史代码的问题,我们要正确写整个服务,现在已经不够了。我们需要将每一个套接字进行封装,每一个套接字都要包含自己对应的输入,输出缓存区空间,只有每一个套接字都有自己对应的缓存空间,读取的时候把数据读取赞存在自己对应的缓存区中,没读完下次在读,这时候在添加到我自己的缓存区里,不就行了吗,这个时候谁和谁都不揉在一起。所以我们在封装一个类! 也就是说我认为未来每一个套接字都看类对象。

class Connection
{
public:
    Connection(int sock = defaultport) : sock_(sock)
    {
    }

    ~Connection()
    {
        if (sock_ == defaultsock)
            close(sock_);
    }

public:
    int sock_;//这个类对应的套接字是谁
    string inbuffer_;//输入缓存区,这里我们暂时没有考虑处理图片,视频等二进制信息,不然stirng就不太合适了                                                                                                                     
    string outbuffer_;//输出缓存区 ,你并不能保证你的写事件就绪
    
}

针对于上面未来每一个Connection对象,因为每一个套接字未来面对的都有自己的读方法、写方法、异常方法,所以在Connection类中,对一个套接字来讲要提供三个回调方法,分别对应读方法、写方法,异常方法。

const int defaultport = 8080;
const int defaultsock = -1;

using func_t = function<void(Connection *)>;

class Connection
{
public:
    Connection(int sock = defaultport) : sock_(sock)
    {
    }

    ~Connection()
    {
        if (sock_ == defaultsock)
            close(sock_);
    }

public:
    int sock_;  //这个类对应的套接字是谁
    string inbuffer_;  //输入缓存区,这里我们暂时没有考虑处理图片,视频等二进制信息,不然stirng就不太合适了                                                                                                                     
    string outbuffer_;  //输出缓存区 ,你并不能保证你的写事件就绪

    func_t recver_;  //从sock_读
    func_t sender_;  //向sock_写
    func_t excepter_;  //处理sock_,IO的时候上面的异常事件
};

所以在初始服务器写的,创建套接字,创建epoll模型,然后将_listensock套接字添加到epoll中,那未来是不是更多的套接字会被accept上来,我们把每一个套接字都看成Connection对象,所以未来整个服务器是不是存在非常多的Conncetion对象。同时_listensock 也是一个sock啊,也要看作成为一个 Connection对象,虽然_listensock不用所谓的输入输出缓存区,包括回调方法最多也是用一个 recver _,但是在我看来_listensock也是一个Connection。 所以刚才服初始服务器哪里写_listensock就有不合适了。并且未来每一个sock都是Connection对象,所以服务器要不要把这些多的Connection对象管理起来呢? 当然要了! 那服务器怎么管理?

先描述,在组织! ,我们管理起来了吗,管理起来了,Connection就是。怎么组织呢?既然每一个Connection对象都有一个对应的套接字,那我们就是哈希表管理。

class TcpServer
{

public:
	//...

private:
    uint16_t _port;

    Sock _sock;
    Epoller _epoll;
    unordered_map<int,Connection*> _connections;//所有链接集合
};

而且这里的代码仅仅是简单的将套接字添加到epoll中,未来我们不仅想将套接字添加到epoll中,并且还想将每一个Connectoin对象添加unordered_map里。在这里插入图片描述

为什么unordered_map我们使用的是套接字做key,因为将来在epoll中一旦有fd就绪了,我们知道是什么事件就绪,也知道是那个fd就绪。知道之后我们就可以在unordered_map根据文件描述符快速的找到对应套接字的Connection对象,然后输入,输出缓存区也就有了。

在这里插入图片描述

所以在写一个添加链接的函数,作为一个服务器收到一个套接字,它就需要添加一个链接。

void AddConnection(int sock, uint32_t events)
{
    // 1. 首先要为该sock创建Connection,并初始化,并添加到connections_


    // 2. 其次将sock与它要关心的事件"写透式"注册到epoll中,让epoll帮我们关心
    bool r = _epoll.AddEvents(sock, events);
    assert(r);
    (void)r;
    
}

剩下的等会再说,所以当你有了新的套接字的时候,不应该是把套接字直接添加到epoll里,而应该是进行AddConnection。_listensock也是如此

void initServer()
{
    // 1.创建套接字
    _sock.sock();
    _sock.Bind(_port);
    _sock.Listen();

    // 2.创建epoll模型
    _epoll.Create();

    // 3. 将目前唯一的一个sock,添加到epoller中, 之前需要先将对应的fd设置成为非阻塞
    // Util::SetNonBlock(_sock.Fd());
    // _epoll.AddEvents(_sock.Fd(), EPOLLIN | EPOLLET);
    AddConnection(_sock.Fd(),EPOLLIN|EPOLLET);
}

void AddConnection(int sock, uint32_t events)
{
   		// 1.设置非阻塞,ET模式fd要非阻塞
        if (events & EPOLLET)
            Util::SetNonBlock(sock);

        // 2. 该sock创建Connection,并初始化,并添加到connections_
        Connection *conn = new Connection(sock);

        // 3.

        // 4. 其次将sock与它要关心的事件"写透式"注册到epoll中,让epoll帮我们关心
        bool r = _epoll.AddEvents(sock, events);
        assert(r);
        (void)r;

        // 5. 将kv添加到connections_
        _connections.insert(make_pair(sock, conn));

        logMessage(DEBUG, "add new sock : %d in epoll and unordered_map", sock);
}

这样写还没完,未来一旦有fd就绪了,然后就可以在unordered_map中根据fd找到对应的Connection,找到之后我们是不是要执行对应的读,写,异常方法,所以任何一个Connection内方法能够被调用,因此Connection类在提供一个给每一个Connection对象注册读,写,异常方法,

这样写的意思是,未来这个AddConnection接口不仅可以被用来注册_listensock套接字,也可以用来被注册一般文件描述符,一般文件描述符也可能关心读,也可以关心写,也可能关心异常。或者只关心读,只关心写等等情况,所以当我们每个套接字注册一个Connection对象,你怎么知道这个套接字未来要执行什么方法呢 ,所以我们需要Connection提供一个注册方法。
在这里插入图片描述

class Connection
{
public:
    Connection(int sock = defaultport) : sock_(sock)
    {
    }

    ~Connection()
    {
        if (sock_ == defaultsock)
            close(sock_);
    }

    void Rigster(func_t r, func_t s, func_t e)
    {
        recver_ = r;
        sender_ = s;
        excepter_ = e;
    }

public:
    int sock_;         // 这个类对应的套接字是谁
    string inbuffer_;  // 输入缓存区,这里我们暂时没有考虑处理图片,视频等二进制信息,不然stirng就不太合适了
    string outbuffer_; // 输出缓存区 ,你并不能保证你的写事件就绪

    func_t recver_;   // 从sock_读
    func_t sender_;   // 向sock_写
    func_t excepter_; // 处理sock_,IO的时候上面的异常事件
};

所以当使用AddConnection的时候,注册文件描述符到epoll的时候,既要有要关心的fd,还要有关心fd上的什么事件,这两个字段是你要告诉epoll,当fd就绪时你想怎么处理这个fd,所以还要再给三个参数,告诉这个fd就绪时处理什么方法。

在这里插入图片描述

void AddConnection(int sock, uint32_t events,func_t recver, func_t sender, func_t excepter)
{
    // 1.设置非阻塞,ET模式fd要非阻塞
    if (events & EPOLLET)
        Util::SetNonBlock(sock);

    // 2. 该sock创建Connection,并初始化,并添加到connections_
    Connection *conn = new Connection(sock);

    // 3. 给对应的sock设置对应的回调方法
    conn->Rigster(recver, sender, excepter);

    // 4. 其次将sock与它要关心的事件"写透式"注册到epoll中,让epoll帮我们关心
    bool r = _epoll.AddEvents(sock, events);
    assert(r);
    (void)r;

    // 5. 将kv添加到connections_
    _connections.insert(make_pair(sock, conn));

    logMessage(DEBUG, "add new sock : %d in epoll and unordered_map", sock);
}

所以当我们重新审视初始化服务器的代码,创建套接字,创建epoll模型,把_listensock套接字添加到epoll里,对于_listensock套接字我们也要设置对应的方法,它就绪时想怎么读,怎么写,怎么处理异常。不过对于_listensock套接字它只关心读,所以这里我们先给它一个未来要写的从_listensock获取链接的方法,不关心写和处理异常,直接设置为nullptr就行了。

不过这里还有一个细节,我们因为读、写、异常是我们使用的是包装器function,如何让它的对象调用一个类内的非静态成员函数会比较麻烦,因为类内的非静态成员函数内隐藏了一个this指针。这样的话function的参数还要添加一个,然后调用的话还需要再传一个这个类对象过去。但是我们在C++学过bing绑定,可以固定参数,这样以后调用这个成员函数直接就帮我自动传这个参数了,对这里不懂得可以看这里【C++】C++11中

void Accepter(Connection* conn)
{
}

void initServer()
{
    // 1.创建套接字
    _sock.sock();
    _sock.Bind(_port);
    _sock.Listen();

    // 2.创建epoll模型
    _epoll.Create();

    // 3. 将目前唯一的一个sock,添加到epoller中, 之前需要先将对应的fd设置成为非阻塞
    // Util::SetNonBlock(_sock.Fd());
    // _epoll.AddEvents(_sock.Fd(), EPOLLIN | EPOLLET);
    AddConnection(_sock.Fd(), EPOLLIN | EPOLLET,
                  bind(&TcpServer::Accepter, this, placeholders::_1), nullptr, nullptr);
}

未来一旦设置到epoll里fd上有事件就绪了,然后就根据就绪的fd在unordered_map找到对应的Conncetion,然后调用Conncetion曾经注册的对应的方法来进行事件派发,所以接下来我们要写事件派发的接口,让epoll_wait帮我们获取就绪事件,这里有很多写法,不过我们还是不写在一块,多写一个Loop函数然后进行一次事件派发,Dispatch就是循环派发

// 事件派发器
void Dispatch()
{
    int timeout = -1000;
    while (true)
    {
        Loop(timeout);
    }
}


void Loop(int &timeout)
{
   _epoll.Wait()// 获取已经就绪的事件
}        

所以在Epoller再来一个接口,我们直接调用就行了,这里我们需要一个拷贝数组,数组大小,和等待的方式

// kernel -> user
int Wait(struct epoll_event *ets, int num, int timeout)
{
    int n = epoll_wait(_epfd, ets, num, timeout);
    switch (n)
    {
    case 0:
        logMessage(NORMAL, "timeout ...");
        break;
    case -1:
        logMessage(WARNING, "epoll_wait failed, code: %d, errstring: %s", errno, strerror(errno));
        break;
    default:
        logMessage(NORMAL, "have event ready");
        break;
    }
    return n;
}

epoll_wait这里注定了我们一次可能会捞取许多就绪事件,所以我们要在服务器中在定义一个保存所有就绪事件的数组,和数组大小然后调用_epoll.Wait()的时候传过去

const int defaultnum = 128;

class TcpServer
{

public:
    TcpServer(int port = defaultport,int num = defaultnum) : _port(port),_revents(nullptr),_num(num)
    {
    }

	

private:
    int _port;

    Sock _sock;
    Epoller _epoll;
    unordered_map<int, Connection *> _con;
    struct epoll_event *_revents;
    int _num;
};

初始服务器的时候,我们就把数组申请好

void initServer()
{
    // 1.创建套接字
    _sock.sock();
    _sock.Bind(_port);
    _sock.Listen();

    // 2.创建epoll模型
    _epoll.Create();

    // 3. 将目前唯一的一个sock,添加到epoller中, 之前需要先将对应的fd设置成为非阻塞
    // Util::SetNonBlock(_sock.Fd());
    // _epoll.AddEvents(_sock.Fd(), EPOLLIN | EPOLLET);
    AddConnection(_sock.Fd(), EPOLLIN | EPOLLET,
                  bind(&TcpServer::Accepter, this, placeholders::_1), nullptr, nullptr);

    // 4.拷贝数组
    _revents = new struct epoll_event[_num];
}

然后接下来就可以进行事件派发了

void Loop(int &timeout)
{
    int n = _epoll.Wait(_revents, _num, timeout); // 获取已经就绪的事件

    for (int i = 0; i < n; ++i) // 下面一定是就绪的事件
    {
        uint32_t event = _revents[i].events; // 就绪的事件
        int sock = _revents[i].data.fd;      // 就绪事件的fd

        // 事件派发
        if ((event & EPOLLIN) && sock == _sock.Fd()) // listensock读就绪
            _connections[sock]->recver_(_connections[sock]);
        if ((event & EPOLLIN) && sock != _sock.Fd()) // 普通sock读就绪
            _connections[sock]->recver_(_connections[sock]);
}

我们注意到不管是_listensock套接字还是普通sock套接字读就绪,它们的处理方法都是一样的,因此我们把它们写在一块。只要是读事件事件,我们都根据文件描述去执行对应的读方法,未来设怎么读都由自己曾经设置对应recver_来定

void Loop(int &timeout)
{
    int n = _epoll.Wait(_revents, _num, timeout); // 获取已经就绪的事件

    for (int i = 0; i < n; ++i) // 下面一定是就绪的事件
    {
        uint32_t event = _revents[i].events; // 就绪的事件
        int sock = _revents[i].data.fd;      // 就绪事件的fd

        // 事件派发

        // if ((event & EPOLLIN) && sock == _sock.Fd()) // listensock读就绪
        //     _connections[sock]->recver_(_connections[sock]);
        // if ((event & EPOLLIN) && sock != _sock.Fd()) // 普通fd读就绪
        //     _connections[sock]->recver_(_connections[sock]);

        if ((event & EPOLLIN))//读事件就绪
            _connections[sock]->recver_(_connections[sock]);
        if ((event & EPOLLOUT))//写事件就绪
            _connections[sock]->sender_(_connections[sock]);
}

这样写还有一些问题,首先我们还要判断一下这个套接字是否在unordered_map真的存在,其次在判断对应套接字的方法是否曾经被设置,不然也可能会存在调用的问题

bool ExitHashSock(int sock)
{
    auto pos = _con.find(sock);
    return pos != _con.end();
}

void Loop(int &timeout)
{
    int n = _epoll.Wait(_revents, _num, timeout); // 获取已经就绪的事件

    for (int i = 0; i < n; ++i) // 下面一定是就绪的事件
    {
        uint32_t event = _revents[i].events; // 就绪的事件
        int sock = _revents[i].data.fd;      // 就绪事件的fd

        // 事件派发


       // _listensock和普通sock都有自己对应的回调方法,因此对_listensock和普通sock处是一样的,不用区分
       if ((event & EPOLLIN) && IsConnectionExists(sock) && (_connections[sock]->recver_))
           _connections[sock]->recver_(_connections[sock]);
       if ((event & EPOLLOUT) && IsConnectionExists(sock) && (_connections[sock]->sender_))
           _connections[sock]->sender_(_connections[sock]);
}

这里还有一丢丢小问题,对我们来说我们从来没说过异常的事件,但并不排除在通信的时候存在其他epoll对应的事件

  • EPOLLERR : 表示对应的文件描述符发生错误;
  • EPOLLHUP : 表示对应的文件描述符被挂断;

我们可以这样写,如果当前就绪的事件出现异常了,我们设置进该就绪事件中的读和写去处理。这样的好处是将所有的异常问题,全部转化,成为读写问题 ,本来读写本来就要做异常处理,只要把读写异常处理了,这个异常也就被自动处理了!

void Loop(int &timeout)
{
    int n = _epoll.Wait(_revents, _num, timeout); // 获取已经就绪的事件

    for (int i = 0; i < n; ++i) // 下面一定是就绪的事件
    {
        uint32_t event = _revents[i].events; // 就绪的事件
        int sock = _revents[i].data.fd;      // 就绪事件的fd

		//将所有的异常问题,全部转化,成为读写问题
        if (event & EPOLLERR)
            event | EPOLLIN | EPOLLOUT;
        if (event & EPOLLHUP)
            event | EPOLLIN | EPOLLOUT;

       // _listensock和普通sock都有自己对应的回调方法,因此对_listensock和普通sock处是一样的,不用区分
       if ((event & EPOLLIN) && IsConnectionExists(sock) && (_connections[sock]->recver_))
           _connections[sock]->recver_(_connections[sock]);
       if ((event & EPOLLOUT) && IsConnectionExists(sock) && (_connections[sock]->sender_))
           _connections[sock]->sender_(_connections[sock]);
}

因为我们目前就一个_listensock套接字,所以到目前为止有事件就绪就是_listensock套接字就绪,也就是这里只会执行_listensock套接字设置的读事件也就是我们曾经给_listensock套接字绑定的Accepter函数来获取新链接。

void Accepter(Connection *conn)
{
       string clientip;
       uint16_t clientport;
       int err = 0;//获取错误码,用来判断是非阻塞了/读取被打断了/还是真错误了
       int sock = _sock.Accept(&clientip, &clientport, &err);
       if (sock > 0)
       {
           // 新的sock套接字添加到AddConnetions
           AddConnetions(sock, EPOLLIN | EPOLLET,);
       }
}

对于未来获取到的普通文件描述符,它还要有对应的事件就绪时我们需要怎么处理这个套接字,所以我们需要设置一个对普通文件描述符就绪时它的处理读,写,异常的方法

void recver(Connection *conn)
{
}

void sender(Connection *conn)
{
}

void excepter(Connection *conn)
{
}

void Accepter(Connection *conn)
{
       string clientip;
       uint16_t clientport;
       int err = 0;//获取错误码,用来判断是非阻塞了/读取被打断了/还是真错误了
       int sock = _sock.Accept(&clientip, &clientport, &err);
       if (sock > 0)
       {
           // 新的sock套接字添加到AddConnetions
           AddConnetions(sock, EPOLLIN | EPOLLET,
                              bind(&TcpServer::recver, this, placeholders::_1),
                              bind(&TcpServer::sender, this, placeholders::_1),
                              bind(&TcpServer::excepter, this, placeholders::_1));
       }
}

现在有一个问题时,当时_listesock套接字注册的时候也是ET模式的,并且也设置成非阻塞的了,所以通知的时候只会通知_listesock套接字读就绪一次,Accepter调用也就一次。

在这里插入图片描述

也就是说你能保证底层到来的链接就一个吗?不能!你并不能保证此时正在处理_listesock套接字获取新连接时有几个到来了,有可能同时到来了非常多的连接,所以倒逼程序员必须把所有到达的连接全部都读上来,所有上面Accepet的写法是不对的,必须要循环读取全部都读上来!而且我们早把_listesock设置为非阻塞了,底层没有连接了也不会被阻塞,虽然返回值是-1,但是错误码被设置!可能是底层没错误了错误码被设为EAGAIN 或EWOULDBLOCK,或者读取时被信号中断了错误码被设为EINTR,或者真错误了!

void Accepter(Connection *conn)
{
    while (true)
    {
        string clientip;
        uint16_t clientport;
        int err = 0; // 获取错误码,用来判断是非阻塞了/读取被打断了/还是真错误了
        int sock = _sock.Accept(&clientip, &clientport, &err);
        if (sock > 0)
        {
            // 新的sock套接字添加到AddConnetions
            AddConnection(sock, EPOLLIN | EPOLLET,
                          bind(&TcpServer::recver, this, placeholders::_1),
                          bind(&TcpServer::sender, this, placeholders::_1),
                          bind(&TcpServer::excepter, this, placeholders::_1));

            logMessage(DEBUG, "get a new link, info: [%s:%d]", clientip.c_str(), clientport);
        }
        else
        {
            if (err == EAGAIN || err == EWOULDBLOCK)
                break;
            else if (err == EINTR)
                continue;
            else
                break;
        }
    }
}

上面处理好了,接下来就是处理普通套接字就绪事件了,假设现在读就绪了,我们先处理读事件。怎么读?注定了也要循环读取!

其次你即使把本来数据全部读完就能保证是一个完整报文吗?不能!不是就没有办法向上层交,也就意味着没有办法对它进行处理,此时我们就先把读到的数据放到对应fd的Connection对象的输入缓存区里

void recver(Connection *conn)
{
	   while (true)
	   {
	       char buffer[1024];
	       ssize_t s = read(conn->sock_, buffer, sizeof(buffer) - 1);
	       if (s > 0)//读取成功
	       {
	           buffer[s] = 0;
	           conn->inbuffer_ += buffer;
	       }
	       
	    }
}

如果s==0,就说明对方把连接关掉了,那我们是不是应该把对应的文件描述符从epoll中删除,然后关闭文件描述符,然后从unordered_map中移除等等工作。如果写出问题呢?我们的代码是不是会充满大量的异常调用,所以只要我们设置了对异常的处理,然后有问题的执行一次异常方法就行了。如果s == -1,我们内部再判断是非阻塞返回、是被信号中断、还是着出错误了。

void recver(Connection *conn)
{
	while (true)
	{
	   char buffer[1024];
	   ssize_t s = read(conn->sock_, buffer, sizeof(buffer) - 1);
	   if (s > 0)//读取成功
	   {
	       buffer[s] = 0;
	       conn->inbuffer_ += buffer;
	   }           
	   else if (s == 0)
	      {
	          if (conn->excepter_)
	          {
	              conn->excepter_(conn);
	              return;
	          }
	      }
	      else
	      {
	          if (errno == EAGAIN || errno == EWOULDBLOCK)//只是没数据了
	              break;
	          else if (errno == EIDRM)//读取时被信号中断,继续读
	              continue;
	          else//真错误了
	          {
	              if (conn->excepter_)
	              {
	                  conn->excepter_(conn);
	                  return;
	              }
	          }
	      }
	      	       
	}
}

接下来我们处理一下读取到的报文,如果是完整报文就向上交付,不是就继续读取直到是一个完整报文。

那问题来了你怎么知道是一个完整的报文?
还记得以前在写网络版本计数器是怎么处理的吗?没错定制一个协议!!!
我们以前学过,今天拿过来用就行了 协议封装

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <jsoncpp/json/json.h>

#define SEP " "
#define SEP_LEN strlen(SEP)
#define LINE_SEP "\r\n"
#define LINE_SEP_LEN strlen(LINE_SEP)

using namespace std;

// "x op y"  -> "content_len"\r\n"x op y"\r\n
string Enlenth(const string &text)//加报头
{
    string send_string = to_string(text.size());
    send_string += LINE_SEP;
    send_string += text;
    send_string += LINE_SEP;

    return send_string;
}

//"content_len"\r\n"x op y"\r\n  -> "x op y"
bool Delenth(const string &packge, string *text)//删除报头
{
    auto pos = packge.find(LINE_SEP);
    if (pos == string::npos)
        return false;
    string text_len_string = packge.substr(0, pos);
    int text_len = stoi(text_len_string);
    *text = packge.substr(pos + LINE_SEP_LEN, text_len);
    return true;
}

class Request//请求
{
public:
    Request() : _x(0), _y(0), _op(0)
    {
    }

    Request(int x, int y, char op) : _x(x), _y(y), _op(op)
    {
    }

    bool serialize(string *out)//序列化
    {
#ifdef MYSELF
        // 结构化 -> "x op y"
        *out = "";
        string x_string = to_string(_x);
        string y_string = to_string(_y);

        *out += x_string;
        *out += SEP;
        *out += _op;
        *out += SEP;
        *out += y_string;
#else
        Json::Value root;
        root["first"] = _x;
        root["second"] = _y;
        root["oper"] = _op;

        Json::FastWriter write;
        *out = write.write(root);
#endif
        return true;
    }

    bool deserialize(const string &in)//反序列
    {
#ifdef MYSELF
        // "x op y" -> 结构化
        auto left = in.find(SEP);
        auto right = in.rfind(SEP);
        if (left == string::npos || right == string::npos)
            return false;
        if (left == right)
            return false;
        if (right - (left + SEP_LEN) != 1)
            return false;

        string x_string = in.substr(0, left); // [0, 2) [start, end) , start, end - start
        string y_string = in.substr(right + SEP_LEN);

        if (x_string.empty())
            return false;
        if (y_string.empty())
            return false;
        _x = stoi(x_string);
        _y = stoi(y_string);
        _op = in[left + SEP_LEN];
#else
        Json::Value root;
        Json::Reader reader;
        reader.parse(in, root);

        _x = root["first"].asInt();
        _y = root["second"].asInt();
        _op = root["oper"].asInt();

#endif

        return true;
    }

public:
    int _x;
    int _y;
    char _op;
};

class Response//响应
{
public:
    Response() : _exitcode(0), _result(0)
    {
    }

    Response(int exitcode, int result) : _exitcode(exitcode), _result(result)
    {
    }

    bool serialize(string *out)//序列化
    {
#ifdef MYSELF
        // 结构化 -> "_exitcode  _result"
        *out = "";
        *out = to_string(_exitcode);
        *out += SEP;
        *out += to_string(_result);
#else
        Json::Value root;
        root["first"] = _exitcode;
        root["second"] = _result;

        Json::FastWriter write;
        *out = write.write(root);
#endif
        return true;
    }

    bool deserialize(const string &in)//反序列化
    {
#ifdef MYSELF
        //"_exitcode  _result" ->结构化
        auto pos = in.find(SEP);
        if (pos == string::npos)
            return false;

        string ec_string = in.substr(0, pos);
        string res_string = in.substr(pos + SEP_LEN);

        if (ec_string.empty())
            return false;
        if (res_string.empty())
            return false;

        _exitcode = stoi(ec_string);
        _result = stoi(res_string);
#else
        Json::Value root;
        Json::Reader reader;
        reader.parse(in, root);
        _exitcode = root["first"].asInt();
        _result = root["second"].asInt();
#endif
        return true;
    }

public:
    int _exitcode; // 0:计算成功,!0表示计算失败,具体是多少,定好标准
    int _result;   // 计算结果
};

//这里是读取一个完整报文!读到后放到text
bool PartOnepackge(string &inbuffer, string *text)
{
    //"content_len"/r/n"x op y"/r/n

    auto pos = inbuffer.find(LINE_SEP);
    if (pos == string::npos) // 没读到一个完整报文
        return false;
    //"content_len"/r/n"x op y"/r/n"content_len" >= 一个完整报文长度
    string text_len_string = inbuffer.substr(0, pos);
    int text_len = stoi(text_len_string);
    int total_len = text_len_string.size() + 2 * LINE_SEP_LEN + text_len;

    cout << "处理前#inbuffer: \n"
         << inbuffer << std::endl;

    if (inbuffer.size() < total_len) // 也没有读到一个完整报文
    {
        cout << "你输入的消息,没有严格遵守我们的协议,正在等待后续的内容, continue" << endl;
        return false;
    }

    // 至少有一个完整的报文
    *text = inbuffer.substr(0, total_len); // 读到一个完整报文
    inbuffer.erase(0, total_len);          // inbuffer内部减去这次读到的一个完整的报文

    cout << "处理后#inbuffer:\n " << inbuffer << endl;
    return true;

}

现在协议定义好了,我们也可以开始对读到的数据进行处理了!这里我们只想让服务器就处理IO事件,因此设置一个回调函数专门处理里面是否读取到一个完整报文,读到一个完整报文就序列化反序列业务处理等等。

using func_t = function<void(Connection *)>;

class TcpServer
{

public:
    TcpServer(func_t f,int port = defaultport, int num = defaultnum) : _service(f),_port(port), _revents(nullptr), _num(num)
    {
    }
	//。。。

private:
    uint16_t _port;

    Sock _sock;
    Epoller _epoll;
    unordered_map<int, Connection *> _connections; // 所有链接集合
    struct epoll_event *_revents;
    int _num;

    func_t _service;//业务处理
};

然后读取一次就交给上层一次,让上层进行处理,而recver专心读取数据就行了。

void recver(Connection *conn)
{
    while (true)
    {
        char buffer[1024];
        ssize_t s = read(conn->sock_, buffer, sizeof(buffer) - 1);
        if (s > 0)
        {
            buffer[s] = 0;
            conn->inbuffer_ += buffer;
            _service(conn);//处理业务
        }
        else if (s == 0)
        {
            if (conn->excepter_)
            {
                conn->excepter_(conn);
                return;
            }
        }
        else
        {
            if (errno == EAGAIN || errno == EWOULDBLOCK)
                break;
            else if (errno == EIDRM)
                continue;
            else
            {
                if (conn->excepter_)
                {
                    conn->excepter_(conn);
                    return;
                }
            }
        }
    }
}

如果读取到一个完整报文后就对请求就行处理,没有就返回继续读取,直到读取到完整的报文

void calculate(Connection *conn)
{
    string onePackage;
    while (PartOnepackge(conn->inbuffer_, &onePackage))
    {
        string req_str;
        // 1.2 我们保证,我们req_text里面一定是一个完整的请求:"content_len"\r\n"x op y"\r\n
        if (!Delenth(onePackage, &req_str))
            return;

        cout << "去掉报头的正文:\n"
             << req_str << endl;

        // 2. 对请求Request,反序列化
        // 2.1 得到一个结构化的请求对象
        Request req;
        if (!req.deserialize(req_str))
            return;


        // 3. 计算机处理,req.x, req.op, req.y --- 业务逻辑
        // 3.1 得到一个结构化的响应
        Response resp;
        cal(req, resp); // req的处理结果,全部放入到了resp, 回调是不是不回来了?不是!

}

业务处理我们也写过直接用

void cal(const Request &req, Response &resp)
{
     req已经有结构化完成的数据啦,你可以直接使用
    resp._exitcode = OK;
    resp._result = OK;

    switch (req._op)
    {
    case '+':
        resp._result = req._x + req._y;
        break;
    case '-':
        resp._result = req._x - req._y;
        break;
    case '*':
        resp._result = req._x * req._y;
        break;
    case '/':
    {
        if (req._y == 0)
            resp._exitcode = DIV_ERR;
        else
            resp._result = req._x / req._y;
    }
    break;
    case '%':
    {
        if (req._y == 0)
            resp._exitcode = MOD_ERR;
        else
            resp._result = req._x % req._y;
    }
    break;
    default:
        resp._exitcode = OPER_ERR;
        break;
    }
}

计算处理好了我们对响应序列化,得到一个响应的报文,接下来就是给对方发回去

void calculate(Connection *conn)
{
    string onePackage;
    while (PartOnepackge(conn->inbuffer_, &onePackage))
    {
        string req_str;
        // 1.2 我们保证,我们req_text里面一定是一个完整的请求:"content_len"\r\n"x op y"\r\n
        if (!Delenth(onePackage, &req_str))
            return;

        cout << "去掉报头的正文:\n"
             << req_str << endl;

        // 2. 对请求Request,反序列化
        // 2.1 得到一个结构化的请求对象
        Request req;
        if (!req.deserialize(req_str))
            return;


        // 3. 计算机处理,req.x, req.op, req.y --- 业务逻辑
        // 3.1 得到一个结构化的响应
        Response resp;
        cal(req, resp); // req的处理结果,全部放入到了resp, 回调是不是不回来了?不是!


        // 4.对响应Response,进行序列化
        // 4.1 得到了一个"字符串"
        string resp_str;
        if (!resp.serialize(&resp_str))
            return;

        // 5 构建成为一个完整的报文
       string send_string= Enlenth(resp_str);
       cout << "--------------result: " << send_string << endl;
		
	   //发回去
    }

}

现在问题是,在epoll中能不能直接发送呢??
不能!用户无法保证发送条件是就绪的!谁最清楚??epoll!!
也就是说不光读事件要添加到epoll中,写事件是否就绪也要添加到epoll里。
什么叫做写发送事件就绪呢?
发送缓存区有空间!

服务器刚开始启动,或者很多情况下,发送事件是一直就绪的!可以直接发送!
只不过,如果我们没有发完怎么办?那就需要下一次发送,这里也就要求每一个sock都要有自己的发送缓存区!

所以根据上面的分析,大部分情况发送缓存区都是有空间的,所以其实可以直接发,有人说直接发那我就循环发,是的可以循环发,如果没发完非阻塞就直接返回错了,下次发就行了,可是下次从哪里发呢,所以就需要sock有自己的发送缓存区。

因为大部分情况下发送缓存区都是就绪的,所以直接发,没发完没关系下次在发,可是下次是什么时候,这次没发完不就是用户从缓存区数据量太大把发送缓冲区打满了,那下次发是不是就要等底层发送缓存区事件就绪我再发。所以注定你要将sock的EPOLLOUT事件也要有时候注册进epoll ,让epoll通知我,如果底层发事件就绪了我再发。现在问题是是什么时候注册?在新sock注册到epoll里的时候就开始关心写了可不可以?

在这里插入图片描述

未来事件派发一旦写事件就绪了自动调用sender方法

在这里插入图片描述

看起来很完美,但是一般读事件对于epoll我们要常设,写事件的关心对于epoll我们要按需设置!

也就是说当你需要发的时候,确实发送没发完,你在想办法在epoll中设置一下,让epoll关心对于fd的写事件! 刚开始设置sock如果你真的需要关心写事件你才需要设置写事件,不要把写事件常设到epoll中,因为你把它常设了,epoll就会一直在返回,因为每一个文件描述符的发送缓存区大概率都是就绪的,就一直的返回,你的处理逻辑就一直在消耗cpu资源,这样没必要,所以写事件我们按序设置。

所以我们的结论就是,因为是ET模式可以直接发送,发完就ok,没发完就让epoll关心一下对应的写事件的关心,所以我们需要常设EPOLLIN的关心,对EPOLLOUT按需设置!

然后这里呢,可能一次发不完,为了下次继续发送,所以我们把response序列化的之后的结果放到该fd的输出缓存区,以供下次发送。然后调用对应fd的读方法把数据发出去。

void calculate(Connection *conn)
{
    string onePackage;
    while (PartOnepackge(conn->inbuffer_, &onePackage))
    {
        string req_str;
        // 1.2 我们保证,我们req_text里面一定是一个完整的请求:"content_len"\r\n"x op y"\r\n
        if (!Delenth(onePackage, &req_str))
            return;

        cout << "去掉报头的正文:\n"
             << req_str << endl;

        // 2. 对请求Request,反序列化
        // 2.1 得到一个结构化的请求对象
        Request req;
        if (!req.deserialize(req_str))
            return;


        // 3. 计算机处理,req.x, req.op, req.y --- 业务逻辑
        // 3.1 得到一个结构化的响应
        Response resp;
        cal(req, resp); // req的处理结果,全部放入到了resp, 回调是不是不回来了?不是!


        // 4.对响应Response,进行序列化
        // 4.1 得到了一个"字符串"
        string resp_str;
        if (!resp.serialize(&resp_str))
            return;

        // 5 构建成为一个完整的报文
        conn->outbuffer_ += Enlenth(resp_str);

        std::cout << "--------------result: " << conn->outbuffer_ << std::endl;
        
		// 直接发
	    if (conn->sender_)
	        conn->sender_(conn);
    }

}

调用sender并不能保证数据一次就发完了,因此如果没有发送完毕,需要对对应的sock开启对写事件的关系, 如果发完了,我们要关闭对写事件的关心!

void sender(Connection *conn)
{
    while (true)
    {
        ssize_t s = send(conn->sock_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);
        if (s > 0)
        {
            conn->outbuffer_.erase(0, s);
            if (conn->outbuffer_.empty())//发完就退出
                break;
        }
        else
        {
            if (errno == EAGAIN || errno == EWOULDBLOCK)//没发完非阻塞退出
                break;
            else if (errno == EINTR)
                continue;
            else
            {
                if (conn->excepter_)
                {
                    conn->excepter_(conn);
                    return;
                }
            }
        }
    }
    
    // 如果没有发送完毕,需要对对应的sock开启对写事件的关系, 如果发完了,我们要关闭对写事件的关心!
}

这里我们在设计一个接口,可以打开对于一个文件描述符,是否对它写事件关心,是否对它读事件关心的情况

bool EnableReadWrite(int sock, bool reader, bool writer)
{
    uint32_t event = (reader ? EPOLLIN : 0) | (writer ? EPOLLOUT : 0) | EPOLLET;
    //_epoll.Control();
}

Epoller类在设计一个函数,在epoll对对应的fd关心事件进行修改

bool Control(int sock, uint32_t event, int action)
{
    int n = 0;
    if (action == EPOLL_CTL_MOD)
    {
        struct epoll_event ev;
        ev.events = event;
        ev.data.fd = sock;
        n = epoll_ctl(_epfd, action, sock, &ev);
    }
    else if (action == EPOLL_CTL_DEL)
    {
        n = epoll_ctl(_epfd, action, sock, nullptr);//删除就不关心任何事件了
    }
    else
        n = -1;
    return n == 0;
}
bool EnableReadWrite(int sock, bool reader, bool writer)
{
    uint32_t event = (reader ? EPOLLIN : 0) | (writer ? EPOLLOUT : 0) | EPOLLET;
    _epoll.Control(sock, event, EPOLL_CTL_MOD);
}
void sender(Connection *conn)
{
    while (true)
    {
        ssize_t s = send(conn->sock_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);
        if (s > 0)
        {
            conn->outbuffer_.erase(0, s);
            if (conn->outbuffer_.empty())
                break;
        }
        else
        {
            if (errno == EAGAIN || errno == EWOULDBLOCK)
                break;
            else if (errno == EINTR)
                continue;
            else
            {
                if (conn->excepter_)
                {
                    conn->excepter_(conn);
                    return;
                }
            }
        }
    }
    // 如果没有发送完毕,需要对对应的sock开启对写事件的关系, 如果发完了,我们要关闭对写事件的关心!
    if (!conn->outbuffer_.empty()) // 设置关心该fd读
        EnableReadWrite(conn->sock_, true, true);
    else
        EnableReadWrite(conn->sock_, true, false);
}

只要我开启了对写事件的关心,epoll就会帮我关心对应fd的写事件,如果就绪了,就会调用对应的fd的sender,以上就是发送的逻辑。

在这里插入图片描述

如果对方连接出异常了怎么办呢?不管是读出现异常,还是写出现异常!并且在前面我们是所有异常情况全部转化成读写问题,然后所有读写异常我们都是调用excepter进行处理,换句话说所有异常处理都在这里。

我们处理异常逻辑很简单,我们也关闭对应文件描述符。

void excepter(Connection *conn)
{
    logMessage(DEBUG, "Excepter begin");
    //1,先在epoll中删除
    _epoll.Control(conn->sock_, 0, EPOLL_CTL_DEL);
    //2. 然后在unordered_map中删除
     _connections.erase(conn->sock_);
     //3.关闭fd
    close(conn->sock_);

    logMessage(DEBUG, "关闭%d 文件描述符的所有的资源", conn->sock_);

    delete conn;
}

到目前为止所有代码细节我们都写完了,对IO的处理+协议定制+业务逻辑都有,下面验证一下,我们看到能够正常通信,并且还有对异常的处理

在这里插入图片描述
Epoller封装完整代码

#pragma once

#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>
#include "Err.hpp"
#include "Log.hpp"

using namespace std;

const int defaultepfd = -1;
const int size = 128;

class Epoller
{

public:
    Epoller(int epfd = defaultepfd) : _epfd(epfd)
    {
    }

    ~Epoller()
    {
        if (_epfd != defaultepfd)
            close(_epfd);
    }

public:
    void Create()
    {
        _epfd = epoll_create(size);
        if (_epfd < 0)
        {
            logMessage(FATAL, "epoll_create error, code: %d, errstring: %s", errno, strerror(errno));
            exit(EPOLL_CREATE_ERR);
        }
        logMessage(NORMAL, "epoll create success, epfd: %d", _epfd);
    }

    // user -> kernel
    bool AddEvents(int sock, uint32_t event)
    {
        struct epoll_event ev;
        ev.events = event;
        ev.data.fd = sock;
        int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev);
        if (n < 0)
        {
            logMessage(ERROR, "sock join epoll fail");
            return false;
        }
        // logMessage(NORMAL, "sock join epoll success");
        return true;
    }

    // kernel -> user
    int Wait(struct epoll_event *ets, int num, int timeout)
    {
        int n = epoll_wait(_epfd, ets, num, timeout);
        switch (n)
        {
        case 0:
            logMessage(NORMAL, "timeout ...");
            break;
        case -1:
            logMessage(WARNING, "epoll_wait failed, code: %d, errstring: %s", errno, strerror(errno));
            break;
        default:
            logMessage(NORMAL, "have event ready");
            break;
        }
        return n;
    }

    bool Control(int sock, uint32_t event, int action)
    {
        int n = 0;
        if (action == EPOLL_CTL_MOD)
        {
            struct epoll_event ev;
            ev.events = event;
            ev.data.fd = sock;
            n = epoll_ctl(_epfd, action, sock, &ev);
        }
        else if (action == EPOLL_CTL_DEL)
        {
            n = epoll_ctl(_epfd, action, sock, nullptr);
        }
        else
            n = -1;
        return n == 0;
    }

private:
    int _epfd;
};

服务器完整代码

#pragma once

#include <iostream>
#include <unordered_map>
#include <functional>
#include <cassert>
#include "Sock.hpp"
#include "Err.hpp"
#include "Log.hpp"
#include "Epoller.hpp"
#include "Util.hpp"

const int defaultport = 8080;
const int defaultsock = -1;
const int defaultnum = 128;

class Connection;

using func_t = function<void(Connection *)>;

class Connection
{
public:
    Connection(int sock = defaultport) : sock_(sock)
    {
    }

    ~Connection()
    {
        if (sock_ == defaultsock)
            close(sock_);
    }

    void Rigster(func_t r, func_t s, func_t e)
    {
        recver_ = r;
        sender_ = s;
        excepter_ = e;
    }

public:
    int sock_;         // 这个类对应的套接字是谁
    string inbuffer_;  // 输入缓存区,这里我们暂时没有考虑处理图片,视频等二进制信息,不然stirng就不太合适了
    string outbuffer_; // 输出缓存区 ,你并不能保证你的写事件就绪

    func_t recver_;   // 从sock_读
    func_t sender_;   // 向sock_写
    func_t excepter_; // 处理sock_,IO的时候上面的异常事件
};

class TcpServer
{

public:
    TcpServer(func_t f, int port = defaultport, int num = defaultnum) : _service(f), _port(port), _revents(nullptr), _num(num)
    {
    }

    ~TcpServer()
    {
        if (_revents)
            delete[] _revents;
    }

    void initServer()
    {
        // 1.创建套接字
        _sock.sock();
        _sock.Bind(_port);
        _sock.Listen();

        // 2.创建epoll模型
        _epoll.Create();

        // 3. 将目前唯一的一个sock,添加到epoller中, 之前需要先将对应的fd设置成为非阻塞
        // Util::SetNonBlock(_sock.Fd());
        // _epoll.AddEvents(_sock.Fd(), EPOLLIN | EPOLLET);
        AddConnection(_sock.Fd(), EPOLLIN | EPOLLET,
                      bind(&TcpServer::Accepter, this, placeholders::_1), nullptr, nullptr);

        // 4.拷贝数组
        _revents = new struct epoll_event[_num];
    }

    // 事件派发
    void Dispatch()
    {
        int timeout = -1000;
        while (true)
        {
            Loop(timeout);
        }
    }

private:
    bool IsConnectionExists(int sock)
    {
        auto pos = _connections.find(sock);
        return pos != _connections.end();
    }

    void Loop(int &timeout)
    {
        int n = _epoll.Wait(_revents, _num, timeout); // 获取已经就绪的事件

        for (int i = 0; i < n; ++i) // 下面一定是就绪的事件
        {
            uint32_t event = _revents[i].events; // 就绪的事件
            int sock = _revents[i].data.fd;      // 就绪事件的fd

            // 将所有的异常问题,全部转化,成为读写问题
            if (event & EPOLLERR)
                event | EPOLLIN | EPOLLOUT;
            if (event & EPOLLHUP)
                event | EPOLLIN | EPOLLOUT;

            // _listensock和普通sock都有自己对应的回调方法,因此对_listensock和普通sock处是一样的,不用区分
            if ((event & EPOLLIN) && IsConnectionExists(sock) && (_connections[sock]->recver_))
                _connections[sock]->recver_(_connections[sock]);
            if ((event & EPOLLOUT) && IsConnectionExists(sock) && (_connections[sock]->sender_))
                _connections[sock]->sender_(_connections[sock]);
        }
    }

    void recver(Connection *conn)
    {
        while (true)
        {
            char buffer[1024];
            ssize_t s = read(conn->sock_, buffer, sizeof(buffer) - 1);
            if (s > 0)
            {
                buffer[s] = 0;
                conn->inbuffer_ += buffer;
                _service(conn);
            }
            else if (s == 0)
            {
                if (conn->excepter_)
                {
                    conn->excepter_(conn);
                    return;
                }
            }
            else
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                    break;
                else if (errno == EIDRM)
                    continue;
                else
                {
                    if (conn->excepter_)
                    {
                        conn->excepter_(conn);
                        return;
                    }
                }
            }
        }
    }

    bool EnableReadWrite(int sock, bool reader, bool writer)
    {
        uint32_t event = (reader ? EPOLLIN : 0) | (writer ? EPOLLOUT : 0) | EPOLLET;
        _epoll.Control(sock, event, EPOLL_CTL_MOD);
    }

    void sender(Connection *conn)
    {
        while (true)
        {
            ssize_t s = send(conn->sock_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);
            if (s > 0)
            {
                conn->outbuffer_.erase(0, s);
                if (conn->outbuffer_.empty())
                    break;
            }
            else
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                    break;
                else if (errno == EINTR)
                    continue;
                else
                {
                    if (conn->excepter_)
                    {
                        conn->excepter_(conn);
                        return;
                    }
                }
            }
        }
        // 如果没有发送完毕,需要对对应的sock开启对写事件的关系, 如果发完了,我们要关闭对写事件的关心!
        if (!conn->outbuffer_.empty()) // 设置关心该fd读
            EnableReadWrite(conn->sock_, true, true);
        else
            EnableReadWrite(conn->sock_, true, false);
    }

    void excepter(Connection *conn)
    {
        logMessage(DEBUG, "Excepter begin");
        _epoll.Control(conn->sock_, 0, EPOLL_CTL_DEL);
        _connections.erase(conn->sock_);
        close(conn->sock_);

        logMessage(DEBUG, "关闭%d 文件描述符的所有的资源", conn->sock_);

        delete conn;
    }
    void Accepter(Connection *conn)
    {
        while (true)
        {
            string clientip;
            uint16_t clientport;
            int err = 0; // 获取错误码,用来判断是非阻塞了/读取被打断了/还是真错误了
            int sock = _sock.Accept(&clientip, &clientport, &err);
            if (sock > 0)
            {
                // 新的sock套接字添加到AddConnetions
                AddConnection(sock, EPOLLIN | EPOLLET,
                              bind(&TcpServer::recver, this, placeholders::_1),
                              bind(&TcpServer::sender, this, placeholders::_1),
                              bind(&TcpServer::excepter, this, placeholders::_1));

                logMessage(DEBUG, "get a new link, info: [%s:%d]", clientip.c_str(), clientport);
            }
            else
            {
                if (err == EAGAIN || err == EWOULDBLOCK)
                    break;
                else if (err == EINTR)
                    continue;
                else
                    break;
            }
        }
    }

    void AddConnection(int sock, uint32_t events, func_t recver, func_t sender, func_t excepter)
    {
        // 1.设置非阻塞,ET模式fd要非阻塞
        if (events & EPOLLET)
            Util::SetNonBlock(sock);

        // 2. 该sock创建Connection,并初始化,并添加到connections_
        Connection *conn = new Connection(sock);

        // 3. 给对应的sock设置对应的回调方法
        conn->Rigster(recver, sender, excepter);

        // 4. 其次将sock与它要关心的事件"写透式"注册到epoll中,让epoll帮我们关心
        bool r = _epoll.AddEvents(sock, events);
        assert(r);
        (void)r;

        // 5. 将kv添加到connections_
        _connections.insert(make_pair(sock, conn));

        logMessage(DEBUG, "add new sock : %d in epoll and unordered_map", sock);
    }

public:
private:
    uint16_t _port;

    Sock _sock;
    Epoller _epoll;
    unordered_map<int, Connection *> _connections; // 所有链接集合
    struct epoll_event *_revents;
    int _num;

    func_t _service;
};

调用逻辑完整代码

#include "TcpServer.hpp"
#include "Err.hpp"
#include <memory>
#include "Protocol.hpp"

enum
{
    OK,
    DIV_ERR,
    MOD_ERR,
    OPER_ERR
};

void cal(const Request &req, Response &resp)
{
     req已经有结构化完成的数据啦,你可以直接使用
    resp._exitcode = OK;
    resp._result = OK;

    switch (req._op)
    {
    case '+':
        resp._result = req._x + req._y;
        break;
    case '-':
        resp._result = req._x - req._y;
        break;
    case '*':
        resp._result = req._x * req._y;
        break;
    case '/':
    {
        if (req._y == 0)
            resp._exitcode = DIV_ERR;
        else
            resp._result = req._x / req._y;
    }
    break;
    case '%':
    {
        if (req._y == 0)
            resp._exitcode = MOD_ERR;
        else
            resp._result = req._x % req._y;
    }
    break;
    default:
        resp._exitcode = OPER_ERR;
        break;
    }
}

void calculate(Connection *conn)
{
    string onePackage;
    while (PartOnepackge(conn->inbuffer_, &onePackage))
    {
        string req_str;
        // 1.2 我们保证,我们req_text里面一定是一个完整的请求:"content_len"\r\n"x op y"\r\n
        if (!Delenth(onePackage, &req_str))
            return;

        cout << "去掉报头的正文:\n"
             << req_str << endl;

        // 2. 对请求Request,反序列化
        // 2.1 得到一个结构化的请求对象
        Request req;
        if (!req.deserialize(req_str))
            return;


        // 3. 计算机处理,req.x, req.op, req.y --- 业务逻辑
        // 3.1 得到一个结构化的响应
        Response resp;
        cal(req, resp); // req的处理结果,全部放入到了resp, 回调是不是不回来了?不是!


        // 4.对响应Response,进行序列化
        // 4.1 得到了一个"字符串"
        string resp_str;
        if (!resp.serialize(&resp_str))
            return;

        // 5 构建成为一个完整的报文
        conn->outbuffer_ += Enlenth(resp_str);

        std::cout << "--------------result: " << conn->outbuffer_ << std::endl;
    }
    // 直接发
    if (conn->sender_)
        conn->sender_(conn);
}

static void usage(std::string proc)
{
    std::cerr << "Usage:\n\t" << proc << " port" << "\n\n";
}

int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        usage(argv[0]);
        exit(USAGG_ERR);
    }

    uint16_t port=atoi(argv[1]);
    std::unique_ptr<TcpServer> uls(new TcpServer(calculate,port));
    uls->initServer();
    uls->Dispatch();

    return 0;
}

总结一下: 这个TcpServer服务器就是传说中的Reactor,对应上服务器上面有一个一个的Connection,未来有哪一个fd就绪了,它就把某一个Connection激活告诉我们事件,Reactor就会进行事件派发。然后去执行对应Connection的读,写,异常方法。这种模式就是Reactor反应堆模式

在这里插入图片描述
Reactor,很明显当我们进行事件就绪了它会回调曾经注册的绑定的方法,今天我们的服务器既保证了就绪事件通知,还负责了IO,其实还负责了业务处理。如果今天我们把一个报文构建成一个任务,扔到后端任务队列,让后端线程池帮我们处理后面的任务。此时对应的Reactor只负责读事件就绪+负责IO,不负责业务处理。

只负责读事件就绪+负责IO 这就是就是半同步
只负责业务处理 这就半异步

还有一种服务器少用的Proactor前摄器模式,可以自己了解了解。

如果想把这个服务器改成多多进程多线程,其实可以直接创建多进程,每个进程里都搞一个TcpServer,或者说创建多线程,每一个线程里都搞一个Tcpserver,把_listensock套接字添加到其中一个线程的Reactor里,一旦有连接就绪的时候,不是要执行Accepter吗,执行Accepter的时候就不仅仅是 AddConnection了,而是尝试把这个连接添加到哪一个线程的epoll中, 然后就在这个线程里把这个文件描述符处理完。

最后说一点,我们还可以在Connectoin中设置一个lasttime记录最近访问时间,每一次读或写的时候我们都更新一个对应时间戳,所以只要读或写就绪了就可以更新一下对应Connection最近时间,换句话说此时我们就可以在派发事件后当所有事件处理完了,就可以在unordered_map遍历所有的连接,计算每一个连接已经有多长时间没有动了,因为每一个连接都有自己的最近访问时间,每一次访问都会更新,不更新就是最开始的,所以我们可以获取当前时间在减去Connectoin里保存的历史最近访问时间,计算出时间差,然后就可以所以连接进行连接管理。时间超过5分钟都没有访问过的,服务器就直接把你关掉。

在这里插入图片描述

如上就是Reactor全部内容。如上也是Linux的全部内容。总结了40多篇Linux从系统编程到网络编程的文章,内容很丰富!!想说什么也不知道该怎么说,就这样把!
大家下篇文章再见 🙌 🙌 🙌 !

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/638304.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JavaScript Window对象

一、BOM&#xff08;浏览器对象模型&#xff09; window对象是一个全局对象&#xff0c;也可以说是JavaScript中的顶级对象。 像document、alert()、console.log()这些都是window的属性&#xff0c;基本BOM的属性和方法都是window的。 所有通过var定义在全局作用域中的变量、…

JAVASE之类和对象(1)

路虽远&#xff0c;行则将至&#xff1b;事虽难&#xff0c;做则必成。 主页&#xff1a;趋早——Step 专栏&#xff1a;JAVASE gitte&#xff1a;https://gitee.com/good-thg 引言&#xff1a; 这篇文章我们只介绍前半部分&#xff0c;下一篇文章会介绍剩下的部分。 目录 一、…

电路仿真软件:点亮教学新篇章,十大便利助力高效学习

在信息化时代的浪潮中&#xff0c;电路仿真软件以其独特的优势&#xff0c;逐渐在教学领域崭露头角。它不仅能够帮助学生更好地理解电路知识&#xff0c;还能提升教师的教学效果。接下来&#xff0c;让我们一起探讨电路仿真软件对教学带来的十大便利。 一、直观展示电路原理 电…

自用网站合集

总览 线上工具-图片压缩 TinyPNG线上工具-url参数解析 线上工具-MOV转GIF UI-Vant微信小程序版本其他-敏捷开发工具 Leangoo领歌 工具 线上工具-图片压缩 TinyPNG 不能超过5m&#xff0c;别的没啥缺点 线上工具-url参数解析 我基本上只用url参数解析一些常用的操作在线…

MSI U盘重装系统

MSI U盘重装系统 1. 准备一块U盘 首先需要将U盘格式化&#xff0c;这个格式化并不是在文件管理中将U盘里面的所有东西都删干净就可以了&#xff0c;需要在磁盘管理中&#xff0c;将这块U盘格式化&#xff0c;如果这块U盘有分区的话&#xff0c;那将所有的分区都格式化并且删除…

非阻塞sokcet和epoll

在Muduo网络库中同时使用了非阻塞socket与epoll&#xff0c;在此简单梳理下。 非阻塞sokcet和epoll共同工作的过程主要涉及网络编程中的非阻塞I/O和事件驱动机制。下面将详细解释这两者如何协同工作&#xff1a; 非阻塞socket简介 在传统的阻塞socket编程中&#xff0c;当调用…

springboot-阿里羚羊 服务端埋点

官方文档 集成Java SDK 手动引入jar包「quickaplus-log-collector-java-sdk-1.0.1-SNAPSHOT.jar」 <dependency><groupId>com.alibaba.lingyang</groupId><artifactId>quickaplus-log-collector-java-sdk</artifactId><version>1.0.1&l…

老Java学 Go 笔录(二) 从 go 的编译开始学起

目录 一.版本选择二.环境准备三.工具的选择四.第一个 hello go4.1 开发4.2 编译4.3 编译运行4.4 直接安装 五.用 go 快速搭建 webserver六.调用外部三方方法七.go vs java 的执行 前言 专栏旨在利用现有的 java 体系内容去完成 go 语言的学习. 本次行文是在 https://go.dev/doc…

华为编程题目(实时更新)

1.大小端整数 计算机中对整型数据的表示有两种方式&#xff1a;大端序和小端序&#xff0c;大端序的高位字节在低地址&#xff0c;小端序的高位字节在高地址。例如&#xff1a;对数字 65538&#xff0c;其4字节表示的大端序内容为00 01 00 02&#xff0c;小端序内容为02 00 01…

JavaScript面试 题

1.延时加载JS有哪些方式 延时加载 :async defer 例如:<script defer type"type/javascript" srcscript.js></ script> defer:等html全部解析完成,才会执行js代码,顺次执行的 async: js和html解析是同步的,不是顺次执行js脚本(谁先加载完先执行谁)2.JS数…

AI视频教程下载:零代码创建10个GPTs 和Ai Agents

你将学到什么&#xff1a; 理解GPTs的基础知识&#xff1a;掌握GPTs Agent的基础知识及其在现代AI应用中的作用。 创建自定义ChatGPT Agent&#xff1a;学习构建针对内容创作、沟通和社交媒体管理等多种任务量身定制的ChatGPT Agent。 在商业和个人项目中的实用应用&#xf…

“闻起来有股答辩的味道”,答辩到底是什么味?

“闻起来有股答辩的味道”&#xff0c;答辩到底是什么味&#xff1f; 一位名叫“小鸡全家桶”的作者虚构了这样一个学校故事&#xff0c;故事说&#xff0c;由于学生的考试试卷印刷得特别模糊&#xff0c;导致里面的插图根本看不清&#xff0c;学生感到懵逼&#xff0c;监考老…

Python语法学习之 - 生成器表达式(Generator Expression)

第一次见这样的语法 本人之前一直是Java工程师&#xff0c;最近接触了一个Python项目&#xff0c;第一次看到如下的代码&#xff1a; i sum(letter in target_arr for letter in source_arr)这条语句是计算source 与 target 数组中有几个单词是相同的。 当我第一眼看到这样…

23种设计模式(持续输出中)

一.设计模式的作用 设计模式是软件从业人员长期总结出来用于解决特定问题的通用性框架&#xff0c;它提高了代码的可维护性、可扩展性、可读性以及复用性。 二.设计模式 1.工厂模式 工厂模式提供了创建对象的接口&#xff0c;而无需制定创建对象的具体类&#xff0c;工厂类…

防抖和节流的区别和举例(简要说明、形象比喻、容易理解)

1、含义&#xff1a; 在前端开发技术中&#xff0c;防抖&#xff08;Debounce&#xff09;和节流&#xff08;Throttle&#xff09;是两种常用的性能优化技术&#xff0c;主要用于处理高频事件触发&#xff0c;如用户的点击、滚动、输入等操作。 防抖&#xff08;Debounce&am…

【Linux系统】文件与基础IO

本篇博客整理了文件与文件系统、文件与IO的相关知识&#xff0c;借由库函数、系统调用、硬件之间的交互、操作系统管理文件的手段等&#xff0c;旨在让读者更深刻地理解“Linux下一切皆文件”。 【Tips】文件的基本认识 文件 内容 属性。文件在创建时就有基本属性&#xff0…

Ribbon负载均衡(自己总结的)

文章目录 Ribbon负载均衡负载均衡解决的问题不要把Ribbon负载均衡和Eureka-Server服务器集群搞混了Ribbon负载均衡代码怎么写ribbon负载均衡依赖是怎么引入的&#xff1f; Ribbon负载均衡 负载均衡解决的问题 首先Ribbon负载均衡配合Eureka注册中心一块使用。 在SpringCloud…

Packet Tracer-HSRP+DHCPv4+VLAN间路由+以太通道综合实验

实验拓扑&#xff1a; 实验内容&#xff1a; VLAN及VLAN间路由的配置&#xff0c;以太通道的配置&#xff0c;STP的根调整&#xff0c;DHCPv4的配置&#xff0c;首跳冗余HSRP的配置。 实验最终结果&#xff1a; PC可以自动获取到DHCP-Server分配的IP地址&#xff0c;实现首跳…

【MySQL精通之路】InnoDB(5)-内存结构

总目录&#xff1a; 【MySQL精通之路】InnoDB存储引擎-CSDN博客 上一篇&#xff1a; 【MySQL精通之路】InnoDB(4)-架构图-CSDN博客 目录 ​编辑 1 缓存池&#xff08;Buffer Pool&#xff09; 1.1 缓存池LRU算法 1.2 缓存区配置 1.3 使用InnoDB标准监视器监视缓存池 …

Kettle简介

一、Kettle简介 Kettle是一个开源的ETL&#xff08;Extract-Transform-Load的缩写&#xff0c;即数据抽取、转换、装载的过程&#xff09;项目。 项目名很有意思&#xff0c;水壶。按项目负责人Matt的说法&#xff1a;把各种数据放到一个壶里&#xff0c;然后呢&#xff0c;以…