liunx网络套接字 | 实现基于tcp协议的echo服务

        前言:本节讲述linux网络下的tcp协议套接字相关内容。博主以实现tcp服务为主线,穿插一些小知识点。以先粗略实现,后精雕细琢为思路讲述实现服务的过程。下面开始我们的学习吧。

        ps:本节内容建议了解网络端口号的友友们观看哦。

目录

实现内容

线程池版本整体代码

准备文件

makefile

tcpserver.hpp

main.cc

tcpclient

version1运行结果

version2版本

version3版本

version4版本


实现内容

        本篇内容将要实现一个服务端, 一个客户端。 然后客户端用来链接服务端, 向服务端发送消息, 然后服务端能够接收到消息并将消息返回给客户端。 

        实现的版本有四个:

  •         version1:实现单执行流的客户服务echo服务, 就是服务端只为一个服务端进行服务。 
  •         version2:在version1的版本上, 添加进程, 实现多进程的客户服务echo服务, 就是服务端为多个客户端进行服务, 但是因为是多进程,所以开销大。
  •         version3:改进version2版本, 将多进程改成多线程。实现多线程的echo服务。 但是当用户很多的时候, 线程量太大, 无法控制。
  •         version4:终极版本, 改进version3, 以线程池为基础, 实现可控的多线程echo服务。 控制线程个数, 既保证了并发性, 又防止了用户太多,线程爆满的问题。

        博主先实现version1, 然后在version1的基础上进行改版。下面开始实现: 

线程池版本整体代码

tcpserver


#pragma once
#include "Log.hpp"
#include <iostream>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "ThreadPool.h"
#include "Task.h"
#include <sys/wait.h>
#include <unistd.h>
using namespace std;

const int defaultfd = -1;
const int defaultport = 8080;
const string defaultip = "0.0.0.0";
const int backlog = 10; // 直接用,一般不要设置的太大。


class TcpServer;  //声明

class ThreadData
{
public:
    ThreadData(int fd, string ip, uint16_t port, TcpServer* const t)
        : sockfd_(fd), clientip_(ip), clientport_(port), t_(t)
    {
    }

public:
    int sockfd_;
    string clientip_;
    uint16_t clientport_;
public:
    TcpServer* const t_;
};



Log lg;

enum
{
    SockError = 2,
    BindError,
    ListenError
};

class TcpServer
{
public:
    TcpServer(int port = defaultport, string ip = defaultip, int sockfd = defaultfd)
        : listensockfd_(sockfd), ip_(ip), port_(port)
    {
    }

    void InitServer()
    {
        listensockfd_ = socket(AF_INET, SOCK_STREAM, 0);
        if (listensockfd_ < 0)
        {
            lg(Fatal, "create socket, errno: %d, strerror: %s", errno, strerror(errno));
            exit(SockError);
        }
        //
        lg(Info, "create socket success, sockfd: %d", listensockfd_);

        struct sockaddr_in local;
        memset(&local, 0, sizeof(local));

        local.sin_family = AF_INET;
        local.sin_port = htons(port_);
        inet_aton(ip_.c_str(), &(local.sin_addr)); // 主机序列转网络学列。 inet_aton是一个线程安全的函数。

        // 绑定
        if (bind(listensockfd_, (sockaddr *)&local, sizeof(local)) < 0)
        {
            lg(Fatal, "bind error, errno: %d, strerror: %s", errno, strerror(errno));
            exit(BindError);
        }
        // tcp面向字节流, 是被动的, 所以要将对应的socket设置为监听状态。
        if (listen(listensockfd_, backlog) < 0) // backlock表示的是底层全连接队列的长度。 这个参数对意思, 不做解释。
        {
            lg(Fatal, "Listen error, errno: %d, strerror: %s", errno, strerror(errno));
            exit(ListenError);
        }
        lg(Info, "Listen has success");
    }



    void Start()
    {
        ThreadPool<Task>::GetInstance()->Start();
        lg(Info, "tcpServer is running...");
        for (;;) // tcp协议也是一种一直处于运行的服务
        {
            // tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置
            // 1、获取新连接,
            struct sockaddr_in client; // 获取的是客户端的addr
            socklen_t len = sizeof(client);

            int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。
            if (sockfd < 0)                                                // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。
            {
                lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));
                continue;
            }
            uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列
            char clientip[32];
            inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));
            // 2、根据新连接进行通信
            lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);

  

            //version--4线程池版本
            Task task_(sockfd, clientip, clientport);
            ThreadPool<Task>::GetInstance()->Push(task_);
        }
    }

    ~TcpServer()
    {
    }

private:
    int listensockfd_; // 监听套接字, 只用来升起服务器, 接收链接

    uint16_t port_;
    string ip_;
};

 main.cc

#include"tcpserver.hpp"
#include<memory>

int main(int argc, char* argv[])
{
    if (argc != 2)
    {
        cout << "has return" << endl;
        return 1;
    }
    //
    uint16_t port = stoi(argv[1]);

    unique_ptr<TcpServer> tcpsvr(new TcpServer(port));
    tcpsvr->InitServer();
    tcpsvr->Start();
    
    return 0;
}

 tcpclient

#include<iostream>
using namespace std;
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<string.h>
#include<netinet/in.h>

