Linux - 高级IO

目录

  • 理解五种IO模型
  • 非阻塞IO的设置
  • 多路转接之select
    • 实现一个简易的select服务器
    • select服务器的优缺点
  • 多路转接之poll
    • 实现一个简易的poll服务器
    • poll服务器的优缺点
  • 多路转接之epoll
    • epoll原理
    • epoll的优势
    • 用epoll实现一个简易的echo服务器
  • epoll的LT和ET工作模式
    • 什么是LT和ET
  • 实现一个简易的reactor服务器

五种IO模型

如何理解五种IO模型:
应用层调用read/recv和write/send的时候,本质就是数据在用户层和操作系统的缓冲区的拷贝交互。要进行拷贝,必须先判断条件成立,该条件就是读写事件就绪。如果条件不成立,这些函数就会被阻塞住,等待资源就绪。
所以输入输出IO可以分为两个步骤:等待 + 拷贝

给大家举个例子帮助大家理解五种IO模型:
钓鱼 = 等待 + 钓(动作)
河边的张三在钓鱼,张三的钓鱼动作:一直盯着鱼漂,直到鱼漂抖动就说明有鱼来了。
李四也在钓鱼,李四的钓鱼动作:一边做自己的事,一边时不时观察鱼漂。
王五有个高级鱼竿,鱼漂上带着铃铛,王五的钓鱼动作:一直做自己的事,铃铛鱼漂响了,就去钓鱼
赵六买了20个鱼竿钓鱼,赵六的钓鱼动作:一直检查每个鱼漂,任何一个鱼漂动了就钓鱼。
小王开车载田七去公司的时候,田七路过的时候看见别人钓鱼,突然自己想吃野生鱼了,就叫小王去钓鱼,钓到了就给田七送过去。

人:进程/线程。鱼:数据。河:内核空间。鱼漂:数据就绪的事件。鱼竿:文件描述符(都是通过文件描述符进行操作)。钓鱼:recv/read
张三对应的是阻塞式IO。李四对应的是非阻塞IO,王五对应的是信号阻塞式IO(其中王五提前是知道铃铛响了该怎么办,该钓鱼),赵六对应的是多路转接/多路复用IO。田七对应的是异步IO(小王(操作系统)有鱼了告诉就告诉田七,田七直接用即可)
在这几个人中,你会觉得哪个人的钓鱼效率是最高的(钓的鱼最多的)?很明显是赵六,因为河中的鱼对每个钩子的咬钩概率是相同的,而赵六的钩子占比是最大的,所以必定单位时间内钓鱼的数量是最多的。

阻塞式IO:在内核将数据准备好之前,系统调用会一直等待,所有的文件描述符,默认都是阻塞方式。阻塞IO是最常见的IO模型,因为简单易上手。
在这里插入图片描述

非阻塞IO:如果内核还未将数据准备好,系统调用仍然会直接返回,并且返回EWOULDBLOCK错误码。非阻塞IO往往需要程序员循环的方式反复尝试读写该文件描述符,这个过程称为轮询,这对CPU来说是较大的浪费,一般只有特定场景下才使用。
在这里插入图片描述

信号驱动IO:内核将数据准备好的时候,使用SIGIO信号通知应用程序进行IO操作
在这里插入图片描述

IO多路转接:虽然从流程上看起来和阻塞IO类似,实际上最核心在于IO多路转接能够同时等待多个文件描述符的就绪状态
在这里插入图片描述

异步IO:由内核在数据拷贝完成时,通知应用程序(而信号驱动是告诉进程何时可以开始拷贝数据,需要进程自己拷贝)
在这里插入图片描述

什么叫做高效IO呢?
任何IO过程,都包含两个步骤,等待 + 拷贝。而且在实际的应用场景中,等待消耗的时间往往都远远高于拷贝的时间。让IO更高效,最核心的办法就是让等待的时间尽量少。在单位时间内,IO过程中,等待对于拷贝的比重越小,IO效率越高。

阻塞式IO和非阻塞IO有什么区别?
区别:等待时的状态不同。
阻塞调用是指调用结果返回之前,当前线程会被挂起,调用线程只有在到得到结果之后才会返回。
非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。

同步IO和异步IO有什么区别?
同步,就是在发出一个调用时,在没有得到该调用的结果之前,该调用就不会返回。但是一旦调用返回,就得到返回值了。换句话说,就是有调用者主动等待这个调用结果。
异步,调用在发出之后,这个调用就直接返回,所以没有返回结果。直到被动的通过状态/信号来通知来处理

非阻塞IO的设置

如何设置非阻塞IO?
在这里插入图片描述
以前我们在写网络编程的时候,recv的flag选项填的是0
如果我们想非阻塞等待,则可以设flags为MSG_DONTWAIT。也可以用open非阻塞的方式打开,方法有很多。
但提供一种更通用的做法,文件描述符本质就是下标,每一个下标指向的就是内核里的文件对象,文件对象中是有文件描述符标志的。
在这里插入图片描述
传入的cmd值不同,后面追加的参数也不相同
fcntl函数有5种功能:
复制一个现有的描述符(cmd = F_DUPFD)
获得/设置文件描述符标记(cmd = F_GETFD或F_SETFD)
获得/设置文件状态标记(cmd = F_GETFL或F_SETFL)
获得/设置异步I/O所有权(cmd = F_GETOWN或F_SETOWN)
获得/设置记录锁(cmd = F_GETLK, F_SETLK或F_SETLKW)

实现非阻塞轮询方式读取标准输入

#include <iostream>
#include <unistd.h>
#include <fcntl.h>

void SetNoBlock(int fd)
{
    int flag = fcntl(fd, F_GETFL);
    fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}

int main()
{
    SetNoBlock(0);
    char buffer[4096];
    while (1)
    {
        std::cout << "Enter#";
        fflush(stdout);
        int n = read(0, buffer, sizeof(buffer) - 1);
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << buffer << std::endl;
        }
        else if (n == 0)
        {
            std::cout << "read done" << std::endl;
            break;
        }
        else
        {
            std::cout << "read error: " << strerror(errno) <<std::endl;
            break;
        }
    }
    return 0;
}

在这里插入图片描述
为什么还没有输入就直接读出错了?错误信息是资源暂时还没有就绪。
结论:
设置成为非阻塞,如果底层fd数据没有就绪,recv/read/write/send,返回值会以出错的形式返回。这样会有两种情况:a、真的出错 b、底层资源没有就绪
我们怎么区分呢? – 通过errno区分
在这里插入图片描述

#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <cstring>

void SetNoBlock(int fd)
{
    int flag = fcntl(fd, F_GETFL);
    fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}

int main()
{
    SetNoBlock(0);
    char buffer[4096];
    while (1)
    {
        int n = read(0, buffer, sizeof(buffer) - 1);
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << buffer << std::endl;
        }
        else if (n == 0)
        {
            std::cout << "read done" << std::endl;
            break;
        }
        else
        {
            if (errno == EWOULDBLOCK)
            {
                //只是底层没有数据就绪,这种错误是可以容忍的
                //do_other_thing(); //也可以做其他事情,设置一些方法,就不演示了
                continue;
            }
            std::cout << "read error: " << strerror(errno) <<std::endl;
            break;
        }
    }
    return 0;
}

在这里插入图片描述

多路转接之select

select是干什么的?select负责多路转接IO中的等待,一次可以等待多个文件描述符,并不负责拷贝。
select系统调用是用来让我们的程序监视多个文件描述符的状态变化的,程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态变化。
在这里插入图片描述
在这里插入图片描述
nfds:填写要监视的所有要监视的fd中的最大值 + 1。这与底层内核有关系,内核会从0一直遍历到nfds。
timeout:为输入输出型参数
如果设置为nullptr,则为阻塞式。
若设置timeout = {0, 0};为非阻塞,会一直函数返回。
若设置timeout = {5, 0};如果一直收不到消息:阻塞5s,每过5s就超时(函数返回)一次,这样反复循环。如果过2s收到了消息,则该参数会返回{3, 0},表示还有3s才超时。
return返回值:大于0,有几个fd就绪了。等于0,超时/非阻塞返回了。小于0,select调用失败了
readfds:为输入输出参数
做输入参数时:表示用户告诉内核,你要帮我关心的文件描述符的读,fd_set表示位图,可以通过位图设置多个文件描述符。
做输出参数时:表示内核告诉用户,你要关心的文件描述中有哪些已经就绪了。
writefds和exceptfds同readfds参数

该位图既然是一种类型,必然有大小,所以能够想关心的fd的个数一定是有上限的。
int main()
{
std::cout << sizeof(fd_set) << std::endl;
return 0;
}
在这里插入图片描述
128 * 8 = 1024bit,最多只能关心1024个fd。bit位的位置代表fd是多少,为0则代表没设置该fd,为1则代表设置了该fd。

实现一个简易的select服务器

为了节省篇幅,后面再贴完整代码

#include "Socket.hpp"

class SelectServer
{
public:
    SelectServer(uint16_t port)
    :_port(port)
    {}
    void Init()
    {
        _listensock.CreateSocket();
        _listensock.Setsockopt();	//设置端口复用
        _listensock.Bind(_port);
        _listensock.Listen();
    }
    void Start()
    {
        while (1)
        {
            //能否直接调用accept?不能直接accept,因为会阻塞!我们的目标就是写多路转接IO,所以不能出现阻塞
        }
    }
    ~SelectServer()
    {
        _listensock.Close();
    }
private:
    Socket _listensock;
    uint16_t _port;
};

更新代码

#include "Socket.hpp"


class SelectServer
{
public:
    SelectServer(uint16_t port)
    :_port(port)
    {}
    void Init()
    {
        _listensock.CreateSocket();
        _listensock.Setsockopt();//设置地址复用
        _listensock.Bind(_port);
        _listensock.Listen();
    }
    void HandleEvent()
    {}
    void Start()
    {
        while (1)
        {
            //能否直接调用accept?不能直接accept,因为会阻塞!
            fd_set readfd;
            FD_SET(_listensock.Fd(), &readfd);
            struct timeval timeout = {3, 0};
            int n = select(_listensock.Fd() + 1, &readfd, nullptr, nullptr, &timeout);//暂时这样设置
            if (n < 0)
            {
                std::cout << "select error: " << strerror(errno) << std::endl;
                break;
            }
            if (n == 0)
            {
                lg(Debug, "[%d: %d]", timeout.tv_sec, timeout.tv_usec);
            }
            else
            {
                std::cout << "Get a New Link" << std::endl;
                sleep(2);
                HandleEvent();
            }
        }
    }
    ~SelectServer()
    {
        _listensock.Close();
    }
private:
    Socket _listensock;
    uint16_t _port;
};

运行结果如下:
请添加图片描述
timeout设置为{3, 0}可以看到select每隔3s就超时一次,同理如果timeout设置为{0, 0},则会一直超时,如果设置为null,则会阻塞。
客户端连接上后,我们也看到了,他一直在打印Get a New Link,为什么一直打印?因为上层没有把底层数据拿上去处理,所以select就会一直提醒事件就绪了。
怎么处理?

更新代码

#include "Socket.hpp"


class SelectServer
{
public:
    SelectServer(uint16_t port)
    :_port(port)
    {}
    void Init()
    {
        _listensock.CreateSocket();
        _listensock.Setsockopt();
        _listensock.Bind(_port);
        _listensock.Listen();
    }
    void HandleEvent(fd_set* rfd)
    {
        //代码走到这里就说明我们的连接事件就绪了
        if (FD_ISSET(_listensock.Fd(), rfd) == true)
        {
            string clientIp; uint16_t clientPort;
            int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了
            if (newSockfd < 0)
            {
                return;
            }
            lg(Info, "accept success: %s, %d\n", clientIp.c_str(), clientPort);
        }
    }
    void Start()
    {
        while (1)
        {
            //能否直接调用accept?不能直接accept,因为会阻塞!
            fd_set readfd;
            FD_SET(_listensock.Fd(), &readfd);
            struct timeval timeout = {3, 0};
            int n = select(_listensock.Fd() + 1, &readfd, nullptr, nullptr, /*&timeout*/0);
            if (n < 0)
            {
                std::cout << "select error: " << strerror(errno) << std::endl;
                break;
            }
            if (n == 0)
            {
                lg(Debug, "[%d: %d]", timeout.tv_sec, timeout.tv_usec);
            }
            else
            {
                std::cout << "Get a New Link" << std::endl;
                sleep(1);
                HandleEvent(&readfd);
            }
        }
    }
    ~SelectServer()
    {
        _listensock.Close();
    }
private:
    Socket _listensock;
    uint16_t _port;
};

代码运行结果:
请添加图片描述

更新HandleEvent函数的代码

void HandleEvent(fd_set* rfd)
{
    //代码走到这里就说明我们的连接事件就绪了
    if (FD_ISSET(_listensock.Fd(), rfd) == true)
    {
        string clientIp; uint16_t clientPort;
        int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了
        if (newSockfd < 0)
        {
            return;
        }
        lg(Info, "accept success: %s, %d\n", clientIp.c_str(), clientPort);

        char buffer[4096];
        ssize_t n = read(newSockfd, buffer, sizeof(buffer) - 1);
        /*我们已经得到了新的sock,可以直接用read接口吗?不可以,因为我们这里是单线程/单进程,直接read可能会被阻塞住,以前我们的代码是直接托管给了
        多线程/多进程,他们的阻塞不会影响主进程。我们这个单进程应该将新到来的newSocketfd交给select,让select帮我们来管理*/
    }
}

