计算机网络socket编程(5)_TCP网络编程实现echo_server

个人主页:C++忠实粉丝
欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 C++忠实粉丝 原创

计算机网络socket编程(5)_TCP网络编程实现echo_server

收录于专栏【计算机网络】
本专栏旨在分享学习计算机网络的一点学习笔记,欢迎大家在评论区交流讨论💌 
 

目录

功能介绍  

InetAddr.hpp

LockGuard.hpp

Log.hpp

Thread.hpp

ThreadPool.hpp

TcpServer.hpp

TcpServerMain.cc

TcpClientMain.cc

效果展示  


功能介绍  

和上回 UDP 网络编程一样, 实现简单的 echo_server, 不过, 这里我们 TCP 网络编程使用了 多线程, 不过大体都差不多~ 

还有就是网络编程代码真的是又多又杂, 有的时候我自己都烦, 没办法网络部分就是这样的, 我最近会尽快更完这个 socket 编程, 提早进入概念部分, 一直编程感觉少了什么~ 还得跟概念结合起来看, 感兴趣的宝子们不要忘记了点赞关注哦! 我现在在网络部分真的待不了一点, 希望我能尽快挣脱网络, 更新数据库 MySQL 的东西吧!  

InetAddr.hpp

这个类封装了 sockaddr_in 结构体,用于简化对 IP 地址和端口的处理。其核心功能是将网络字节序的 sockaddr_in 地址转换为易于操作的主机字节序的 IP 地址字符串和端口号,并提供相关的成员函数来获取这些信息。 

#pragma once

#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>

class InetAddr
{
private:
    void ToHost(const struct sockaddr_in &addr)
    {
        _port = ntohs(addr.sin_port);
        // _ip = inet_ntoa(addr.sin_addr);
        char ip_buf[32];
        // inet_p to n
        // p: process
        // n: net
        // inet_pton(int af, const char *src, void *dst);
        // inet_pton(AF_INET, ip.c_str(), &addr.sin_addr.s_addr);
        ::inet_ntop(AF_INET, &addr.sin_addr, ip_buf, sizeof(ip_buf));
        _ip = ip_buf;
    }

public:
    InetAddr(const struct sockaddr_in &addr):_addr(addr)
    {
        ToHost(addr);
    }
    InetAddr()
    {}
    bool operator == (const InetAddr &addr)
    {
        return (this->_ip == addr._ip && this->_port == addr._port);
    }
    std::string Ip()
    {
        return _ip;
    }
    uint16_t Port()
    {
        return _port;
    }
    struct sockaddr_in Addr()
    {
        return _addr;
    }
    std::string AddrStr()
    {
        return _ip + ":" + std::to_string(_port);
    }
    ~InetAddr()
    {
    }

private:
    std::string _ip;
    uint16_t _port;
    struct sockaddr_in _addr;
};

私有成员变量

_ip:存储 IP 地址的字符串(如 "192.168.0.1")。

_port:存储端口号。

_addr:存储一个 sockaddr_in 结构体,用于保存 IP 地址和端口。

私有成员函数 ToHost

ToHost 函数的作用是将一个 sockaddr_in 地址结构转换为 InetAddr 类的成员 _ip 和 _port。

ntohs(addr.sin_port):将网络字节顺序的端口号(从 sockaddr_in 中获取)转换为主机字节顺序。网络字节顺序是大端模式,而主机字节顺序通常取决于平台。

inet_ntop(AF_INET, &addr.sin_addr, ip_buf, sizeof(ip_buf)):将 sockaddr_in 中的 IP 地址(以二进制形式存储)转换为点分十进制字符串表示(如 "192.168.0.1")。

这里 inet_ntopntohs 用于处理网络字节序和主机字节序的转换,确保 IP 地址和端口在不同环境下的正确性。

构造函数 InetAddr(const struct sockaddr_in &addr)

该构造函数接受一个 sockaddr_in 类型的参数 addr,并调用 ToHost 方法将其转换为 InetAddr 类内部的 _ip 和 _port。

_addr(addr):将 sockaddr_in 结构体存储在 _addr 中。

默认构造函数 InetAddr()

默认构造函数没有做任何事情。它用于创建一个空的 InetAddr 对象

运算符重载 ==

重载了 == 运算符,用于比较两个 InetAddr 对象是否相等。它通过比较 ip 和 port 字段来判断是否相同。

成员函数 Ip

返回当前 InetAddr 对象的 IP 地址。

成员函数 Port

返回当前 InetAddr 对象的端口号。

成员函数 Addr

返回存储的 sockaddr_in 结构体。sockaddr_in 包含了完整的 IP 地址和端口信息。

成员函数 AddrStr

返回一个格式化的字符串,表示 IP 地址和端口,格式为 "ip:port"(例如 "192.168.0.1:8080")。

LockGuard.hpp

#pragma once

#include <pthread.h>

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
    {
        pthread_mutex_lock(_mutex);
    }
    ~LockGuard()
    {
        pthread_mutex_unlock(_mutex);
    }
private:
    pthread_mutex_t *_mutex;
};

构造函数 LockGuard(pthread_mutex_t *mutex)

构造函数接收一个 pthread_mutex_t* 类型的指针作为参数,并在构造时通过 pthread_mutex_lock 来锁定该互斥量。