int main(int argc, char* argv[])
{    
    //处理argc, argv[]
    if (argc != 3)
    {
        cout << "has return " << endl;
        return 1;
    }
    //
    uint16_t serverport = stoi(argv[2]);
    string serverip = argv[1];
    
    //创建addr结构体, 设置端口号ip地址
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
    {
        cout << "socket error" << endl;
        return 1;
    }
    sockaddr_in server;
    memset(&server, 0, sizeof(server));
    server.sin_family = AF_INET;
    server.sin_port = htons(serverport);
    inet_pton(AF_INET, serverip.c_str(), &(server.sin_addr));

    //1、客户端要绑定端口号, 但是不需要显示的绑定, 而是系统进行随机端口的绑定。 
    int n = connect(sockfd,(sockaddr*)&server, sizeof(server));
    if (n < 0) 
    {
        cerr << "connect error..." << endl;
        return 2;
    }


    //2、发送信息, 接收信息。
    string message;
    while (true)
    {
        cout << "Please Enter# ";
        getline(cin, message);

        //
        write(sockfd, message.c_str(), message.size());

        char inbuffer[4096];
        int n = read(sockfd, inbuffer, sizeof(inbuffer) - 1);
        if (n > 0)
        {
            inbuffer[n] = 0;
            cout << inbuffer << endl;
        }
    }
    
    return 0;
}

ThreadPool


#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<string>
using namespace std;
#include<queue>
#include<ctime>
#include<unistd.h>

//对线程的属性做一下封装, 有利于线程池的保存以及后面的处理
struct ThreadInfo
{
    pthread_t tid_;
    string name_;   
};


template<class T>
class ThreadPool
{
    static const int defaultnum = 5;  //默认的线程池的大小(线程池的大小就是里面包含的线程的数量)

private:
    //加锁解锁
    void Lock()
    {
        pthread_mutex_lock(&mutex_);
    }

    void Unlock()
    {
        pthread_mutex_unlock(&mutex_);
    }

    //唤醒线程, 线程是可以被挂起的(就比如信号量)。 当任务没有的时候,线程就要被挂起, 有任务后再唤醒
    void Wakeup()
    {
        pthread_cond_signal(&cond_);
    }

    void ThreadSleep()
    {
        pthread_cond_wait(&cond_, &mutex_);
    }

    bool IsQueueEmpty()
    {
        return tasks_.empty();
    }



public:

    //线程要执行的函数
    static void* HandlerTask(void* args)
    {
        ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);
        while(true) 
        {
            tp->Lock();
            while (tp->tasks_.empty())
            {
                tp->ThreadSleep(); //如果队列里面没有任务了, 就让线程去休眠。
            }
            //否则就去拿到tasks里面的任务
            T t = tp->tasks_.front();
            tp->tasks_.pop();
            
            //
            tp->Unlock();
            t();  //每一个线程先对任务进行消费, 消费完成之后处理任务。    

        }
    }

    //运行这个线程池, 也就是先将线程创建出来。 然后去运行线程
    void Start()
    {
        int num = threads_.size();
        for (int i = 0; i < num; i++)
        {
            threads_[i].name_ = "thread-";
            threads_[i].name_ += to_string(i);   
            pthread_create(&(threads_[i].tid_), nullptr, HandlerTask, this);
        }
    }

    //主线程给线程池发送任务, 注意, 这个任务一定是可以被储存起来的。 因为当我们的任务很多很多的时候, 我们的线程池内的线程要一个一个地对这些任务进行处理
    void Push(const T& t)
    {
        Lock();
        tasks_.push(t);
        Wakeup();

        Unlock();
    }

    //获取单例
    //改编成单例的步骤里面只有这里要说一下, 就是为什么我们要套双层判断。 其实这里的最外面的一层的判断是我们另外加上去的。 为什么
    //要加这个判断呢? 就是如果我们不加最外层这一层判断。 那么每一个线程获取单例都要申请所,加锁。 不就是相当于所有的线程都在串行执行? 这就有效率问题。 
    //解决方案就是这个再加一层判断。 这样假如有四个线程。 那么一开始四个线程都在判断, 那么它们四个线程都进入了if里面。 然后就都申请锁, 但是只有第一个线程能够
    //进入第二层里面, 其他的进入不了。 那么当这一轮的四个线程都申请一次锁候就都退出了函数, 然后就都去做自己的事情了。 问题是, 当下次它们再来申请单例对象的时候它们连
    //第一层判断都成功不了了, 也就都不用加锁解锁了, 这就大大提高了效率!!!
    static ThreadPool<T>* GetInstance(int num = defaultnum)
    {
        if (tp_ == nullptr)
        {
            pthread_mutex_lock(&tp_->lock_);
            
            if (tp_ == nullptr) 
            {
                tp_ = new ThreadPool<T>(num);
            }

            pthread_mutex_unlock(&tp_->lock_);
        }
        
        return tp_;
    }



private:
    //构造函数私有化, 只有Getinstance里面才能创建。 
    ThreadPool(int num = defaultnum)
        :threads_(num)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&cond_, nullptr);
    }

    //单例模式只有一个对象, 所以要将拷贝构造和拷贝赋值封住, 为了防止有人在外部重新拷贝一个对象。 
    ThreadPool(const ThreadPool<T>& tp) = delete;
    const ThreadPool<T>& operator=(const ThreadPool<T>& tp) = delete;
    ~ThreadPool()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&cond_);
    }



