reactor (百万并发服务器) -- 1

        为了从点滴开始,文章会先从一些基础socket去补充一些经常发生但是没有很深入去思考的细节。然后我们再开始去设计reactor的设计,可以选择跳过起过前面部分。

        为了能从0开始去设计,测试,优化...整个过程会分为2-3篇文章输出,喜欢的可以点歌关注哦。

socket的API理解

        这部分不过多详细的去解释,只是对使用过程中容易忽略的地方进行补充。很基础的部分不是很了解的部分,可以看我的另一篇文章。

C++项目实战-socket编程_c++ socket-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/weixin_46120107/article/details/126528923

    //-- 创建(分配)一个管理者sockfd

    // 参数1:表示创建套接字使用的协议族   -- AF_INET(IPv4地址) AF_INET6(IPv6地址) AF_UNIX(本地通信)
    // 参数2:表示创建套接字使用的协议类型     -- SOCK_STREAM(字节流套接字) SOCK_DGRAM(数据报套接字)
    // 参数3: 表示创建套接字使用的协议 -- 0(由操作系统自动选择适当的协议类型) IPPROTO_TCP(TCP) TPPROTO_UDP(UDP)
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);

        为了更好的理解,我们可以把socket理解为一家门店招聘一个管理者或者一个小饭店的老板。那么现在我们有管理者了,现在需要把这个管理者安排到指定的门店去工作。

-- 选择门店(设置这个门店接待的客人类型)
struct sockaddr_in serveraddr;
memset(&serveraddr, 0, sizeof(serveraddr));

-- 表示可以进门的顾客是 AF_INET(IPv4协议族的)
serveraddr.sin_family = AF_INET;
// INADDR_ANY(0.0.0.0) 表示可以接待来着任何地方的顾客 
// 127.0.0.1/localhost: 只可以让本机访问,其他计算机是无法访问的(只接待内部人员)
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);

-- 表示顾客可以从哪个门口进入
serveraddr.sin_port = htons(2048);


-- 将管理者安排到指定的门店
bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr));

        现在我们已经安排好我们的门店管理者(店长)了,我们还需要一个记录顾客进店的登记表,或者管理进店的秩序方式,防止发生踩踏事件,也为了公平起见,先来的客人,我们先进行服务。在计算机中采用队列的先进先出性质能很好的解决。

// --记录所有进门的顾客信息(连接)
// 创建等待队列,将所有经过端口连接的请求放到等待队列
// 如果队列已满,则新的请求就会被拒绝,这个数字并不是一个硬性要求,实际上系统会设定一个合适的值
listen(sockfd, 10);

        现在我们已经把管理方案和管理人员都安排好了,我们就可以开业了吗?等等,我们还需要记录顾客的信息呢?我们需要知道顾客从哪里来,这样我们才能更地道的为顾客提供服务。所以我们需要先准备一张表格,登记顾客的信息。

struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);

        到现在为止,我们就完事具备了,等待顾客的光临,然后为每一个顾客专门安排一个一对一个的服务员,我们主打一个服务。

// -- 等待顾客到来,当顾客当来,填充clientaddr表,然后由店长叫一个专门的服务员来为顾客提供服务
// 这个函数是阻塞的,可以设置成非阻塞的方式(使用fcntl将套接字socket设置成非阻塞的即可)
int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);

        接下来就是服务员与顾客之间的对话了。顾客提出需求,服务员进行解答和提供服务。

// 顾客提出要求,服务员通过recv的方式接收到 
recv(clientfd, buffer, 128, 0);

// 业务处理 --> 针对不同的需求,服务员会做出相应的处理方式
TODO

// 给顾客提供反馈,让顾客收获到快乐
send(clientfd, buffer, 128, 0);

        顾客总有离开的时候,这个时候我们需要把安排的服务员收回,下次让他为其他顾客提供服务。

close(clientfd);

        夜深了,门店需要打样了。我们要把管理者也收回。

 close(sockfd);

-------------------------------------------------------------------------------------------------------------------------

现在我们来实操一下(这里我们只进行TCP进行表述):

客户端我们就不写了,使用sockTools进行测试。

https://pan.baidu.com/share/init?surl=GqaKzEZWNvhXivm0FAnZngicon-default.png?t=N7T8https://pan.baidu.com/share/init?surl=GqaKzEZWNvhXivm0FAnZng提取码:s5wy

我们先思考2个问题:

  • 系统中出现大量TIME_WAIT状态的原因?
  • 系统中出现大量的CLOSE_WAIT状态的原因?

我们来复现第一种情况:系统中出现大量TIME_WAIT状态的原因?

#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <error.h>
#include <errno.h>
#include <stdio.h>
#include <unistd.h>


int main()
{

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(-1 == sockfd)
    {
        perror("socket():");
        return -1;
    }

    struct sockaddr_in serveraddr;
    memset(&serveraddr, 0, sizeof(serveraddr));

    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htons(INADDR_ANY);
    serveraddr.sin_port = htons(2048);   // 这里补充一个细节:端口1024以前是系统的,如果要用需要使用root权限

    if(-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)))
    {
        perror("bind");
        return -1;
    }

    listen(sockfd, 10);

    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);

    int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
    if(-1 == clientfd)
    {
        perror("clientfd");
        return -1;
    }

    // char buffer[128] = {0}; 
    // while(1)
    // {
    //     int count = recv(clientfd, buffer, 128, 0);
    //     if(count == 0)
    //     {
    //         printf("断开\n");
    //         close(clientfd);
    //     }
    //     else 
    //     {
    //         send(clientfd, buffer, 128, 0);
    //     }
    // }

    close(sockfd);

    return 0;
}

