网络程序 -- TCP版服务器

一 多进程版TCP服务器

1.1 核心功能

  对于之前编写的 字符串回响程序 来说,如果只有一个客户端进行连接并通信,是没有问题的,但如果有多个客户端发起连接请求,并尝试进行通信,服务器是无法应对的

  原因在于 服务器是一个单进程版本,处理连接请求 和 业务处理 是串行化执行的,如果想处理下一个连接请求,需要把当前的业务处理完成。

具体表现为下面这种情况:

 为什么客户端B会显示当前已经连接成功?

  这是因为是客户端是主动发起连接请求的一方,在请求发出后,如果出现连接错误,客户端就认为已经连接成功了,但实际上服务器还没有处理这个连接请求.

  这显然是服务器的问题,处理连接请求业务处理 应该交给两个不同的执行流完成,可以使用多进程或者多线程解决,这里先采用多进程的方案

  所以当前需要实现的网络程序核心功能为:当服务器成功处理连接请求后,fork 新建一个子进程,用于进行业务处理,原来的进程专注于处理连接请求。

1.2 创建子进程

注:当前的版本的修改只涉及 StartServer() 函数

创建子进程使用 fork() 函数,它的返回值含义如下

  • ret == 0 表示创建子进程成功,接下来执行子进程的代码
  • ret > 0 表示创建子进程成功,接下来执行父进程的代码
  • ret < 0 表示创建子进程失败

  子进程创建成功后,会继承父进程的文件描述符表,能轻而易举的获取客户端的 socket 套接字,从而进行网络通信

当然不止文件描述符表,得益于 写时拷贝 机制,子进程还会共享父进程的变量,当发生修改行为时,才会自己创建。

注意: 当子进程取走客户端的 socket 套接字进行通信后,父进程需要将其关闭(因为它不需要了),避免文件描述符泄漏

StartServer() 服务器启动函数 — 位于 server.hppTcpServer

// 进程创建、等待所需要的头文件
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>


 //启动服务器
        void StartServer(){
            // 忽略 SIGCHLD 信号
            //signal(SIGCHLD, SIG_IGN);
            while(!_quit){
                //1 处理连接请求
                struct sockaddr_in client;
                socklen_t len = sizeof(client);
                int sock = accept(_listensock,(struct sockaddr*)&client,&len);

                //2 如果连接失败 继续尝试连接
                if(sock == -1){
                    std::cerr<< "Accept Fail!:"<<strerror(errno)<<std::endl;
                    continue;
                }

                // 连接成功,获取客户端信息
                std::string clientip = inet_ntoa(client.sin_addr);
                uint16_t clientport = ntohs(client.sin_port);
              
                std::cout<<"Server accept"<<clientip + "-"<<clientport<<sock<<" from "<<_listensock << "success!"<<std::endl;

                //3 创建子进程 
                pid_t id=fork();
                if(id<0){
                    // 创建子进程失败,暂时不与当前客户端建立通信会话
                   close(sock);
                   std::cerr<<"Fork Fail!"<<std::endl;
                }
                else if( 0 == id){
                   //进入子进程
                   // 子进程拥有父进程相同的文件描述符,建议把不用的关闭
                 close(_listensock);
                  // 执行业务处理函数
                 //4 这里因为是字节流传递,一般而言我们会自己写一个函数
                 Service(sock,clientip,clientport);
                 exit(0);
                }
                else {
                  // 父进程需要等待子进程
                    pid_t ret = waitpid(id, nullptr, 0); // 默认为阻塞式等待
                    //更改为非阻塞
                    // pid_t ret = waitpid(id,nullptr,WNOHANG);
                    if(ret == id){
                     std::cout << "Wait " << id << " success!";
                    }
                }
            }
        }

  虽然此时成功创建了子进程,但父进程(处理连接请求)仍然需要等待子进程退出后,才能继续运行,而不能和我们想象中一样单独进行处理连接请求函数,说白了就是 父进程现在处于阻塞等待状态,需要设置为 非阻塞等待.

1.3 设置非阻塞状态

设置父进程为非阻塞的方式有很多,这里来一一列举

方式一:通过参数设置为非阻塞等待(不推荐)

可以直接给 waitpid() 函数的参数3传递 WNOHANG,表示当前为 非阻塞等待.

pid_t ret = waitpid(id, nullptr, WNOHANG); // 设置为非阻塞式等待

  这种方法可行,但不推荐,原因如下:虽然设置成了非阻塞式等待,但父进程终究是需要通过 waitpid() 函数来尝试等待子进程,倘若父进程一直卡在 accept() 函数处,会导致子进程退出后暂时无人收尸,进而导致资源泄漏。

方式二:忽略 SIGCHLD 信号(推荐使用)

  这是一个子进程在结束后发出的信号,默认动作是什么都不做;父进程需要检测并回收子进程,我们可以直接忽略该信号,这里的忽略是个特例,只是父进程不对其进行处理,转而由 操作系统 对其负责,自动清理资源并进行回收,不会产生 僵尸进程。

 //启动服务器
        void StartServer(){
            // 忽略 SIGCHLD 信号
            signal(SIGCHLD, SIG_IGN);
            while(!_quit){
                //1 处理连接请求
                struct sockaddr_in client;
                socklen_t len = sizeof(client);
                int sock = accept(_listensock,(struct sockaddr*)&client,&len);

                //2 如果连接失败 继续尝试连接
                if(sock == -1){
                    std::cerr<< "Accept Fail!:"<<strerror(errno)<<std::endl;
                    continue;
                }

                // 连接成功,获取客户端信息
                std::string clientip = inet_ntoa(client.sin_addr);
                uint16_t clientport = ntohs(client.sin_port);
              
                std::cout<<"Server accept"<<clientip + "-"<<clientport<<sock<<" from "<<_listensock << "success!"<<std::endl;

                //3 创建子进程 
                pid_t id=fork();
                if(id<0){
                    // 创建子进程失败,暂时不与当前客户端建立通信会话
                   close(sock);
                   std::cerr<<"Fork Fail!"<<std::endl;
                }
                else if( 0 == id){
                   //进入子进程
                   // 子进程拥有父进程相同的文件描述符,建议把不用的关闭
                 close(_listensock);
                  // 执行业务处理函数
                 //4 这里因为是字节流传递,一般而言我们会自己写一个函数
                 Service(sock,clientip,clientport);
                 exit(0);
                }
                // else {
                //   // 父进程需要等待子进程
                //     //pid_t ret = waitpid(id, nullptr, 0); // 默认为阻塞式等待
                //     //更改为非阻塞
                //      pid_t ret = waitpid(id,nullptr,WNOHANG);
                //     if(ret == id){
                //      std::cout << "Wait " << id << " success!";
                //     }
                // }
            }
        }