private:
    vector<ThreadInfo> threads_;   //线程都维护在vector当中, 这个就是线程池里面的线程的个数,
    queue<T> tasks_ ;              //向线程池中发送任务, 这个队列里面保存的就是我们的任务的数目。 

    pthread_mutex_t mutex_;        //锁,用来生产者线程(本份代码只是主线程)给线程池发送任务时候加锁使用以及消费者线程抢夺任务时加锁使用 

    pthread_cond_t cond_;          //条件变量, 用来没有任务的时候,消费者要挂起。 

    

    static pthread_mutex_t lock_;         //锁, 这个锁是为了在获取单例的时候能够让线程原子性的访问if (tp_ == nullptr)。
    static ThreadPool<T>* tp_;    //tp指针, 这就是唯一个单例对象。 

};

template<class T>
ThreadPool<T>* ThreadPool<T>::tp_ = nullptr;

template<class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;

Task

#include <iostream>
using namespace std;
#include"Log.hpp"
#include <vector>
#include <string>
extern Log lg;

// Task.h文件里面包含了任务类, 这个是我们线程池要执行的任务
class Task
{
public:
    // 构造函数, 第一个参数data1, 第二个参数data2, 第三个参数是加减乘除的符号。 这个任务就是进行四则运算
    Task(int sockfd, string clientip, uint16_t clientport)
        :sockfd_(sockfd), clientip_(clientip), clientport_(clientport)
    {
    }

    ~Task() {}

    // 执行任务的接口run(), 这个方法对三个变量进行判断, 然后进行运算。
    void run()
    {
        char buffer[4096];
        string temp = clientip_;
        while (true)
        {
            ssize_t n = read(sockfd_, buffer, sizeof(buffer));
            if (n > 0)
            {
                buffer[n] = 0;
                cout << "client say#: " << buffer << endl;
                string echo_string("tcpserver echo#  " + (string)buffer);

                write(sockfd_, echo_string.c_str(), echo_string.size());
            }
            else if (n == 0)
            {
                lg(Info, "quit sockfd:%d ", sockfd_);
                exit(1);
            }
            else
            {
                lg(Waring, "Waring, sockfd:%d, clientport: %d, clientip: %s", sockfd_, clientport_, temp.c_str());
            }
            //
        }
    }

    // 仿函数, 为了方便我们的对象能够像函数一样使用。
    void operator()()
    {
        run();
    }

private:
    // 每一个任务对象里面都有三个参数, 一个data1, 一个data2, 最后一个op_
    int sockfd_;
    string clientip_;
    uint16_t clientport_;
};
    

 

准备文件

先准备好文件。 tcpserver.hpp实现服务的接口, main.cc运行服务端。 然后tcpclient.cc运行客户端。 

 

makefile

makefile不解释, 直接上代码(这里加上-g是为了后续方便调试, 也可以不加)

.PHONY:all
all: tcpserver.exe tcpclient.exe

tcpserver.exe:main.cc
	g++ -o $@ $^ -std=c++11 -lpthread -g
tcpclient.exe:tcpclient.cc
	g++ -o $@ $^ -std=c++11 -lpthread -g

.PHONY:clean
clean:
	rm -rf tcpserver.exe tcpclient.exe

 tcpserver.hpp

先来看框架

#pragma once
#include "Log.hpp"
#include <iostream>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include"ThreadPool.h"
#include"Task.h"
#include <sys/wait.h>
#include <unistd.h>
using namespace std;

const int defaultfd = -1;
const int defaultport = 8080;
const string defaultip = "0.0.0.0";
const int backlog = 10; // 直接用,一般不要设置的太大。

Log lg;

enum
{
    SockError = 2,
    BindError,
    ListenError
};

class TcpServer
{
public:
    //构造函数, 将服务端的端口号, ip地址传过来
    TcpServer(int port = defaultport, string ip = defaultip, int sockfd = defaultfd)
        : listensockfd_(sockfd), ip_(ip), port_(port)
    {}

    //初始化服务端,分两步:绑定和监听
    void InitServer()
    {
        //绑定
        
        //监听
    }

    
    //运行服务端
    void Start()
    {

    }
    

    //执行相应的服务
    void Service(int sockfd, const string &clientip, uint16_t clientport)
    {

    } 
    
    //析构函数
    ~TcpServer()
    {}

private:
    int listensockfd_; // 监听套接字, 只用来升起服务器, 接收链接

    uint16_t port_;
    string ip_;
};