我们来看下上面的代码,当我们连接之后,立马就退出了。然后就进入了TIME_WAIT状态,接下来我们怎么分析呢?

在TCP连接中,主动关闭的一方会进入TIME_WAIT状态,这样是不正常的。说明服务器会总是断开连接导致系统中出现大量的TIME_CLOSE状态。导致原因可能是进程异常退出(崩溃)...

我们来复现第二种情况:系统中出现大量的CLOSE_WAIT状态的原因?

#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <error.h>
#include <errno.h>
#include <stdio.h>
#include <unistd.h>


int main()
{

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(-1 == sockfd)
    {
        perror("socket():");
        return -1;
    }

    struct sockaddr_in serveraddr;
    memset(&serveraddr, 0, sizeof(serveraddr));

    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htons(INADDR_ANY);
    serveraddr.sin_port = htons(2048);   // 这里补充一个细节:端口1024以前是系统的,如果要用需要使用root权限

    if(-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)))
    {
        perror("bind");
        return -1;
    }

    listen(sockfd, 10);

    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);

    int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
    if(-1 == clientfd)
    {
        perror("clientfd");
        return -1;
    }

    char buffer[128] = {0}; 
    while(1)
    {
        int count = recv(clientfd, buffer, 128, 0);
        if(count == 0)
        {
            printf("断开\n");
            // close(clientfd);
        }
        else 
        {
            send(clientfd, buffer, 128, 0);
        }
    }

    close(sockfd);

    return 0;
}

在TCP连接中,对端关闭连接后,收到FIN后发送ACK确认收到了要关闭的信息,随后进入CLOSE_WAIT状态,关闭后进入LAST_ACK状态。按这个道理分析,我们很容易得出当服务器未调用close出现在TCP状态中,对端调用close关闭连接后,服务器回送ACK,表明收到了消息后进入半连接状态,当服务器调用close后,退出CLOSE_WAIT状态。其实从字面就能猜出来,关闭等待嘛,合适关闭呢,调用close呗。说明连接没关闭,这样是很危险的,很容易造成描述符没有释放而程序崩溃。


这个话题到这里就结束了,后面遇到一些情况,会进行补充...

多线程/多进程服务器

        在上面的代码中我们很容易看出来,这个服务器最大的缺陷是什么?

        只能处理一个连接。我们来测试一下:

        连接1的情况:

        连接2的情况:

我们回到代码:

    int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
    if(-1 == clientfd)
    {
        perror("clientfd");
        return -1;
    }

    char buffer[128] = {0}; 

    // 循环
    while(1)
    {
        int count = recv(clientfd, buffer, 128, 0);
        if(count == 0)
        {
            printf("断开\n");
            close(clientfd);
        }
        else 
        {
            send(clientfd, buffer, 128, 0);
        }
    }

    close(sockfd);

   我们很容易看出来,我们没有为第二个连接提供专门服务员进行服务。那有什么办法吗?

   很容易想到,开线程/开进程,接下来我们就对我们的代码进行改进吧!!!

        哈哈,似乎问题得到的合理的解决,那么这样真的满足所有的需求吗?这里我才两个连接呢,如果我们有1w个连接同时在线呢? 很读朋友很快想到,那用线程池呗,其实不然。用线程池不是解决这个问题的核心,如果连接池的连接数拿到设置成1w吗?那如果有1w+1的连接呢,还不是要去创建往线程池中添加一个线程。

        这里插一个故事:apache的C10K问题。C10K问题是只支持一万个并发连接问题,在Apache中,每一个客户端请求的会分配一个独立的线程或者进程来处理。当并发请求增加时,系统将消耗更多的线程或者进程资源,这将导致内存和CPU资源的过度使用,从而影响服务器的性能。

        如何解决呢?怎么办呢?害,好像这个问题无解了。

        我们需要寻找在一种在一个线程中对应多个连接的方法 --> IO多路复用出现了

IO多路复用 

        在Linux中,IO多路复用主要包括了select/poll/epoll技术。接下来我们将对这个几个技术做一个的中的疑问提出一些看法,很基础的部分请跳到开头的提供的文章中。


        select

  • 为什么说select会受到1024的限制呢?
  • select的性能权限是什么呢?
  • select的使用上的缺点在哪呢?

我们先来使用和测试下select,验证下它是否能在一个线程中处理多个连接的问题。(传参的说明在代码中有注释,这里就不过多的重复了)

#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <error.h>
#include <errno.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/select.h>