mutex 参数是指向一个 pthread_mutex_t 类型的指针,这个互斥量将用来保护临界区。

_mutex(mutex) 是初始化成员变量 _mutex 的成员初始化列表,它将构造函数的参数 mutex 的值赋给 _mutex 成员变量。

pthread_mutex_lock(_mutex) 调用会尝试锁定互斥量 _mutex。如果互斥量已经被其他线程锁定,当前线程会被阻塞,直到该互斥量变为可用。

析构函数 ~LockGuard()

析构函数负责在 LockGuard 对象生命周期结束时自动解锁互斥量。

当 LockGuard 对象的作用域结束时,析构函数会自动被调用。

pthread_mutex_unlock(_mutex) 会释放锁,即解锁互斥量。这样可以确保即使在发生异常或提前返回的情况下,互斥量也能被正确解锁,从而避免死锁。

成员变量 _mutex

_mutex 是一个指向 pthread_mutex_t 类型的指针,它保存了传递给构造函数的互斥量地址。这个指针将用于在构造和析构中对互斥量进行锁定和解锁操作。

关键特点:

自动锁定:当 LockGuard 对象被创建时,构造函数自动锁定互斥量。

自动解锁:当 LockGuard 对象超出作用域时,析构函数自动解锁互斥量。

简化代码:使用 LockGuard 类可以避免手动调用 pthread_mutex_lock 和 pthread_mutex_unlock,并且保证解锁操作一定会发生。

Log.hpp

#pragma once

#include <iostream>
#include <sys/types.h>
#include <unistd.h>
#include <ctime>
#include <cstdarg>
#include <fstream>
#include <cstring>
#include <pthread.h>
#include "LockGuard.hpp"

namespace log_ns
{

    enum
    {
        DEBUG = 1,
        INFO,
        WARNING,
        ERROR,
        FATAL
    };

    std::string LevelToString(int level)
    {
        switch (level)
        {
        case DEBUG:
            return "DEBUG";
        case INFO:
            return "INFO";
        case WARNING:
            return "WARNING";
        case ERROR:
            return "ERROR";
        case FATAL:
            return "FATAL";
        default:
            return "UNKNOWN";
        }
    }

    std::string GetCurrTime()
    {
        time_t now = time(nullptr);
        struct tm *curr_time = localtime(&now);
        char buffer[128];
        snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",
                 curr_time->tm_year + 1900,
                 curr_time->tm_mon + 1,
                 curr_time->tm_mday,
                 curr_time->tm_hour,
                 curr_time->tm_min,
                 curr_time->tm_sec);
        return buffer;
    }

    class logmessage
    {
    public:
        std::string _level;
        pid_t _id;
        std::string _filename;
        int _filenumber;
        std::string _curr_time;
        std::string _message_info;
    };

#define SCREEN_TYPE 1
#define FILE_TYPE 2

    const std::string glogfile = "./log.txt";
    pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;

    // log.logMessage("", 12, INFO, "this is a %d message ,%f, %s hellwrodl", x, , , );
    class Log
    {
    public:
        Log(const std::string &logfile = glogfile) : _logfile(logfile), _type(SCREEN_TYPE)
        {
        }
        void Enable(int type)
        {
            _type = type;
        }
        void FlushLogToScreen(const logmessage &lg)
        {
            printf("[%s][%d][%s][%d][%s] %s",
                   lg._level.c_str(),
                   lg._id,
                   lg._filename.c_str(),
                   lg._filenumber,
                   lg._curr_time.c_str(),
                   lg._message_info.c_str());
        }
        void FlushLogToFile(const logmessage &lg)
        {
            std::ofstream out(_logfile, std::ios::app);
            if (!out.is_open())
                return;
            char logtxt[2048];
            snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s",
                     lg._level.c_str(),
                     lg._id,
                     lg._filename.c_str(),
                     lg._filenumber,
                     lg._curr_time.c_str(),
                     lg._message_info.c_str());
            out.write(logtxt, strlen(logtxt));
            out.close();
        }
        void FlushLog(const logmessage &lg)
        {
            // 加过滤逻辑 --- TODO

            LockGuard lockguard(&glock);
            switch (_type)
            {
            case SCREEN_TYPE:
                FlushLogToScreen(lg);
                break;
            case FILE_TYPE:
                FlushLogToFile(lg);
                break;
            }
        }
        void logMessage(std::string filename, int filenumber, int level, const char *format, ...)
        {
            logmessage lg;

            lg._level = LevelToString(level);
            lg._id = getpid();
            lg._filename = filename;
            lg._filenumber = filenumber;
            lg._curr_time = GetCurrTime();

            va_list ap;
            va_start(ap, format);
            char log_info[1024];
            vsnprintf(log_info, sizeof(log_info), format, ap);
            va_end(ap);
            lg._message_info = log_info;

            // 打印出来日志
            FlushLog(lg);
        }
        ~Log()
        {
        }

    private:
        int _type;
        std::string _logfile;
    };

    Log lg;