然后初始化就是创建sockaddr结构体, 创建套接字, 然后绑定。 因为tcp是面向字节流的。 所以还要对网卡进行监听。下面是初始化服务。


    //对服务端进行初始化
    void InitServer()
    {
        listensockfd_ = socket(AF_INET, SOCK_STREAM, 0);
        if (listensockfd_ < 0)
        {
            lg(Fatal, "create socket, errno: %d, strerror: %s", errno, strerror(errno));
            exit(SockError);
        }
        //
        lg(Info, "create socket success, sockfd: %d", listensockfd_);

        struct sockaddr_in local;
        memset(&local, 0, sizeof(local));

        local.sin_family = AF_INET;
        local.sin_port = htons(port_);
        inet_aton(ip_.c_str(), &(local.sin_addr)); // 主机序列转网络学列。 inet_aton是一个线程安全的函数。

        // 绑定
        if (bind(listensockfd_, (sockaddr *)&local, sizeof(local)) < 0)
        {
            lg(Fatal, "bind error, errno: %d, strerror: %s", errno, strerror(errno));
            exit(BindError);
        }
        // tcp面向字节流, 是被动的, 所以要将对应的socket设置为监听状态。
        if (listen(listensockfd_, backlog) < 0) // backlock表示的是底层全连接队列的长度。 这个参数对意思, 不做解释。
        {
            lg(Fatal, "Listen error, errno: %d, strerror: %s", errno, strerror(errno));
            exit(ListenError);
        }
        lg(Info, "Listen has success");
    }

        然后是运行服务, 运行服务也是分两步: accept与客户端建立连接。 然后执行服务。

    void Start()
    {

        lg(Info, "tcpServer is running...");
        for (;;) // tcp协议也是一种一直处于运行的服务
        {
            // tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置
            // 1、获取新连接,
            struct sockaddr_in client; // 获取的是客户端的addr
            socklen_t len = sizeof(client);

            int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。
            if (sockfd < 0)                                                // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。
            {
                lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));
                continue;
            }
            uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列
            char clientip[32];
            inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));
            // 2、根据新连接进行通信
            lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);

            //version--1 单进程版
            Service(sockfd, clientip, clientport);
            close(sockfd);

        }
    }

        然后执行的服务是echo服务, 就是先接受客户端发来的信息, 然后将信息加工一下发回去。


    void Service(int sockfd, const string &clientip, uint16_t clientport)
    {
        char buffer[4096];
        while (true)
        {
            ssize_t n = read(sockfd, buffer, sizeof(buffer) - 1);
            if (n > 0)
            {
                buffer[n] = 0;
                cout << "client say#: " << buffer << endl;
                string message("tcpserver echo#  " + string(buffer));
                //
                write(sockfd, message.c_str(), message.size());
            }
            else if (n == 0)
            {
                lg(Info, "quit sockfd:%d ", sockfd);
                exit(1);
            }
            else
            {
                lg(Waring, "Waring, sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip.c_str());
            }
        }
    }

         上面就是整个的代码。

        我们这里说一下accept这个代码 

        accept代码的第一个参数是sockfd, 就是网卡的文件fd。 然后第二个参数和第三个参数都是输出型参数。 能够将对方也就是客户端的sockaddr带出来。 

main.cc

        主函数就是接收到传来的端口号, 创建服务端然后初始化并运行起来。

#include"tcpserver.hpp"
#include<memory>

int main(int argc, char* argv[])
{
    if (argc != 2)
    {
        cout << "has return" << endl;
        return 1;
    }
    //
    uint16_t port = stoi(argv[1]);

    unique_ptr<TcpServer> tcpsvr(new TcpServer(port));
    tcpsvr->InitServer();
    tcpsvr->Start();
    
    return 0;
}   

 tcpclient

        先看客户端的框架, 就是先链接服务端。 然后就给服务端发信息,接收信息。 (接收到的这个信息是被服务端处理过的)

#include<iostream>
using namespace std;
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<string.h>
#include<netinet/in.h>

int main(int argc, char* argv[])
{
    //处理argc, argv[]
    
    //先连接


    //再发数据接受数据
    
    return 0;
}

处理argc的时候, 因为我们的参数一定是三个。即, 一个程序名一个端口号, 一个IP地址。 所以如果argc不是等于3的话直接返回。 为3的话就将端口号以及ip地址保存一下。 其中ip地址是要连接到的服务端的ip地址, 端口号是要连接到服务端的端口号。

    //处理argc, argv[]
    if (argc != 3)
    {
        cout << "has return " << endl;
        return 1;
    }
    //
    uint16_t serverport = stoi(argv[2]);
    string serverip = argv[1];

然后创建addr结构,连接到服务端。 

    //创建addr结构体, 设置端口号ip地址
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
    {
        cout << "socket error" << endl;
        return 1;
    }
    sockaddr_in server;
    memset(&server, 0, sizeof(server));
    server.sin_family = AF_INET;
    server.sin_port = htons(serverport);
    inet_pton(AF_INET, serverip.c_str(), &(server.sin_addr));

    //1、客户端要绑定端口号, 但是不需要显示的绑定, 而是系统进行随机端口的绑定。 
    int n = connect(sockfd,(sockaddr*)&server, sizeof(server));
    if (n < 0) 
    {
        cerr << "connect error..." << endl;
        return 2;
    }

最后就是收发消息, 这里创建循环, 让我们可以执行多次服务, 多次收发消息。

    //2、发送信息, 接收信息。
    string message;
    while (true)
    {
        cout << "Please Enter# ";
        getline(cin, message);

        //
        write(sockfd, message.c_str(), message.size());

        char inbuffer[4096];
        int n = read(sockfd, inbuffer, sizeof(inbuffer) - 1);
        if (n > 0)
        {
            inbuffer[n] = 0;
            cout << inbuffer << endl;
        }
    }
    

然后我们就能够运行了。下面是运行结果。

version1运行结果

可以看到已经可以收发消息。

version2版本

        version2版本是创建多进程。 但是, 要知道, 我们的父进程创建子进程后要进行等待。 否则会造成内存泄漏。 但是我们父进程等待, 父进程又不能向下运行代码了, 就不能继续创建子进程了。 所以, 为了解决这个问题。 我们就可以创建一个孤儿进程。 让子进程创建好孙子进程后直接退出, 将孙子进程托孤。 父进程等待子进程后就继续向下执行。 这样就能创建一批孙子进程并发访问!!!


    void Start()
    {

        lg(Info, "tcpServer is running...");
        for (;;) // tcp协议也是一种一直处于运行的服务
        {
            // tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置
            // 1、获取新连接,
            struct sockaddr_in client; // 获取的是客户端的addr
            socklen_t len = sizeof(client);

            int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。
            if (sockfd < 0)                                                // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。
            {
                lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));
                continue;
            }
            uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列
            char clientip[32];
            inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));
            // 2、根据新连接进行通信
            lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);

            // // version--1 单进程版
            // Service(sockfd, clientip, clientport);
            // close(sockfd);

            //version--2 多进程版
            pid_t id = fork();
            if (id == 0)
            {
                close(listensockfd_);
                if (fork() > 0)
                {
                    exit(0);
                }
                Service(sockfd, clientip, clientport);
                close(sockfd);
                exit(0);
                //child
            }
            close(sockfd);
            pid_t rid = waitpid(id, nullptr, 0);
            
        }
    }

 version3版本

        第三版本是多线程版本, 多线程同样有主线程等待的问题,我们的主线程一旦要等待子线程, 那么就不能向后执行了。 所以为了能够并发, 就要对子线程进行分离。 

        下面是代码的改动:

        第一个改动地方就是创建一个ThreadData类:


class TcpServer;  //声明

//创建这个类是为了能够将服务端对象传给线程去执行
class ThreadData
{
public:
    ThreadData(int fd, string ip, uint16_t port, TcpServer* const t)
        : sockfd_(fd), clientip_(ip), clientport_(port), t_(t)
    {
    }

public:
    int sockfd_;
    string clientip_;
    uint16_t clientport_;
public:
    TcpServer* const t_;
};

        然后第二个改动的地方就是线程要执行的动作:


    static void *pthrun(void *args)
    {
        pthread_detach(pthread_self()); // 子线程直接分离
        // 一个进程打开的所有的文件描述符表, 其他进程能看到呢?
        ThreadData* td = static_cast<ThreadData*>(args);
        td->t_->Service(td->sockfd_, td->clientip_, td->clientport_);

    }

        第三个改动就是start函数里面执行服务的代码部分:

    void Start()
    {

        lg(Info, "tcpServer is running...");
        for (;;) // tcp协议也是一种一直处于运行的服务
        {
            // tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置
            // 1、获取新连接,
            struct sockaddr_in client; // 获取的是客户端的addr
            socklen_t len = sizeof(client);

            int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。
            if (sockfd < 0)                                                // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。
            {
                lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));
                continue;
            }
            uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列
            char clientip[32];
            inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));
            // 2、根据新连接进行通信
            lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);

            // // version--1 单进程版
            // Service(sockfd, clientip, clientport);
            // close(sockfd);

            //version--2 多进程版
            // pid_t id = fork();
            // if (id == 0)
            // {
            //     close(listensockfd_);
            //     if (fork() > 0)
            //     {
            //         exit(0);
            //     }
            //     Service(sockfd, clientip, clientport);
            //     close(sockfd);
            //     exit(0);
            //     //child
            // }
            // close(sockfd);
            // pid_t rid = waitpid(id, nullptr, 0);

            // version--3多线程版本
            ThreadData* td = new ThreadData(sockfd, clientip, clientport, this);
            pthread_t tid;
            pthread_create(&tid, nullptr, pthrun, td);
            
        }
    }

运行结果

一开始一个server服务, 然后我们添加一个client就增加一个server服务, 增加一个client就增加一个服务。

version4版本

        version4线程池版本, version4的线程池版本就是我们创建好线程池。 然后创建一个任务类, 将我们要执行的服务作为任务类的一个结构。 这就要求我们的这个任务类里面必须有我们的ip, port, sockfd这样的。 字段。 并且我们还要引入几个头文件, 下面为代码:

首先要有两个新文件, 一个包含task类, 一个包含线程池类。

我们让task类里面包含服务的方法。就是run方法。


#include <iostream>
using namespace std;
#include"Log.hpp"
#include <vector>
#include <string>
extern Log lg;

// Task.h文件里面包含了任务类, 这个是我们线程池要执行的任务
class Task
{
public:
    // 构造函数, 第一个参数data1, 第二个参数data2, 第三个参数是加减乘除的符号。 这个任务就是进行四则运算
    Task(int sockfd, string clientip, uint16_t clientport)
        :sockfd_(sockfd), clientip_(clientip), clientport_(clientport)
    {
    }

    ~Task() {}

    // 执行任务的接口run(), 这个方法对三个变量进行判断, 然后进行运算。
    void run()
    {
        char buffer[4096];
        string temp = clientip_;
        while (true)
        {
            ssize_t n = read(sockfd_, buffer, sizeof(buffer));
            if (n > 0)
            {
                buffer[n] = 0;
                cout << "client say#: " << buffer << endl;
                string echo_string("tcpserver echo#  " + (string)buffer);

                write(sockfd_, echo_string.c_str(), echo_string.size());
            }
            else if (n == 0)
            {
                lg(Info, "quit sockfd:%d ", sockfd_);
                exit(1);
            }
            else
            {
                lg(Waring, "Waring, sockfd:%d, clientport: %d, clientip: %s", sockfd_, clientport_, temp.c_str());
            }
            //
        }
    }