强烈推荐使用该方案,因为操作简单,并且没有后患之忧。

方式三:设置 SIGCHLD 信号的处理动作为子进程回收(不是很推荐)

  当子进程退出并发送该信号时,执行父进程回收子进程的操作。

  设置 SIGCHLD 信号的处理动作为 回收子进程后,父进程同样不必再考虑回收子进程的问题

  注意: 因为现在处于 TcpServer 类中,handler() 函数需要设置为静态(避免隐含的 this 指针),避免不符合 signal() 函数中信号处理函数的参数要求。

 // 需要设置为静态
        static void handler(int signo){
            printf("进程 %d 捕捉到了 %d 号信号\n", getpid(), signo);
            // 这里的 -1 表示父进程等待时,只要是已经退出了的子进程,都可以进行回收
            while (1){
                pid_t ret = waitpid(-1, NULL, WNOHANG);
                if (ret > 0)
                    printf("父进程: %d 已经成功回收了 %d 号进程\n", getpid(), ret);
                else
                    break;
            }
            printf("子进程回收成功\n");
        }
        
        //启动服务器
        void StartServer(){
            // 设置 SIGCHLD 信号的处理动作
            signal(SIGCHLD, handler);
            // 忽略 SIGCHLD 信号
            // signal(SIGCHLD, SIG_IGN);
            while(!_quit){
                //1 处理连接请求
                struct sockaddr_in client;
                socklen_t len = sizeof(client);
                int sock = accept(_listensock,(struct sockaddr*)&client,&len);

                //2 如果连接失败 继续尝试连接
                if(sock == -1){
                    std::cerr<< "Accept Fail!:"<<strerror(errno)<<std::endl;
                    continue;
                }

                // 连接成功,获取客户端信息
                std::string clientip = inet_ntoa(client.sin_addr);
                uint16_t clientport = ntohs(client.sin_port);
              
                std::cout<<"Server accept"<<clientip + "-"<<clientport<<sock<<" from "<<_listensock << "success!"<<std::endl;

                //3 创建子进程 
                pid_t id=fork();
                if(id<0){
                    // 创建子进程失败,暂时不与当前客户端建立通信会话
                   close(sock);
                   std::cerr<<"Fork Fail!"<<std::endl;
                }
                else if( 0 == id){
                   //进入子进程
                   // 子进程拥有父进程相同的文件描述符,建议把不用的关闭
                 close(_listensock);
                  // 执行业务处理函数
                 //4 这里因为是字节流传递,一般而言我们会自己写一个函数
                 Service(sock,clientip,clientport);
                 exit(0);
                }
                  else {
                  // 父进程需要等待子进程
                    pid_t ret = waitpid(id, nullptr, 0); // 默认为阻塞式等待
                    //更改为非阻塞
                    // pid_t ret = waitpid(id,nullptr,WNOHANG);
                    if(ret == id){
                     std::cout << "Wait " << id << " success!";
                    }
                }
            }
        }

为什么不是很推荐这种方法?因为这种方法实现起来比较麻烦,不如直接忽略 SIGCHLD 信号

方式四:设置孙子进程(不是很推荐)

  众所周知,父进程只需要对子进程负责,至于孙子进程交给子进程负责,如果某个子进程的父进程终止运行了,那么它就会变成 孤儿进程,父进程会变成 1 号进程,也就是由操作系统领养,回收进程的重担也交给了操作系统

  可以利用该特性,在子进程内部再创建一个子进程(孙子进程),然后子进程退出,父进程可以直接回收(不必阻塞),子进程(孙子进程)的父进程变成 1 号进程

  这种实现方法比较巧妙,而且与我们后面即将学到的 守护进程 有关

  注意: 使用这种方式时,父进程是需要等待子进程退出的。

   这种方法代码也很简单,我们也不再做过多示例,但依旧不推荐,因为倘若连接请求变多,会导致孤儿进程变多,孤儿进程由操作系统接管,数量变多会给操作系统带来负担

  以上就是设置 非阻塞 的四种方式,推荐使用方式二:忽略 SIGCHLD 信号。

  至此我们的 字符串回响程序 可以支持多客户端了。

细节补充:当子进程取走 sock 套接字进行网络通信后,父进程就不需要使用 sock 套接字了,可以将其进行关闭,下次连接时继续使用,避免文件描述符不断增长。

StartServer() 服务器启动函数 — 位于 server.hpp 服务器头文件中的 TcpServer

// 启动服务器
void StartServer()
{
    // 忽略 SIGCHLD 信号
    signal(SIGCHLD, SIG_IGN);

    while (!_quit)
    {
        // 1.处理连接请求
        // ...

        // 2.如果连接失败,继续尝试连接
        // ...

        // 连接成功,获取客户端信息
        // ...

        // 3.创建子进程
        // ...

        close(sock); // 父进程不再需要资源(建议关闭)
    }
}

  这个补丁可以减少资源消耗,建议加上,前面是忘记加了,并且不太好修改,server.hpp 服务器头文件完整代码如下:
 

// server.hpp

#pragma once

#include <signal.h>
#include<iostream>
#include<string>
#include<functional>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include"err.hpp"
#include<cstring>
#include<unistd.h>
#include<cerrno>
#include <sys/types.h>
#include <sys/wait.h>