#define LOG(Level, Format, ...)                                        \
    do                                                                 \
    {                                                                  \
        lg.logMessage(__FILE__, __LINE__, Level, Format, ##__VA_ARGS__); \
    } while (0)
#define EnableScreen()          \
    do                          \
    {                           \
        lg.Enable(SCREEN_TYPE); \
    } while (0)
#define EnableFILE()          \
    do                        \
    {                         \
        lg.Enable(FILE_TYPE); \
    } while (0)
};

日志系统, 我们的老演员了, 这里就不再多介绍了~~

Thread.hpp

#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>

namespace ThreadMoudle
{
    // 线程要执行的方法,后面我们随时调整
    // typedef void (*func_t)(ThreadData *td); // 函数指针类型

    // typedef std::function<void()> func_t;
    using func_t = std::function<void(const std::string&)>;

    class Thread
    {
    public:
        void Excute()
        {
            _isrunning = true;
            _func(_name);
            _isrunning = false;
        }
    public:
        Thread(const std::string &name, func_t func):_name(name), _func(func)
        {
        }
        static void *ThreadRoutine(void *args) // 新线程都会执行该方法!
        {
            Thread *self = static_cast<Thread*>(args); // 获得了当前对象
            self->Excute();
            return nullptr;
        }
        bool Start()
        {
            int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);
            if(n != 0) return false;
            return true;
        }
        std::string Status()
        {
            if(_isrunning) return "running";
            else return "sleep";
        }
        void Stop()
        {
            if(_isrunning)
            {
                ::pthread_cancel(_tid);
                _isrunning = false;
            }
        }
        void Join()
        {
            ::pthread_join(_tid, nullptr);
        }
        std::string Name()
        {
            return _name;
        }
        ~Thread()
        {
        }

    private:
        std::string _name;
        pthread_t _tid;
        bool _isrunning;
        func_t _func; // 线程要执行的回调函数
    };
} // namespace ThreadModle

线程函数类型 func_t

using func_t = std::function<void(const std::string&)>;

这里使用了 std::function 来定义线程要执行的函数类型。func_t 是一个函数对象类型,它表示接受一个 std::string 类型参数并返回 void 的函数。使用 std::function 的好处是,它可以适配普通函数、lambda 表达式以及成员函数等,使得线程任务的定义更加灵活。

Thread 类

Thread 类是该代码的核心,封装了 POSIX 线程的管理操作,包括线程的创建、执行、停止、等待和状态查询。

成员变量

_name: 线程的名称,用于标识线程。

_tid: 线程标识符 (pthread_t 类型),用于标识线程。

_isrunning: 布尔变量,表示线程是否正在运行。

_func: 线程执行的任务(即回调函数),使用 func_t 类型存储。

构造函数

Thread 类的构造函数接收线程名称 (name) 和线程任务 (func) 作为参数,并初始化相关成员变量。_isrunning 被初始化为 false,表示线程在创建时默认处于非运行状态。

线程执行方法 Excute

Excute 方法是线程执行的主体部分,它首先将 _isrunning 设置为 true,然后执行通过构造函数传入的任务函数 _func。执行完后,将 _isrunning 设置为 false,表示线程已经结束执行。

线程例程 ThreadRoutine

ThreadRoutine 是一个静态方法,它会作为线程的入口函数。当创建线程时,系统会调用这个函数。

在 ThreadRoutine 中,首先通过 static_cast 将传入的 void* 类型的参数转换为 Thread* 类型,这样我们就可以访问到线程的成员变量和方法。

然后调用 self->Excute(),即执行线程实际的工作。

启动线程 Start

Start 方法通过 pthread_create 创建一个新的线程。pthread_create 会接受线程标识符 _tid、线程属性(这里是 nullptr,即默认属性)、线程入口函数(这里是 ThreadRoutine)以及线程传递的参数(这里是 this,即当前对象)。

如果线程创建成功,返回 true;否则返回 false。

查询线程状态 Status

Status 方法返回当前线程的状态。如果线程正在执行(_isrunning == true),返回 "running",否则返回 "sleep"。

停止线程 Stop

Stop 方法用于停止正在运行的线程。它调用 pthread_cancel 来请求终止指定线程 _tid。然后将 _isrunning 设置为 false,表示线程已停止。

等待线程完成 Join

Join 方法用于等待线程执行完毕。它通过 pthread_join 阻塞当前线程,直到指定线程 _tid 执行完毕。

获取线程名称 Name

Name 方法返回线程的名称。

ThreadPool.hpp

这段代码实现了一个 线程池 模式,提供了多线程处理任务的能力,能够管理多个线程执行任务,并且通过线程池单例模式(Singleton)来确保只会创建一个线程池实例。 

#pragma once

#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include <queue>
#include <functional>
#include "Thread.hpp"
#include "Log.hpp"
#include "LockGuard.hpp"

using namespace ThreadMoudle;
using namespace log_ns;

static const int gdefaultnum = 10;

void test()
{
    while (true)
    {
        std::cout << "hello world" << std::endl;
        sleep(1);
    }
}