    // 仿函数, 为了方便我们的对象能够像函数一样使用。
    void operator()()
    {
        run();
    }

private:
    // 每一个任务对象里面都有三个参数, 一个data1, 一个data2, 最后一个op_
    int sockfd_;
    string clientip_;
    uint16_t clientport_;
};

 然后线程池代友友们如果没写过可以自己实现一个,或者直接用博主的, 如下:


#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<string>
using namespace std;
#include<queue>
#include<ctime>
#include<unistd.h>

//对线程的属性做一下封装, 有利于线程池的保存以及后面的处理
struct ThreadInfo
{
    pthread_t tid_;
    string name_;   
};

template<class T>
class ThreadPool
{
    static const int defaultnum = 5;  //默认的线程池的大小(线程池的大小就是里面包含的线程的数量)

private:
    //加锁解锁
    void Lock()
    {
        pthread_mutex_lock(&mutex_);
    }

    void Unlock()
    {
        pthread_mutex_unlock(&mutex_);
    }

    //唤醒线程, 线程是可以被挂起的(就比如信号量)。 当任务没有的时候,线程就要被挂起, 有任务后再唤醒
    void Wakeup()
    {
        pthread_cond_signal(&cond_);
    }

    void ThreadSleep()
    {
        pthread_cond_wait(&cond_, &mutex_);
    }

    bool IsQueueEmpty()
    {
        return tasks_.empty();
    }

public:

    //线程要执行的函数
    static void* HandlerTask(void* args)
    {
        ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);
        while(true) 
        {
            tp->Lock();
            while (tp->tasks_.empty())
            {
                tp->ThreadSleep(); //如果队列里面没有任务了, 就让线程去休眠。
            }
            //否则就去拿到tasks里面的任务
            T t = tp->tasks_.front();
            tp->tasks_.pop();
            
            //
            tp->Unlock();
            t();  //每一个线程先对任务进行消费, 消费完成之后处理任务。    

        }
    }

    //运行这个线程池, 也就是先将线程创建出来。 然后去运行线程
    void Start()
    {
        int num = threads_.size();
        for (int i = 0; i < num; i++)
        {
            threads_[i].name_ = "thread-";
            threads_[i].name_ += to_string(i);   
            pthread_create(&(threads_[i].tid_), nullptr, HandlerTask, this);
        }
    }

    //主线程给线程池发送任务, 注意, 这个任务一定是可以被储存起来的。 因为当我们的任务很多很多的时候, 我们的线程池内的线程要一个一个地对这些任务进行处理
    void Push(const T& t)
    {
        Lock();
        tasks_.push(t);
        Wakeup();

        Unlock();
    }

    //获取单例
    //改编成单例的步骤里面只有这里要说一下, 就是为什么我们要套双层判断。 其实这里的最外面的一层的判断是我们另外加上去的。 为什么
    //要加这个判断呢? 就是如果我们不加最外层这一层判断。 那么每一个线程获取单例都要申请所,加锁。 不就是相当于所有的线程都在串行执行? 这就有效率问题。 
    //解决方案就是这个再加一层判断。 这样假如有四个线程。 那么一开始四个线程都在判断, 那么它们四个线程都进入了if里面。 然后就都申请锁, 但是只有第一个线程能够
    //进入第二层里面, 其他的进入不了。 那么当这一轮的四个线程都申请一次锁候就都退出了函数, 然后就都去做自己的事情了。 问题是, 当下次它们再来申请单例对象的时候它们连
    //第一层判断都成功不了了, 也就都不用加锁解锁了, 这就大大提高了效率!!!
    static ThreadPool<T>* GetInstance(int num = defaultnum)
    {
        if (tp_ == nullptr)
        {
            pthread_mutex_lock(&tp_->lock_);
            
            if (tp_ == nullptr) 
            {
                tp_ = new ThreadPool<T>(num);
            }

            pthread_mutex_unlock(&tp_->lock_);
        }
        
        return tp_;
    }

private:
    //构造函数私有化, 只有Getinstance里面才能创建。 
    ThreadPool(int num = defaultnum)
        :threads_(num)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&cond_, nullptr);
    }

    //单例模式只有一个对象, 所以要将拷贝构造和拷贝赋值封住, 为了防止有人在外部重新拷贝一个对象。 
    ThreadPool(const ThreadPool<T>& tp) = delete;
    const ThreadPool<T>& operator=(const ThreadPool<T>& tp) = delete;
    ~ThreadPool()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&cond_);
    }

private:
    vector<ThreadInfo> threads_;   //线程都维护在vector当中, 这个就是线程池里面的线程的个数,
    queue<T> tasks_ ;              //向线程池中发送任务, 这个队列里面保存的就是我们的任务的数目。 

    pthread_mutex_t mutex_;        //锁,用来生产者线程(本份代码只是主线程)给线程池发送任务时候加锁使用以及消费者线程抢夺任务时加锁使用 

    pthread_cond_t cond_;          //条件变量, 用来没有任务的时候,消费者要挂起。 

    static pthread_mutex_t lock_;         //锁, 这个锁是为了在获取单例的时候能够让线程原子性的访问if (tp_ == nullptr)。
    static ThreadPool<T>* tp_;    //tp指针, 这就是唯一个单例对象。 

};