所以此时,我们需要将HandleEvent函数里面的newSocketfd传递给Start函数。如何做呢?可以使用数组作为该类的成员 – 这个数组也叫做辅助数组,这也是select最大的特点之一:使用辅助数组,让文件描述符在函数之间互相传递。

#include "Socket.hpp"


class SelectServer
{
    static const int MaxFdNum = sizeof(fd_set) * 8; //因为位图fd_set有大小,所以最多只能管理1024个fd
    static const int DefaultNum = -1;
public:
    SelectServer(uint16_t port)
    :_port(port)
    {
        //初始化辅助数组
        for (int i = 0; i < MaxFdNum; ++i)
        {
            read_fd_array[i] = DefaultNum;
        }
    }
    void Init()
    {
        _listensock.CreateSocket();
        _listensock.Setsockopt();
        _listensock.Bind(_port);
        _listensock.Listen();
    }
    void HandleEvent(fd_set* rfd)
    {
        for (int i = 0; i < MaxFdNum; ++i)//遍历所有的read_fd_array中是否有满足条件FD_ISSET
        {
            int socket = read_fd_array[i];
            if (socket == DefaultNum)
                continue;
            if (FD_ISSET(socket, rfd) == true && _listensock.Fd() == socket)
            {
                //代码走到这里就说明我们的连接事件就绪了
                string clientIp; uint16_t clientPort;
                int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了
                if (newSockfd < 0)
                {
                    return;
                }
                lg(Info, "accept success, sock: %d, clientIp: %s, clientPort: %d\n", newSockfd, clientIp.c_str(), clientPort);

                //添加新的文件描述符到辅助数组
                for (int i = 0; i < MaxFdNum; ++i)
                {
                    if (read_fd_array[i] == DefaultNum)
                    {
                        if (_maxfd < newSockfd)
                            _maxfd = newSockfd;
                        read_fd_array[i] = newSockfd;
                        break;
                    }
                    if (i == MaxFdNum - 1)
                    {
                        lg(Warning, "server is full, close sock: %d", newSockfd);
                        close(newSockfd);
                    }
                }
            }
            else if (FD_ISSET(socket, rfd) == true)
            {
                //说明是其他的套接字读事件就绪了
                char buffer[4096];
                int n = read(socket, buffer, sizeof(buffer) - 1);
                if (n > 0)
                {
                	buffer[n] = 0;
                    lg(Info, "client say:%s", buffer);
                }
                else if (n == 0)
                {
                    //说明对端把连接关闭了
                    lg(Info, "client closed...");
                    //把该socket就可以移除关心状态了
                    read_fd_array[i] = DefaultNum;
                    close(socket);
                }
                else
                {
                    //读出错了
                    lg(Error, "read error");
                    //把该socket就可以移除关心状态了
                    read_fd_array[i] = DefaultNum;
                    close(socket);
                }
            }
        }
    }
    void Start()
    {
        //将lisntensockfd设置进辅助数组
        read_fd_array[0] = _listensock.Fd();
        _maxfd = read_fd_array[0];
        while (1)
        {
            //因为select的参数为输入输出参数,所以我们每次调用select的时候都需要重新设置一遍select的参数,所以需要将下面的语句写入循环里
            fd_set readfd;
            FD_ZERO(&readfd);   //一定要设置为0,否则可能会出现select失败的问题
            for (int i = 0; i < MaxFdNum; ++i)
            {
                if (read_fd_array[i] != DefaultNum)
                {
                    if (_maxfd < read_fd_array[i]) 
                        _maxfd = read_fd_array[i];
                    //将辅助数组中所要关心的文件描述符全部都设置进去
                    FD_SET(read_fd_array[i], &readfd);
                }
            }
            int n = select(_maxfd + 1, &readfd, nullptr, nullptr, 0);//为了易于观察,我们将timout参数设置为0
            if (n < 0)
            {
                std::cout << "select error: " << strerror(errno) << std::endl;
                break;
            }
            if (n == 0)
            {
                //超时返回(非阻塞返回)
                lg(Info, "timeout...");
            }
            else
            {
                HandleEvent(&readfd);
            }
        }
    }
    ~SelectServer()
    {
        _listensock.Close();
    }
private:
    int _maxfd; //最大的文件描述符是什么
    Socket _listensock;
    uint16_t _port;
    int read_fd_array[MaxFdNum]; //记录的是需要关心的读事件的文件描述符,-1表示不关心,非-1表示需要关心的文件描述符是什么
    // int write_fd_array[MaxFdNum];    //仅为了理解多路转接IO,不考虑这两种情况,情况简单化,只考虑读事件
    // int except_fd_array[MaxFdNum];
};

代码运行结果:
请添加图片描述
注意,我们的可是单进程哦。实现了并发处理多个请求。

整理代码,完整代码(允许结果和上面演示效果相同):

Log.hpp文件 – 往期文章实现过

#pragma once
#include <iostream>
#include <string>
#include <stdarg.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>

using namespace std;

#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4

#define Screen 1
#define Onefile 2
#define Classfile 3

#define fileName "log.txt"
//使用前需要创建log目录
class Log
{
public:
    Log()
    {
        printMethod = Screen;
        path = "./log/";
    }
    void Enable(int method)
    {
        printMethod = method;
    }
    void printOneFile(string logname, const string& logtxt)
    {
        logname = path + logname;
        int fd = open(logname.c_str(), O_WRONLY | O_APPEND | O_CREAT, 0666);//open只会创建文件不会创建目录
        if (fd < 0)
        {
            perror("open failed");
            return;
        }
        write(fd, logtxt.c_str(), logtxt.size());
        close(fd);
    }
    void printClassFile(int level, const string& logtxt)
    {
        string filename = fileName;
        filename += ".";
        filename += leveltoString(level);
        printOneFile(filename, logtxt);
    }
    void printLog(int level, const string& logtxt)
    {
        if (printMethod == Screen)
        {
            cout << logtxt << endl;
            return;
        }
        else if (printMethod == Onefile)
        {
            printOneFile(fileName, logtxt);
            return;
        }
        else if (printMethod == Classfile)
        {
            printClassFile(level, logtxt);
            return;
        }
    }
    const char* leveltoString(int level)
    {
        if (level == Info) return "Info";
        else if (level == Debug) return "Debug";
        else if (level == Error) return "Error";
        else if (level == Fatal) return "Fatal";
        else return "default";
    }
    void operator()(int level, const char* format, ...)
    {
        time_t t = time(nullptr);
        struct tm* st = localtime(&t);

        char leftbuffer[4096];
        snprintf(leftbuffer, sizeof(leftbuffer), "year: %d, month: %d, day: %d, hour: %d, minute: %d, second: %d\n\
        [%s]:",
        st->tm_year + 1900, st->tm_mon + 1, st->tm_mday, st->tm_hour, st->tm_min, st->tm_sec, leveltoString(level));
        
        char rightbuffer[4096];
        va_list start;
        va_start(start, format);
        vsnprintf(rightbuffer, sizeof(rightbuffer), format, start);
        va_end(start);
        char logtxt[4096 * 2];
        snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);
        printLog(level, logtxt);
    }
private:
    int printMethod;
    string path;//路径与文件名解耦,最后将路径和文件粘合起来,再用open打开即可
};

Sock.hpp文件 – 往期文章实现过

#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <cstring>
#include "Log.hpp"

Log lg;
class Socket
{
public:
    Socket()
    {}
    ~Socket()
    {}
    void CreateSocket()
    {
        _sockfd = socket(AF_INET, SOCK_STREAM, 0);
        if (_sockfd < 0)
        {
            lg(Fatal, "create socket failed:%s", strerror(errno));
            exit(1);
        }
    }
    void Bind(uint16_t serverPort)
    {
        struct sockaddr_in server;
        server.sin_family = AF_INET;
        server.sin_port = htons(serverPort);
        server.sin_addr.s_addr = INADDR_ANY;
        int n = bind(_sockfd, (struct sockaddr*)&server, sizeof(server));
        if (n < 0)
        {
            lg(Fatal, "bind socket failed:%s", strerror(errno));
            exit(2);
        }
    }
    void Listen()
    {
        int n = listen(_sockfd, 5);
        if (n < 0)
        {
            lg(Fatal, "listen socket failed:%s", strerror(errno));
            exit(3);
        }
    }
    int Accept(string* clientIp, uint16_t* clinetPort)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        int newsocket = accept(_sockfd, (struct sockaddr*)&peer, &len);
        if (newsocket < 0)
        {
            lg(Error, "accept error:%s", strerror(errno));
            return -1;
        }
        *clientIp = inet_ntoa(peer.sin_addr);
        *clinetPort = ntohs(peer.sin_port);
        return newsocket;
    }
    int Connect(const string& serverIp, uint16_t serverPort)
    {
        struct sockaddr_in server;
        server.sin_family = AF_INET;
        server.sin_addr.s_addr = inet_addr(serverIp.c_str());
        server.sin_port = htons(serverPort);
        int n = connect(_sockfd, (struct sockaddr*)&server, sizeof(server));
        if (n < 0)
        {
            lg(Error, "connect error:%s", strerror(errno));
            return -1;
        }
        return 0;
    }
    void Setsockopt()
    {
        int opt = 1;
        if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (const void*)&opt, sizeof(opt)) < 0)
            lg(Error, "%s\n", strerror(errno));
    }
    void Close()
    {
        close(_sockfd);
    }
    int Fd()
    {
        return _sockfd;
    }
private:
    int _sockfd;
};

SelectServer.hpp文件

#include "Socket.hpp"


class SelectServer
{
    static const int MaxFdNum = sizeof(fd_set) * 8; //因为位图fd_set有大小,所以最多只能管理1024个fd
    static const int DefaultNum = -1;
public:
    SelectServer(uint16_t port)
    :_port(port)
    {
        //初始化辅助数组
        for (int i = 0; i < MaxFdNum; ++i)
        {
            read_fd_array[i] = DefaultNum;
        }
    }
    void Init()
    {
        _listensock.CreateSocket();
        _listensock.Setsockopt();
        _listensock.Bind(_port);
        _listensock.Listen();
    }
    void Recver(int socket, int pos)
    {
        char buffer[4096];
        int n = read(socket, buffer, sizeof(buffer) - 1);
        if (n > 0)
        {
        	buffer[n] = 0;
            lg(Info, "client say:%s", buffer);
        }
        else if (n == 0)
        {
            //说明对端把连接关闭了
            lg(Info, "client closed...");
            //把该socket就可以移除关心状态了
            read_fd_array[pos] = DefaultNum;
            close(socket);
        }
        else
        {
            //读出错了
            lg(Error, "read error");
            //把该socket就可以移除关心状态了
            read_fd_array[pos] = DefaultNum;
            close(socket);
        }
    }
    void Acceptor()
    {
        string clientIp; uint16_t clientPort;
        int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了
        if (newSockfd < 0)
        {
            return;
        }
        lg(Info, "accept success, sock: %d, clientIp: %s, clientPort: %d\n", newSockfd, clientIp.c_str(), clientPort);

        //添加新的文件描述符到辅助数组
        for (int i = 0; i < MaxFdNum; ++i)
        {
            if (read_fd_array[i] == DefaultNum)
            {
                if (_maxfd < newSockfd)
                    _maxfd = newSockfd;
                read_fd_array[i] = newSockfd;
                break;
            }
            if (i == MaxFdNum - 1)
            {
                lg(Warning, "server is full, close sock: %d", newSockfd);
                close(newSockfd);
            }
        }
    }
    void Dispatcher(fd_set* rfd)    //事件派发器
    {
        for (int i = 0; i < MaxFdNum; ++i)//遍历所有的read_fd_array中是否有满足条件FD_ISSET
        {
            int socket = read_fd_array[i];
            if (socket == DefaultNum)
                continue;
            if (FD_ISSET(socket, rfd) == true && _listensock.Fd() == socket)
            {
                //代码走到这里就说明我们的连接事件就绪了
                Acceptor();
            }
            else if (FD_ISSET(socket, rfd) == true)
            {
                //说明是其他的套接字读事件就绪了
                Recver(socket, i);
            }
        }
    }
    void EventLoop()    //事件循环
    {
        //将lisntensockfd设置进辅助数组
        read_fd_array[0] = _listensock.Fd();
        _maxfd = read_fd_array[0];
        while (1)
        {
            //因为select的参数为输入输出参数,所以我们每次调用select的时候都需要重新设置一遍select的参数,所以需要将下面的语句写入循环里
            fd_set readfd;
            FD_ZERO(&readfd);   //一定要设置为0,否则可能会出现select失败的问题
            for (int i = 0; i < MaxFdNum; ++i)
            {
                if (read_fd_array[i] != DefaultNum)
                {
                    if (_maxfd < read_fd_array[i]) 
                        _maxfd = read_fd_array[i];
                    //将辅助数组中所要关心的文件描述符全部都设置进去
                    FD_SET(read_fd_array[i], &readfd);
                }
            }
            int n = select(_maxfd + 1, &readfd, nullptr, nullptr, 0);//为了易于观察,我们将timout参数设置为0
            if (n < 0)
            {
                std::cout << "select error: " << strerror(errno) << std::endl;
                break;
            }
            if (n == 0)
            {
                //超时返回(非阻塞返回)
                lg(Info, "timeout...");
            }
            else
            {
                Dispatcher(&readfd);    //事件派发器
            }
        }
    }
    ~SelectServer()
    {
        _listensock.Close();
    }
private:
    int _maxfd; //最大的文件描述符是什么
    Socket _listensock;
    uint16_t _port;
    int read_fd_array[MaxFdNum]; //记录的是需要关心的读事件的文件描述符,-1表示不关心,非-1表示需要关心的文件描述符是什么
    // int write_fd_array[MaxFdNum];    //仅为了理解多路转接IO,不考虑这两种情况,情况简单化,只考虑读事件
    // int except_fd_array[MaxFdNum];
};