namespace My_server{

       // 默认端口号
        const uint16_t default_port = 8088;
        //全连接队列的最大长度
        const int backlog = 32;
        using func_t =std::function<std::string(std::string)>;

    class server
    {
    private:
        /* data */
        //套接字
        int _listensock;
        //端口号
        uint16_t _port;
        // 判断服务器是否结束运行
        bool _quit;
        // 外部传入的回调函数
        func_t _func;
    public:

        server(const func_t &func,const uint16_t &port = default_port)
         :_func(func)
         ,_port(port)
         ,_quit(false)
        {}

        ~server(){}

        //初始化服务器
        void InitServer(){
            
            //1 创建套接字
            _listensock = socket(AF_INET,SOCK_STREAM,0);
            if(_listensock == -1){
                //绑定失败
                std::cerr<<"Create Socket Fail:"<<strerror(errno)<<std::endl;
                exit(SOCKET_ERR);
            }
            std::cout<<"Create Socket Success!" <<_listensock<<std::endl;

            //2 绑定端口号和IP地址
            struct sockaddr_in local;
            bzero(&local,sizeof(local));
            
            local.sin_family = AF_INET;
            local.sin_port = htons(_port);
            local.sin_addr.s_addr = INADDR_ANY;

            if(bind(_listensock,(const sockaddr*)&local,sizeof(local))){
                std::cerr << "Bind IP&&Port Fali" << strerror(errno) << std::endl;
                exit(BIND_ERR);
            }

            //3 开始监听
            if(listen(_listensock,backlog)== -1){
                std::cerr<<"Listen Fail: "<< strerror(errno) << std::endl;
                //新增一个报错
                exit(LISTEN_ERR);
            }
             std::cout<<"Listen Success!"<<std::endl;
        }



    // // 需要设置为静态
    //     static void handler(int signo){
    //         printf("进程 %d 捕捉到了 %d 号信号\n", getpid(), signo);
    //         // 这里的 -1 表示父进程等待时,只要是已经退出了的子进程,都可以进行回收
    //         while (1){
    //             pid_t ret = waitpid(-1, NULL, WNOHANG);
    //             if (ret > 0)
    //                 printf("父进程: %d 已经成功回收了 %d 号进程\n", getpid(), ret);
    //             else
    //                 break;
    //         }
    //         printf("子进程回收成功\n");
    //     }
        
        //启动服务器
        void StartServer(){
            // 设置 SIGCHLD 信号的处理动作
            //signal(SIGCHLD, handler);
            // 忽略 SIGCHLD 信号
            signal(SIGCHLD, SIG_IGN);
            while(!_quit){
                //1 处理连接请求
                struct sockaddr_in client;
                socklen_t len = sizeof(client);
                int sock = accept(_listensock,(struct sockaddr*)&client,&len);

                //2 如果连接失败 继续尝试连接
                if(sock == -1){
                    std::cerr<< "Accept Fail!:"<<strerror(errno)<<std::endl;
                    continue;
                }

                // 连接成功,获取客户端信息
                std::string clientip = inet_ntoa(client.sin_addr);
                uint16_t clientport = ntohs(client.sin_port);
              
                std::cout<<"Server accept"<<clientip + "-"<<clientport<<sock<<" from "<<_listensock << "success!"<<std::endl;

                //3 创建子进程 
                pid_t id=fork();
                if(id<0){
                    // 创建子进程失败,暂时不与当前客户端建立通信会话
                   close(sock);
                   std::cerr<<"Fork Fail!"<<std::endl;
                }
                else if( 0 == id){
                   //进入子进程
                   // 子进程拥有父进程相同的文件描述符,建议把不用的关闭
                 close(_listensock);
                  // 执行业务处理函数
                 //4 这里因为是字节流传递,一般而言我们会自己写一个函数
                 Service(sock,clientip,clientport);
                 exit(0);
                }
                // else {
                //   // 父进程需要等待子进程
                //     pid_t ret = waitpid(id, nullptr, 0); // 默认为阻塞式等待
                //     //更改为非阻塞
                //     // pid_t ret = waitpid(id,nullptr,WNOHANG);
                //     if(ret == id){
                //      std::cout << "Wait " << id << " success!";
                //     }
                // }
                
                close(sock); // 父进程不再需要资源(建议关闭)
            }
        }


        void Service(int sock,std::string &clientip,const uint16_t &clientport){

            char buff[1024];
            std::string who = clientip + "-" + std::to_string(clientport);
            while(true){
                // 以字符串格式读取,预留\0的位置
                ssize_t n = read(sock,buff,sizeof(buff)-1);
                if(n>0){
                    //读取成功
                    buff[n]='\0';
                    std::cout<<"Server get: "<< buff <<" from "<<who<<std::endl;
                    //实际处理可以交给上层逻辑指定
                    std::string respond = _func(buff);
                    write(sock,buff,strlen(buff));
                }
                else if(n==0){
                  //表示当前读到了文件末尾,结束读取
                 std::cout<<"Client "<<who<<" "<<sock<<" quit!"<<std::endl;
                 close(sock);
                 break;
               }
                else{
                  // 读取出问题(暂时)
                  std::cerr << "Read Fail!" << strerror(errno) << std::endl;
                  close(sock); // 关闭文件描述符
                   break;
               }    
            }
        }
    };
    
}

二 多线程版服务器

2.1 核心功能

通过多线程,实现支持多客户端同时通信的服务器

核心功能:服务器与客户端成功连接后,创建一个线程,服务于客户端的业务处理

'这里先通过 原生线程库 模拟实现.

2.2 使用原生线程库

  线程的回调函数中需要 Service() 业务处理函数中的所有参数,同时也需要具备访问 Service() 业务处理函数的能力,单凭一个 void* 的参数是无法解决的,为此可以创建一个类,里面可以包含我们所需要的参数。