template<class T>
ThreadPool<T>* ThreadPool<T>::tp_ = nullptr;

template<class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;

        然后就是start运行函数


    void Start()
    {
        ThreadPool<Task>::GetInstance()->Start();
        lg(Info, "tcpServer is running...");
        for (;;) // tcp协议也是一种一直处于运行的服务
        {
            // tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置
            // 1、获取新连接,
            struct sockaddr_in client; // 获取的是客户端的addr
            socklen_t len = sizeof(client);

            int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。
            if (sockfd < 0)                                                // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。
            {
                lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));
                continue;
            }
            uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列
            char clientip[32];
            inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));
            // 2、根据新连接进行通信
            lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);

            // // version--1 单进程版
            // Service(sockfd, clientip, clientport);
            // close(sockfd);

            //version--2 多进程版
            // pid_t id = fork();
            // if (id == 0)
            // {
            //     close(listensockfd_);
            //     if (fork() > 0)
            //     {
            //         exit(0);
            //     }
            //     Service(sockfd, clientip, clientport);
            //     close(sockfd);
            //     exit(0);
            //     //child
            // }
            // close(sockfd);
            // pid_t rid = waitpid(id, nullptr, 0);
            

            // // version--3多线程版本
            // ThreadData* td = new ThreadData(sockfd, clientip, clientport, this);
            // pthread_t tid;
            // pthread_create(&tid, nullptr, pthrun, td);

            //version--4线程池版本
            Task task_(sockfd, clientip, clientport);
            ThreadPool<Task>::GetInstance()->Push(task_);
        }
    }

 运行结果:

我们可以看到, 只要已启动服务端的瞬间, 就能创建出6个线程(一个主线程, 五个分线程)

 

 ——————以上就是本节全部内容哦, 如果对友友们有帮助的话可以关注博主, 方便学习更多知识哦!!!   

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/905571.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【果蔬识别】Python+卷积神经网络算法+深度学习+人工智能+机器学习+TensorFlow+计算机课设项目+算法模型

一、介绍 果蔬识别系统&#xff0c;本系统使用Python作为主要开发语言&#xff0c;通过收集了12种常见的水果和蔬菜&#xff08;‘土豆’, ‘圣女果’, ‘大白菜’, ‘大葱’, ‘梨’, ‘胡萝卜’, ‘芒果’, ‘苹果’, ‘西红柿’, ‘韭菜’, ‘香蕉’, ‘黄瓜’&#xff09;…

isp框架代码理解

一、整体框架如下&#xff1a; 1 外层的src中 1.1 从camera.c->task.c&#xff1a;封装了3层&#xff0c;透传到某个功能的本级。 1.2 core.c和capability.c中实现&#xff1a;开机初始化加载参数。2. plat/src中 2.1 fun.c中继task.c又透传了一层&#xff1b;以及最后功能…

VuePress文档初始化请求过多问题探讨

1. 背景 公司有部门使用VuePress 1.0搭建平台的帮助文档&#xff0c;前期文档不是很多&#xff0c;所以没有暴露出特别明显的问题。但随着文档的逐步迭代和内容的增多&#xff0c;出现了大量的并发请求&#xff0c;总共有218个请求&#xff0c;导致服务器带宽耗尽、响应速度下降…

Paimon x StarRocks 助力喜马拉雅构建实时湖仓

作者&#xff1a;王琛 喜马拉雅数仓专家 小编导读&#xff1a; 本文将介绍喜马拉雅直播的业务现状及数据仓库架构的迭代升级&#xff0c;重点分享基于 Flink Paimon StarRocks 实现实时湖仓的架构及其成效。我们通过分钟级别的收入监控、实时榜单生成、流量监测和盈亏预警&am…

Virtuoso使用layout绘制版图、使用Calibre验证DRC和LVS

1 绘制版图 1.1 进入Layout XL 绘制好Schmatic后&#xff0c;在原理图界面点击Launch&#xff0c;点击Layout XL进入版图绘制界面。 1.2 导入元件 1、在Layout XL界面左下角找到Generate All from Source。 2、在Generate Layout界面&#xff0c;选中“Instance”&#…

vscode插件-08 Golang

文章目录 Go安装其他必须软件 Go Go语言环境&#xff0c;只需安装这一个插件。然后通过vscode命令下载安装其他go环境需要的内容。 程序调试&#xff0c;需要创建.vscode文件夹并编写launch.json文件。 安装其他必须软件 ctrlshiftp&#xff0c;调出命令面板&#xff0c;输入…

Linux系列-vim的使用

&#x1f308;个人主页&#xff1a;羽晨同学 &#x1f4ab;个人格言:“成为自己未来的主人~” vim的使用 vim是多模式编辑器&#xff0c;不同的是vim是vi的升级版本&#xff0c;它不仅兼容vi的所有指令&#xff0c;而且还有一些新的特性在里面&#xff0c;比如语法加亮&am…

强化学习DQN实践(gymnasium+pytorch)