main.cc文件

#include "SelectServer.hpp"


int main()
{
    SelectServer svr(8080);
    svr.Init();
    svr.EventLoop();
    return 0;
}

select服务器的优缺点

优点:能实现成多路转接的服务器,即单进程也能处理多用户的请求
缺点:
1.同时等待的fd是有上限的,因为位图fd_set是有大小的。
2.用户必须借助第三方数组来维护合法的fd
3.使用select接口设置参数时,每次都要对关心的fd进行事件重置。
4.数据拷贝的频率比较高,select函数需要每次都需要重新设置参数,每传一次位图fd_set内核就需要拷贝一次。
5.如果只有一个fd就绪,用户层也需要全部遍历(虽然可以改进但没必要)
6.内核中检测位图fd_set的事件就绪也要遍历(这也就是为什么select的第一个参数传的是文件描述符的最大值,因为内核要以这个范围来遍历)

多路转接之poll

为了解决上面一些的问题,poll就出现了

在这里插入图片描述
fds:struct pollfd数组的首地址
nfds:数组元素的个数
timeout:设置超时时间,单位是ms
填0:非阻塞等待
小于0:表示阻塞等待
大于0:每过timeout就超时一次。(理解同select)
在这里插入图片描述
fd:你要让内核关心的某一个fd
events:输入参数,用户让内核关心fd的事件
revents:输出参数,内核告诉用户,你要关心的fd哪些事件就绪了
从这里看出来了,poll将输入和输出参数分离,所以poll不需要像select一样每次都需要将参数重新设定。
short是一个16bit的位图,events和revents的取值如下:

事件描 述是否可作为输入是否可作为输出
POLLIN数据(包括普通数据和优先数据)可读
POLLRDNORM普通数据可读
POLLRDBAND优先级带数据可读(Linux不支持)
POLLPRI高优先级数据可读,比如TCP带外数据
POLLOUT数据(包括普通数据和优先数据)可写
POLLWRNORM普通数据可写
POLLWRBAND优先级带数据可写
POLLRDHUPPOLLRDHUPTCP连接被对方关闭,或者对方关闭了写操作。它GNU 引入
POLLERR错误
POLLHUP挂起。比如管道的写端被关闭后,读端描述符上将收到POLLHUP事件
POLLNVAL文件描述符没有打开

select只分为了读、写、异常事件,而poll将fd的事件分的更细了。

实现一个简易的poll服务器

参考代码:
Socket.hpp、Log.hpp文件同上

PollerServer.hpp文件

#include "Socket.hpp"
#include <poll.h>
#include <vector>


class PollServer
{
    static const int DefaultNum = -1;
public:
    PollServer(uint16_t port)
    :_port(port)
    {}
    void Init()
    {
        _listensock.CreateSocket();
        _listensock.Setsockopt();
        _listensock.Bind(_port);
        _listensock.Listen();
    }
    void Recver(int socket, int pos)
    {
        char buffer[4096];
        int n = read(socket, buffer, sizeof(buffer) - 1);
        if (n > 0)
        {
            buffer[n] = 0;
            lg(Info, "client say:%s", buffer);
        }
        else if (n == 0)
        {
            //说明对端把连接关闭了
            lg(Info, "client closed...");
            //把该socket就可以移除关心状态了
            _event_fds.erase(_event_fds.begin() + pos);
            close(socket);
        }
        else
        {
            //读出错了
            lg(Error, "read error");
            //把该socket就可以移除关心状态了
            _event_fds.erase(_event_fds.begin() + pos);
            close(socket);
        }
    }
    void Acceptor()
    {
        string clientIp; uint16_t clientPort;
        int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了
        if (newSockfd < 0)
        {
            return;
        }
        lg(Info, "accept success, sock: %d, clientIp: %s, clientPort: %d\n", newSockfd, clientIp.c_str(), clientPort);

        //添加新的文件描述符到辅助数组
        struct pollfd event;
        event.fd = newSockfd;
        event.events |= POLLIN;
        _event_fds.push_back(move(event));
    }
    void Dispatcher()    //事件派发器
    {
        for (int i = 0; i < _event_fds.size(); ++i)//遍历所有的_event_fds中是否有revents事件就绪的
        {
            int socket = _event_fds[i].fd;
            if ((_event_fds[i].revents & POLLIN) == true)
            {
                if (_listensock.Fd() == socket)
                {
                    //代码走到这里就说明我们的连接事件就绪了
                    Acceptor();
                }
                else
                {
                    //说明是其他的套接字读事件就绪了
                    Recver(socket, i);
                }
            }
            else if ((_event_fds[i].revents & POLLOUT) == true)
            {
                //写事件就绪
            }
            else
            {
                //...
            }
        }
    }
    void EventLoop()    //事件循环
    {
        //将lisntensockfd设置进辅助数组
        struct pollfd event;
        event.fd = _listensock.Fd();
        event.events |= POLLIN;
        _event_fds.push_back(move(event));  //将listensock添加到_event_fds数组里
        while (1)
        {
            int n = poll(_event_fds.data(), _event_fds.size(), -1);  //易于观察就设置为-1
            if (n < 0)
            {
                std::cout << "poll error: " << strerror(errno) << std::endl;
                break;
            }
            if (n == 0)
            {
                //超时返回(非阻塞返回)
                lg(Info, "timeout...");
            }
            else
            {
                Dispatcher();    //事件派发器
            }
        }
    }
    ~PollServer()
    {
        _listensock.Close();
    }
private:
    Socket _listensock;
    uint16_t _port;
    std::vector<struct pollfd> _event_fds;  
};

main.cc文件

#include "PollServer.hpp"


int main()
{
    PollServer svr(8080);
    svr.Init();
    svr.EventLoop();
    return 0;
}

运行结果
请添加图片描述

poll服务器的优缺点

优点:
pollfd结构包含了要监视的event事件和就绪的revent事件,输入与输出分离了,接口使用比select方便。
poll解除了fd有上限的问题,数组为vector了,vector最大能有多大取决于操作系统了,不关poll函数的事了。
缺点:
用户层和内核层都需要遍历该数组,都是o(n)的效率,如果用户要关心的fd有上万个,频繁的遍历会影响效率问题。
每次调用poll,都需要把大量的pollfd结果从用户态拷贝到内核中。
有大量fd,若只有少量的fd处于就绪状态,也需要全部线性遍历一遍,随着监视描述符数量的增长,其效率也会线性下降。
总结:解决了select的1、3问题

多路转接之epoll

按照man手册的说法:是为处理大批量的socketfd而作了改进的poll。它是在2.5.44内核中被引进的,linux2.6下,它几乎具备了之前所说的一切优点,被公认为性能最好的多路转接IO。现在只要涉及服务器组件的,都用的是epoll。

在这里插入图片描述
size:该参数已经被废弃,但是要填大于0的值
该函数会创建一个epoll模型,返回epoll模型的文件描述符,什么是epoll模型,后面会讲。
在这里插入图片描述
epfd:epoll模型的描述符
events:输出型参数,struct epoll_event数组的首地址
maxevents:该数组的个数
timeout:意义同epoll参数的timeout
返回值和select、poll一样
在这里插入图片描述
fd:当事件就绪时,能知道该事件是哪个fd就绪了

events可以是以下几个宏的集合

事件描 述
EPOLLIN表示对应的文件描述符可以读(包括对端SOCKET正常关闭)
EPOLLOUT表示对应的文件描述符可以写
POLLPRI表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)
POLLERR表示示对应的文件描述符发生错误
POLLPRI表示对应的文件描述符被挂断
POLLPRI将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的
EPOLLONESHOT只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里

在这里插入图片描述
epfd:epoll模型
op、fd、event:用户告诉内核,要对fd的event事件进行op处理
op:
EPOLL_CTL_ADD添加事件
EPOLL_CTL_MOD修改事件
EPOLL_CTL_DEL删除事件

epoll解决了select的所有缺点,epoll是怎样设计的呢?它的原理是什么?

epoll原理

在这里插入图片描述
如图所示,内核中的三个部分(红黑树,回调函数,就绪队列)组成了epoll模型。epoll模型并且由struct_file结构体所指向,想管理epoll模型,就能通过fd找到epoll模型。

红黑树管理已注册的文件描述符:epoll_ctl对红黑树的增删改操作时间复杂度为O(logN)

就绪事件的通知机制:每当有fd就绪了,网卡驱动就会通知(调用回调函数)epoll,并执行内核曾经注册好的回调函数。有了这个就绪事件通知机制,内核就不用再O(N)的遍历所有的文件描述符是否已经就绪
就绪列表的管理:epoll_wait所捞取的都是已经就绪的

epoll的优势

1.用户层不再需要每次调用接口时都将关心fd拷贝给内核了
2.管理fd个数没有上限
3.不再需要辅助数组了,因为内核中的红黑树就代替了曾经用户要管理的数组
在这里插入图片描述
4.内核中通过事件通知机制O(1)就能将就绪fd放入就绪队列,不再需要遍历所有的fd,来判断是否就绪
5.对事件的管理更方便,只需要对epoll_ctl进行操作
select和poll的底层如图所示

用epoll实现一个简易的echo服务器

参考代码:
Log.hpp文件 和 Socket.hpp文件和之前一样

Epoller.hpp文件

#include <sys/epoll.h>
#include "Log.hpp"
#include <cstring>

class Epoller
{
public:
    Epoller()
    {}
    void EpollerCreate()
    {
        _epollfd = epoll_create(64);
        if (_epollfd < 0)
        {
           lg(Fatal, "Create epoll failed: %s", strerror(errno));
           abort();
        }
    }
    void EpollerUpate(int oper, int fd, uint32_t event)
    {
        int n = 0;
        if (oper == EPOLL_CTL_ADD || oper == EPOLL_CTL_MOD)
        {
            struct epoll_event epEvent;
            epEvent.data.fd = fd;
            epEvent.events = 0;
            epEvent.events |= event;
            n = epoll_ctl(_epollfd, oper, fd, &epEvent);
            if (n < 0)
            {
                lg(Error, "epoll ctl failed: %s", strerror(errno));
            }
        }
        else if (oper == EPOLL_CTL_DEL)
        {
            n = epoll_ctl(_epollfd, oper, fd, 0);
            if (n < 0)
            {
                lg(Error, "epoll ctl failed: %s", strerror(errno));
            }
        }
    }
    int Wait(struct epoll_event* events, int n, int timeout)
    {
        int m = epoll_wait(_epollfd, events, n, timeout);
        if (m < 0)
        {
            lg(Error, "epoll wait failed: %s", strerror(errno));
        }
        else if (m == 0)
        {
            //超时返回(非阻塞返回)
            lg(Info, "timeout...");
        }
        return m;
    }
    int Fd()
    {
        return _epollfd;
    }
private:
    int _epollfd;
};

nocpy.hpp文件


//编程技巧:写防拷贝的类的时候直接继承,就不用自己再手动写了
class nocpy
{
public:
    nocpy(){}
    ~nocpy(){}
private:
    nocpy(const nocpy&) = delete;
    nocpy& operator=(const nocpy&) = delete;
};

EpollServer.hpp文件

#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>