ThreadData 类 — 位于 server.hpp 服务器头文件中。

   //包含我们所需参数的类型
    class ThreadData{
      public:
         ThreadData(int sock,const std::string&ip,const uint16_t&port,server*ptr)
          :_sock(sock)
          ,_clientip(ip)
          ,_clientport(port)
          ,_current(ptr)
         {}
      public:
        int _sock;
        std::string _clientip;
        uint16_t _clientport;
        server* _current;
    };

接下来就可以考虑如何借助多线程了

线程创建后,需要关闭不必要的 socket 套接字吗?

  • 不需要,线程之间是可以共享这些资源的,无需关闭

如何设置主线程不必等待次线程退出?

  • 可以把次线程进行分离

  所以接下来我们需要在连接成功后,创建次线程,利用已有信息构建 ThreadData 对象,为次线程编写回调函数(最终目的是为了执行 Service() 业务处理函数)

注意: 因为当前在类中,线程的回调函数需要使用 static 设置为静态函数。

server.hpp 服务器头文件

// server.hpp

#pragma once

#include<iostream>
#include<string>
#include<functional>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include"err.hpp"
#include<cstring>
#include<unistd.h>
#include<cerrno>

namespace My_server{

    // 默认端口号
    const uint16_t default_port = 8088;
    //全连接队列的最大长度
    const int backlog = 32;
    using func_t = std::function<std::string(std::string)>;
    
    //前置声明
    class server;
    //包含我们所需参数的类型
    class ThreadData{
      public:
         ThreadData(int sock,const std::string&ip,const uint16_t&port,server*ptr)
          :_sock(sock)
          ,_clientip(ip)
          ,_clientport(port)
          ,_current(ptr)
         {}
      public:
        int _sock;
        std::string _clientip;
        uint16_t _clientport;
        server* _current;
    };

    class server
    {
    private:
        /* data */
        //套接字
        int _listensock;
        //端口号
        uint16_t _port;
        // 判断服务器是否结束运行
        bool _quit;
        // 外部传入的回调函数
        func_t _func;
    public:

        server(const func_t &func,const uint16_t &port = default_port)
         :_func(func)
         ,_port(port)
         ,_quit(false)
        {}

        ~server(){}

        //初始化服务器
        void InitServer(){
            //1 创建套接字
            _listensock = socket(AF_INET,SOCK_STREAM,0);
            if(_listensock == -1){
                //绑定失败
                std::cerr<<"Create Socket Fail:"<<strerror(errno)<<std::endl;
                exit(SOCKET_ERR);
            }
            std::cout<<"Create Socket Success!" <<_listensock<<std::endl;

            //2 绑定端口号和IP地址
            struct sockaddr_in local;
            bzero(&local,sizeof(local));
            
            local.sin_family = AF_INET;
            local.sin_port = htons(_port);
            local.sin_addr.s_addr = INADDR_ANY;

            if(bind(_listensock,(const sockaddr*)&local,sizeof(local))){
                std::cerr << "Bind IP&&Port Fali" << strerror(errno) << std::endl;
                exit(BIND_ERR);
            }

            //3 开始监听
            if(listen(_listensock,backlog)== -1){
                std::cerr<<"Listen Fail: "<< strerror(errno) << std::endl;
                //新增一个报错
                exit(LISTEN_ERR);
            }
             std::cout<<"Listen Success!"<<std::endl;
        }
        //启动服务器
        void StartServer(){

            while(!_quit){
                //1 处理连接请求
                struct sockaddr_in client;
                socklen_t len = sizeof(client);
                int sock = accept(_listensock,(struct sockaddr*)&client,&len);

                //2 如果连接失败 继续尝试连接
                if(sock == -1){
                    std::cerr<< "Accept Fail!:"<<strerror(errno)<<std::endl;
                    continue;
                }

                // 连接成功,获取客户端信息
                std::string clientip = inet_ntoa(client.sin_addr);
                uint16_t clientport = ntohs(client.sin_port);

                std::cout<<"Server accept"<< clientip + "-"<< clientport <<sock<<" from "<<_listensock << "success!"<<std::endl;

                // 3.创建线程及所需要的线程信息类
                ThreadData* td = new ThreadData(sock, clientip, clientport, this);
                pthread_t p;
                pthread_create(&p, nullptr, Routine, td);
            }
        }

        // 线程回调函数
        static void* Routine(void* args){
            // 线程分离
            pthread_detach(pthread_self());

            ThreadData* td = static_cast<ThreadData*>(args);

            // 调用业务处理函数
            td->_current->Service(td->_sock, td->_clientip, td->_clientport);

            // 销毁对象
            delete td;
            return nullptr;
        }

        void Service(int sock,std::string &clientip,const uint16_t &clientport){

            char buff[1024];
            std::string who = clientip + "-" + std::to_string(clientport);
            while(true){
                // 以字符串格式读取,预留\0的位置
                ssize_t n = read(sock,buff,sizeof(buff)-1);
                if(n>0){
                    //读取成功
                    buff[n]='\0';
                    std::cout<<"Server get: "<< buff <<" from "<<who<<std::endl;
                    //实际处理可以交给上层逻辑指定
                    std::string respond = _func(buff);
                    write(sock,buff,strlen(buff));
                }
                else if(n==0){
                  //表示当前读到了文件末尾,结束读取
                 std::cout<<"Client "<<who<<" "<<sock<<" quit!"<<std::endl;
                 close(sock);
                 break;
               }
                else{
                  // 读取出问题(暂时)
                  std::cerr << "Read Fail!" << strerror(errno) << std::endl;
                  close(sock); // 关闭文件描述符
                   break;
               }    
            }
        }
    };
    
}

因为当前使用了 原生线程库,所以在编译时,需要加上 -lpthread

Makefile 文件

.PHONY:all
all:server client

server:server.cc
	g++ -o $@ $^ -std=c++11 -lpthread

	
client:client.cc
	g++ -o $@ $^ -std=c++11 -lpthread