template <typename T>
class ThreadPool
{
private:
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    void Wakeup()
    {
        pthread_cond_signal(&_cond);
    }
    void WakeupAll()
    {
        pthread_cond_broadcast(&_cond);
    }
    void Sleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    bool IsEmpty()
    {
        return _task_queue.empty();
    }
    void HandlerTask(const std::string &name) // this
    {
        while (true)
        {
            // 取任务
            LockQueue();
            while (IsEmpty() && _isrunning)
            {
                _sleep_thread_num++;
                LOG(INFO, "%s thread sleep begin!\n", name.c_str());
                Sleep();
                LOG(INFO, "%s thread wakeup!\n", name.c_str());
                _sleep_thread_num--;
            }
            // 判定一种情况
            if (IsEmpty() && !_isrunning)
            {
                UnlockQueue();
                LOG(INFO, "%s thread quit\n", name.c_str());
                break;
            }

            // 有任务
            T t = _task_queue.front();
            _task_queue.pop();
            UnlockQueue();

            // 处理任务
            t(); // 处理任务,此处不用/不能在临界区中处理
            // std::cout << name << ": " << t.result() << std::endl;
            // LOG(DEBUG, "hander task done, task is : %s\n", t.result().c_str());
        }
    }
    void Init()
    {
        func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
        for (int i = 0; i < _thread_num; i++)
        {
            std::string threadname = "thread-" + std::to_string(i + 1);
            _threads.emplace_back(threadname, func);
            LOG(DEBUG, "construct thread %s done, init success\n", threadname.c_str());
        }
    }
    void Start()
    {
        _isrunning = true;
        for (auto &thread : _threads)
        {
            LOG(DEBUG, "start thread %s done.\n", thread.Name().c_str());
            thread.Start();
        }
    }
    ThreadPool(int thread_num = gdefaultnum)
        : _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }
    ThreadPool(const ThreadPool<T> &) = delete;
    void operator=(const ThreadPool<T> &) = delete;

public:
    void Stop()
    {
        LockQueue();
        _isrunning = false;
        WakeupAll();
        UnlockQueue();
        LOG(INFO, "Thread Pool Stop Success!\n");
    }

    // 如果是多线程获取单例呢?
    static ThreadPool<T> *GetInstance()
    {
        if (_tp == nullptr)
        {
            LockGuard lockguard(&_sig_mutex);
            if (_tp == nullptr)
            {
                LOG(INFO, "create threadpool\n");
                // thread-1 thread-2 thread-3....
                _tp = new ThreadPool<T>();
                _tp->Init();
                _tp->Start();
            }
            else
            {
                LOG(INFO, "get threadpool\n");
            }
        }
        return _tp;
    }

    void Equeue(const T &in)
    {
        LockQueue();
        if (_isrunning)
        {
            _task_queue.push(in);
            if (_sleep_thread_num > 0)
                Wakeup();
        }
        UnlockQueue();
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

private:
    int _thread_num;
    std::vector<Thread> _threads;
    std::queue<T> _task_queue;
    bool _isrunning;

    int _sleep_thread_num;

    pthread_mutex_t _mutex;
    pthread_cond_t _cond;

    // 单例模式
    // volatile static ThreadPool<T> *_tp;
    static ThreadPool<T> *_tp;
    static pthread_mutex_t _sig_mutex;
};

template <typename T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr;
template <typename T>
pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;

类定义:ThreadPool<T>

ThreadPool 类是一个模板类,能够处理类型为 T 的任务。ThreadPool 负责管理一组线程,这些线程会从任务队列中取出任务并执行。

成员变量:

线程池大小:_thread_num 表示线程池中线程的数量。

线程队列:_threads 用来存储所有创建的线程。

任务队列:_task_queue 是一个 std::queue,存储待处理的任务。

运行状态:_isrunning 标志线程池是否正在运行。

空闲线程数:_sleep_thread_num 表示当前空闲的线程数量。

同步机制:

_mutex:用于保护任务队列的互斥锁。

_cond:用于线程同步的条件变量,确保线程池在没有任务时可以进入等待状态。

单例模式:

_tp:一个静态指针,用于存储线程池的唯一实例。

_sig_mutex:一个静态互斥锁,用于控制对 _tp 的访问,确保线程池的单例实现是线程安全的。

成员函数:

(1) 同步操作:

LockQueue() 和 UnlockQueue():这些是保护任务队列的互斥锁方法,确保在访问任务队列时线程是同步的,避免并发冲突。

Wakeup() 和 WakeupAll():分别是唤醒一个或所有线程的函数。当有任务加入时,如果某些线程在等待任务,这些函数可以用来通知线程继续工作。

Sleep():如果任务队列为空,线程会调用这个函数进入等待状态,直到有新的任务被加入到队列中。

IsEmpty():检查任务队列是否为空。

(2) 任务处理:

HandlerTask():这是线程执行的主要任务。每个线程会不断地从任务队列中获取任务并执行,直到线程池被停止。

线程首先会尝试从任务队列中取出任务。

如果队列为空且线程池仍然在运行,线程将会休眠,直到有新任务到来。

如果线程池已停止并且队列为空,线程将退出。

如果队列非空,线程会执行任务。

(3) 初始化和启动:

Init():为线程池中的每个线程创建一个 Thread 对象,并绑定任务处理函数 HandlerTask(),然后将线程添加到 _threads 向量中。

Start():启动所有线程。

(4) 停止:

Stop():停止线程池,首先设置 _isrunning = false,然后唤醒所有处于等待状态的线程。