class EpollServer : public nocpy
{
    static const int num = 64;      //一次性最多捞取上来多少个就绪fd, 一次捞取不完下一次可以继续捞取
public:
    EpollServer(uint16_t port)
    :_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller)
    {}
    void Init()
    {
        _listensock_ptr->CreateSocket();
        _listensock_ptr->Setsockopt();
        _listensock_ptr->Bind(_port);
        _listensock_ptr->Listen();

        lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());
        _epoller_ptr->EpollerCreate();
        lg(Info, "create epoller success");
    }
    void Recver(int socket, int pos)
    {
        char buffer[4096];
        int n = read(socket, buffer, sizeof(buffer) - 1);
        if (n > 0)
        {
            buffer[n] = 0;
            lg(Info, "client say:%s", buffer);
            string s = "server echo:";
            s += buffer;
            int m = write(socket, s.c_str(), s.size());
            if (m < 0)
            {
                lg(Error, "write failed: %s", strerror(errno));
            }
        }
        else if (n == 0)
        {
            //说明对端把连接关闭了
            lg(Info, "client closed...");
            //把该socket就可以移除关心状态了
            _epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, socket, 0);
            close(socket);
        }
        else
        {
            //读出错了
            lg(Error, "read error");
            //把该socket就可以移除关心状态了
            _epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, socket, 0);
            close(socket);
        }
    }
    void Acceptor()
    {
        string clientIp; uint16_t clientPort;
        int newSockfd = _listensock_ptr->Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了
        if (newSockfd < 0)
        {
            return;
        }
        lg(Info, "accept success, sock: %d, clientIp: %s, clientPort: %d\n", newSockfd, clientIp.c_str(), clientPort);

        //添加新的文件描述符到Epoller里面
        _epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, newSockfd, EPOLLIN);
    }
    void Dispatcher(struct epoll_event* recvs, int n)    //事件派发器
    {
        for (int i = 0; i < n; ++i)//recvs数组里面全部都是就绪的fd
        {
            int socket = recvs[i].data.fd;
            if ((recvs[i].events & EPOLLIN) == true)
            {
                if (_listensock_ptr->Fd() == socket)
                {
                    //代码走到这里就说明我们的连接事件就绪了
                    Acceptor();
                }
                else
                {
                    //说明是其他的套接字读事件就绪了
                    Recver(socket, i);
                }
            }
            else if ((recvs[i].events & EPOLLOUT) == true)
            {
                //写事件就绪
            }
            else
            {
                //...
            }
        }
    }
    void EventLoop()    //事件循环
    {
        //添加_listensock到Epoller里面
        _epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, _listensock_ptr->Fd(), EPOLLIN);
        struct epoll_event recvs[num];
        while (1)
        {
            int n = _epoller_ptr->Wait(recvs, num, -1);  //易于观察就设置为-1
            Dispatcher(recvs, n);   //捞取上来了n个fd
        }
    }
    ~EpollServer()
    {
        _listensock_ptr->Close();
    }
private:
    std::shared_ptr<Socket> _listensock_ptr;    //智能指针优势:1.可共享拷贝资源 - 不怕浅拷贝 2.灵活的生命周期,可随时释放 3.可延迟初始化 4.异常安全
    std::shared_ptr<Epoller> _epoller_ptr;
    uint16_t _port;
};

main.cc文件

#include "EpollServer.hpp"

int main()
{
    std::unique_ptr<EpollServer> svr(new EpollServer(8080 ));
    svr->Init();
    svr->EventLoop();
    return 0;
}

代码运行结果:
请添加图片描述

epoll的LT和ET工作模式

什么是LT和ET

一个现象:
我们将这行代码注释掉,上层不处理新连接到来的这个事件
在这里插入图片描述
代码运行结果:
请添加图片描述

发现底层会一直通知上层,有新事件到来,请上层处理。
一旦有新的连接到来或者有新的数据到来,上层如果你不取走,底层会一直通知你去取走哦,这种模式就叫做LT

LT:水平触发Level Triggered ,ET:边缘触发Edge Triggered
LT、ET,和示波器很像
在这里插入图片描述
LT一直处于高电平,表示为真。ET只有从低点到高点变化的时候,才为真
epoll默认模式:LT模式。事件到来,但是上层不处理,高电平,一直有效
ET:底层数据或者连接,从无到有,从有到多,变化的时候,才会通知我们一次。

给大家举一个例子
张三是个快递员,是一个比较负责的快递员。假如你买的快递到了,有6个快递,下楼取一下,你可能在打游戏没时间取,过一会儿,张三又给你打电话:你有6个快递,你下来取一下。你游戏没打完就是不取,但是呢,张三期间也一直打电话。张三的同事李四,李四也有你的快递,路过,张三就给李四说:我帮你派发吧。张三现在一共有十个快递,让你下来取,你不下来取张三就一直会给你打电话,张三送快递的模式就叫做LT模式。
过了一周,又要派发你的快递。但是是李四来派发你的快递,李四给你打电话说:你不来取,我就不给你打电话了,我只通知你一次。你一听:那我还是下去取吧,要是走了我快递里就找不到了,我的快递在一堆快递里,除了快递员不知道哪个是我的快递。你取的时候,发现你只能拿走6个当中的4个,一次拿不完,你把快递拿上去之后剩下的快递你不知道在哪了,索性就不要了,你又去打你的游戏了。李四看到张三也有你的快递,还人情就帮张三派发,又给你打电话:你有新的快递到了,你不下来取,我就走了。你一听,这人只通知一次,你就又下去取了,这次就把上次没取完的一并给取走了。

总结:
张三是有快递就一直给你打电话。
李四是从无到有,从有到多,只给你打一次电话,你没取干净,我也不再通知你。
你认为两个快递员谁的工作效率更高呢?
ET,因为这个快递员在单位时间内通知的人数是更多的。ET一小时内可以通知50人,而LT可能只通知了10人。主要是ET的通知效率高。

ET:因为只会通知一次,所以会倒逼程序员使用ET工作模式的时候,每次通知,都必须要把本轮数据全部取走。你怎么知道你把本次就绪底层的数据读取完毕了呢?循环读取,知道读取不到数据了。一般的fd,是阻塞式的fd,如果没有数据了会阻塞,所以在ET模式下,我们的fd必须是非阻塞的。
ET的通知效率高,不仅仅如此,ET的IO效率也更高,原因在于,每通知一次就要求程序员把本轮数据全部取走,这意味着tcp会向对方通告一个更大的窗口,从而概率上让对方一次能给我发送更多的数据,提高了网络的吞吐量,则提高IO效率。

ET的效率也不是一定比LT高,LT也可以将所有的fd设置成为非阻塞,然后循环读取,通知第一次的时候就全部取走,不就和ET一样了嘛。LT是epoll的默认行为,使用ET能够减少epoll触发的次数,但是代价就是强逼着程序员一次就绪响应就把所有的数据都处理完。所以说,如果LT情况下如果也能做到每次就绪的文件描述符都立即处理,不让这个就绪被重复提示的话,其实性能也是一样的。那为什么LT不代替ET呢?因为程序员不能保证完全的替代,会可能写出bug。

实现一个简易的reactor服务器

我们之前read函数只读一次,因为tcp是面向字节流的,所以不能保证读上来的是一个完整的报文。
在这里插入图片描述
如果读一次没有读完,那需要读第二次,也就是循环读,所以读到的数据需要存到一个缓冲区里。

下面我们就用epoll来实现一个ET模式的reactor服务器

TcpServer.hpp文件

#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>

const int num = 64; //一次性最多捞取上来多少个fd

class Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;

uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);

//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:
    Connection(int fd, TcpServer* tcp_server_ptr)
    :_sockfd(fd), _tcp_server_ptr(tcp_server_ptr)
    {}
    void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
    {
        _recv_cb = recv_cb;
        _send_cb = send_cb;
        _except_cb = except_cb;
    }
private:
    int _sockfd;
    std::string _inbuffer;
    std::string _outbuffer;

    func_t _recv_cb;        //读回调函数
    func_t _send_cb;        //写回调函数
    func_t _except_cb;      //异常回调函数

    std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针
};

class TcpServer : public nocpy
{
public:
    TcpServer(uint16_t port)
    :_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller)
    {}
    void Init()
    {
        _listensock_ptr->CreateSocket();
        _listensock_ptr->Setsockopt();
        _listensock_ptr->Bind(_port);
        _listensock_ptr->SetNonBlock(); //设置非阻塞
        _listensock_ptr->Listen();
        _epoller_ptr->EpollerCreate();
        lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());
    }
    void Dispatcher(int n)
    {
        for (int i = 0; i < n; ++i)
        {

        }
    }
    void Start()
    {
        _epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, _listensock_ptr->Fd(), EVENT_IN);
        while (1)
        {
            int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1
            if (n < 0)
            {
                lg(Error, "epoll wait failed: %s", strerror(errno));
                break;
            }
            else if (n == 0)
            {
                lg(Info, "timeout...");
            }
            else 
            {
                Dispatcher(n);
            }
        }
    }
    ~TcpServer()
    {}
private:
    std::shared_ptr<Socket> _listensock_ptr;
    std::shared_ptr<Epoller> _epoller_ptr;
    struct epoll_event recvs[num];  //捞取就绪事件的数组
    std::unordered_map<int, std::shared_ptr<Connection>> _connections;  //管理所有的连接
    int _port;
};

在这里插入图片描述

更新代码

TcpServer.hpp文件

#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>

const int num = 64; //一次性最多捞取上来多少个fd

class Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;

uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);

//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:
    Connection(int fd, TcpServer* tcp_server_ptr)
    :_sockfd(fd), _tcp_server_ptr(tcp_server_ptr)
    {}
    void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
    {
        _recv_cb = recv_cb;
        _send_cb = send_cb;
        _except_cb = except_cb;
    }

public:
    int _sockfd;
    std::string _inbuffer;
    std::string _outbuffer;

    func_t _recv_cb;        //读回调函数
    func_t _send_cb;        //写回调函数
    func_t _except_cb;      //异常回调函数

    std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针
};

class TcpServer : public nocpy
{
public:
    TcpServer(uint16_t port)
    :_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller)
    {}
    void Init()
    {
        _listensock_ptr->CreateSocket();
        _listensock_ptr->Setsockopt();
        _listensock_ptr->Bind(_port);
        _listensock_ptr->SetNonBlock(); //设置非阻塞
        _listensock_ptr->Listen();
        _epoller_ptr->EpollerCreate();
        lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());

        AddConnection(_listensock_ptr->Fd(), EVENT_IN, nullptr, nullptr, nullptr);
    }
    void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb)
    {
        //建立连接的同时,挂到epoller中
        //1.将listensock添加到Connection中,同时,将listen和Connection放入Connections中
        std::shared_ptr<Connection> conn(new Connection(_listensock_ptr->Fd(), this));
        conn->SetHandler(recv_cb, send_cb, except_cb);
        _connections.insert(make_pair(_listensock_ptr->Fd(), conn));
        //2.添加对应的事件,放入到epoller中
        _epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);
    }
    void Dispatcher(int n)
    {
        //对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦
        for (int i = 0; i < n; ++i)
        {
            uint32_t event = recvs[i].events;
            int fd = recvs[i].data.fd;
            auto pos = _connections.find(fd);
            assert(pos != _connections.end());
            //统一把事件异常转换成为读写问题,这样就只用考虑读和写即可
            if (event & EPOLLERR)
                event |= (EPOLLIN | EPOLLOUT);
            if (event & EPOLLHUP)
                event |= (EPOLLIN | EPOLLOUT);
            if (event & EPOLLIN)
            {
                if (pos->second->_recv_cb)
                    pos->second->_recv_cb(pos->second);
            }
            if (event & EPOLLOUT)
            {
                if (pos->second->_send_cb)
                    pos->second->_send_cb(pos->second);
            }
        }
    }
    void Start()
    {
        while (1)
        {
            int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1
            if (n < 0)
            {
                lg(Error, "epoll wait failed: %s", strerror(errno));
                break;
            }
            else if (n == 0)
            {
                lg(Info, "timeout...");
            }
            else 
            {
                Dispatcher(n);
            }
        }
    }
    ~TcpServer()
    {}
private:
    std::shared_ptr<Socket> _listensock_ptr;
    std::shared_ptr<Epoller> _epoller_ptr;
    struct epoll_event recvs[num];  //捞取就绪事件的数组
    std::unordered_map<int, std::shared_ptr<Connection>> _connections;  //管理所有的连接
    int _port;
};

更新代码

TcpServer.hpp文件

#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>

const int num = 64; //一次性最多捞取上来多少个fd

class Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;

uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);

//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:
    Connection(int fd, TcpServer* tcp_server_ptr)
    :_sockfd(fd), _tcp_server_ptr(tcp_server_ptr)
    {}
    void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
    {
        _recv_cb = recv_cb;
        _send_cb = send_cb;
        _except_cb = except_cb;
    }

public:
    int _sockfd;
    std::string _inbuffer;
    std::string _outbuffer;

    func_t _recv_cb;        //读回调函数
    func_t _send_cb;        //写回调函数
    func_t _except_cb;      //异常回调函数

    std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针
};