.PHONY:clean
clean:
	rm -rf server client

  使用 原生线程库 过于单薄了,并且这种方式存在问题:连接都准备好了,才创建线程,如果创建线程所需要的资源较多,会拖慢服务器整体连接效率

为此可以改用之前实现的 线程池

三 线程池版服务器

3.1 ThreadPool.hpp 线程池头文件

#pragma once

#include <vector>
#include <string>
#include <memory>
#include <functional>
#include <unistd.h>
#include <pthread.h>
#include "Task.hpp"
#include "Thread.hpp"
#include "BlockingQueue.hpp" // CP模型

namespace My_pool{

  const int THREAD_NUM = 10;
  
    template<class T>
    class ThreadPool
    {
    private:
        ThreadPool(int num = THREAD_NUM)
            :_num(num)
        {}

        ~ThreadPool(){
            // 等待线程退出
            for(auto &t : _threads)
                t.join();
        }

        // 删除拷贝构造
        ThreadPool(const ThreadPool<T> &) = delete;

    public:
        static ThreadPool<T>* getInstance(){
            // 双检查
            if(_inst == nullptr){
                // 加锁
                LockGuard lock(&_mtx);
                if(_inst == nullptr){
                    // 创建对象
                    _inst = new ThreadPool<T>();
                    // 初始化及启动服务
                    _inst->init();
                    _inst->start();
                }
            }

            return _inst;
        }

    public:
        void init(){
            // 创建一批线程
            for(int i = 0; i < _num; i++)
                _threads.push_back(Thread(i, threadRoutine, this));
        }

        void start(){
            // 启动线程
            for(auto &t : _threads)
                t.run();
        }

        // 提供给线程的回调函数(已修改返回类型为 void)
        static void threadRoutine(void *args){
            // 避免等待线程,直接剥离
            pthread_detach(pthread_self());
            auto ptr = static_cast<ThreadPool<T>*>(args);
            while (true){
                // 从CP模型中获取任务
                T task = ptr->popTask();
                task(); // 回调函数
            }
        }


        // 装载任务
        void pushTask(const T& task){
            _blockqueue.Push(task);
        }
    
    protected:
        T popTask(){
            T task;
            _blockqueue.Pop(&task);
            return task;
        }

    private:
        std::vector<Thread> _threads;
        int _num; // 线程数量
        My_Queue::BlockingQueue<T> _blockqueue; // 阻塞队列
        // 创建静态单例对象指针及互斥锁
        static ThreadPool<T> *_inst;
        static pthread_mutex_t _mtx;
    };

    // 初始化指针
    template<class T>
    ThreadPool<T>* ThreadPool<T>::_inst = nullptr;

    // 初始化互斥锁
    template<class T>
    pthread_mutex_t ThreadPool<T>::_mtx = PTHREAD_MUTEX_INITIALIZER;
}

3.2 Thread.hpp 封装实现的线程库头文件

#pragma once 

#include<iostream>
#include<pthread.h>
#include<string>


//代表线程状态
enum class Status{
    NEW = 0,
    RUNNING ,
    EXIT
};

// 参数。返回值为void* 返回值的函数类型
typedef void (*func_t)(void*);

class Thread
{
private:
    pthread_t _tid; // 线程 ID
    std::string _name; // 线程名
    Status _status; // 线程状态
    func_t _func; // 线程回调函数
    void* _args; // 传递给回调函数的参数
public:
    Thread(int num=0,func_t func = nullptr,void *args = nullptr)
     :_tid(num)
     ,_func(func)
     ,_status(Status::NEW)
     ,_args(args)
    {
        char name[1024];
        snprintf(name,sizeof(name),"thread - %d",num);
        _name = name;
    }

    ~Thread(){}

    //获取线程名
    std::string getName() const{
        return _name;
    }

    // 获取状态
    Status getStatus() const{
        return _status;
    }

    // 回调方法
    static void* runHelper(void *args){
        Thread * myThis = static_cast<Thread*>(args);
        myThis->_func(myThis->_args);
        return nullptr;
    }

    //启动线程
    void run(){
        int ret = pthread_create(&_tid,nullptr,runHelper,this);
        if(0 != ret){
            std::cerr << "Thread create fail!"<<std::endl;
            exit(1);
        }
        _status = Status::RUNNING;
    }

    // 线程等待
    void join(){
        int ret = pthread_join(_tid,nullptr);
        if(0 != ret){
            if(0 != ret){
            std::cerr << "Thread join fail!"<<std::endl;
            exit(1);
           }
        }
         _status = Status::EXIT;
    }

};

3.3 BlockingQueue.hpp 生产者消费者模型头文件

#pragma once

#include <queue>
#include <mutex>
#include <pthread.h>
#include "LockGuard.hpp"

namespace My_Queue{
    
 const int DEF_SIZE = 10;
    
    template<class T>
    class BlockingQueue
    {
    private:
       // 任务队列
        std::queue<T> _queue;
        size_t _cap; // 阻塞队列的容量
        pthread_mutex_t _mtx; // 互斥锁
        pthread_cond_t _pro_cond; // 生产者条件变量
        pthread_cond_t _con_cond; // 消费者条件变量
    public:
        BlockingQueue(size_t cap = DEF_SIZE)
          :_cap(cap)
        {
            // 初始化锁与条件变量
            pthread_mutex_init(&_mtx,nullptr);
            pthread_cond_init(&_pro_cond,nullptr);
            pthread_cond_init(&_con_cond,nullptr);
        }

        ~BlockingQueue(){
            //销毁锁与条件变量
            pthread_mutex_destroy(&_mtx);
            pthread_cond_destroy(&_pro_cond);
            pthread_cond_destroy(&_con_cond);
        }

      // 生产数据(入队)
        void Push(const T& inData){
            // 加锁(RAII风格)
            LockGuard lock(&_mtx);
            // 循环判断条件是否满足
            while(IsFull()){
                pthread_cond_wait(&_pro_cond, &_mtx);
            }
            _queue.push(inData);
            // 可以加策略唤醒,比如生产一半才唤醒消费者
            pthread_cond_signal(&_con_cond);
            // 自动解锁
        }