int main()
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);

    struct sockaddr_in serveraddr;
    memset(&serveraddr, 0, sizeof(serveraddr));

    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serveraddr.sin_port = htons(2048);

    if(-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr)))
    {
        perror("bind()");
        return -1;
    } 

    listen(sockfd, 10);

    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);

    // 定义监视IO集合
    fd_set rfds,rset;
    // 初始化时全部置为0
    FD_ZERO(&rfds);
    // 将需要监视的IO置1
    FD_SET(sockfd, &rfds);
    // 因为select内部的循环是 for(int i = 0; i < __nfds; ++i)
    // 为了减少循环,我们提前记录下来
    int maxFd = sockfd;

    int fds[1024] = {0};
    fds[maxFd] = 1;

    while(1)
    {
        
        rset = rfds;

        // 这里的+1很重要哦
        // 参数1: 表示需要监视的文件描述符数量+1
        // 参数2:表示需要监视的读事件集合
        // 参数3: 表示需要监视的写事件集合
        // 参数4:表示需要坚实的错误事件集合
        // 参数5: 超时时间,NULL表示阻塞方式
        // 返回: 就绪描述符个数
        int ready = select(maxFd + 1, &rset, NULL, NULL, NULL);

        // 对连接事件的处理
        if(FD_ISSET(sockfd, &rset))
        {
            struct sockaddr_in clientaddr;
            socklen_t len = sizeof(clientaddr);

            int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);

            // 将新的需要监视的IO加入到集合中
            FD_SET(clientfd, &rfds);

            maxFd = clientfd;

            if(clientfd > maxFd)
            {
                maxFd = clientfd;
            }

            fds[clientfd] = 1;
        }

        for(int i = sockfd + 1; i <= maxFd; ++i)
        {
            if(FD_ISSET(i, &rset))
            {
                char buffer[128] = {0};
                int count = recv(i, buffer, 128, 0);
                if(count == 0)
                {
                    printf("断开 %d\n", i);
                    fds[i] = 0;
                    FD_CLR(i, &rfds);
                    close(i);

                    // 如果当前最大的被关闭了,则需要更新(减小无用循环)
                    if(i == maxFd)
                    {
                        for(int k = maxFd; k > sockfd + 1; --k)
                        {
                            if(fds[i] == 1)
                            {
                                maxFd = k;
                            }
                        }
                    }

                    continue;
                }
                send(i, buffer, count, 0);
            }
        }
    }
   
    close(sockfd);  

    return 0;
}

我们来复现第一种情况:为什么说select会受到1024的限制呢?

//-------------------------------------------//
#define __FD_SETSIZE		1024
//-------------------------------------------//

//-------------------------------------------//
#define __NFDBITS	(8 * (int) sizeof (__fd_mask))
//-------------------------------------------//

//-------------------------------------------//
typedef long int __fd_mask;
//-------------------------------------------//

/* fd_set for select and pselect.  */
typedef struct
  {
    /* XPG4.2 requires this member name.  Otherwise avoid the name
       from the global namespace.  */
#ifdef __USE_XOPEN
    __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
    __fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
  } fd_set;

这里面很重要的一个部分: __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS]

__fd_mask --> long int

__FD_SETSIZE --> 1024

__NFDBITS --> long int

__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS]-->long int fds_bits[128]-->8 * 128个字节 --> 1024个位-->每一个位对应一个IO,分别有1和0两种状态对应事件的发生或者没有发生。

select的性能权限是什么呢?

我们在写上面select的过程中就能体会到select的一些权限。

1.我们需要重复的复制集合,而且在select的内部,需要将集合从用户区复制到内核区,等有事件到来,又需要将数据从内核区复制到用户区。

2.每次的需要遍历整个IO集合。

select的使用上的缺点在哪呢?

1.参数太多,容易搞错。一共5个参数,用户体验不好。

2.每次需要把待检测的IO集合进行拷贝,对性能有影响。

3.对IO的数量有限制。


        poll

        对于poll而言,本质上是对select的改进。但是只停留在了表面,并没有解决性能问题,只是在参数层面做了优化和解除了通过宏定义来设置的限制。本质上没啥好说的,我们就来简单实现下吧,熟悉熟悉代码。

#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/poll.h>

int main()
{

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);

    struct sockaddr_in serveraddr;
    memset(&serveraddr, 0, sizeof(struct sockaddr_in));

    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serveraddr.sin_port = htons(2048);

    if(-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr)))
    {
        perror("bind");
        return -1;
    }

    listen(sockfd, 10);
    
    // 准备一个事件消息的数组,这个长度由用户定义 --> 突破了select通过宏定义写死的缺陷
    struct pollfd fds[1024] = {0};
    fds[sockfd].fd = sockfd;
    fds[sockfd].events = POLLIN;
    int fd_in[1024] = {0};
    int maxFd = sockfd;
    fd_in[sockfd] = 1;

    while(1)
    {
        int nready = poll(fds, maxFd + 1, -1);

        if(fds[sockfd].revents & POLLIN)
        {
            struct sockaddr_in clientaddr;
            socklen_t len = sizeof(clientaddr);

            int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
            fds[clientfd].fd = clientfd;
            fds[clientfd].events = POLLIN;

            fd_in[clientfd] = clientfd;
            maxFd = clientfd;
        }

        int i = 0;
        for(i = sockfd + 1; i <= maxFd; ++i)
        {
            if(fds[i].revents & POLLIN)
            {
                char buffer[128] = {0};
				int count = recv(i, buffer, 128, 0);
                if(count == 0)
                {
                    fds[i].fd = -1;
                    fds[i].events = 0;

                    close(i);
                    continue;
                }

                send(i, buffer, count, 0);
            }
        }

    }


    return 0;
}


        epoll

        epoll对于Linux来说太重要了,如果没有epoll的存在,Linux只能停留在设备相关了开发了,epoll解决了select的问题。从设计层面来说,摒弃了之前的数组思维。那么epoll怎么设计的呢?为了方便理解,我们举一个例子,现在有一个快递站点,有用户,有一个快递员。对于用户来说寄快递只需要将快递放到快递站点,取快递只需要到快递站点去就行了,不用关系之后会不会有用户搬出和新用户的入住,将这层分割出来了。

        我们来用下吧。