class TcpServer : public nocpy
{
public:
    TcpServer(uint16_t port)
    :_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller)
    {}
    void Init()
    {
        _listensock_ptr->CreateSocket();
        _listensock_ptr->Setsockopt();
        _listensock_ptr->Bind(_port);
        _listensock_ptr->SetNonBlock(); //设置非阻塞
        _listensock_ptr->Listen();
        _epoller_ptr->EpollerCreate();
        lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());

        AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);
    }
    void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb)
    {
        //建立连接的同时,挂到epoller中
        //1.将fd添加到Connection中,同时,将fd和Connection放入Connections中
        std::shared_ptr<Connection> conn(new Connection(fd, this));
        conn->SetHandler(recv_cb, send_cb, except_cb);
        _connections.insert(make_pair(fd, conn));
        //2.添加对应的事件,放入到epoller中
        _epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);
    }
    void Acceptor(std::shared_ptr<Connection> conn)
    {
        while (1)
        {
            std::string clientIp;
            uint16_t clientPort;
            Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);
            if (newSocket.Fd() < 0)
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    //如果底层没有数据了
                    break;
                }
                else if (errno == EINTR)
                {
                    //被信号打断了
                    continue;
                }
                lg(Error, "listensock accept failed: %s", strerror(errno));
                break;
            }
            lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);
            newSocket.SetNonBlock();
            AddConnection(newSocket.Fd(), EVENT_IN, nullptr, nullptr, nullptr);
        }
    }
    void Dispatcher(int n)
    {
        //对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦
        for (int i = 0; i < n; ++i)
        {
            uint32_t event = recvs[i].events;
            int fd = recvs[i].data.fd;
            auto pos = _connections.find(fd);
            assert(pos != _connections.end());
            //统一把事件异常转换成为读写问题,这样就只用考虑读和写即可
            if (event & EPOLLERR)
                event |= (EPOLLIN | EPOLLOUT);
            if (event & EPOLLHUP)
                event |= (EPOLLIN | EPOLLOUT);
            if (event & EPOLLIN)
            {
                if (pos->second->_recv_cb)
                    pos->second->_recv_cb(pos->second);
            }
            if (event & EPOLLOUT)
            {
                if (pos->second->_send_cb)
                    pos->second->_send_cb(pos->second);
            }
        }
    }
    void Start()
    {
        while (1)
        {
            int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1
            if (n < 0)
            {
                lg(Error, "epoll wait failed: %s", strerror(errno));
                break;
            }
            else if (n == 0)
            {
                lg(Info, "timeout...");
            }
            else 
            {
                Dispatcher(n);
            }
        }
    }
    ~TcpServer()
    {}
private:
    std::shared_ptr<Socket> _listensock_ptr;
    std::shared_ptr<Epoller> _epoller_ptr;
    struct epoll_event recvs[num];  //捞取就绪事件的数组
    std::unordered_map<int, std::shared_ptr<Connection>> _connections;  //管理所有的连接
    int _port;
};

用三个客户端连接,代码运行结果
在这里插入图片描述

更新代码

TcpServer.hpp文件

#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>

const int num = 64; //一次性最多捞取上来多少个fd

class Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;

uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);

//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:
    Connection(int fd, TcpServer* tcp_server_ptr)
    :_sockfd(fd), _tcp_server_ptr(tcp_server_ptr)
    {}
    void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
    {
        _recv_cb = recv_cb;
        _send_cb = send_cb;
        _except_cb = except_cb;
    }
public:
    int _sockfd;
    std::string _inbuffer;
    std::string _outbuffer;

    func_t _recv_cb;        //读回调函数
    func_t _send_cb;        //写回调函数
    func_t _except_cb;      //异常回调函数

    std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针

    std::string _ip;
    uint16_t _port;
};

class TcpServer : public nocpy
{
public:
    TcpServer(uint16_t port, func_t OnMessage)
    :_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller), _OnMessage(OnMessage)
    {}
    void Init()
    {
        _listensock_ptr->CreateSocket();
        _listensock_ptr->Setsockopt();
        _listensock_ptr->Bind(_port);
        _listensock_ptr->SetNonBlock(); //设置非阻塞
        _listensock_ptr->Listen();
        _epoller_ptr->EpollerCreate();
        lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());

        AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);
    }
    void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb)
    {
        //建立连接的同时,挂到epoller中
        //1.将fd添加到Connection中,同时,将fd和Connection放入Connections中
        std::shared_ptr<Connection> conn(new Connection(fd, this));
        conn->SetHandler(recv_cb, send_cb, except_cb);
        _connections.insert(make_pair(fd, conn));
        //2.添加对应的事件,放入到epoller中
        _epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);
    }
    void Excepter(std::shared_ptr<Connection> conn) //异常事件触发了,转为读事件,就会调用读函数,读的时候肯定会异常,直接调用异常函数即可
    {
        lg(Warning, "Excepter hander sockefd: %d, client info: %s:%d", conn->_sockfd, conn->_ip.c_str(), conn->_port);
        auto it = _connections.find(conn->_sockfd);
        assert (it != _connections.end());
        _connections.erase(it);     //因为此时connection被_connections管理着的,一旦删了it,则connection也会被释放
        _epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, conn->_sockfd, 0);
        close(conn->_sockfd);
    }
    void Recver(std::shared_ptr<Connection> conn)
    {
        int fd = conn->_sockfd;
        char buffer[4096];
        while (1)
        {
            int n = recv(fd, buffer, sizeof(buffer) - 1, 0);//虽然这里的最后一个参数时阻塞读取,但是之前设置了非阻塞,所以还是非阻塞读取
            if (n < 0)
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    //如果底层没有数据了
                    break;
                }
                else if (errno == EINTR)
                {
                    //被信号打断了
                    continue;
                }
                lg(Error, "read failed: %s", strerror(errno));
                conn->_except_cb(conn);
                return;
            }
            else if (n == 0)
            {
                lg(Info, "server closed...");
                conn->_except_cb(conn);
                return;
            }
            buffer[n] = 0;
            conn->_inbuffer += buffer;  //因为read读上来的数据可能不完整,所以读后要放到_inbuffer来统一处理
        }
        _OnMessage(conn);   //统一处理:1.数据有了,但是不一定全,所以要检测数据是否全。2.如果检测到完整数据,就处理
    }
    void Acceptor(std::shared_ptr<Connection> conn)
    {
        while (1)
        {
            std::string clientIp;
            uint16_t clientPort;
            Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);
            if (newSocket.Fd() < 0)
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    //如果底层没有数据了
                    break;
                }
                else if (errno == EINTR)
                {
                    //被信号打断了
                    continue;
                }
                lg(Error, "listensock accept failed: %s", strerror(errno));
                break;
            }
            lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);
            newSocket.SetNonBlock();
            AddConnection(newSocket.Fd(), EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1),\
             nullptr, std::bind(&TcpServer::Excepter, this, std::placeholders::_1));
        }
    }
    void Dispatcher(int n)
    {
        //对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦
        for (int i = 0; i < n; ++i)
        {
            uint32_t event = recvs[i].events;
            int fd = recvs[i].data.fd;
            auto pos = _connections.find(fd);
            assert(pos != _connections.end());
            //统一把事件异常转换成为读写问题,这样就只用考虑读和写即可
            if (event & EPOLLERR)
                event |= (EPOLLIN | EPOLLOUT);
            if (event & EPOLLHUP)
                event |= (EPOLLIN | EPOLLOUT);
            if (event & EPOLLIN)
            {
                if (pos->second->_recv_cb)
                    pos->second->_recv_cb(pos->second);
            }
            if (event & EPOLLOUT)
            {
                if (pos->second->_send_cb)
                    pos->second->_send_cb(pos->second);
            }
        }
    }
    void Start()
    {
        while (1)
        {
            int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1
            if (n < 0)
            {
                lg(Error, "epoll wait failed: %s", strerror(errno));
                break;
            }
            else if (n == 0)
            {
                lg(Info, "timeout...");
            }
            else 
            {
                Dispatcher(n);
            }
        }
    }
    ~TcpServer()
    {}
private:
    std::shared_ptr<Socket> _listensock_ptr;
    std::shared_ptr<Epoller> _epoller_ptr;
    struct epoll_event recvs[num];  //捞取就绪事件的数组
    std::unordered_map<int, std::shared_ptr<Connection>> _connections;  //管理所有的连接
    int _port;

    func_t _OnMessage;  //让上层处理信息
};

main.cc文件

#include "TcpServer.hpp"


void DefaultOnMessage(std::shared_ptr<Connection> conn)
{
    std::cout << "sock: " << conn->_sockfd <<"的缓冲区里的数据为:" << conn->_inbuffer << std::endl;
}

int main()
{
    std::unique_ptr<TcpServer> svr(new TcpServer(8080, DefaultOnMessage));
    svr->Init();
    svr->Start();
    return 0;
}

代码运行结果如下
请添加图片描述
但是发现了一个问题:类似于循环引用,异常事件关闭连接时,导致程序出错。
在这里插入图片描述

更新代码

#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>

const int num = 64; //一次性最多捞取上来多少个fd

class Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;

uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);

//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:
    Connection(int fd)
    :_sockfd(fd)
    {}
    void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
    {
        _recv_cb = recv_cb;
        _send_cb = send_cb;
        _except_cb = except_cb;
    }
public:
    int _sockfd;
    std::string _inbuffer;
    std::string _outbuffer;

    func_t _recv_cb;        //读回调函数
    func_t _send_cb;        //写回调函数
    func_t _except_cb;      //异常回调函数

    std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针

    std::string _ip;
    uint16_t _port;
};

class TcpServer : public nocpy, public std::enable_shared_from_this<TcpServer>  //继承此的作用仅是为了能获取到this对应的shared_ptr
{
public:
    TcpServer(uint16_t port, func_t OnMessage)
    :_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller), _OnMessage(OnMessage)
    {}
    void Init()
    {
        _listensock_ptr->CreateSocket();
        _listensock_ptr->Setsockopt();
        _listensock_ptr->Bind(_port);
        _listensock_ptr->SetNonBlock(); //设置非阻塞
        _listensock_ptr->Listen();
        _epoller_ptr->EpollerCreate();
        lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());

        AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);
    }
    void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb)
    {
        //建立连接的同时,挂到epoller中
        //1.将fd添加到Connection中,同时,将fd和Connection放入Connections中
        std::shared_ptr<Connection> conn(new Connection(fd));
        conn->SetHandler(recv_cb, send_cb, except_cb);
        _connections.insert(make_pair(fd, conn));
        if (fd == _listensock_ptr->Fd())
            conn->_tcp_server_ptr = shared_ptr<TcpServer>(this);    //如果是listensock连接,就构造智能指针来管理TcpServer
        else
            conn->_tcp_server_ptr = shared_from_this();     //如果是其他连接,就共享该资源来管理
        //2.添加对应的事件,放入到epoller中
        _epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);
    }
    void Excepter(std::shared_ptr<Connection> conn) //异常事件触发了,转为读事件,就会调用读函数,读的时候肯定会异常,直接调用异常函数即可
    {
        lg(Warning, "Excepter hander sockefd: %d, client info: %s:%d", conn->_sockfd, conn->_ip.c_str(), conn->_port);
        auto it = _connections.find(conn->_sockfd);
        assert (it != _connections.end());
        _connections.erase(it);     //因为此时connection被_connections管理着的,一旦删了it,则connection也会被释放
        _epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, conn->_sockfd, 0);
        close(conn->_sockfd);
    }
    void Recver(std::shared_ptr<Connection> conn)
    {
        int fd = conn->_sockfd;
        char buffer[4096];
        while (1)
        {
            int n = recv(fd, buffer, sizeof(buffer) - 1, 0);//虽然这里的最后一个参数时阻塞读取,但是之前设置了非阻塞,所以还是非阻塞读取
            if (n < 0)
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    //如果底层没有数据了
                    break;
                }
                else if (errno == EINTR)
                {
                    //被信号打断了
                    continue;
                }
                lg(Error, "read failed: %s", strerror(errno));
                conn->_except_cb(conn);
                return;
            }
            else if (n == 0)
            {
                lg(Info, "server closed...");
                conn->_except_cb(conn);
                return;
            }
            buffer[n] = 0;
            conn->_inbuffer += buffer;  //因为read读上来的数据可能不完整,所以读后要放到_inbuffer来统一处理
        }
        _OnMessage(conn);   //统一处理:1.数据有了,但是不一定全,所以要检测数据是否全。2.如果检测到完整数据,就处理
    }
    void Acceptor(std::shared_ptr<Connection> conn)
    {
        while (1)
        {
            std::string clientIp;
            uint16_t clientPort;
            Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);
            if (newSocket.Fd() < 0)
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    //如果底层没有数据了
                    break;
                }
                else if (errno == EINTR)
                {
                    //被信号打断了
                    continue;
                }
                lg(Error, "listensock accept failed: %s", strerror(errno));
                break;
            }
            lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);
            newSocket.SetNonBlock();
            AddConnection(newSocket.Fd(), EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1),\
             nullptr, std::bind(&TcpServer::Excepter, this, std::placeholders::_1));
        }
    }
    void Dispatcher(int n)
    {
        //对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦
        for (int i = 0; i < n; ++i)
        {
            uint32_t event = recvs[i].events;
            int fd = recvs[i].data.fd;
            auto pos = _connections.find(fd);
            assert(pos != _connections.end());
            //统一把事件异常转换成为读写问题,这样就只用考虑读和写即可
            if (event & EPOLLERR)
                event |= (EPOLLIN | EPOLLOUT);
            if (event & EPOLLHUP)
                event |= (EPOLLIN | EPOLLOUT);
            if (event & EPOLLIN)
            {
                if (pos->second->_recv_cb)
                    pos->second->_recv_cb(pos->second);
            }
            if (event & EPOLLOUT)
            {
                if (pos->second->_send_cb)
                    pos->second->_send_cb(pos->second);
            }
        }
    }
    void Start()
    {
        while (1)
        {
            int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1
            if (n < 0)
            {
                lg(Error, "epoll wait failed: %s", strerror(errno));
                break;
            }
            else if (n == 0)
            {
                lg(Info, "timeout...");
            }
            else 
            {
                Dispatcher(n);
            }
        }
    }
    ~TcpServer()
    {}