        // 消费数据(出队)
        void Pop(T* outData){
            // 加锁(RAII 风格)
            LockGuard lock(&_mtx);
            // 循环判读条件是否满足
            while(IsEmpty()) {
                pthread_cond_wait(&_con_cond, &_mtx);
            }
            *outData = _queue.front();
            _queue.pop();
            // 可以加策略唤醒,比如消费完后才唤醒生产者
            pthread_cond_signal(&_pro_cond);
            // 自动解锁
        }
        private:
        
        //判断是否为满
        bool IsFull(){
            return _queue.size() == _cap;
        }
        //判断是否为空
        bool IsEmpty(){
            return _queue.empty();
        }
    };

}

3.4 LockGuard.hpp 自动化锁头文件

#pragma once

#include<pthread.h>

class LockGuard
{
private:
    pthread_mutex_t* _pmtx;
public:
    LockGuard(pthread_mutex_t *pmtx)
    :_pmtx(pmtx)
    {
       //加锁
       pthread_mutex_lock(_pmtx);
    }
    ~LockGuard(){
        //解锁
        pthread_mutex_unlock(_pmtx);
    }
};

 3.5 Task.hpp 任务类

  现在需要修改 Task.hpp 任务头文件中的 Task 任务类,将其修改为一个服务于 网络通信中业务处理 的任务类(也就是 Service() 业务处理函数)

   在 Service() 业务处理函数中,需要包含 socket 套接字、客户端 IP、客户端端口号 等必备信息,除此之外,我们还可以将 可调用对象(Service() 业务处理函数) 作为参数传递给 Task 对象.

#pragma once

#include <string>
#include <functional>

namespace My_task{

    // Service() 业务处理函数的类型
    using cb_t = std::function<void(int, std::string, uint16_t)>;

    class Task{
    public:
        // 可以再提供一个默认构造(防止部分场景中构建对象失败)
        Task()
        {}

        Task(int sock, const std::string& ip, const uint16_t& port, const cb_t& cb)
            :_sock(sock)
            ,_ip(ip)
            ,_port(port)
            ,_cb(cb)
        {}

        // 重载运算操作,用于回调 [业务处理函数]
        void operator()(){
            // 直接回调 cb [业务处理函数] 即可
            _cb(_sock, _ip, _port);
        }

    private:
        int _sock;
        std::string _ip;
        uint16_t _port;
        cb_t _cb; // 回调函数
    };
}

3.6 server.hpp 头文件

准备工作完成后,接下来就是往 server.hpp 服务器头文件中添加组件了

注意:

  • 在构建 Task 对象时,需要使用 bind 绑定类内函数,避免参数不匹配
  • 当前的线程池是单例模式,在 Task 任务对象构建后,通过线程池操作句柄 push 对象即可
// server.hpp

#pragma once

#include<iostream>
#include<string>
#include<functional>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include"err.hpp"
#include<cstring>
#include<unistd.h>
#include<cerrno>
#include"ThreadPool.hpp"
#include"Task.hpp"

namespace My_server{

    // 默认端口号
    const uint16_t default_port = 1111;
    //全连接队列的最大长度
    const int backlog = 32;
    using func_t = std::function<std::string(std::string)>;
    
    //前置声明
    class server;
    //包含我们所需参数的类型
    class ThreadData{
      public:
         ThreadData(int sock,const std::string&ip,const uint16_t&port,server*ptr)
          :_sock(sock)
          ,_clientip(ip)
          ,_clientport(port)
          ,_current(ptr)
         {}
      public:
        int _sock;
        std::string _clientip;
        uint16_t _clientport;
        server* _current;
    };

    class server
    {
    private:
        /* data */
        //套接字
        int _listensock;
        //端口号
        uint16_t _port;
        // 判断服务器是否结束运行
        bool _quit;
        // 外部传入的回调函数
        func_t _func;
    public:

        server(const func_t &func,const uint16_t &port = default_port)
         :_func(func)
         ,_port(port)
         ,_quit(false)
        {}

        ~server(){}

        //初始化服务器
        void InitServer(){
            //1 创建套接字
            _listensock = socket(AF_INET,SOCK_STREAM,0);
            if(_listensock == -1){
                //绑定失败
                std::cerr<<"Create Socket Fail:"<<strerror(errno)<<std::endl;
                exit(SOCKET_ERR);
            }
            std::cout<<"Create Socket Success!" <<_listensock<<std::endl;

            //2 绑定端口号和IP地址
            struct sockaddr_in local;
            bzero(&local,sizeof(local));
            
            local.sin_family = AF_INET;
            local.sin_port = htons(_port);
            local.sin_addr.s_addr = INADDR_ANY;

            if(bind(_listensock,(const sockaddr*)&local,sizeof(local))){
                std::cerr << "Bind IP&&Port Fali" << strerror(errno) << std::endl;
                exit(BIND_ERR);
            }

            //3 开始监听
            if(listen(_listensock,backlog)== -1){
                std::cerr<<"Listen Fail: "<< strerror(errno) << std::endl;
                //新增一个报错
                exit(LISTEN_ERR);
            }
             std::cout<<"Listen Success!"<<std::endl;
        }
        //启动服务器
        void StartServer(){

            while(!_quit){
                //1 处理连接请求
                struct sockaddr_in client;
                socklen_t len = sizeof(client);
                int sock = accept(_listensock,(struct sockaddr*)&client,&len);

                //2 如果连接失败 继续尝试连接
                if(sock == -1){
                    std::cerr<< "Accept Fail!:"<<strerror(errno)<<std::endl;
                    continue;
                }

                // 连接成功,获取客户端信息
                std::string clientip = inet_ntoa(client.sin_addr);
                uint16_t clientport = ntohs(client.sin_port);

                std::cout<<"Server accept"<< clientip + "-"<< clientport <<sock<<" from "<<_listensock << "success!"<<std::endl;

                 // 3.构建任务对象 注意:使用 bind 绑定 this 指针
                My_task::Task t(sock, clientip, clientport, std::bind(&server::Service, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

                // 4.通过线程池操作句柄,将任务对象 push 进线程池中处理
               //s
               //std::cout<<std::endl<<"push Task"<<std::endl;
                My_pool::ThreadPool<My_task::Task>::getInstance()->pushTask(t);
            }
        }

    
        void Service(int sock,const std::string &clientip,const uint16_t &clientport){

            char buff[1024];
            std::string who = clientip + "-" + std::to_string(clientport);
            while(true){
                // 以字符串格式读取,预留\0的位置
                ssize_t n = read(sock,buff,sizeof(buff)-1);
                if(n>0){
                    //读取成功
                    buff[n]='\0';
                    std::cout<<"Server get: "<< buff <<" from "<<who<<std::endl;
                    //实际处理可以交给上层逻辑指定
                    std::string respond = _func(buff);
                    write(sock,buff,strlen(buff));
                }
                else if(n==0){
                  //表示当前读到了文件末尾,结束读取
                 std::cout<<"Client "<<who<<" "<<sock<<" quit!"<<std::endl;
                 close(sock);
                 break;
               }
                else{
                  // 读取出问题(暂时)
                  std::cerr << "Read Fail!" << strerror(errno) << std::endl;
                  close(sock); // 关闭文件描述符
                   break;
               }    
            }
        }
    };
    
}

接下来编译并运行程序,当服务器启动后(此时无客户端连接),只有一个线程,这是因为我们当前的 线程池 是基于 懒汉模式 实现的,只有当第一次使用时,才会创建线程.

接下来启动客户端,可以看到确实创建了一批次线程(十个)