#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/epoll.h>

int main()
{

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    
    struct sockaddr_in serveraddr;
    memset(&serveraddr, 0, sizeof(struct sockaddr_in));

    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serveraddr.sin_port = htons(2048);

    bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr));

    listen(sockfd, 10);

    // 这里的参数没有意义,为了兼用2.6以前版本。传参时需要大于0
    int epfd = epoll_create(1);

    // 一个epoll事件
    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = sockfd;

    // 把连接事件加入到集合中(本质上是一个红黑树)
    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);

    // 存储epoll事件,这里我们暂定位1024
    struct epoll_event events[1024] = {0};

    while(1)
    {
        int nready = epoll_wait(epfd, events, 1024, -1);
        int i = 0;
        for(int i = 0; i < nready; ++i)
        {
            int connfd = events[i].data.fd;
            if(sockfd == connfd)
            {
                struct sockaddr_in clientaddr;   
                socklen_t len = sizeof(clientaddr);
                int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
                
                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = clientfd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);
            }
            else if(events[i].events & EPOLLIN)
            {
                char buffer[10] = {0};
                int count = recv(connfd, buffer, 10, 0);
                if(count == 0)
                {
                    epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);
                    close(i);
                    continue;
                }

                send(connfd, buffer, count, 0);
            }
        }
    }

    close(sockfd);

    return 0;
}

用法很简答哈,其实这样的代码有点问题,不能让epoll实现百万级别的并发。这里我们先不讨论,我们后面封住的时候会一步步优化代码的。

        epoll有存在两种触发机制,水平触发(LT)和边缘触发(ET),所谓的水平触发就是当缓冲区有数据的时候会一直触发,知道缓冲区为空。而边缘触发是指来数据的时候,触发一次。怎么测出来这样的效果,我们把接收的buffer变小,然后分别修改为ET和LT及上打印,很容易看出区别。篇幅问题,这里就演示了,很简答的。

        我们先提出两个问题:

        1.epoll里面有没有mmap?

        2.epoll是不是线程安全的?

        3.什么时候用水平触发合适,什么时候用边缘触发合适?

        对于第一个和第二个问题,我们需要阅读epoll的源码(路径:eventpoll.c),这里带大家一起看一下。(源码:The Linux Kernel Archives) --> 建议拷过来

        epoll里面有没有mmap?

        通过查看eventpoll.c,并没有发现关于mmap相关的API出现,可见epoll中是不包含mmap的。epoll的性能好是取决于他的设计方式和数据结构,减少遍历次数,不需要像select和poll一样需要将整个数组遍历,只需要将返回的列表中就绪的进行操作,使用红黑树结构能够将查找效率从O(N)缩减至log(N),当并发量越大时,效果越明显。

        epoll是不是线程安全的?

        通过查看eventpoll.c,它的过程是在内核太完成的,不涉及到用户空间和多线程竞争关系。但值得注意的时,如果在,当需要多个线程或者多个进程同时操作同一个epoll的文件描述符时,需要注意红黑树和就绪队列所在的空间存在竞争问题。

        什么时候用水平触发合适,什么时候用边缘触发合适?

        无论什么场景使用水平触发和边缘触发都可以实现,需要考虑的是在什么时候使用哪种方式会更合适。

        边缘触发对于类似于代理的那种方式,比如不对数据进行处理,直接转发的情况下使用边缘触发。当业务处理比较慢的情况,读缓冲区数据比较多的情况,需要处理完业务然后再此去读,这时候使用水平触发。

        select和poll使用的都是水平触发方式。


        我们总算是把socket基础部分讲完了,现在我们要正式的开始进行reactor的封装了。

reactor

        通常情况下,在epoll对IO的处理有两种方式,一种是面向IO的处理模式,一只种是面向事件的处理方式。

        面向IO的方式不方便封装和维护,面向事件的方式发生什么事件就调用什么回调函数,例如发生读IO事件,那么就调用响应的读回调函数。换一种说法,通过面向连接的方式能够将IO事件与读写存储以及事件回调封装到一起。

        对于reactor的封装,我们有要采用面向事件的方式,事实上reactor也是这么做的。


        step01:首先我们需要考虑处理的对象是什么?

        ---> 能够想到我们需要处理的是IO

        step02:然后我们需要考虑如何封装?

        --> 对于一个IO来说有哪些属性或者行为,IO对应的文件描述符,读数据、写数据、各种事件的处理。看起来就这么多。

        stop03:代码怎么体现呢?        

        --> 使用结构体或者类将他们组装在一起。所以我们可以设计以下的结构:

struct conn_item
{
    int fd;   // 一个IO事件对应的文件描述符    
                              
    char rbuffer[BUFFER_LENGTH];        // 读缓冲区
    int rlen;
    char wbuffer[BUFFER_LENGTH];        // 写缓冲区
    int wlen;