private:
    std::shared_ptr<Socket> _listensock_ptr;
    std::shared_ptr<Epoller> _epoller_ptr;
    struct epoll_event recvs[num];  //捞取就绪事件的数组
    std::unordered_map<int, std::shared_ptr<Connection>> _connections;  //管理所有的连接
    int _port;

    func_t _OnMessage;  //让上层处理信息
};

运行结果如下:
请添加图片描述

每一个socket都配套有一个读写缓冲区,为什么要有读缓冲区?因为你不能保证一次read上来的数据就是一个完整的数据。为什么要有写缓冲区?因为你不能保证内核缓冲区是有空间的,也就是你无法保证发送条件是否就绪。
异常和读事件我们都处理了,如何处理写事件呢?
通常情况下,发送缓冲区一般都是有空间的,写事件一般都是就绪的,如果我们设置对EPOLLOUT关心,那EPOLLOUT几乎每次都是就绪,会导致epoll_wait经常返回,浪费CPU资源。
结论:对于读事件,设置常关心,对于写事件,按需设置。什么是按需设置?直接写入,如果写入完成就结束。如果没有将这一轮的数据写完,outbuffer里还有数据,我们就需要对写事件进行关心了,如果写完了,就去掉对写事件的关心。

更新代码

TcpServer.hpp文件

#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>

const int num = 64; //一次性最多捞取上来多少个fd

class Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;

uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);

//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:
    Connection(int fd)
    :_sockfd(fd)
    {}
    void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
    {
        _recv_cb = recv_cb;
        _send_cb = send_cb;
        _except_cb = except_cb;
    }
public:
    int _sockfd;
    std::string _inbuffer;
    std::string _outbuffer;

    func_t _recv_cb;        //读回调函数
    func_t _send_cb;        //写回调函数
    func_t _except_cb;      //异常回调函数

    std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针

    std::string _ip;
    uint16_t _port;
};

class TcpServer : public nocpy, public std::enable_shared_from_this<TcpServer>  //继承此的作用仅是为了能获取到this对应的shared_ptr
{
public:
    TcpServer(uint16_t port, func_t OnMessage)
    :_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller), _OnMessage(OnMessage)
    {}
    void Init()
    {
        _listensock_ptr->CreateSocket();
        _listensock_ptr->Setsockopt();
        _listensock_ptr->Bind(_port);
        _listensock_ptr->SetNonBlock(); //设置非阻塞
        _listensock_ptr->Listen();
        _epoller_ptr->EpollerCreate();
        lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());

        AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);
    }
    void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb)
    {
        //建立连接的同时,挂到epoller中
        //1.将fd添加到Connection中,同时,将fd和Connection放入Connections中
        std::shared_ptr<Connection> conn(new Connection(fd));
        conn->SetHandler(recv_cb, send_cb, except_cb);
        _connections.insert(make_pair(fd, conn));
        if (fd == _listensock_ptr->Fd())    //如果是listensock连接,就构造智能指针来管理TcpServer
            conn->_tcp_server_ptr = shared_ptr<TcpServer>(this);
        else
            conn->_tcp_server_ptr = shared_from_this();     //如果是其他连接,就共享该资源来管理
        //2.添加对应的事件,放入到epoller中
        _epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);
    }
    void Sender(std::shared_ptr<Connection> conn)
    {
        while (1)
        {
            int n = write(conn->_sockfd, conn->_outbuffer.c_str(), conn->_outbuffer.size());
            if (n > 0)
            {
                conn->_outbuffer.erase(0, n);
                if (conn->_outbuffer.empty() == true)
                {
                    return;
                }
            }
            else if (n == 0)
            {
                return;
            }
            else
            {
                if (errno == EWOULDBLOCK || errno == EAGAIN)
                {
                    //说明底层数据不就绪,即内核写缓冲区没有空间了
                    break;
                }
                if (errno == EINTR)
                {
                    continue;
                }
                lg(Error, "write failed, socket: %d", conn->_sockfd);
                conn->_except_cb(conn);
                return;
            }
        }
        if (!conn->_outbuffer.empty())
        {
            //说明没有将数据发完,则需要设置写事件关心了
            uint32_t event = EVENT_OUT | EVENT_IN;
            _epoller_ptr->EpollerUpate(EPOLL_CTL_MOD, conn->_sockfd, event);
        }
        else 
        {
            //说明将数据发完了,则取消对写事件关心了
            uint32_t event = EVENT_IN;
            _epoller_ptr->EpollerUpate(EPOLL_CTL_MOD, conn->_sockfd, event);
        }
    }
    void Excepter(std::shared_ptr<Connection> conn) //异常事件触发了,转为读事件,就会调用读函数,读的时候肯定会异常,直接调用异常函数即可
    {
        lg(Warning, "Excepter hander sockefd: %d, client info: %s:%d", conn->_sockfd, conn->_ip.c_str(), conn->_port);
        auto it = _connections.find(conn->_sockfd);
        assert (it != _connections.end());
        _connections.erase(it);     //因为此时connection被_connections管理着的,一旦删了it,则connection也会被释放
        _epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, conn->_sockfd, 0);
        close(conn->_sockfd);
    }
    void Recver(std::shared_ptr<Connection> conn)
    {
        int fd = conn->_sockfd;
        char buffer[4096];
        while (1)
        {
            int n = recv(fd, buffer, sizeof(buffer) - 1, 0);//虽然这里的最后一个参数时阻塞读取,但是之前设置了非阻塞,所以还是非阻塞读取
            if (n < 0)
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    //如果底层没有数据了
                    break;
                }
                else if (errno == EINTR)
                {
                    //被信号打断了
                    continue;
                }
                lg(Error, "read failed: %s", strerror(errno));
                conn->_except_cb(conn);
                return;
            }
            else if (n == 0)
            {
                lg(Info, "server closed...");
                conn->_except_cb(conn);
                return;
            }
            buffer[n] = 0;
            conn->_inbuffer += buffer;  //因为read读上来的数据可能不完整,所以读后要放到_inbuffer来统一处理
        }
        _OnMessage(conn);   //统一处理:1.数据有了,但是不一定全,所以要检测数据是否全。2.如果检测到完整数据,就处理
    }
    void Acceptor(std::shared_ptr<Connection> conn)
    {
        while (1)
        {
            std::string clientIp;
            uint16_t clientPort;
            Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);
            if (newSocket.Fd() < 0)
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    //如果底层没有数据了
                    break;
                }
                else if (errno == EINTR)
                {
                    //被信号打断了
                    continue;
                }
                lg(Error, "listensock accept failed: %s", strerror(errno));
                break;
            }
            lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);
            newSocket.SetNonBlock();
            AddConnection(newSocket.Fd(), EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1),\
              std::bind(&TcpServer::Sender, this, std::placeholders::_1),\
              std::bind(&TcpServer::Excepter, this, std::placeholders::_1));
        }
    }
    void Dispatcher(int n)
    {
        //对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦
        for (int i = 0; i < n; ++i)
        {
            uint32_t event = recvs[i].events;
            int fd = recvs[i].data.fd;
            auto pos = _connections.find(fd);
            assert(pos != _connections.end());
            //统一把事件异常转换成为读写问题,这样就只用考虑读和写即可
            if (event & EPOLLERR)
                event |= (EPOLLIN | EPOLLOUT);
            if (event & EPOLLHUP)
                event |= (EPOLLIN | EPOLLOUT);
            if (event & EPOLLIN)
            {
                if (pos->second->_recv_cb)
                    pos->second->_recv_cb(pos->second);
            }
            if (event & EPOLLOUT)
            {
                if (pos->second->_send_cb)
                    pos->second->_send_cb(pos->second);
            }
        }
    }
    void Start()
    {
        while (1)
        {
            int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1
            if (n < 0)
            {
                lg(Error, "epoll wait failed: %s", strerror(errno));
                break;
            }
            else if (n == 0)
            {
                lg(Info, "timeout...");
            }
            else 
            {
                Dispatcher(n);
            }
        }
    }
    ~TcpServer()
    {}
private:
    std::shared_ptr<Socket> _listensock_ptr;
    std::shared_ptr<Epoller> _epoller_ptr;
    struct epoll_event recvs[num];  //捞取就绪事件的数组
    std::unordered_map<int, std::shared_ptr<Connection>> _connections;  //管理所有的连接
    int _port;

    func_t _OnMessage;  //让上层处理信息
};

main.cc文件

#include "TcpServer.hpp"


std::string Handle(std::string& inbuffer)   //业务处理
{
    std::string tmp = inbuffer;
    inbuffer.clear();
    return tmp;
}
void DefaultOnMessage(std::shared_ptr<Connection> conn)
{
    string response_str = Handle(conn->_inbuffer);
    if (response_str.empty()) return;
    conn->_outbuffer += response_str;
    if (conn->_send_cb)
        conn->_send_cb(conn);
}

int main()
{
    std::unique_ptr<TcpServer> svr(new TcpServer(8080, DefaultOnMessage));
    svr->Init();
    svr->Start();
    return 0;
}

代码运行结果如下:
请添加图片描述

套用之前写的网络版本计算器业务逻辑 - 文末附完整代码

代码运行结果
请添加图片描述

reactor模型是半同步半异步的模型
半同步体现的是等:由epoll来做的:保证了就绪事件的通知和进行IO
半异步的体现:负责了业务处理,如果把业务处理放到线程池中去处理,就是半异步了。因为我们的业务处理并不耗时,所以就没有开多线程

还有一种模式叫做Proactor,纯异步的编写方式,在linux服务器设计里面没有,我们就不涉及了。
reactor也叫做反应堆,是什么意思呢?如同打地鼠的游戏
在这里插入图片描述
玩游戏的人就相当于是一个多路转接,我们要检测每一个洞口有没有地鼠出来,虽然没出来,但是我们知道一旦出来了我就要执行我的回调方法(来砸它) – 回调函数是提前设置好的。
游戏的面板就相当于我们的reactor,每一个洞就相当于Connection,老鼠上来了就叫做事件就绪,执行砸方法就是执行回调函数。这种就叫做反应堆

redis底层用的就是单reactor,处理用的是reactor的LT模式

完整版代码如下:
文件目录
在这里插入图片描述

Epoller.hpp文件

#include <sys/epoll.h>
#include "Log.hpp"
#include <cstring>

class Epoller
{
public:
    Epoller()
    {}
    void EpollerCreate()
    {
        _epollfd = epoll_create(64);
        if (_epollfd < 0)
        {
           lg(Fatal, "Create epoll failed: %s", strerror(errno));
           abort();
        }
    }
    void EpollerUpate(int oper, int fd, uint32_t event)
    {
        int n = 0;
        if (oper == EPOLL_CTL_ADD || oper == EPOLL_CTL_MOD)
        {
            struct epoll_event epEvent;
            epEvent.data.fd = fd;
            epEvent.events = 0;
            epEvent.events |= event;
            n = epoll_ctl(_epollfd, oper, fd, &epEvent);
            if (n < 0)
            {
                lg(Error, "epoll ctl failed: %s", strerror(errno));
            }
        }
        else if (oper == EPOLL_CTL_DEL)
        {
            n = epoll_ctl(_epollfd, oper, fd, 0);
            if (n < 0)
            {
                lg(Error, "epoll ctl failed: %s", strerror(errno));
            }
        }
    }
    int Wait(struct epoll_event* events, int n, int timeout)
    {
        int m = epoll_wait(_epollfd, events, n, timeout);
        if (m < 0)
        {
            lg(Error, "epoll wait failed: %s", strerror(errno));
        }
        else if (m == 0)
        {
            //超时返回(非阻塞返回)
            lg(Info, "timeout...");
        }
        return m;
    }
    int Fd()
    {
        return _epollfd;
    }
private:
    int _epollfd;
};

Log.hpp文件

#pragma once
#include <iostream>
#include <string>
#include <stdarg.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>

using namespace std;

#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4

#define Screen 1
#define Onefile 2
#define Classfile 3