Pytorch官方教程中有强化学习教程&#xff0c;但是很多中文翻译都太老了&#xff0c;里面的代码也不能跑了 这篇blog按照官方最新教程实现&#xff0c;并加入了一些个人理解 工具 gymnasium&#xff1a;由gym升级而来&#xff0c;官方定义&#xff1a;An API standard for rei…

2024快手面试算法题-生气传染

问题描述 思路分析 生气只会向后传播&#xff0c;最后一个生气的人一定是最长连续没有生气的人中的最后一个人&#xff0c;前提是前面得有一个人生气。 注意&#xff0c;一次只能传播一个人&#xff0c;比如示例1&#xff0c;第一次只会传播给第一个P&#xff0c;不会传播给第…

入门 | Kafka数据使用vector消费到Loki中使用grafana展示

一、Loki的基本介绍 1、基本介绍 Loki 是由 Grafana Labs 开发的一款水平可扩展、高性价比的日志聚合系统。它的设计初衷是为了有效地处理和存储大量的日志数据&#xff0c;与 Grafana 生态系统紧密集成&#xff0c;方便用户在 Grafana 中对日志进行查询和可视化操作。 从架构…

分类算法——逻辑回归 详解

逻辑回归&#xff08;Logistic Regression&#xff09;是一种广泛使用的分类算法&#xff0c;特别适用于二分类问题。尽管名字中有“回归”二字&#xff0c;逻辑回归实际上是一种分类方法。下面将从底层原理、数学模型、优化方法以及源代码层面详细解析逻辑回归。 1. 基本原理 …

【Spring MVC】DispatcherServlet 请求处理流程

一、 请求处理 Spring MVC 是 Spring 框架的一部分&#xff0c;用于构建 Web 应用程序。它遵循 MVC&#xff08;Model-View-Controller&#xff09;设计模式&#xff0c;将应用程序分为模型&#xff08;Model&#xff09;、**视图&#xff08;View&#xff09;和控制器&#x…

现代数字信号处理I--最佳线性无偏估计 BLUE 学习笔记

目录 1. 最佳线性无偏估计的由来 2. 简单线性模型下一维参数的BLUE 3. 一般线性模型下一维参数的BLUE 4. 一般线性模型下多维参数的BLUE 4.1 以一维情况说明Rao论文中的结论 4.2 矢量参数是MVUE的本质是矢量参数中的每个一维参数都是MVUE 4.3 一般线性模型多维参数BLUE的…

QT(绘图)

目录 QPainter QPainter 的一些关键步骤和使用方法&#xff1a; QPainter 的一些常用接口&#xff1a; 1. 基础绘制接口 2. 颜色和画刷设置 3. 图像绘制 4. 文本绘制 5. 变换操作 6. 渲染设置 7. 状态保存与恢复 8. 其它绘制方法 示例代码1&#xff1a; 示例代码…

【js逆向学习】某多多anti_content逆向(补环境)

文章目录 声明逆向目标逆向分析逆向过程总结 声明 本文章中所有内容仅供学习交流使用&#xff0c;不用于其他任何目的&#xff0c;不提供完整代码&#xff0c;抓包内容、敏感网址、数据接口等均已做脱敏处理&#xff0c;严禁用于商业用途和非法用途&#xff0c;否则由此产生的…

【安全解决方案】深入解析:如何通过CDN获取用户真实IP地址

一、业务场景 某大型互联网以及电商公司为了防止客户端获取到真实的ip地址&#xff0c;以及达到保护后端业务服务器不被网站攻击&#xff0c;同时又可以让公安要求留存网站日志和排查违法行为&#xff0c;以及打击犯罪的时候&#xff0c;获取不到真实的ip地址&#xff0c;发现…

Java | Leetcode Java题解之第524题通过删除字母匹配到字典里最长单词

题目&#xff1a; 题解&#xff1a; class Solution {public String findLongestWord(String s, List<String> dictionary) {int m s.length();int[][] f new int[m 1][26];Arrays.fill(f[m], m);for (int i m - 1; i > 0; --i) {for (int j 0; j < 26; j) {…

python爬虫抓取豆瓣数据教程

环境准备 在开始之前&#xff0c;你需要确保你的Python环境已经安装了以下库&#xff1a; requests&#xff1a;用于发送HTTP请求。BeautifulSoup&#xff1a;用于解析HTML文档。 如果你还没有安装这些库&#xff0c;可以通过以下命令安装&#xff1a; pip install requests…

Python实现深度学习模型预测控制(tensorflow)DL-MPC(Deep Learning Model Predictive Control

链接&#xff1a;深度学习模型预测控制 &#xff08;如果认为有用&#xff0c;动动小手为我点亮github小星星哦&#xff09;&#xff0c;持续更新中…… 链接&#xff1a;WangXiaoMingo/TensorDL-MPC&#xff1a;DL-MPC&#xff08;深度学习模型预测控制&#xff09;是基于 P…

简单的ELK部署学习

简单的ELK部署学习 1. 需求 我们公司现在使用的是ELK日志跟踪&#xff0c;在出现问题的时候&#xff0c;我们可以快速定为到问题&#xff0c;并且可以对日志进行分类检索&#xff0c;比如对服务名称&#xff0c;ip , 级别等信息进行分类检索。此文章为本人学习了解我们公司的…