    // 事件处理 - 连接、读事件、写事件
    RCALLBACK accept_callback;
    RCALLBACK read_callback;
    RCALLBACK write_callback;
};

        到目前为止,看起一切都是这么的顺其自然。

        stop04:现在我们已经设计好了一个IO需要处理的结构了,那么在使用的过程中,无可厚非会有很多个IO,为了方便我们定义一个数组来存储这些。当然你可以选择其他的数据结构进行存储。

struct conn_item connlist[1024] = {0};

        stop05:总的设计思路我们确定了下来,接下来我们需要把整个使用的框架先初步搭起来,因为这里我们使用epoll来做,本质上就是epoll的那一套。

#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/poll.h>
#include <sys/epoll.h>

#define BUFFER_LENGTH		1024

typedef int (*RCALLBACK)(int fd);

struct conn_item
{
    int fd;   // 一个IO事件对应的文件描述符    
                              
    char rbuffer[BUFFER_LENGTH];        // 读缓冲区
    int rlen;
    char wbuffer[BUFFER_LENGTH];        // 写缓冲区
    int wlen;

    // 事件处理 - 连接、读事件、写事件
    RCALLBACK accept_callback;
    RCALLBACK read_callback;
    RCALLBACK write_callback;
};

struct conn_item connlist[1024] = {0};

int accept_cb(int fd)
{
    return 0;
}

int read_cb(int fd)
{
    return 0;
}

int write_cb(int fd)
{
    return 0;
}

int main()
{

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(-1 == sockfd)
    {
        perror("socket");
        return -1;
    }

    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serveraddr.sin_port = htons(2048);

    if(-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)))
    {
        perror("bind");
        return -1;
    }

    listen(sockfd, 10);

    // 将sockfd放入到监视集合中
    connlist[sockfd].fd = sockfd;
    connlist[sockfd].accept_callback = accept_cb;

    // 这里我们使用epoll
    int epfd = epoll_create(1);
    struct epoll_event ev;
    ev.data.fd = sockfd;
    ev.events = EPOLLIN;

    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);

    struct epoll_event events[1024] = {0};  // 就绪事件
    while(1)
    {
        int nready = epoll_wait(epfd, events, 1024, -1);

        for(int i = 0; i < nready; ++i)
        {
            // 发生什么事件,就处理什么事件
            if(events[i].events & EPOLLIN)
            {
                // 连接事件
                if(events[i].data.fd == sockfd)
                {
                    // struct sockaddr_in clientaddr;
                    // socklen_t len = sizeof(clientaddr);
                    // int connfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);

                    // ev.data.fd = connfd;
                    // ev.events = EPOLLIN;

                    // epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

                    // connlist[connfd].fd = connfd;
                    // connlist[connfd].read_callback = read_cb;
                    // connlist[connfd].write_callback = write_cb;

                    connlist[sockfd].accept_callback(sockfd);
                }
                else 
                {
                    // 读事件
                    int connfd = events[i].data.fd;
                    connlist[connfd].read_callback(connfd);
                }
            }
            else if(events[i].events & EPOLLOUT)
            {
                // 写事件
                int connfd = events[i].data.fd;
                connlist[connfd].write_callback(connfd);
            }
        }
    }


    return 0;
}

        stop06:现在我们做完了大部分工作了,只需要将回调函数实现就可以了。之后我们就开始优化我们打代码。

#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/poll.h>
#include <sys/epoll.h>

#define BUFFER_LENGTH		1024

typedef int (*RCALLBACK)(int fd);

int accept_cb(int fd);
int read_cb(int fd);
int write_cb(int fd);


struct conn_item
{
    int fd;   // 一个IO事件对应的文件描述符    
                              
    char rbuffer[BUFFER_LENGTH];        // 读缓冲区
    int rlen;
    char wbuffer[BUFFER_LENGTH];        // 写缓冲区
    int wlen;

    // 事件处理 - 连接、读事件、写事件
    RCALLBACK accept_callback;
    RCALLBACK read_callback;
    RCALLBACK write_callback;
};

struct conn_item connlist[1024] = {0};
int epfd;

int accept_cb(int fd)
{
    // 创建连接
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    int connfd = accept(fd, (struct sockaddr*)&clientaddr, &len);

    struct epoll_event ev;
    ev.data.fd = connfd;
    ev.events = EPOLLIN;

    epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

    connlist[connfd].fd = connfd;
    connlist[connfd].read_callback = read_cb;
    connlist[connfd].write_callback = write_cb;

    return connfd;
}

int read_cb(int fd)
{
    char* buffer = connlist[fd].rbuffer;
    int index = connlist[fd].rlen;

    int count = recv(fd, buffer + index, BUFFER_LENGTH - index, 0);
    if(0 == count)
    {
        epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
        close(fd);
        return 0;
    }
    connlist[fd].rlen += count;

    // TODO
    printf("buffer:%s\n", buffer);
    memcpy(connlist[fd].wbuffer, buffer, count);
    connlist[fd].wlen = connlist[fd].rlen;

    struct epoll_event ev;
    ev.data.fd = fd;
    ev.events = EPOLLOUT;
    epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);

    return count;
}