#define fileName "log.txt"
//使用前需要创建log目录
class Log
{
public:
    Log()
    {
        printMethod = Screen;
        path = "./log/";
    }
    void Enable(int method)
    {
        printMethod = method;
    }
    void printOneFile(string logname, const string& logtxt)
    {
        logname = path + logname;
        int fd = open(logname.c_str(), O_WRONLY | O_APPEND | O_CREAT, 0666);//open只会创建文件不会创建目录
        if (fd < 0)
        {
            perror("open failed");
            return;
        }
        write(fd, logtxt.c_str(), logtxt.size());
        close(fd);
    }
    void printClassFile(int level, const string& logtxt)
    {
        string filename = fileName;
        filename += ".";
        filename += leveltoString(level);
        printOneFile(filename, logtxt);
    }
    void printLog(int level, const string& logtxt)
    {
        if (printMethod == Screen)
        {
            cout << logtxt << endl;
            return;
        }
        else if (printMethod == Onefile)
        {
            printOneFile(fileName, logtxt);
            return;
        }
        else if (printMethod == Classfile)
        {
            printClassFile(level, logtxt);
            return;
        }
    }
    const char* leveltoString(int level)
    {
        if (level == Info) return "Info";
        else if (level == Debug) return "Debug";
        else if (level == Error) return "Error";
        else if (level == Fatal) return "Fatal";
        else return "default";
    }
    void operator()(int level, const char* format, ...)
    {
        time_t t = time(nullptr);
        struct tm* st = localtime(&t);

        char leftbuffer[4096];
        snprintf(leftbuffer, sizeof(leftbuffer), "year: %d, month: %d, day: %d, hour: %d, minute: %d, second: %d\n\
        [%s]:",
        st->tm_year + 1900, st->tm_mon + 1, st->tm_mday, st->tm_hour, st->tm_min, st->tm_sec, leveltoString(level));
        
        char rightbuffer[4096];
        va_list start;
        va_start(start, format);
        vsnprintf(rightbuffer, sizeof(rightbuffer), format, start);
        va_end(start);
        char logtxt[4096 * 2];
        snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);
        printLog(level, logtxt);
    }
private:
    int printMethod;
    string path;//路径与文件名解耦,最后将路径和文件粘合起来,再用open打开即可
};

nocpy.hpp文件

#pragma once
//编程技巧:写防拷贝的类的时候直接继承,就不用自己再手动写了
class nocpy
{
public:
    nocpy(){}
    ~nocpy(){}
private:
    nocpy(const nocpy&) = delete;
    nocpy& operator=(const nocpy&) = delete;
};

Socket.hpp文件

#pragma once
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <cstring>
#include "Log.hpp"

Log lg;
class Socket
{
public:
    Socket(int fd = -1)
    :_sockfd(fd)
    {}
    ~Socket()
    {}
    void CreateSocket()
    {
        _sockfd = socket(AF_INET, SOCK_STREAM, 0);
        if (_sockfd < 0)
        {
            lg(Fatal, "create socket failed:%s", strerror(errno));
            exit(1);
        }
    }
    void Bind(uint16_t serverPort)
    {
        struct sockaddr_in server;
        server.sin_family = AF_INET;
        server.sin_port = htons(serverPort);
        server.sin_addr.s_addr = INADDR_ANY;
        int n = bind(_sockfd, (struct sockaddr*)&server, sizeof(server));
        if (n < 0)
        {
            lg(Fatal, "bind socket failed:%s", strerror(errno));
            exit(2);
        }
    }
    void Listen()
    {
        int n = listen(_sockfd, 5);
        if (n < 0)
        {
            lg(Fatal, "listen socket failed:%s", strerror(errno));
            exit(3);
        }
    }
    int Accept(string* clientIp, uint16_t* clinetPort)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        int newsocket = accept(_sockfd, (struct sockaddr*)&peer, &len);
        if (newsocket < 0)
        {
            return -1;
        }
        *clientIp = inet_ntoa(peer.sin_addr);
        *clinetPort = ntohs(peer.sin_port);
        return newsocket;
    }
    int Connect(const string& serverIp, uint16_t serverPort)
    {
        struct sockaddr_in server;
        server.sin_family = AF_INET;
        server.sin_addr.s_addr = inet_addr(serverIp.c_str());
        server.sin_port = htons(serverPort);
        int n = connect(_sockfd, (struct sockaddr*)&server, sizeof(server));
        if (n < 0)
        {
            return -1;
        }
        return 0;
    }
    void Setsockopt()
    {
        int opt = 1;
        if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (const void*)&opt, sizeof(opt)) < 0)
            lg(Error, "%s\n", strerror(errno));
    }
    void SetNonBlock()
    {
        int flag = fcntl(_sockfd, F_GETFL);
        fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
    }
    void Close()
    {
        close(_sockfd);
    }
    int Fd()
    {
        return _sockfd;
    }
private:
    int _sockfd;
};

TcpServer.hpp文件如下

#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>


const int num = 64; //一次性最多捞取上来多少个fd

class Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;

uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);

//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:
    Connection(int fd)
    :_sockfd(fd)
    {}
    void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
    {
        _recv_cb = recv_cb;
        _send_cb = send_cb;
        _except_cb = except_cb;
    }
public:
    int _sockfd;
    std::string _inbuffer;
    std::string _outbuffer;

    func_t _recv_cb;        //读回调函数
    func_t _send_cb;        //写回调函数
    func_t _except_cb;      //异常回调函数

    std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针

    std::string _ip;
    uint16_t _port;
};

class TcpServer : public nocpy, public std::enable_shared_from_this<TcpServer>  //继承此的作用仅是为了能获取到this对应的shared_ptr
{
public:
    TcpServer(uint16_t port, func_t OnMessage)
    :_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller), _OnMessage(OnMessage)
    {}
    void Init()
    {
        _listensock_ptr->CreateSocket();
        _listensock_ptr->Setsockopt();
        _listensock_ptr->Bind(_port);
        _listensock_ptr->SetNonBlock(); //设置非阻塞
        _listensock_ptr->Listen();
        _epoller_ptr->EpollerCreate();
        lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());

        AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);
    }
    void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb)
    {
        //建立连接的同时,挂到epoller中
        //1.将fd添加到Connection中,同时,将fd和Connection放入Connections中
        std::shared_ptr<Connection> conn(new Connection(fd));
        conn->SetHandler(recv_cb, send_cb, except_cb);
        _connections.insert(make_pair(fd, conn));
        if (fd == _listensock_ptr->Fd())    //如果是listensock连接,就构造智能指针来管理TcpServer
            conn->_tcp_server_ptr = shared_ptr<TcpServer>(this);
        else
            conn->_tcp_server_ptr = shared_from_this();     //如果是其他连接,就共享该资源来管理
        //2.添加对应的事件,放入到epoller中
        _epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);
    }
    void Sender(std::shared_ptr<Connection> conn)
    {
        while (1)
        {
            int n = write(conn->_sockfd, conn->_outbuffer.c_str(), conn->_outbuffer.size());
            if (n > 0)
            {
                conn->_outbuffer.erase(0, n);
                if (conn->_outbuffer.empty() == true)
                {
                    return;
                }
            }
            else if (n == 0)
            {
                return;
            }
            else
            {
                if (errno == EWOULDBLOCK || errno == EAGAIN)
                {
                    //说明底层数据不就绪,即内核写缓冲区没有空间了
                    break;
                }
                if (errno == EINTR)
                {
                    continue;
                }
                lg(Error, "write failed, socket: %d", conn->_sockfd);
                conn->_except_cb(conn);
                return;
            }
        }
        if (!conn->_outbuffer.empty())
        {
            //说明没有将数据发完,则需要设置写事件关心了
            uint32_t event = EVENT_OUT | EVENT_IN;
            _epoller_ptr->EpollerUpate(EPOLL_CTL_MOD, conn->_sockfd, event);
        }
        else 
        {
            //说明将数据发完了,则取消对写事件关心了
            uint32_t event = EVENT_IN;
            _epoller_ptr->EpollerUpate(EPOLL_CTL_MOD, conn->_sockfd, event);
        }
    }
    void Excepter(std::shared_ptr<Connection> conn) //异常事件触发了,转为读事件,就会调用读函数,读的时候肯定会异常,直接调用异常函数即可
    {
        lg(Warning, "Excepter hander sockefd: %d, client info: %s:%d", conn->_sockfd, conn->_ip.c_str(), conn->_port);
        auto it = _connections.find(conn->_sockfd);
        assert (it != _connections.end());
        _connections.erase(it);     //因为此时connection被_connections管理着的,一旦删了it,则connection也会被释放
        _epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, conn->_sockfd, 0);
        close(conn->_sockfd);
    }
    void Recver(std::shared_ptr<Connection> conn)
    {
        int fd = conn->_sockfd;
        char buffer[4096];
        while (1)
        {
            int n = recv(fd, buffer, sizeof(buffer) - 1, 0);//虽然这里的最后一个参数时阻塞读取,但是之前设置了非阻塞,所以还是非阻塞读取
            if (n < 0)
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    //如果底层没有数据了
                    break;
                }
                else if (errno == EINTR)
                {
                    //被信号打断了
                    continue;
                }
                lg(Error, "read failed: %s", strerror(errno));
                conn->_except_cb(conn);
                return;
            }
            else if (n == 0)
            {
                lg(Info, "server closed...");
                conn->_except_cb(conn);
                return;
            }
            buffer[n] = 0;
            conn->_inbuffer += buffer;  //因为read读上来的数据可能不完整,所以读后要放到_inbuffer来统一处理
        }
        _OnMessage(conn);   //统一处理:1.数据有了,但是不一定全,所以要检测数据是否全。2.如果检测到完整数据,就处理
    }
    void Acceptor(std::shared_ptr<Connection> conn)
    {
        while (1)
        {
            std::string clientIp;
            uint16_t clientPort;
            Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);
            if (newSocket.Fd() < 0)
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    //如果底层没有数据了
                    break;
                }
                else if (errno == EINTR)
                {
                    //被信号打断了
                    continue;
                }
                lg(Error, "listensock accept failed: %s", strerror(errno));
                break;
            }
            lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);
            newSocket.SetNonBlock();
            AddConnection(newSocket.Fd(), EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1),\
              std::bind(&TcpServer::Sender, this, std::placeholders::_1),\
              std::bind(&TcpServer::Excepter, this, std::placeholders::_1));
        }
    }
    void Dispatcher(int n)
    {
        //对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦
        for (int i = 0; i < n; ++i)
        {
            uint32_t event = recvs[i].events;
            int fd = recvs[i].data.fd;
            auto pos = _connections.find(fd);
            assert(pos != _connections.end());
            //统一把事件异常转换成为读写问题,这样就只用考虑读和写即可
            if (event & EPOLLERR)
                event |= (EPOLLIN | EPOLLOUT);
            if (event & EPOLLHUP)
                event |= (EPOLLIN | EPOLLOUT);
            if (event & EPOLLIN)
            {
                if (pos->second->_recv_cb)
                    pos->second->_recv_cb(pos->second);
            }
            if (event & EPOLLOUT)
            {
                if (pos->second->_send_cb)
                    pos->second->_send_cb(pos->second);
            }
        }
    }
    void Start()
    {
        while (1)
        {
            int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1
            if (n < 0)
            {
                lg(Error, "epoll wait failed: %s", strerror(errno));
                break;
            }
            else if (n == 0)
            {
                lg(Info, "timeout...");
            }
            else 
            {
                Dispatcher(n);
            }
        }
    }
    ~TcpServer()
    {}
private:
    std::shared_ptr<Socket> _listensock_ptr;
    std::shared_ptr<Epoller> _epoller_ptr;
    struct epoll_event recvs[num];  //捞取就绪事件的数组
    std::unordered_map<int, std::shared_ptr<Connection>> _connections;  //管理所有的连接
    int _port;

    func_t _OnMessage;  //让上层处理信息
};

main.cc文件

#include "TcpServer.hpp"
#include "ServerCal.hpp"


void DefaultOnMessage(std::shared_ptr<Connection> conn)
{
    ServerCal calculator;
    std::string response_str = calculator.Calculator(conn->_inbuffer);
    if (response_str.empty()) return;
    conn->_outbuffer += response_str;
    if (conn->_send_cb)
        conn->_send_cb(conn);
}

int main()
{
    std::unique_ptr<TcpServer> svr(new TcpServer(8080, DefaultOnMessage));
    svr->Init();
    svr->Start();
    return 0;
}

Protocol.hpp文件

#pragma once
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>

using namespace std;

const string blank_space_sep = " ";
const string protocol_sep = "\n";
//添加报头:"内容" 转为 "长度\n内容\n"
bool Encode(string *package, const string& content)
{
    package->clear();
    *package += to_string(content.size());
    *package += protocol_sep;
    *package += content;
    *package += protocol_sep;
    return true;
}
//去除报头:"长度\n内容\n" 转为 "内容"  -- 可能存在本来是5\n1 + 1\n  但接受到的是5\n1 +
bool Decode(string &package, string *content)
{
    content->clear();
    int pos = package.find(protocol_sep);
    if (pos == string::npos) return false;
    int len = stoi(package.substr(0, pos));
    int totalLen = pos + len + 2;
    if (package.size() < totalLen) return false;//如果报文不完整,说明
    *content = package.substr(pos + protocol_sep.size(), len);

    package.erase(0, totalLen);

    return true;
}

//提取字符串
bool Extract(const string& in, int& x, int& y, char& oper)
{
    int pos = in.find(blank_space_sep);
    if (pos == string::npos) return false;

    x = stoi(in.substr(0, pos));

    oper = *in.substr(pos + blank_space_sep.size(), 1).c_str();

    int pos2 = in.rfind(blank_space_sep);
    if (pos2 == string::npos) return false;

    y = stoi(in.substr(pos2 + blank_space_sep.size()));
    return true;
}