(5) 单例实现:

GetInstance():这是一个线程安全的单例实现,使用双重检查锁定(Double-Checked Locking)来确保线程池实例 _tp 只会被创建一次。_sig_mutex 用于同步对 _tp 的访问。

(6) 任务队列:

Equeue():将任务 in 添加到任务队列中。任务加入后,如果有空闲线程,某些线程会被唤醒来处理这些任务。

(7) 析构函数:

pthread_mutex_destroy(&_mutex) 和 pthread_cond_destroy(&_cond) 用于销毁互斥锁和条件变量,释放相关资源。

线程池的工作流程:

初始化和启动线程池:

通过 ThreadPool<T>::GetInstance() 获取线程池的唯一实例(如果还没有创建)。

调用 ThreadPool::Start() 启动线程池中的所有线程,每个线程执行 HandlerTask() 函数。

任务处理:

任务通过 ThreadPool::Equeue() 被加入到任务队列 _task_queue 中。

线程在 HandlerTask() 中从队列中取任务并执行。

线程阻塞与唤醒:

如果队列为空,线程会调用 Sleep() 进入等待状态,直到有新任务被添加。

当任务被加入队列时,如果有空闲线程,它们会被唤醒执行任务。

停止线程池:

调用 ThreadPool::Stop() 停止线程池,设置 _isrunning = false,并唤醒所有等待中的线程。每个线程在执行完当前任务后退出。

单例模式的线程安全性分析:

ThreadPool<T>::GetInstance() 中使用了双重检查锁定(Double-Checked Locking)来实现线程安全的单例模式:

第一次检查 _tp == nullptr 是为了减少锁的竞争。

第二次检查 _tp == nullptr 是为了确保在锁定后 _tp 仍然没有被创建(避免其他线程已经创建了 ThreadPool 实例)。

LockGuard 用于确保在操作 _tp 时,访问是线程安全的。

TcpServer.hpp

这个 TcpServer 类是一个简单的 TCP 服务器实现,它能够接受客户端的连接,并处理客户端发送的消息。它使用线程池来处理每个客户端的连接,避免了多进程和单线程模型的缺点。 

#pragma once
#include <iostream>
#include <cstring>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <pthread.h>
#include "Log.hpp"
#include "InetAddr.hpp"
#include "ThreadPool.hpp"

using namespace log_ns;

enum
{
    SOCKET_ERROR = 1,
    BIND_ERROR,
    LISTEN_ERR
};

const static int gport = 8888;
const static int gsock = -1;
const static int gblcklog = 8;

using task_t = std::function<void()>;

class TcpServer
{
public:
    TcpServer(uint16_t port = gport)
        : _port(port),
          _listensockfd(gsock),
          _isrunning(false)
    {
    }
    void InitServer()
    {
        // 1. 创建socket
        _listensockfd = ::socket(AF_INET, SOCK_STREAM, 0);
        if (_listensockfd < 0)
        {
            LOG(FATAL, "socket create error\n");
            exit(SOCKET_ERROR);
        }
        LOG(INFO, "socket create success, sockfd: %d\n", _listensockfd); // 3

        struct sockaddr_in local;
        memset(&local, 0, sizeof(local));
        local.sin_family = AF_INET;
        local.sin_port = htons(_port);
        local.sin_addr.s_addr = INADDR_ANY;

        // 2. bind sockfd 和 Socket addr
        if (::bind(_listensockfd, (struct sockaddr *)&local, sizeof(local)) < 0)
        {
            LOG(FATAL, "bind error\n");
            exit(BIND_ERROR);
        }
        LOG(INFO, "bind success\n");

        // 3. 因为tcp是面向连接的,tcp需要未来不断地能够做到获取连接
        if (::listen(_listensockfd, gblcklog) < 0)
        {
            LOG(FATAL, "listen error\n");
            exit(LISTEN_ERR);
        }
        LOG(INFO, "listen success\n");
    }
    class ThreadData
    {
    public:
        int _sockfd;
        TcpServer *_self;
        InetAddr _addr;
    public:
        ThreadData(int sockfd, TcpServer *self, const InetAddr &addr):_sockfd(sockfd), _self(self), _addr(addr)
        {}
    };
    void Loop()
    {
        // signal(SIGCHLD, SIG_IGN);
        _isrunning = true;
        while (_isrunning)
        {
            struct sockaddr_in client;
            socklen_t len = sizeof(client);
            // 4. 获取新连接
            int sockfd = ::accept(_listensockfd, (struct sockaddr *)&client, &len);
            if (sockfd < 0)
            {
                LOG(WARNING, "accept error\n");
                continue;
            }
            InetAddr addr(client);
            LOG(INFO, "get a new link, client info : %s, sockfd is : %d\n", addr.AddrStr().c_str(), sockfd);

            // version 0 --- 不靠谱版本
            // Service(sockfd, addr);

            // version 1 --- 多进程版本
            // pid_t id = fork();
            // if (id == 0)
            // {
            //     // child
            //     ::close(_listensockfd); // 建议!

            //     if(fork() > 0) exit(0);

            //     Service(sockfd, addr);
            //     exit(0);
            // }
            // // father
            // ::close(sockfd);
            // int n = waitpid(id, nullptr, 0);
            // if (n > 0)
            // {
            //     LOG(INFO, "wait child success.\n");
            // }

            // version 2 ---- 多线程版本 --- 不能关闭fd了,也不需要了
            // pthread_t tid;
            // ThreadData *td = new ThreadData(sockfd, this, addr);
            // pthread_create(&tid, nullptr, Execute, td); // 新线程进行分离

            // version 3 ---- 线程池版本 int sockfd, InetAddr addr
            task_t t = std::bind(&TcpServer::Service, this, sockfd, addr);
            ThreadPool<task_t>::GetInstance()->Equeue(t);
        }
        _isrunning = false;
    }
    static void *Execute(void *args)
    {
        pthread_detach(pthread_self());
        ThreadData *td = static_cast<ThreadData *>(args);
        td->_self->Service(td->_sockfd, td->_addr);
        delete td;
        return nullptr;
    }
    void Service(int sockfd, InetAddr addr)
    {
        // 长服务
        while (true)
        {
            char inbuffer[1024]; // 当做字符串
            ssize_t n = ::read(sockfd, inbuffer, sizeof(inbuffer) - 1);
            if (n > 0)
            {
                inbuffer[n] = 0;
                LOG(INFO, "get message from client %s, message: %s\n", addr.AddrStr().c_str(), inbuffer);

                std::string echo_string = "[server echo] #";
                echo_string += inbuffer;
                write(sockfd, echo_string.c_str(), echo_string.size());
            }
            else if (n == 0)
            {
                LOG(INFO, "client %s quit\n", addr.AddrStr().c_str());
                break;
            }
            else
            {
                LOG(ERROR, "read error: %s\n", addr.AddrStr().c_str());
                break;
            }
        }
        ::close(sockfd);
    }