int write_cb(int fd)
{
    char* buffer = connlist[fd].wbuffer;
    int index = connlist[fd].wlen;

    printf("%s\n", buffer);
    int count = send(fd, buffer, index, 0);

    struct epoll_event ev;
    ev.data.fd = fd;
    ev.events = EPOLLIN;
    epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);

    return count;
}

int main()
{

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(-1 == sockfd)
    {
        perror("socket");
        return -1;
    }

    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serveraddr.sin_port = htons(2480);

    if(-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)))
    {
        perror("bind");
        return -1;
    }

    listen(sockfd, 10);

    // 将sockfd放入到监视集合中
    connlist[sockfd].fd = sockfd;
    connlist[sockfd].accept_callback = accept_cb;

    // 这里我们使用epoll
    epfd = epoll_create(1);
    struct epoll_event ev;
    ev.data.fd = sockfd;
    ev.events = EPOLLIN;

    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);

    struct epoll_event events[1024] = {0};  // 就绪事件
    while(1)
    {
        int nready = epoll_wait(epfd, events, 1024, -1);

        for(int i = 0; i < nready; ++i)
        {
            // 发生什么事件,就处理什么事件
            if(events[i].events & EPOLLIN)
            {
                // 连接事件
                if(events[i].data.fd == sockfd)
                {
                    connlist[sockfd].accept_callback(sockfd);
                }
                else 
                {
                    // 读事件
                    int connfd = events[i].data.fd;
                    connlist[connfd].read_callback(connfd);
                }
            }
            else if(events[i].events & EPOLLOUT)
            {
                // 写事件
                int connfd = events[i].data.fd;
                connlist[connfd].write_callback(connfd);
            }
        }
    }

    close(sockfd);

    return 0;
}

      

  stop07:接下来我们来优化我们的代码,先在代码层进行。把一些公共的操作提取出来

  很明显,我们可以看到大量的epoll操作,这部分可以提到一个函数中:

void setEpoll(int fd, int mode)
{
    switch (mode) 
    {
        case 1:  // 添加
        {
            struct epoll_event ev;
            ev.data.fd = fd;
            ev.events = EPOLLIN;
            epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
            break;
        }
        case 2:  // EPOLLOUT --> EPOLLIN
        {
            struct epoll_event ev;
            ev.data.fd = fd;
            ev.events = EPOLLIN;
            epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
            break;
        }
        case 3:  // EPOLLIN --> EPOLLOUT
        {
            struct epoll_event ev;
            ev.data.fd = fd;
            ev.events = EPOLLOUT;
            epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
            break;
        }
        case 4:  // 删除
        {
            epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
            break;
        }
        default:
            break;
    }
}

观察者一块,能不能融到一起

        我们想一想,这两个逻辑是不会同时发生的,而且发的IO事件肯定不是同一个,因为accept_callback是在sockfd中发生的,而read_callback是在connfd中发生的。每一个IO对应一个conn_item。所以这个很适合使用联合体进行。

【联合体:允许在同一内存空间中存储不同数据类型的成员变量,但同一时间只能使用其中一个成员变量。联合体的所有成员变量从同一个地址开始,占用的空间大小取最大成员变量的大小,这使得联合体在底层内存上非常高效】

        更有甚者,连接事件在epoll中也是读事件,而且使用是函数指针,发生事件的IO又不同,所以我们可以这样设计:(其他地方代码最相应修改即可)

struct conn_item
{
    int fd;   // 一个IO事件对应的文件描述符    
                              
    char rbuffer[BUFFER_LENGTH];        // 读缓冲区
    int rlen;
    char wbuffer[BUFFER_LENGTH];        // 写缓冲区
    int wlen;

    // 事件处理 - 连接、读事件、写事件
    // union
    // {
    //     RCALLBACK accept_callback;
    //     RCALLBACK read_callback;
    // }recv_t;

    // RCALLBACK accept_callback;
    RCALLBACK read_callback;
    RCALLBACK write_callback;
};

完整代码

#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/poll.h>
#include <sys/epoll.h>

#define BUFFER_LENGTH		1024

typedef int (*RCALLBACK)(int fd);

int read_cb(int fd);
int write_cb(int fd);
void setEpoll(int fd, int mode);

struct conn_item
{
    int fd;   // 一个IO事件对应的文件描述符    
                              
    char rbuffer[BUFFER_LENGTH];        // 读缓冲区
    int rlen;
    char wbuffer[BUFFER_LENGTH];        // 写缓冲区
    int wlen;

    RCALLBACK read_callback;
    RCALLBACK write_callback;
};

struct conn_item connlist[1024] = {0};
int epfd;

int accept_cb(int fd)
{
    // 创建连接
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    int connfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
    
    printf("connfd:%d\n", connfd);

    connlist[connfd].fd = fd;
    connlist[connfd].read_callback = read_cb;
    connlist[connfd].write_callback = write_cb;

    setEpoll(connfd, 1);

    return connfd;
}

int read_cb(int fd)
{
    char* buffer = connlist[fd].rbuffer;
    int index = connlist[fd].rlen;

    int count = recv(fd, buffer + index, BUFFER_LENGTH - index, 0);
    if(0 == count)
    {
        setEpoll(fd, 4);
        close(fd);
        return 0;
    }
    connlist[fd].rlen += count;

    // TODO
    printf("buffer:%s\n", buffer);
    memcpy(connlist[fd].wbuffer, buffer, count);
    connlist[fd].wlen = connlist[fd].rlen;

    setEpoll(fd, 3);

    return count;
}