//请求协议
class Request
{
public:
    Request(int x = 0, int y = 0, char oper = '+')
    :_x(x), _y(y), _oper(oper)
    {}
    //序列化 -- 将结构体转为"_x + _y"
    bool Serialize(string* out)
    {
#ifdef MySelf
        out->clear();
        *out += to_string(_x);
        *out += blank_space_sep;
        *out += _oper;
        *out += blank_space_sep;
        *out += to_string(_y);
        return true;
#else
        Json::Value root;
        root["x"] = _x;
        root["y"] = _y;
        root["oper"] = _oper;
        Json::StyledWriter w;
        *out = w.write(root);
        return true;
#endif
    }
    //反序列化 -- 将"_x + _y"转为结构体
    bool Deserialize(const string& in)
    {
#ifdef MySelf
        return Extract(in, _x, _y, _oper);
#else
        Json::Reader r;
        Json::Value root;
        r.parse(in, root);
        _x = root["x"].asInt();
        _y = root["y"].asInt();
        _oper = root["oper"].asInt();
        return true;
#endif
    }
public:
    int _x;
    int _y;
    char _oper;
};

//响应协议
class Response
{
    const string blank_space_sep = " ";
public:
    Response(int result = 0, int code = 0)
    :_result(), _code(code)
    {}
    //序列化 -- 将结构体转为"_result _code"
    bool Serialize(string* out)
    {
#ifdef MySelf
        *out = to_string(_result) + blank_space_sep + to_string(_code);
        return true;
#else 
        Json::Value root;
        root["code"] = _code;
        root["result"] = _result;
        Json::StyledWriter w;
        *out = w.write(root);
        return true;
#endif
    }
    //反序列化 -- 将"_result _code"转为结构体
    bool Deserialize(const string& in)
    {
#ifdef MySelf
        int pos = in.find(blank_space_sep);
        if (pos == string::npos) return false;
        _result = stoi(in.substr(0, pos));
        _code = stoi(in.substr(pos + blank_space_sep.size()));
        return true;
#else
        Json::Reader r;
        Json::Value root;
        r.parse(in, root);
        _result = root["result"].asInt();
        _code = root["code"].asInt();
        return true;
#endif
    }
public:
    int _result;
    int _code; // 0,可信,否则!0具体是几,表明对应的错误原因
};

ServerCal.hpp文件

#pragma once
#include "Protocol.hpp"

//服务器的计算服务
class ServerCal
{
public:
    ServerCal()
    {}
    Response CalculatorHelper(const Request &req)
    {
        int x = req._x;
        char oper = req._oper;
        int y = req._y;
        Response rsp(0, 0);
        switch (oper)
        {
        case '+':
        {
            rsp._result = x + y;
            rsp._code = 0;
            break;
        }
        case '-':
        {
            rsp._result = x - y;
            rsp._code = 0;
            break;
        }
        case '*':
        {
            rsp._result = x * y;
            rsp._code = 0;
            break;
        }
        case '/':
        {
            if (y == 0)
            {
                rsp._result = 0;
                rsp._code = -1;
                break;
            }
            rsp._result = x / y;
            rsp._code = 0;
            break;
        }
        case '%':
        {
            if (y == 0)
            {
                rsp._result = 0;
                rsp._code = -1;
                break;
            }
            rsp._result = x % y;
            rsp._code = 0;
            break;
        }
        default:
            break;
        }
        return rsp;
    }
    string Calculator(string& s)
    {
        string content;
        if(!Decode(s, &content))//将"长度/n内容/n" -> "内容"
            return "";
        Request rq;
        rq.Deserialize(content);//将"内容"反序列化
        Response rsp = CalculatorHelper(rq);//得到答案内容
        content.clear();
        rsp.Serialize(&content);//序列化答案内容
        string ret;
        Encode(&ret, content);//将"内容" -> "长度/n内容/n"
        return ret;
    }
};

TcpClient.cc文件

#include "Protocol.hpp"
#include "Socket.hpp"

static void Usage(const std::string &proc)
{
    std::cout << "\nUsage: " << proc << "\tserverIp\tport\n" << std::endl; 
}
int main(int argc, char* argv[])
{
    if (argc != 3)
    {
        Usage(argv[0]);
        exit(0);
    }
    string serverIp = argv[1];
    uint16_t serverPort = atoi(argv[2]);
    Socket skt;
    skt.CreateSocket();
    skt.Connect(serverIp, serverPort);

    string streamBuffer;
    while (1)
    {
        cout << "Enter#";
        fflush(stdout);
        string content;
        getline(cin, content);
        //提取字符串,得到”请求反序列化“
        Request rq;
        Extract(content, rq._x, rq._y, rq._oper);
        string s;
        rq.Serialize(&s);
        string ret;
        Encode(&ret, s);
        write(skt.Fd(), ret.c_str(), ret.size());

        char readBuffer[4096];
        int n = read(skt.Fd(), readBuffer, sizeof(readBuffer) - 1);
        if (n > 0)
        {
            readBuffer[n] = 0;
            streamBuffer += readBuffer;
            string ret;
            Decode(streamBuffer, &ret);
            Response rsp;
            rsp.Deserialize(ret);
            cout << "code: " << rsp._code << "    result: " << rsp._result << endl;
        }
        else if (n == 0)
        {
            cerr << "Server closed..." << endl;
            break;
        }
        else 
        {
            cerr << "read failed:" << strerror(errno) << endl;
            exit(3);
        }
    }
    return 0;
}

Makefile文件

1=main
.PHONY:all
all:$1 TcpClient
$1:$1.cc
	g++ -o $@ $^ -std=c++11 -ljsoncpp
TcpClient:TcpClient.cc
	g++ -o $@ $^ -std=c++11 -ljsoncpp
.PHONY:clean
clean:
	rm -rf $1 TcpClient

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/677493.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【mysql】数据报错: incorrect datetime value ‘0000-00-00 00:00:00‘ for column

一、问题原因 时间字段在导入值0000-00-00 00:00:00或者添加 NOT NULL的时间字段时&#xff0c;会往mysql添加0值&#xff0c;此时可能出现此报错。 这是因为当前的MySQL不支持datetime为0&#xff0c;在MySQL5.7版本以上&#xff0c;默认设置sql_mode模式包含NO_ZERO_DATE, N…

实验名称:组合数据类型

大家好&#xff01;欢迎收听你的月亮我的心&#xff01;我是肖老师&#xff01;好久不见&#xff01; 目录 一、实验目的&#xff1a; 二、实验环境&#xff1a; 三、实验步骤&#xff1a; 四、实验结果&#xff1a; 1.已知列表li_one[1,2,1,2,3,5,4,3,5,7,4,7,8],删除列…

解决使用Python检查本地网络中运行的Web服务器的问题

如果我们要检查本地网络中运行的 Web 服务器&#xff0c;可以使用 Python 的 socket 模块来进行网络连接测试。以下是一个简单的示例代码&#xff0c;演示如何检查本地网络中运行的 Web 服务器&#xff1a; 1、问题背景 在学习如何使用 Python 时&#xff0c;一位用户希望编写…

C#的web项目ASP.NET

添加实体类和控制器类 using System; using System.Collections.Generic; using System.Linq; using System.Web;namespace WebApplication1.Models {public class Company{public string companyCode { get; set; }public string companyName { get; set; }public string com…

开始报名!龙蜥社区系统安全 Meetup 硬核议程发布

在数字化时代&#xff0c;随着云计算、大数据和人工智能等技术的广泛应用&#xff0c;操作系统扮演着关键的角色&#xff0c;成为支撑关键业务和数据的核心基础设施。在这一背景下&#xff0c;操作系统的安全性显得尤为重要&#xff0c;它直接影响着信息系统的稳定运行和持续发…

RocketMQ---Day1

RocketMQ---Day1 1.认识MQ 火车案例&#xff1a; 人就相当于消息 进站口将消息分发给不同的候车厅&#xff08;主题&#xff09; 火车将不同候车厅的人拉走&#xff08;消费不同主题里面的数据&#xff09; MQ是一种消息中间件。 2.微服务的远程调用 1.同步调用 RPC&am…

(Arkts界面示例)ets pages Demo

Index.ets 文件 import router from ohos.routerEntry//表示该自定义组件为入口组件 Component //表示自定义组件 struct Index {//表示组件中的状态变量&#xff0c;状态变量变化会触发UI刷新State changeValue: string State submitValue: string controller: SearchContr…

04.docker的主要组成部分

docker体验 docker是传统的CS架构分为docker client和docker server,跟mysql一样 查看版本命令&#xff1a;docker version 查看docker下载的是社区版,ce代表社区 rpm -qa |grep docker 查看docker系统命令 docker system docker info&#xff08;如果要做监控&#xff…

机器视觉开启航空安全新篇章:飞机复合材料检测研究进展,军工材料、智能装备和通信技术全产业链博览会

"精准把控行业标准&#xff1a;机器视觉在飞机垂尾复合材料检测中的应用" 随着航空航天技术的快速发展&#xff0c;飞机制造的质量控制要求越来越高&#xff0c;尤其是对于关键部件如垂尾复合材料零件的缺陷检测。基于机器视觉的检测技术因其非接触、高效率和高精度…

使用 Django Channels 构建实时聊天应用(包含用户认证和消息持久化)

文章目录 准备工作创建 Django 项目创建应用程序配置项目编写 Consumer编写路由创建 URL 路由运行应用用户认证消息持久化显示历史消息结论 Django Channels 是 Django 的一个扩展&#xff0c;允许在 Web 应用中添加实时功能&#xff0c;例如 Websockets、HTTP2 和其他协议。本…

虚拟机安装 RockyLinux为例

目录 一、VMWare、Xshell、Xftp、LinuxISO资料下载 二、VMWare安装 三、创建虚拟机 四、虚拟机安装过程的问题 一、VMWare、Xshell、Xftp、LinuxISO资料下载 链接&#xff1a;百度网盘 请输入提取码 提取码&#xff1a;6666 二、VMWare安装 三、创建虚拟机 四、虚拟机安装…

企业如何利用智能防止截屏保护商业机密

在数字化时代&#xff0c;企业商业机密的保护变得尤为重要。智能防止截屏技术作为一种先进的数据安全手段&#xff0c;能够帮助企业有效防止商业机密的泄露。本文将探讨企业如何利用这一技术保护其宝贵的商业信息。 一、商业机密面临的威胁 商业机密包括但不限于产品设计方案…

微信小程序毕业设计-电影院订票选座系统项目开发实战(附源码+论文)

大家好&#xff01;我是程序猿老A&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f49e;当前专栏&#xff1a;微信小程序毕业设计 精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f380; Python毕业设计…

基于单片机的八路抢答器设计论文

绪 论1.1 课题研究的相关背景 抢答器是一种应用非常广泛的设备,在各种竞赛、抢答场合中,它能迅速、客观地分辨出最先获得发言权的选手。早期的抢答器只由几个三极管、可控硅、发光管等组成,能通过发光管的指示辩认出选手号码。现在大多数抢答器均使用单片机(如MCS-5…

中间件解析漏洞及Apache解析漏洞原理和复现

Apache漏洞 Apache HTTPD 多后缀解析漏洞 httpd是Apache超文本传输协议(HTTP)服务器的主程序。被设计为一个独立运行的后台进程&#xff0c;它会建立 一个处理请求的子进程或线程的池。 漏洞原理 apache httpd支持一个文件有多个后缀&#xff0c;如&#xff1a;shell.php.d…

【ArcGIS微课1000例】0114:基于DEM地形数据整体抬升或下降高程

相关阅读:【GlobalMapper精品教程】083:基于DEM整体抬升或下降地形高程的两种方式 文章目录 一、任务分析二、栅格计算器简介三、地形整体修改四、注意事项一、任务分析 打开软件,加载配套实验数据中的0112.rar中的dem数据,如下所示,dem的高程范围为256.75~342.37米,现在…

【自然语言处理】文本情感分析

文本情感分析 1 任务目标 1.1 案例简介 情感分析旨在挖掘文本中的主观信息&#xff0c;它是自然语言处理中的经典任务。在本次任务中&#xff0c;我们将在影评文本数据集&#xff08;Rotten Tomato&#xff09;上进行情感分析&#xff0c;通过实现课堂讲授的模型方法&#x…

企业数字化转型顶层设计与企业架构TOGAF9.2认证【鉴定级】

什么是TOGAF TOGAF由国际标准权威组织 The Open Group制定。The Open Group于1993年开始应客户要求制定系统架构的标准&#xff0c;在1995年发表The Open Group Architecture Framework (TOGAF) 架构框架。2022年4月25日发布了TOGAF的最新版本10&#xff0c;目前&#xff0c;T…

如何用TCC方案轻松实现分布式事务一致性

本文作者:小米,一个热爱技术分享的29岁程序员。如果你喜欢我的文章,欢迎关注我的微信公众号“软件求生”,获取更多技术干货! 哈喽,大家好!我是小米,一个热爱技术的活力小青年,今天要和大家分享的是一种在分布式系统中实现事务的一种经典方案——TCC(Try Confirm Canc…

【机器学习】朴素贝叶斯算法及其应用探索

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 朴素贝叶斯算法及其应用探索引言1. 朴素贝叶斯基本概念1.1 贝叶斯定理回顾1.2 朴…