    ~TcpServer() {}

private:
    uint16_t _port;
    int _listensockfd;
    bool _isrunning;
};

1. 类结构和成员变量

成员变量:

_port:服务器监听的端口号,默认为 8888。

_listensockfd:服务器的监听套接字描述符。

_isrunning:标志服务器是否在运行,控制服务器的生命周期。

枚举常量:

SOCKET_ERROR:表示创建套接字失败时的错误码。

BIND_ERROR:表示绑定地址失败时的错误码。

LISTEN_ERR:表示监听失败时的错误码。

常量:

gport:默认的监听端口号。

gsock:默认的套接字标识符,表示未初始化的套接字。

gblcklog:用于 listen() 调用的 backlog 参数,指定操作系统允许的最大连接数。

task_t:使用 std::function<void()> 定义的任务类型,代表线程池中将要执行的任务。这里用来封装客户端连接的处理工作。

2. TcpServer 类的主要函数

TcpServer::InitServer()

该函数用于初始化服务器,完成以下工作:

创建套接字:

使用 ::socket() 创建一个 TCP 套接字。

如果创建失败,输出错误日志并退出。

绑定地址:

使用 ::bind() 将套接字与指定的地址和端口绑定。

如果绑定失败,输出错误日志并退出。

监听连接:

使用 ::listen() 将套接字设为监听状态,等待客户端连接。

如果监听失败,输出错误日志并退出。

TcpServer::Loop()

该函数是服务器的主循环,它负责接受客户端的连接并将连接交给线程池处理:

在循环中,调用 ::accept() 接受来自客户端的连接请求。

如果连接成功,创建一个 InetAddr 对象以保存客户端的 IP 地址和端口信息。

将处理任务(客户端连接的处理)封装为一个 task_t,然后将任务提交给线程池。

TcpServer::Service()

这是处理客户端请求的核心函数:

该函数会从客户端套接字中读取数据(通过 ::read())。

如果读取成功,则将客户端发送的消息打印出来,并以 "server echo" 开头将数据返回给客户端。

如果读取到的数据为空(客户端关闭连接),则输出日志并关闭套接字。

如果发生读取错误,则输出错误日志并关闭套接字。

TcpServer::Execute()

这是一个静态函数,用于在线程池中执行处理任务。该函数被线程池中的线程调用,处理客户端请求:

它首先将线程标记为分离状态,以便线程结束后自动释放资源。

然后它调用 Service() 来处理客户端请求。

处理完成后,销毁 ThreadData 对象。

3. 线程池的使用

服务器使用线程池来处理每个客户端的连接。每当有新连接时,创建一个 task_t(封装了 TcpServer::Service() 方法),然后将任务提交给线程池:

这样线程池中的线程会并行处理这些任务,避免了每次都创建新线程的开销。

4. ThreadData 类

ThreadData 类用于封装每个客户端连接的相关信息:

sockfd:客户端的套接字。

self:指向当前 TcpServer 对象的指针。

addr:客户端的地址信息。

该类主要用于在线程中传递参数,它的生命周期由线程池中的线程控制。

5. 错误处理

在整个过程中,如果发生错误(如创建套接字失败、绑定失败、监听失败等),会通过 LOG() 打印详细错误信息,并通过 exit() 终止程序。可以在实际应用中根据需求改进错误处理逻辑(如重新尝试,或者返回错误状态)。

6. 程序退出

服务器的退出主要由 Loop() 中的 _isrunning 控制。当前 _isrunning 为 false 时,Loop() 会退出。程序会继续执行后续的清理工作,并最终终止。