  看似程序已经很完善了,其实隐含着一个大问题:当前线程池中的线程,本质上是在回调一个 while(true) 死循环函数,当连接的客户端大于线程池中的最大线程数时,会导致所有线程始终处于满负载状态,直接影响就是连接成功后,无法再创建通信会话(倘若客户端不断开连接,线程池中的线程就无力处理其他客户端的会话)

  说白了就是 线程池 比较适合用于处理短任务,对于当前的场景来说,线程池 不适合建立持久通信会话,应该将其用于处理 read 读取、write 写入 任务.

  如果想解决这个问题,有两个方向:Service() 函数中支持一次 [收 / 发],或者多线程+线程池,多线程用于构建通信会话,线程池则用于处理 [收 / 发] 任务

前者实现起来比较简单,无非就是把 Service() 业务处理函数中的 while(true) 循环去掉

Service() 业务处理函数


        void Service(int sock,const std::string &clientip,const uint16_t &clientport){

            char buff[1024];
            std::string who = clientip + "-" + std::to_string(clientport);
           
                // 以字符串格式读取,预留\0的位置
                ssize_t n = read(sock,buff,sizeof(buff)-1);
                if(n>0){
                    //读取成功
                    buff[n]='\0';
                    std::cout<<"Server get: "<< buff <<" from "<<who<<std::endl;
                    //实际处理可以交给上层逻辑指定
                    std::string respond = _func(buff);
                    write(sock,buff,strlen(buff));
                }
                else if(n==0){
                  //表示当前读到了文件末尾,结束读取
                 std::cout<<"Client "<<who<<" "<<sock<<" quit!"<<std::endl;
                 close(sock);
               }
                else{
                  // 读取出问题(暂时)
                  std::cerr << "Read Fail!" << strerror(errno) << std::endl;
                  close(sock); // 关闭文件描述符
               }    
            
        }

至于后者就比较麻烦了,需要结合 高级IO 相关知识,这里不再阐述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/578634.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据结构 - 队列 [动画+代码注释超详解],萌新轻松上手!!!

一. 队列的概念 队列是一种特殊的线性表&#xff0c;用于存储元素&#xff0c;并且按照先进先出(First In First Out)的顺序进行管理&#xff0c;这意味着最先加入队列的元素将会是最先从队列中被移除的元素 队列的原型&#xff1a;只允许在一端进行插入数据的操作&#xff0c…

【嵌入式AI开发】轻量级卷积神经网络MobileNetV2详解

前言:MobileNetV2网络先升维后降维,在降维时使用线性激活函数,带残差的Inverted bottleck模块,防止ReLU信息丢失。在图像分类、目标检测、语义分割等任务上实现了网络轻量化、速度和准确度的权衡。 回顾MobileNetV1的理论和MobileNetV2项目实战可查阅如下链接: 【嵌入式AI…

使用SDRPI运行openwifi和设置网口

目录 一 制作启动盘 二 使用串口的方式启动openwifi 三 无线连接 四 网口设置&#xff0c;有线连接 五 使用SSH登录 一 制作启动盘 在github上下载img文件&#xff0c;由于github上下载速度比较慢&#xff0c;我会上传网盘链接 githun下载img文件地址: https://git…

OS对软件的管理,进程,PCB、子进程

进程 可执行程序加载到内存中&#xff0c;操作系统为内个程序都形成一个PCB对象&#xff08;结构体对象&#xff09;&#xff0c;PCB里存放着这个程序的所有的属性。进程可执行程序PCB &#xff0c;CPU执行程序也是先通过该程序的PCB找到相应的程序代码&#xff0c;然后一条一…

SpringCloud之Hystrix

Hystrix理解 熔断器本身是一种开关装置&#xff0c;用于在电路上保护线路过载。当线路中有电器发生短路时&#xff0c;熔断器能够及时切断故障电路&#xff0c;防止发生过载、发热甚至起火等严重后果。这种保护机制被借鉴到分布式系统的设计中&#xff0c;形成了类似Hystrix中…

基于SpringBoot+Vue校园二手交易系统的设计与实现

系统介绍 自从新冠疫情爆发以来&#xff0c;各个线下实体越来越难做&#xff0c;线下购物的人也越来越少&#xff0c;随之带来的是一些不必要的浪费&#xff0c;尤其是即将毕业的大学生&#xff0c;各种用品不方便携带走导致被遗弃&#xff0c;造成大量的浪费。本系统目的就是让…

安装好fedora_kde系统后的操作

文章目录 1 前言2 办公软件2.1 输入法2.1.1 安装 fcitx52.1.2 安装 fcitx5-rime2.1.3 安装 東風破2.1.4 使用 東風破 安装 郭斌勇 大神的 新世纪五笔 项目2.1.5 配置 fcitx5-rime2.1.6 重新部署 3 感谢阅读~ 1 前言 本文用的是 fedora 40 kde plasma 6。 因为有很多的软件都同时…

2024全国大学生高新技术竞赛——算法智星挑战赛(A~J)

好多都是之前的原题&#xff0c;甚至有上次第二届全国大学生信息技术认证挑战赛的原题&#xff0c;刚打完又来一遍&#xff0c;没绷住。 A. 手机 原题之一&#xff0c;具体出处忘了 最无脑的方法直接用map记录每个按下的值就行了&#xff0c;代码仅供参考。 #include <bit…

MATLAB矩阵

MATLAB 矩阵 矩阵是数字的二维数组。 在MATLAB中&#xff0c;您可以通过在每行中以逗号或空格分隔的数字输入元素并使用分号标记每行的结尾来创建矩阵。 例如&#xff0c;让我们创建一个45矩阵一- 示例 a [ 1 2 3 4 5; 2 3 4 5 6; 3 4 5 6 7; 4 5 6 7 8] MATLAB将执行上述语…

pycharm使用ssh连接服务器

1、具体流程 打开pycharm – File – Setting 输入服务器的IP地址&#xff0c;端口号、登录账号名 输入登陆账号的密码 下一步 一些初级设置 2、一些需要注意的小问题 2.1 更改代码地址 2.2 本地代码上传到服务器 首先在需要上传文件右键 2.3 在服务器的环境中上新安装库&am…

Docker 简单使用及安装常用软件

一、Docker 安装、配置与卸载 1.1、Docker 安装 # 1.安装gcc环境 yum -y install gcc gcc-c && \# 2. 卸载docker旧版本&#xff08;可能之前有安装&#xff09; yum -y remove docker docker-common docker-selinux docker-engine && \# 3. 安装依赖的软件包…

【项目学习01_2024.04.27_Day01】

学习笔记 项目学习链接第2章 内容管理模块v3.11 模块需求分析1.1 什么是需求分析1.2 模块介绍1.3 业务流程1.4 界面原型 2 创建模块工程2.1 模块工程结构父工程和子工程之间的继承关系以及工程与工程之间的依赖关系&#xff0c;通俗理解&#xff1a;2.2 创建模块工程\pom\含义及…

go 安装软件报go.mod file not found

执行 go get -u github.com/go-sql-driver/mysql 下载mysql 报错 解决方法: 控制台&#xff1a;输入go env 返回如下&#xff1a; 红圈值为NUL&#xff0c;需要设置GOMOD的值, 然后再控制台执行 &#xff08;1&#xff09;mkdir mod (2)go mod init mod 然后再执行下载&…

AIGC——什么是人工智能生成内容

人工智能生成内容&#xff08;AIGC&#xff09;是当今数字时代的一个引人注目的前沿技术&#xff0c;它借助深度学习和自然语言处理等技术&#xff0c;使计算机系统具备了生成高质量文本、图像、音频等多媒体内容的能力。AIGC的出现不仅推动了信息技术的发展&#xff0c;也在多…

【匹配】匈牙利匹配算法

every blog every motto: You can do more than you think. https://blog.csdn.net/weixin_39190382?typeblog 0. 前言 匈牙利匹配算法 1. 正文 1.1 基础概念 二分图 顶点分为两个集合&#xff0c;集合间顶点相连&#xff0c;集合内点不相连 匹配 一个匹配就是一个边的…

Linux基础——Linux开发工具(make/makefile,git)

前言&#xff1a;在经过前面两篇学习&#xff0c;大家对Linux开发工具都有一定的了解&#xff0c;而在此之前最重要的两个工具就是vim&#xff0c;gcc。 如果对这两个工具不太了解&#xff0c;可以先阅读这两篇文章&#xff1a; Linux开发工具 (vim) Linux开发工具 (gcc/g) 首先…

汇智知了堂携手西华大学共探鸿蒙生态发展之路

近日&#xff0c;汇智知了堂有幸走进美丽的西华大学&#xff0c;为师生们带来了一场别开生面的鸿蒙专场讲座。本次讲座旨在深入解析鸿蒙生态的发展前景&#xff0c;增进同学们对鸿蒙系统的认识&#xff0c;同时展示汇智知了堂在产教融合领域的专业实力。 在讲座现场&#xff…

app渗透测试

1.夜神模拟器搭建流程 直接自定义安装 就可以了 如果是androd7本 修改为低于7版本的 调整夜神版本 2.burp设置代理 可以自己指定电脑ip windows cmd ifconfig 设置-添加-指定地址端口 然后导出证书或者在夜神模拟器使用指定的ip加端口访问下载 3.安装证书 如果是导出的…

(学习日记)2024.05.03:UCOSIII第五十七节:User文件夹函数概览(uCOS-III->Source文件夹)第三部分

写在前面&#xff1a; 由于时间的不足与学习的碎片化&#xff0c;写博客变得有些奢侈。 但是对于记录学习&#xff08;忘了以后能快速复习&#xff09;的渴望一天天变得强烈。 既然如此 不如以天为单位&#xff0c;以时间为顺序&#xff0c;仅仅将博客当做一个知识学习的目录&a…

Pytorch 计算深度模型的大小

计算模型大小的方法 卷积 时间复杂度 与 空间复杂度 的计算方式&#xff1a; C 通道的个数&#xff0c;K卷积核大小&#xff0c;M特征图大小&#xff0c;C_l-1是输入通道的个数&#xff0c;C_l是输出通道的个数 1 模型大小 MB 计算模型的大小的原理就是计算保存模型所需要…