int write_cb(int fd)
{
    char* buffer = connlist[fd].wbuffer;
    int index = connlist[fd].wlen;

    printf("%s\n", buffer);
    int count = send(fd, buffer, index, 0);

    setEpoll(fd, 2);

    return count;
}

void setEpoll(int fd, int mode)
{
    switch (mode) 
    {
        case 1:  // 添加
        {
            struct epoll_event ev;
            ev.data.fd = fd;
            ev.events = EPOLLIN;
            epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
            break;
        }
        case 2:  // EPOLLOUT --> EPOLLIN
        {
            struct epoll_event ev;
            ev.data.fd = fd;
            ev.events = EPOLLIN;
            epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
            break;
        }
        case 3:  // EPOLLIN --> EPOLLOUT
        {
            struct epoll_event ev;
            ev.data.fd = fd;
            ev.events = EPOLLOUT;
            epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
            break;
        }
        case 4:  // 删除
        {
            epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
            break;
        }
        default:
            break;
    }
}

int main()
{

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(-1 == sockfd)
    {
        perror("socket");
        return -1;
    }

    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serveraddr.sin_port = htons(2480);

    if(-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)))
    {
        perror("bind");
        return -1;
    }

    listen(sockfd, 10);

    // 将sockfd放入到监视集合中
    connlist[sockfd].fd = sockfd;
    connlist[sockfd].read_callback = accept_cb;

    // 这里我们使用epoll
    epfd = epoll_create(1);

    setEpoll(sockfd, 1);

    struct epoll_event events[1024] = {0};  // 就绪事件
    while(1)
    {
        int nready = epoll_wait(epfd, events, 1024, -1);

        for(int i = 0; i < nready; ++i)
        {
            // 发生什么事件,就处理什么事件
            int connfd = events[i].data.fd;
            if(events[i].events & EPOLLIN)
            {
                // 读事件
                connlist[connfd].read_callback(connfd);
            }
            else if(events[i].events & EPOLLOUT)
            {
                // 写事件
                connlist[connfd].write_callback(connfd);
            }
        }
    }

    close(sockfd);

    return 0;
}

后续 

        由于篇幅问题,已经在逐步实现reactor过程中设计到不同的知识点,我们不方便一次完成,如果感兴趣可以点个关注。

        后续文章将进行wsl测试已经对buffer进行优化,设计合理的用户缓冲区,而不是使用定长的buffer进行实现。

        然后会对代码进行调整,目前的设计不方便提取出来使用,我们最后会封装成一个库的方式,提供一个.h和.c文件,以此方便移植。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/111300.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Redis队列Stream

1 缘起 项目中处理文件的场景&#xff1a; 将文件处理请求放入队列&#xff0c; 一方面&#xff0c;缓解服务器文件处理压力&#xff1b; 另一方面&#xff0c;可以根据文件大小拆分到不同的队列&#xff0c;提高文件处理效率。 这是Java开发组Leader佳汇提出的文件处理方案&a…

网络安全—小白自学

1.网络安全是什么 网络安全可以基于攻击和防御视角来分类&#xff0c;我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术&#xff0c;而“蓝队”、“安全运营”、“安全运维”则研究防御技术。 2.网络安全市场 一、是市场需求量高&#xff1b; 二、则是发展相对成熟…

QT实现用本地资源管理器来打开文件夹

QString path"文件夹路径";QDesktopServices::openUrl(QUrl("file:"path, QUrl::TolerantMode)); 在windows中QT编程&#xff0c;使用资源管理器来打开指定本地文件夹的方法&#xff1a; 第一种&#xff1a;使用Qprocess命令&#xff08;相当于在cmd命令管…

测试C#调用Aplayer播放视频(2:VideoPlayer源码学习)

参考文献1除了介绍Aplayer组件的用法之外&#xff0c;还提供有demo下载以供学习&#xff0c;本文学习并记录其中的使用方式。   VideoPlayer项目使用C#在VS2013开发&#xff0c;其解决方案中包括VideoPlayer和VideoPlayer两个小项目&#xff0c;前者基于.net framework4.0&am…

【数据分析】上市公司半年报数据分析

前言 前文介绍过使用网络技术获取上市公司半年报数据的方法&#xff0c;本文将对获取到的数据进行简要的数据分析。 获取数据的代码介绍在下面的两篇文章中 【java爬虫】使用selenium获取某交易所公司半年报数据-CSDN博客 【java爬虫】公司半年报数据展示-CSDN博客 全量数…

C#开发DLL,CAPL调用(CAPL>> .NET DLL)

文章目录 展示说明新建类库工程C# 代码生成dllCAPL脚本调用dll,输出结果展示 ret为dll里函数返回的值。 说明 新建类库工程 在visual studio中建立。 C# 代码 using

Python构造代理IP池提高访问量

目录 前言 一、代理IP是什么 二、代理IP池是什么 三、如何构建代理 IP 池 1. 从网上获取代理 IP 地址 2. 对 IP 地址进行筛选 3. 使用筛选出来的 IP 地址进行数据的爬取 四、总结 前言 爬虫程序是批量获取互联网上的信息的重要工具&#xff0c;在访问目标网站时需要频…