TcpServerMain.cc

#include "TcpServer.hpp"

#include <memory>


// ./tcpserver 8888
int main(int argc, char *argv[])
{
    if(argc != 2)
    {
        std::cerr << "Usage: " << argv[0] << " local-port" << std::endl;
        exit(0);
    }
    uint16_t port = std::stoi(argv[1]);
    std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(port);
    tsvr->InitServer();
    tsvr->Loop();

    return 0;
}

TcpClientMain.cc

#include <iostream>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

// ./tcpclient server-ip server-port
int main(int argc, char *argv[])
{
    if (argc != 3)
    {
        std::cerr << "Usage: " << argv[0] << " server-ip server-port" << std::endl;
        exit(0);
    }
    std::string serverip = argv[1];
    uint16_t serverport = std::stoi(argv[2]);

    // 1. 创建socket
    int sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
    {
        std::cerr << "create socket error" << std::endl;
        exit(1);
    }

    // 注意:不需要显示的bind,但是一定要有自己的IP和port,所以需要隐式的bind,OS会自动bind sockfd,用自己的IP和随机端口号
    // 什么时候进行自动bind?If the connection or binding succeeds
    struct sockaddr_in server;
    memset(&server, 0, sizeof(server));
    server.sin_family = AF_INET;
    server.sin_port = htons(serverport);
    ::inet_pton(AF_INET, serverip.c_str(), &server.sin_addr);

    int n = ::connect(sockfd, (struct sockaddr *)&server, sizeof(server));
    if (n < 0)
    {
        std::cerr << "connect socket error" << std::endl;
        exit(2);
    }

    while(true)
    {
        std::string message;
        std::cout << "Enter #";
        std::getline(std::cin, message);

        write(sockfd, message.c_str(), message.size());

        char echo_buffer[1024];
        n = read(sockfd, echo_buffer, sizeof(echo_buffer));
        if(n > 0)
        {
            echo_buffer[n] = 0;
            std::cout << echo_buffer << std::endl;
        }
        else
        {
            break;
        }
    }
    ::close(sockfd);
    return 0;
}

效果展示  

虽然做的有点粗糙, 但是完成的还不错!

下一章还是 TCP socket 编程, 实现命令处理的功能, 处理从客户端接收到的命令,检查这些命令的安全性,并执行这些命令。好了, 篇幅已经很长了, 我们下期在见~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/923920.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Stable Diffusion 3详解

&#x1f33a;系列文章推荐&#x1f33a; 扩散模型系列文章正在持续的更新&#xff0c;更新节奏如下&#xff0c;先更新SD模型讲解&#xff0c;再更新相关的微调方法文章&#xff0c;敬请期待&#xff01;&#xff01;&#xff01;&#xff08;本文及其之前的文章均已更新&…

网络安全原理与技术思考题/简答题

作业1&#xff08;第1章、第2章、第8章&#xff09; 1. 网络安全的基本属性有哪些&#xff1f;简单解释每个基本属性的含义。网络安全的扩展属性包括哪些&#xff1f; 基本属性&#xff1a; 1.机密性(Confidentiality)&#xff1a; 含义&#xff1a;确保信息不被未授权的用户…

【大数据学习 | Spark-Core】Spark的分区器(HashPartitioner和RangePartitioner)

之前学过的kv类型上面的算子 groupby groupByKey reduceBykey sortBy sortByKey join[cogroup left inner right] shuffle的 mapValues keys values flatMapValues 普通算子&#xff0c;管道形式的算子 shuffle的过程是因为数据产生了打乱重分&#xff0c;分组、排序、join等…

Java代码实现数字信封

1. 前言 本篇博客是工作经验总结&#xff0c;如果您发现此篇博客有疏漏或有待改进之处&#xff0c;欢迎评论区交流。 2. 数字信封 数字信封使用的是接收者的非对称密钥对。即&#xff1a;用接收者的公钥加密&#xff0c;且只能由接收者的私钥解密。其实现过程如下&#xff1a;…

第 4 章 Java 并发包中原子操作类原理剖析

原子变量操作类 AtomicLong 是原子性递增或者递减类&#xff0c;其内部使用 Unsafe 来实现&#xff0c;AtomicLong类也是在 rt.jar 包下面的&#xff0c;AtomicLong 类就是通过 BootStarp 类加载器进行加载的。这里的原子操作类都使用 CAS 非阻塞算法 private static final lon…

Android调起系统分享图片到其他应用

Android调起系统分享图片到其他应用 有时候分享不想接第三方的&#xff0c;其实如果你的分享要求不是很高&#xff0c;调系统的分享也是可以的。 一、思路&#xff1a; 用intent.action Intent.ACTION_SEND 二、效果图&#xff1a; 三、关键代码&#xff1a; //这个是分享…

C++中虚继承为什么可以解决菱形继承的数据冗余问题

在C中菱形继承会有数据冗余的问题发生&#xff0c;我们可以使用虚继承来解决&#xff0c;那虚继承的原理是什么&#xff0c;为什么它可以解决这个问题。 菱形继承的数据冗余问题 class A { public:int data; };class B : public A {};class C : public A {};class D : public…