启动Vue项目报错Error: error:0308010C:digital envelope routines::unsupported

问题描述 启动Vue项目报错Error: error:0308010C:digital envelope routines::unsupported 出现这个一般就是node版本的问题&#xff0c;通过命令查看node -v查看node版本&#xff1b; 百度查了好多&#xff0c;都让我降低node版本&#xff0c;属实太麻烦了 在不改node版本的…

[论文笔记]BGE

引言 今天介绍论文BGE,是智源开源的语义向量模型,BAAI General Embedding。 作者发布了C-Pack,一套显著推进中文嵌入领域的资源包。包括三个重要资源: 1) C-MTEB是一个全面的中文文本嵌入基准,涵盖了6个任务和35个数据集。 2) C-MTP是一个从标记和未标记的中文语料库中选…

均值、方差、标准差

1 中间值和均值 表现&#xff02;中间值&#xff02;的统计名词&#xff1a; a.均值:   mean&#xff0c;数列的算术平均值&#xff0c;反应了数列的集中趋势,等于有效数值的合除以有效数值的个数&#xff0e;b.中位值:  median&#xff0c;等于排序后中间位置的值&#x…

c++11新特性

文章目录 1. C11简介2. 统一的列表初始化2.1 &#xff5b;&#xff5d;初始化2.2 std::initializer_list 3. 声明3.1 auto3.2 decltype3.3 nullptr 4 范围for循环5. STL中一些变化 1. C11简介 2003年&#xff0c;C标准委员会提交了技术勘误表&#xff08;TC1&#xff09;&…

Python的错误和异常处理

一、错误和异常 编程中出现的错误大致可以分为两类&#xff1a;错误和异常。 (一)错误 错误又可以分为两类&#xff1a;语法错误和逻辑错误。 1. 语法错误 语法错误又称解析错误&#xff0c;它是指在编写程序时&#xff0c;程序的语法不符合Python语言的规范&#xff0c;导致…

BI零售数据分析,告别拖延症,及时掌握一线信息

在日常的零售数据分析中&#xff0c;经常会因为数据量太大&#xff0c;分析指标太多且计算组合多变而导致数据分析报表难产&#xff0c;零售运营决策被迫拖延症。随着BI数据可视化分析技术的发展&#xff0c;智能化、可视化、自助分析的BI数据分析逐渐成熟&#xff0c;形成一套…

如何使用navicat图形化工具远程连接MariaDB数据库【cpolar内网穿透】

公网远程连接MariaDB数据库【cpolar内网穿透】 文章目录 公网远程连接MariaDB数据库【cpolar内网穿透】1. 配置MariaDB数据库1.1 安装MariaDB数据库1.2 测试局域网内远程连接 2. 内网穿透2.1 创建隧道映射2.2 测试随机地址公网远程访问3. 配置固定TCP端口地址3.1 保留一个固定的…

Vue项目搭建及使用vue-cli创建项目、创建登录页面、与后台进行交互,以及安装和使用axios、qs和vue-axios

目录 1. 搭建项目 1.1 使用vue-cli创建项目 1.2 通过npm安装element-ui 1.3 导入组件 2 创建登录页面 2.1 创建登录组件 2.2 引入css&#xff08;css.txt&#xff09; 2.3 配置路由 2.5 运行效果 3. 后台交互 3.1 引入axios 3.2 axios/qs/vue-axios安装与使用 3.2…

Webpack常见的插件和模式

文章目录 一、认识插件Plugin1.认识Plugin 二、CleanWebpackPlugin三、HtmlWebpackPlugin1.生成index.html分析2.自定义HTML模板3.自定义模板数据填充 四、DefinePlugin1.DefinePlugin的介绍2.DefinePlugin的使用 五、Mode配置 一、认识插件Plugin 1.认识Plugin Webpack的另一…

【HMS Core】机器学习服务热门问题合集

【关键词】 机器学习服务、文本识别、身份证识别 【问题描述1】 机器学习服务的文本识别能力&#xff0c;是否支持草书等&#xff1f; 【解决方案】 草书是不支持的&#xff0c;目前建议使用较为规范的字体测试。 【问题描述2】 机器学习服务是否支持训练模型&#xff1f;…

Flink on yarn 加载失败plugins失效问题解决

Flink on yarn 加载失败plugins失效问题解决 flink版本&#xff1a;1.13.6 1. 问题 flink 任务运行在yarn集群,plugins加载失效,导致通过扩展资源获取任务参数失效 2. 问题定位 yarn容器的jar包及插件信息,jar包是正常上传 源码定位 加载plugins入口&#xff0c;TaskMana…

Mysql权限控制语句

1.创建用户 create user ky32localhost IDENTIFIED by 123456 create user&#xff1a;创建用户开头 ky32&#xff1a;用户名 localhost 新建的用户可以在哪些主机上登录 即可以使用ip地址&#xff0c;网段主机名 ky32localhost ky32192.168.233.22 ky32192.168.233.0/2…

如何在mac 安装 cocos 的 android环境

基本概念&#xff1a; Java: Java 是一种编程语言&#xff0c;由Sun Microsystems&#xff08;现在是 Oracle Corporation&#xff09;开发。Java 是一种跨平台的语言&#xff0c;可以用于开发各种应用程序&#xff0c;包括 Android 应用程序。Android 应用程序的核心代码通常用…