LSA详情与特殊区域

LSA是构成LSDB的重要原材料&#xff0c;在OSPF中发挥很大作用。 报文 通用头部 LS age&#xff1a;LSA寿命&#xff0c;0-3600s Options&#xff1a;可选项 LS type&#xff1a;LSA类型&#xff0c;三要素之一 Link State ID&#xff1a;LSAID 三要素之一 Advertising Ro…

Kubeadm 安装 Kubernetes 高可用集群 v1.30.0

1、修改主机名&#xff08;各个节点&#xff09; hostnamectl set-hostname xxx2、hosts 文件加入主机名&#xff08;全部节点&#xff09; cat /etc/hosts 192.168.88.5 master1 192.168.88.6 master2 192.168.88.7 master3 192.168.88.8 node13、关闭防火墙&#xff08;全部…

泥石流灾害风险评估与模拟丨AI与R语言、ArcGIS、HECRAS融合,提升泥石流灾害风险预测的精度和准确性

目录 第一章 理论基础 第二章 泥石流风险评估工具 第三章 数据准备与因子提取 第四章 泥石流灾害评价 第五章 HECRAS软件的应用 第六章 操作注意事项与模型优化 泥石流灾害的频发与严重后果&#xff0c;已成为全球范围内防灾减灾工作的重大挑战。随着科技的不断进步&…

自由学习记录(25)

只要有修改&#xff0c;子表就不用元表的参数了&#xff0c;用自己的参数&#xff08;只不过和元表里的那个同名&#xff09; 子表用__index“继承”了父表的值&#xff0c;此时子表仍然是空表 一定是创建这样一个同名的变量在原本空空的子表里&#xff0c; 传参要传具体的变…

leetcode 3206. 交替组 I 简单

给你一个整数数组 colors &#xff0c;它表示一个由红色和蓝色瓷砖组成的环&#xff0c;第 i 块瓷砖的颜色为 colors[i] &#xff1a; colors[i] 0 表示第 i 块瓷砖的颜色是 红色 。colors[i] 1 表示第 i 块瓷砖的颜色是 蓝色 。 环中连续 3 块瓷砖的颜色如果是 交替 颜色&…

彻底解决 macOS 下Matplotlib 中文显示乱码问题

彻底解决 macOS 下Matplotlib 中文显示乱码问题 在使用 Python 的 Matplotlib 库进行数据可视化时&#xff0c;中文字符的显示常常会出现乱码问题&#xff0c;尤其在 macOS 系统上。在网上找了一大堆方法&#xff0c;花了很久&#xff0c;发现不是要安装各种字体就是要改配置&…

深度学习笔记24_天气预测

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制 一、我的环境 1.语言环境&#xff1a;Python 3.9 2.编译器&#xff1a;Pycharm 3.深度学习环境&#xff1a;TensorFlow 2.10.0 二、GPU设置…

podman 源码 5.3.1编译

1. 构建环境 在麒麟V10服务器操作系统上构建&#xff1a;Kylin-Server-V10-GFB-Release-2204-Build03-ARM64.iso。由于只是编译 podman 源码&#xff0c;没必要特地在物理机或服务上安装一个这样的操作系统&#xff0c;故采用在虚拟机里验证。 2. 安装依赖 参考资料&#xf…

【K8S系列】深入解析 Kubernetes 中的 Deployment

Kubernetes&#xff08;K8s&#xff09;是一个开源的容器编排平台&#xff0c;旨在自动化应用程序的部署、扩展和管理。在 Kubernetes 中&#xff0c;Deployment 是一种用于管理无状态应用的工作负载资源&#xff0c;提供了丰富的功能&#xff0c;包括版本控制、滚动更新和回滚…

玩转 Burp Suite (1)

内容预览 ≧∀≦ゞ 玩转 Burp Suite (1)声明Burp Suite 简介Dashboard&#xff08;仪表盘&#xff09;1. 默认任务管理2. 暂停任务3. 新建扫描任务4. 使用总结 Target&#xff08;目标&#xff09;1. SIte Map &#xff08;站点地图&#xff09;2. Scope&#xff08;范围&#…

【ArcGISPro】Sentinel-2数据处理

错误 默认拉进去只组织了4个波段,但是实际有12个波段 解决方案 数据下载 Sentinel-2 数据下载-CSDN博客 数据处理 数据查看 创建镶嵌数据集 在数据管理工具箱中找到创建镶嵌数据集

智慧环保大数据解决方案

1. 智慧环保概述 智慧环保是“数字环保”的延伸&#xff0c;借助物联网技术整合环境监控对象&#xff0c;通过云计算实现环境管理与决策的智能化。其核心在于快速感知城市环境指标&#xff0c;保障人体健康与生命安全。 2. 智慧环保总体目标 智慧环保的总体目标是建立全面感…

【H2O2|全栈】JS进阶知识(八)ES6(4)

目录 前言 开篇语 准备工作 浅拷贝和深拷贝 浅拷贝 概念 常见方法 弊端 案例 深拷贝 概念 常见方法 弊端 逐层拷贝 原型 构造函数 概念 形式 成员 弊端 显式原型和隐式原型 概念 形式 constructor 概念 形式 原型链 概念 形式 结束语 前言 开篇语…