TcpServer服务器管理模块(模块十)

目录

类功能

类定义

类实现

编译测试

server.cc

gdb测试断点

忽略SIGPIPE信号


类功能

类定义

// TcpServer服务器管理模块(即全部模块的整合)
class TcpServer
{
private:
    uint64_t _next_id;                                  // 这是一个自动增长的连接ID
    int _port;
    int _timeout;                                       // 这是非活跃连接的统计时间--多长时间无通信就是非活跃连接
    bool _enable_inactive_release;                      // 是否启动非活跃连接超时销毁的判断标志
    EventLoop _baseloop;                                // 这是主线程的EventLoop对象,负责监听事件的处理
    Acceptor _acceptor;                                 // 这是监听套接字的管理对象
    LoopThreadPool _pool;                               // 这是从属EventLoop线程池
    std::unordered_map<uint64_t, PtrConnection> _conns; // 保管所有连接对应的share_ptr对象,这里面的对象被删除,就意味这某一个连接被删除

    using ConnectedCallback = std::function<void(const PtrConnection &)>;
    using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;
    using ClosedCallback = std::function<void(const PtrConnection &)>;
    using AnyEventCallback = std::function<void(const PtrConnection &)>;
    using Functor = std::function<void()>;
    ConnectedCallback _connected_callback;
    MessageCallback _message_callback;
    ClosedCallback _closed_callback;
    AnyEventCallback _event_callback;
private:
    void NewConnection(int fd); // 为新连接构造一个Connection进行管理
    void RemoveConnection(); // 从管理Connection的_conns移除连接信息
public:
    TcpServer();
    void SetThreadCount(int count);
    void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }
    void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }
    void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }
    void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }
    void EnableInactiveRelease(int timeout);
    void RunAfter(const Functor &task, int delay);  // 用于添加一个定时任务
    void Start();
};

类实现

// TcpServer服务器管理模块(即全部模块的整合)
class TcpServer
{
private:
    uint64_t _next_id; // 这是一个自动增长的连接ID
    int _port;
    int _timeout;                                       // 这是非活跃连接的统计时间--多长时间无通信就是非活跃连接
    bool _enable_inactive_release;                      // 是否启动非活跃连接超时销毁的判断标志
    EventLoop _baseloop;                                // 这是主线程的EventLoop对象,负责监听事件的处理
    Acceptor _acceptor;                                 // 这是监听套接字的管理对象
    LoopThreadPool _pool;                               // 这是从属EventLoop线程池
    std::unordered_map<uint64_t, PtrConnection> _conns; // 保管所有连接对应的share_ptr对象,这里面的对象被删除,就意味这某一个连接被删除

    using ConnectedCallback = std::function<void(const PtrConnection &)>;
    using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;
    using ClosedCallback = std::function<void(const PtrConnection &)>;
    using AnyEventCallback = std::function<void(const PtrConnection &)>;
    using Functor = std::function<void()>;
    ConnectedCallback _connected_callback;
    MessageCallback _message_callback;
    ClosedCallback _closed_callback;
    AnyEventCallback _event_callback;

private:
    void RunAfterInLoop(const Functor &task, int delay)
    {
        _next_id++;
        _baseloop.TimerAdd(_next_id, delay, task);
    }
    // 为新连接构造一个Connection进行管理
    void NewConnection(int fd)
    {
        _next_id++;
        PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));
        conn->SetMessageCallback(_message_callback);
        conn->SetClosedCallback(_closed_callback);
        conn->SetConnectedCallback(_connected_callback);
        conn->SetAnyEventCallback(_event_callback);
        conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));
        if (_enable_inactive_release)
            conn->EnableInactiveRelease(10); // 启动非活跃超时销毁
        conn->Established();                 // 就绪初始化
        _conns.insert(std::make_pair(_next_id, conn));
    }
    void RemoveConnectionInLoop(const PtrConnection &conn)
    {
        int id = conn->Id();
        auto it = _conns.find(id);
        if (it != _conns.end())
        {
            _conns.erase(it);
        }
    }
    // 从管理Connection的_conns移除连接信息
    void RemoveConnection(const PtrConnection &conn)
    {
        _baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));
    }

public:
    TcpServer(int port) : _port(port),
                          _next_id(0),
                          _enable_inactive_release(false),
                          _acceptor(&_baseloop, port),
                          _pool(&_baseloop)
    {
        _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));
        _acceptor.Listen(); // 将监听套接字挂到baseloop上
    }
    void SetThreadCount(int count) { return _pool.SetThreadCount(count); }
    void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }
    void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }
    void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }
    void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }
    void EnableInactiveRelease(int timeout) { _timeout = timeout, _enable_inactive_release = true; }
    // 用于添加一个定时任务
    void RunAfter(const Functor &task, int delay)
    {
        _baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));
    }
    void Start()
    {
        _pool.Create(); // 创建线程池中的从属线程
        _baseloop.Start();
    }
};

编译测试

为了便于测试整合,创建了一个新的文件server.cc

server.cc

#include "../source/server.hpp"

void OnConnected(const PtrConnection &conn)
{
    DBG_LOG("NEW CONNECTION:%p", conn.get());
}
void OnClosed(const PtrConnection &conn)
{
    DBG_LOG("CLOSE CONNECTION:%p", conn.get());
}
void OnMessage(const PtrConnection &conn, Buffer *buf)
{
    DBG_LOG("%s", buf->ReadPosition());
    buf->MoveReadOffset(buf->ReadAbleSize());
    std::string str = "Hello World";
    conn->Send(str.c_str(), str.size());
    conn->Shutdown(); // 调用关闭接口
}

int main()
{
    TcpServer server(8500);
    server.SetThreadCount(2);
    server.EnableInactiveRelease(10);
    server.SetClosedCallback(OnClosed);
    server.SetConnectedCallback(OnConnected);
    server.SetMessageCallback(OnMessage);
    server.Start();
    return 0;
}

服务端

客户端

符合预期

gdb测试断点

在测试的过程中,出现了一些小问题,通过断点进行处理

忽略SIGPIPE信号

前几天测试发生的问题,可以依靠创建下面一个类来忽略该信号的触发

忽略SIGPIPE信号,当连接断开的时候,如果我们继续向对端send发送信息,就会触发异常,即SIGPIPE异常,这个就是导致客户端异常退出的原因

// 忽略SIGPIPE信号,当连接断开的时候,如果我们继续向对端send发送信息,就会触发异常,即SIGPIPE异常,这个就是导致客户端异常退出的原因
class NetWork{
    public:
        NetWork(){
            DBG_LOG("SIGPIPE INIT");
            signal(SIGPIPE, SIG_IGN); // 忽视SIGPIPE异常,这个会导致进程退出
        }
};
static NetWork nw;  // 这个是为了执行里面的构造函数

服务器主体源码

因为写到这里已经算是到了一定程度了,也就是说服务器的部分已经基本完成,后续会以次为基础,创建一个回显服务器,这里就直接贴代码了,不要嫌长

#ifndef __M_SERVER_H__  
#define __M_SERVER_H__
#include <iostream>
#include <vector>
#include <cstdint>
#include <cassert>
#include <ctime>
#include <cstring>
#include <string>
#include <unistd.h>
#include <typeinfo>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <fcntl.h>
#include <functional>
#include <signal.h>
#include <unordered_map>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/timerfd.h>
#include <sys/socket.h>
#include <sys/types.h>

#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL DBG

#define LOG(level, format, ...)                                                                                        \
    do                                                                                                                 \
    {                                                                                                                  \
        if (level < LOG_LEVEL)                                                                                         \
            break;                                                                                                     \
        time_t t = time(NULL);                                                                                         \
        struct tm *ltm = localtime(&t);                                                                                \
        char tmp[32] = {0};                                                                                            \
        strftime(tmp, 31, "%H:%M:%S", ltm);                                                                            \
        fprintf(stdout, "[%p %s %s:%d] " format "\n", (void *)pthread_self(), tmp, __FILE__, __LINE__, ##__VA_ARGS__); \
    } while (0)

#define INF_LOG(format, ...) LOG(INF, format, ##__VA_ARGS__)
#define DBG_LOG(format, ...) LOG(DBG, format, ##__VA_ARGS__)
#define ERR_LOG(format, ...) LOG(ERR, format, ##__VA_ARGS__)

// 缓冲区类
#define BUFFER_DEFAULT_SIZE 1024 // Buffer 默认起始大小
class Buffer
{
private:
    std::vector<char> _buffer; // 使用vector进行内存空间管理
    uint64_t _reader_idx;      // 读偏移
    uint64_t _writer_idx;      // 写偏移
public:
    Buffer() : _reader_idx(0), _writer_idx(0), _buffer(BUFFER_DEFAULT_SIZE) {}
    char *Begin() { return &*_buffer.begin(); }
    // 获取当前写入起始地址
    char *WritePosition() { return Begin() + _writer_idx; }
    // 获取当前读取起始地址
    char *ReadPosition() { return Begin() + _reader_idx; }
    // 获取缓冲区末尾空闲空间大小--写偏移之后的空闲空间, 总体空间大小减去写偏移
    uint64_t TailIdleSize() { return _buffer.size() - _writer_idx; }
    // 获取缓冲区起始空闲空间大小--读偏移之前的空闲空间
    uint64_t HeadIdleSize() { return _reader_idx; }
    // 获取可读数据大小 = 写偏移 - 读偏移
    uint16_t ReadAbleSize() { return _writer_idx - _reader_idx; };
    // 将读偏移向后移动
    void MoveReadOffset(uint64_t len)
    {
        if (len == 0)
            return;
        // 向后移动的大小, 必须小于可读数据大小
        assert(len <= ReadAbleSize());
        _reader_idx += len;
    }
    // 将写偏移向后移动
    void MoveWriteOffset(uint64_t len)
    {
        // 向后移动的大小,必须小于当前后边的空闲空间大小
        assert(len <= TailIdleSize());
        _writer_idx += len;
    }
    // 确保可写空间足够(整体空闲空间够了就移动数据,否则就扩容)
    void EnsureWriteSpace(uint64_t len)
    {
        // 如果末尾空闲空间大小足够,直接返回
        if (TailIdleSize() >= len)
        {
            return;
        }
        // 末尾空闲空间不够,则判断加上起始位置的空闲空间大小是否足够,够了就将数据移动到起始位置
        if (len <= TailIdleSize() + HeadIdleSize())
        {
            // 将数据移动到起始位置
            uint64_t rsz = ReadAbleSize();                            // 把当前数据大小先保存起来
            std::copy(ReadPosition(), ReadPosition() + rsz, Begin()); // 把可读数据拷贝到起始位置
            _reader_idx = 0;                                          // 将读偏移归0
            _writer_idx = rsz;                                        // 将写位置置为可读数据大小, 因为当前的可读数据大小就是写偏移量
        }
        else
        {
            // 总体空间不够,则需要扩容,不移动数据,直接给写偏移之后扩容足够空间即可
            _buffer.resize(_writer_idx + len);
        }
    }
    // 写入数据
    void Write(const void *data, uint64_t len)
    {
        // 1.保证有足够空间, 2.拷贝数据进去
        EnsureWriteSpace(len);
        const char *d = (const char *)data;
        std::copy(d, d + len, WritePosition());
    }
    void WriteAndPush(const void *data, uint64_t len)
    {
        Write(data, len);
        MoveWriteOffset(len);
    }
    void WriteString(const std::string &data)
    {
        return Write(data.c_str(), data.size());
    }
    void WriteStringAndPush(const std::string &data)
    {
        WriteString(data);
        MoveWriteOffset(data.size());
    }
    void WriteBuffer(Buffer &data)
    {
        return Write(data.ReadPosition(), data.ReadAbleSize());
    }
    void WriteBufferAndPush(Buffer &data)
    {
        WriteBuffer(data);
        MoveWriteOffset(data.ReadAbleSize());
    }
    // 读取数据
    void Read(void *buf, uint64_t len)
    {
        // 要求获取的数据大小必须小于可读数据大小
        assert(len <= ReadAbleSize());
        std::copy(ReadPosition(), ReadPosition() + len, (char *)buf);
    }
    void ReadAndPop(void *buf, uint64_t len)
    {
        Read(buf, len);
        MoveReadOffset(len);
    }
    std::string ReadAsString(uint64_t len)
    {
        // 要求获取的数据大小必须小于可读数据大小
        assert(len <= ReadAbleSize());
        std::string str;
        str.resize(len);
        Read(&str[0], len); // 这里不直接用str.c_str()的原因是,这个的返回值是const类型
        return str;
    }
    std::string ReadAsStringAndPop(uint64_t len)
    {
        assert(len <= ReadAbleSize());
        std::string str = ReadAsString(len);
        MoveReadOffset(len);
        return str;
    }
    char *FindCRLF()
    {
        char *res = (char *)memchr(ReadPosition(), '\n', ReadAbleSize());
        return res;
    }
    // 这种情况针对的是,通常获取一行数据
    std::string GetLine()
    {
        char *pos = FindCRLF();
        if (pos == NULL)
            return "";
        // +1 是为了把换行字符也取出来
        return ReadAsString(pos - ReadPosition() + 1);
    }
    std::string GetLineAndPop()
    {
        std::string str = GetLine();
        MoveReadOffset(str.size());
        return str;
    }

    // 清空缓冲区
    void Clear()
    {
        // 只需要将偏移量归0即可
        _reader_idx = 0;
        _writer_idx = 0;
    }
};

// 套接字类
#define MAX_LISTEN 1024
class Socket
{
private:
    int _sockfd;

public:
    Socket() : _sockfd(-1) {}
    Socket(int fd) : _sockfd(fd) {}
    ~Socket() { Close(); };
    int Fd() { return _sockfd; }
    // 创建套接字
    bool Create()
    {
        // int socket(int domain, int type, int protocol)
        _sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
        if (_sockfd < 0)
        {
            ERR_LOG("CREATE SOCKET FAILED!");
            return false;
        }
        return true;
    }
    // 绑定地址信息
    bool Bind(const std::string &ip, uint16_t port)
    {
        struct sockaddr_in addr;
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        addr.sin_addr.s_addr = inet_addr(ip.c_str());
        socklen_t len = sizeof(struct sockaddr_in);
        // int bind(int sockfd, struct sockaddr* addr, socklen_t len)
        int ret = bind(_sockfd, (struct sockaddr *)&addr, len);
        if (ret < 0)
        {
            ERR_LOG("BIND ADDRESS FAILED!");
            return false;
        }
        return true;
    }
    // 开始监听
    bool Listen(int backlog = MAX_LISTEN)
    {
        // int listen(int backlog)
        int ret = listen(_sockfd, backlog);
        if (ret < 0)
        {
            ERR_LOG("SOCKET LISTEN FAILED!");
            return false;
        }
        return true;
    }
    // 向服务器发起连接
    bool Connect(const std::string &ip, uint16_t port)
    {
        struct sockaddr_in addr;
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        addr.sin_addr.s_addr = inet_addr(ip.c_str());
        socklen_t len = sizeof(struct sockaddr_in);
        // int connect(int sockfd, struct sockaddr* addr, socklen_t len)
        int ret = connect(_sockfd, (struct sockaddr *)&addr, len);
        if (ret < 0)
        {
            ERR_LOG("CONNECT SERVER FAILED!");
            return false;
        }
        return true;
    }
    // 获取新连接
    int Accept()
    {
        // int accept(int sockfd, struct sockaddr *addr, socklen_t *len);
        int newfd = accept(_sockfd, NULL, NULL);
        if (newfd < 0)
        {
            ERR_LOG("SOCKET ACCEPT FAILED!");
            return -1;
        }
        return newfd;
    }
    // 接收数据
    ssize_t Recv(void *buf, size_t len, int flag = 0) // 0 阻塞
    {
        // ssize_t recv(int sockfd, void *buf, size_t len, int flag)
        ssize_t ret = recv(_sockfd, buf, len, flag);
        if (ret <= 0)
        {
            // EAGAIN 当前的接收缓冲区中没用数据了,在非阻塞的情况下才有这个错误
            // EINTR 表示当前socket的阻塞等待,被信号打断了
            if (errno == EAGAIN || errno == EINTR)
            {
                return 0; // 表示这次没用接收到数据
            }
            ERR_LOG("SOCKET RECV FAILED!");
            return -1;
        }
        return ret; // 实际接收的数据长度
    }
    ssize_t NonBlockRecv(void *buf, size_t len)
    {
        return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞
    }
    // 发送数据
    ssize_t Send(const void *buf, size_t len, int flag = 0)
    {
        // ssize_t send(int sockfd, void *data, size_t len, int flag)
        ssize_t ret = send(_sockfd, buf, len, flag);
        if (ret < 0)
        {
            if (errno == EAGAIN || errno == EINTR)
            {
                return 0;
            }
            ERR_LOG("SOCKET SEND FAILED!!");
            return -1;
        }
        return ret; // 实际发送的数据长度
    }
    ssize_t NonBlockSend(void *buf, size_t len)
    {
        if (len == 0)
            return 0;
        return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞
    }
    // 关闭套接字
    void Close()
    {
        if (_sockfd != -1)
        {
            close(_sockfd);
            _sockfd = -1;
        }
    }
    // 创建一个服务器连接
    bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) // 接收全部
    {
        // 1.创建套接字 2.绑定地址 3.开始监听 4.设置非阻塞 5.启动地址重用
        if (Create() == false)
            return false;
        if (block_flag)
            NonBlock(); // 默认阻塞
        if (Bind(ip, port) == false)
            return false;
        if (Listen() == false)
            return false;
        ReuseAddress();
        return true;
    }
    // 创建一个客户端连接
    bool CreateClient(uint16_t port, const std::string &ip)
    {
        // 1.创建套接字 2.指向连接服务器
        if (Create() == false)
            return false;
        if (Connect(ip, port) == false)
            return false;
        return true;
    }
    // 设置套接字选项 -- 开启地址端口重用
    void ReuseAddress()
    {
        // int setsockopt(int fd, int level, int optname, void *val, int vallen)
        int val = 1;
        setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(int)); // 地址
        val = 1;
        setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(int)); // 端口号
    }
    // 设置套接字阻塞属性 -- 设置为非阻塞
    void NonBlock()
    {
        // int fcntl(int fd, int cmd, .../*arg*/)
        int flag = fcntl(_sockfd, F_GETFL, 0);
        fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
    }
};

class Poller; // 整合测试1:声明
class EventLoop;
// Channel类
class Channel
{
private:
    int _fd;
    EventLoop *_loop;
    uint32_t _events;  // 当前需要监控的事件
    uint32_t _revents; // 当前连接触发的事件
    using EventCallback = std::function<void()>;
    EventCallback _read_callback;  // 可读事件被触发的回调函数
    EventCallback _write_callback; // 可写事件被触发的回调函数
    EventCallback _error_callback; // 错误事件被触发的回调函数
    EventCallback _close_callback; // 连接断开事件被触发的回调函数
    EventCallback _event_callback; // 任意事件被触发的回调函数
public:
    Channel(EventLoop *loop, int fd) : _fd(fd), _events(0), _revents(0), _loop(loop) {}
    int Fd() { return _fd; }
    uint32_t Events() { return _events; } // 获取想要监控的事件
    void SetREvents(uint32_t events) { _revents = events; }
    void SetReadCallback(const EventCallback &cb) { _read_callback = cb; } // 设置实际就绪的事件
    void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }
    void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }
    void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }
    void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }
    // 当前是否监控了可读
    bool ReadAble() { return (_events & EPOLLIN); }
    // 当前是否监控了可写
    bool WriteAble() { return (_events & EPOLLOUT); }
    // 启动读事件监控
    void EnableRead()
    {
        _events |= EPOLLIN;
        Update();
    }
    // 启动写事件监控
    void EnableWrite()
    {
        _events |= EPOLLOUT;
        Update();
    }
    // 关闭读事件监控
    void DisableRead()
    {
        _events &= ~EPOLLIN;
        Update();
    }
    // 关闭写事件监控
    void DisableWrite()
    {
        _events &= ~EPOLLOUT;
        Update();
    }
    // 关闭所有事件监控
    void DisableAll()
    {
        _events = 0;
        Update();
    }
    // 移除监控
    void Remove(); // 声明和实现要分离,因为实现的时候是不知道里面有什么函数成员的
    void Update(); // 这两个特殊,所以把实现放在Poller类的下面进行实现
    // 事件处理,一旦触发了事件,就调用这个函数,自己触发了什么事件如何处理自己决定
    void HandleEvent()
    {
        // 第二参数,对方关闭连接,第三参数,带外数据
        if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
        {
            if (_event_callback) // 不管任何事件,都调用的回调函数
                _event_callback();
            if (_read_callback)
                _read_callback();
        }
        // 有可能会释放连接的操作事件,一次只处理一个
        if (_revents & EPOLLOUT)
        {
            if (_event_callback)
                _event_callback(); // 放到事件处理完毕后调用,刷新活跃度
            if (_write_callback)
                _write_callback();
        }
        else if (_revents & EPOLLERR)
        {
            if (_event_callback)
                _event_callback();
            if (_error_callback)
                _error_callback();
        }
        else if (_revents & EPOLLHUP)
        {
            if (_event_callback)
                _event_callback();
            if (_close_callback)
                _close_callback();
        }
    }
};

// Poller描述符监控类
#define MAX_EPOLLEVENTS 1024
class Poller
{
private:
    int _epfd;
    struct epoll_event _evs[MAX_EPOLLEVENTS];
    std::unordered_map<int, Channel *> _channels;

private:
    // 对epoll的直接操作
    void Update(Channel *channel, int op)
    {
        // int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev)
        int fd = channel->Fd();
        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = channel->Events();
        int ret = epoll_ctl(_epfd, op, fd, &ev);
        if (ret < 0)
        {
            ERR_LOG("EPOLLCTL FAILED!");
        }
        return;
    }
    // 判断一个Channel 是否已经添加了事件监控
    bool HasChannel(Channel *channel)
    {
        auto it = _channels.find(channel->Fd());
        if (it == _channels.end())
        {
            return false;
        }
        return true;
    }

public:
    Poller()
    {
        _epfd = epoll_create(MAX_EPOLLEVENTS); // 这个值大于0就行了,无用处
        if (_epfd < 0)
        {
            ERR_LOG("EPOLL CREATE FAILED!");
            abort(); // 退出程序
        }
    }
    // 添加或修改监控事件
    void UpdateEvent(Channel *channel)
    {
        bool ret = HasChannel(channel);
        if (ret == false)
        {
            // 不存在则添加
            _channels.insert(std::make_pair(channel->Fd(), channel));
            return Update(channel, EPOLL_CTL_ADD);
        }
        return Update(channel, EPOLL_CTL_MOD);
    }
    // 移除监控
    void RemoveEvent(Channel *channel)
    {
        auto it = _channels.find(channel->Fd());
        if (it != _channels.end())
        {
            _channels.erase(it);
        }
        Update(channel, EPOLL_CTL_DEL);
    }
    // 开始监控, 返回活跃连接
    void Poll(std::vector<Channel *> *active)
    {
        // int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout);
        int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1); // -1阻塞监控
        if (nfds < 0)
        {
            if (errno == EINTR) // 信号打断
            {
                return;
            }
            ERR_LOG("EPOLL WAIT ERROR:%s\n", strerror(errno));
            abort();
        }
        for (int i = 0; i < nfds; i++) // 添加活跃信息
        {
            auto it = _channels.find(_evs[i].data.fd); // 没找到就说明不在我们的管理之下,这是不正常的
            assert(it != _channels.end());
            it->second->SetREvents(_evs[i].events); // 设置实际就绪的事件
            active->push_back(it->second);
        }
        return;
    }
};

// timerwheel时间轮定时器类
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
private:
    uint64_t _id;         // 定时器任务对象
    uint32_t _timeout;    // 定时任务的超时时间
    bool _canceled;       // false-表示没有被取消,true-表示被取消
    TaskFunc _task_cb;    // 定时器要执行的定时任务
    ReleaseFunc _release; // 用于删除TimerWheel中保存的定时器对象信息
public:
    TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb) : _id(id), _timeout(delay), _task_cb(cb), _canceled(false) {}
    ~TimerTask()
    {
        if (_canceled == false)
            _task_cb();
        _release();
    }
    void Cancel() { _canceled = true; }
    void SetRelease(const ReleaseFunc &cb) { _release = cb; }
    uint32_t DelayTime() { return _timeout; } // 返回时间
};

class TimerWheel
{
private:
    using WeakTask = std::weak_ptr<TimerTask>;
    using PtrTask = std::shared_ptr<TimerTask>;
    int _tick;     // 当前的的秒针,走到哪里哪里就释放执行
    int _capacity; // 表盘最大数量 -- 其实就是最大延迟时间
    std::vector<std::vector<PtrTask>> _wheel;
    // 用weak_ptr来构造出新的shared_ptr用来计数,不过后续要记得释放
    std::unordered_map<uint64_t, WeakTask> _timers;

    EventLoop *_loop;
    int _timerfd; // 定时器描述符 -- 可读事件回调就是读取计数器,执行定时任务
    std::unique_ptr<Channel> _timer_channel;

private:
    void RemoveTimer(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it != _timers.end())
        {
            _timers.erase(it);
        }
    }
    static int CreateTimerfd()
    {
        // int timerfd_create(int clockid, int flags);
        int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
        if (timerfd < 0)
        {
            ERR_LOG("TIMERFD CREATE FAILED!");
            abort();
        }
        // int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec);
        struct itimerspec itime;
        itime.it_value.tv_sec = 1;                 // 设置 秒钟
        itime.it_value.tv_nsec = 0;                // 设置 纳秒 第一次超时时间为1s后
        itime.it_interval.tv_sec = 1;              // 同上
        itime.it_interval.tv_nsec = 0;             // 第一次超时后,每隔超时的间隔时
        timerfd_settime(timerfd, 0, &itime, NULL); // 0代表阻塞式
        return timerfd;
    }
    void ReadTimefd()
    {
        uint64_t times;
        int ret = read(_timerfd, &times, 8);
        if (ret < 0)
        {
            ERR_LOG("READ TIMERFD FAILED!");
            abort();
        }
        return;
    }
    // 这个函数应该每秒钟被执行一次,相当于秒钟向后走了一步
    void RunTimerTask()
    {
        _tick = (_tick + 1) % _capacity;
        _wheel[_tick].clear(); // 清空指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉.从而执行函数
    }
    void OnTime()
    {
        ReadTimefd();
        RunTimerTask();
    }
    void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb) // 添加定时任务
    {
        PtrTask pt(new TimerTask(id, delay, cb));                      // 实例化定时任务对象
        pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id)); // 第0个位置是隐藏的this指针。再把任务id绑定进去
        int pos = (_tick + delay) % _capacity;
        _wheel[pos].push_back(pt);
        _timers[id] = WeakTask(pt);
    }
    // 刷新/延迟定时任务
    void TimerRefreshInLoop(uint64_t id)
    {
        // 通过保存的定时器对象的weak_ptr构造一个shared_ptr出来, 添加到轮子中
        auto it = _timers.find(id);
        if (it == _timers.end())
        {
            return; // 没找到定时任务, 没法刷新,没法延迟
        }
        PtrTask pt = it->second.lock(); // lock获取weak_ptr管理的对象对应的shared_ptr
        int delay = pt->DelayTime();    // 获取到了初始的延迟时间
        int pos = (_tick + delay) % _capacity;
        _wheel[pos].push_back(pt);
    }
    void TimerCancelInLoop(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it == _timers.end())
        {
            return; // 没找到定时任务, 没法刷新,没法延迟
        }
        PtrTask pt = it->second.lock(); // 当还没有过期才进行取消
        if (pt)
            pt->Cancel();
    }

public:
    TimerWheel(EventLoop *loop) : _capacity(60), _tick(0), _wheel(_capacity), _loop(loop),
                                  _timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd))
    {
        _timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));
        _timer_channel->EnableRead(); // 启动读事件监控
    }
    /*定时器中有个_timers成员,定时器信息的操作有可能在多线程中进行,因此需要考虑线程安全问题*/
    /*如果不想加锁,那就把对定期的所有操作,都放在一个线程中进行*/
    void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb);
    // 刷新/延迟定时任务
    void TimerRefresh(uint64_t id);
    void TimerCancel(uint64_t id);
    /*这个接口存在线程安全问题--这个接口实际上不能被外界使用者调用,只能在模块内,对应的EventLoop线程内执行*/
    bool HasTimer(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it == _timers.end())
        {
            return false; // 没找到定时任务, 没法刷新,没法延迟
        }
        return true;
    }
};

// EventLoop事件监控处理类
class EventLoop
{
private:
    using Functor = std::function<void()>;
    std::thread::id _thread_id;              // 线程ID
    int _event_fd;                           // eventfd唤醒IO事件监控有可能导致的阻塞
    std::unique_ptr<Channel> _event_channel; // 智能指针
    Poller _poller;                          // 进行所有描述符的事件监控
    std::vector<Functor> _tasks;             // 任务池
    std::mutex _mutex;                       // 实现任务池操作的线程安全
    TimerWheel _timer_wheel;                 // 定时器模块
public:
    //  执行任务池中的所有任务
    void RunAllTask()
    {
        std::vector<Functor> functor;
        {
            std::unique_lock<std::mutex> _lock(_mutex);
            _tasks.swap(functor);
        }
        for (auto &f : functor)
        {
            f();
        }
        return;
    }
    static int CreateEventFd()
    {
        int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
        if (efd < 0)
        {
            ERR_LOG("CREATE EVENTFD FAILED!!");
            abort(); // 让程序异常退出
        }
        return efd;
    }
    void ReadEventfd()
    {
        uint64_t res = 0;
        int ret = read(_event_fd, &res, sizeof(res));
        if (ret < 0)
        {
            // EINTR -- 被信号打断, EAGAIN -- 表示无数据可读
            if (errno == EINTR || EAGAIN)
            {
                return;
            }
            ERR_LOG("READ EVENTFD FAILED!");
            abort();
        }
        return;
    }
    void WeakUpEventFd()
    {
        uint64_t val = 1;
        int ret = write(_event_fd, &val, sizeof(val));
        if (ret < 0)
        {
            if (errno == EINTR)
            {
                return;
            }
            ERR_LOG("READ EVENTFD FAILED!");
            abort();
        }
        return;
    }

public:
    EventLoop() : _thread_id(std::this_thread::get_id()),
                  _event_fd(CreateEventFd()),
                  _event_channel(new Channel(this, _event_fd)),
                  _timer_wheel(this)
    {
        // 给eventfd添加可读事件回调函数,读取eventfd事件通知次数
        _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));
        // 启动eventfd的读事件监控
        _event_channel->EnableRead();
    }
    // 三步走--事件监控-》就绪事件处理-》执行任务
    void Start()
    {
        while (1)
        {
            // 1.事件监控
            std::vector<Channel *> actives;
            _poller.Poll(&actives);
            // 2.事件处理
            for (auto &channel : actives)
            {
                channel->HandleEvent();
            }
            // 3.执行任务
            RunAllTask();
        }
    }
    // 用于判断当前线程是否是EventLoop对应的线程
    bool IsInLoop()
    {
        return (_thread_id == std::this_thread::get_id());
    }
    void AssertInLoop()
    {
        assert(_thread_id == std::this_thread::get_id());
    }
    // 判断将要执行的任务是否处于当前线程中,如果是则执行,否则压入队列
    void RunInLoop(const Functor &cb)
    {
        if (IsInLoop())
        {
            return cb();
        }
        return QueueInLoop(cb);
    }
    // 将操作压入任务池
    void QueueInLoop(const Functor &cb)
    {
        {
            std::unique_lock<std::mutex> _lock(_mutex);
            _tasks.push_back(cb);
        }
        // 唤醒有可能因为没有事件就绪,而导致的epoll阻塞
        // 其实就是给eventfd写入一个数据,eventfd就会触发可读事件
        WeakUpEventFd();
    }
    // 添加/修改描述符的事件监控
    void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); }
    // 移除描述符的监控
    void RemoveEvent(Channel *channel) { return _poller.RemoveEvent(channel); }
    void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _timer_wheel.TimerAdd(id, delay, cb); }
    void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); }
    void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); }
    bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
};

// EventLoop实例化管理类
class LoopThread
{
private:
    /* 用于实现_loop获取的同步关系,避免线程创建了,但是_loop还没有实例化之前去获取_loop*/
    std::mutex _mutex;             // 互斥锁
    std::condition_variable _cond; // 条件变量
    EventLoop *_loop;              // EventLoop指针变量,这个对象需要在线程内实例化
    std::thread _thread;           // EventLoop对应的线程
private:
    /*实例化EventLoop对象,唤醒_cond上有可能阻塞的线程,并且开始运行EventLoop模块的功能*/
    void ThreadEntry()
    {
        EventLoop loop;
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _loop = &loop;
            _cond.notify_all();
        }
        loop.Start();
    }

public:
    /*创建线程,设定线程入口函数*/
    LoopThread() : _loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}
    /*返回当前线程关联的EventLoop对象指针*/
    EventLoop *GetLoop()
    {
        EventLoop *loop = NULL;
        {
            std::unique_lock<std::mutex> lock(_mutex); // 加锁
            _cond.wait(lock, [&]()
                       { return _loop != NULL; }); // loop为NULL就一直阻塞
            loop = _loop;
        }
        return loop;
    }
};

// LoopThreadPool管理LoopThread创建主从线程类
class LoopThreadPool
{
private:
    int _thread_count;
    int _next_idx;
    EventLoop *_baseloop;
    std::vector<LoopThread *> _threads;
    std::vector<EventLoop *> _loops;

public:
    LoopThreadPool(EventLoop *baseloop) : _thread_count(0), _next_idx(0), _baseloop(baseloop) {}
    void SetThreadCount(int count) { _thread_count = count; }
    void Create()
    {
        if (_thread_count > 0)
        {
            _threads.resize(_thread_count); // 里面存放的是指针不是对象,所以可以直接扩大
            _loops.resize(_thread_count);
            for (int i = 0; i < _thread_count; i++)
            {
                _threads[i] = new LoopThread();     // 构造无参数
                _loops[i] = _threads[i]->GetLoop(); // 在上一句构造线程还没有创建完成会一直阻塞,因此不用担心在创建期间就分配连接
            }
        }
    }
    EventLoop *NextLoop()
    {
        if (_thread_count == 0)
            return _baseloop; // 没有从线程就直接返回主线程
        _next_idx = (_next_idx + 1) % _thread_count;
        return _loops[_next_idx];
    }
};

class Any
{
private:
    class holder
    {
    public:
        virtual ~holder() {}
        virtual const std::type_info &type() = 0;
        virtual holder *clone() = 0;
    };
    template <class T>
    class placeholder : public holder
    {
    public:
        placeholder(const T &val) : _val(val) {}
        // 获取子类对象保存的数据类型
        virtual const std::type_info &type() { return typeid(T); }
        // 针对当前的对象自身,克隆出一个新的子类对象
        virtual holder *clone() { return new placeholder(_val); }
        // 析构用数据自身的就行了
    public:
        T _val;
    };
    holder *_content;

public:
    Any() : _content(nullptr) {}
    template <class T>
    Any(const T &val) : _content(new placeholder<T>(val)) {}
    Any(const Any &other) : _content(other._content ? other._content->clone() : nullptr) {}
    ~Any() { delete _content; }

    Any &swap(Any &other)
    {
        std::swap(_content, other._content);
        return *this;
    }

    // 返回子类对象保存的数据的指针
    template <class T>
    T *get()
    {
        // 想要获取的数据类型,必须和保存的数据类型一致
        assert(typeid(T) == _content->type());
        return &((placeholder<T> *)_content)->_val;
    }
    // 赋值运算符的重载函数
    template <class T>
    Any &operator=(const T &val)
    {
        // 为val构造一个临时的通用容器,然后与当前容器自身进行指针交换,临时对象释放的时候,原先保存的数据也就被释放了
        Any(val).swap(*this);
        return *this;
    }
    Any &operator=(const Any &other)
    {
        Any(other).swap(*this);
        return *this;
    }
};

// 连接管理类
class Connection;
// DISCONNECTED -- 连接关闭状态  CONNECTING -- 连接建立成功-待处理状态
// CONNECTED -- 连接建立完成,各种设置已完成,可以通信状态    DISCONNECTING -- 待关闭状态
typedef enum
{
    DISCONNECTED,
    CONNECTING,
    CONNECTED,
    DISCONNECTING
} ConnStatu;
using PtrConnection = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection>
{
private:
    uint64_t _conn_id; // 连接的唯一ID,便于连接的管理和查找
    // uint64_t _timer_id; // 定时器ID,必须是唯一的,这块是为了简化操作使用conn_id作为定时器
    int _sockfd;                   // 连接关联的文件描述符
    bool _enable_inactive_release; // 连接是否启动非活跃的判断标志,默认为false
    EventLoop *_loop;              // 连接所关联的一个EventLoop
    ConnStatu _statu;              // 连接状态
    Socket _socket;                // 套接字操作管理
    Channel _channel;              // 连接的事件管理
    Buffer _in_buffer;             // 输入缓冲区--存放从socket中读取到的数据
    Buffer _out_buffer;            // 输出缓冲区--存放要发送给对端的数据
    Any _context;                  // 请求的接收处理上下文

    /*这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的)*/
    /*换句话来说,这几个回调都是组件使用者使用的*/
    using ConnectedCallback = std::function<void(const PtrConnection &)>;
    using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;
    using ClosedCallback = std::function<void(const PtrConnection &)>;
    using AnyEventCallback = std::function<void(const PtrConnection &)>;
    ConnectedCallback _connected_callback;
    MessageCallback _message_callback;
    ClosedCallback _closed_callback;
    AnyEventCallback _event_callback;
    /*组件内的连接关闭回调--组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭*/
    /*就应该从管理的地方移除掉自己的信息*/
    ClosedCallback _server_closed_callback;

private:
    /*五个channel的事件回调函数*/
    // 描述符可读事件触发后调用的函数,接收socket数据放到接收缓冲区中,然后调用_message_callback
    void HandleRead()
    {
        // 1.接收socket的数据,放到缓冲区
        char buf[65536];
        ssize_t ret = _socket.NonBlockRecv(buf, 65535);
        if (ret < 0)
        {
            // 出错了,不能直接关闭连接
            return ShutdownInLoop();
        }
        // 这里的等于0表示的是没有读取到数据,而并不是连接断开了,连接断开返回的是-1
        // 将数据放入输入缓冲区,写入之后顺便将写偏移向后移动
        _in_buffer.WriteAndPush(buf, ret);
        // 2.调用message_callback进行业务处理
        if (_in_buffer.ReadAbleSize() > 0)
        {
            // shard_from_this--从当前对象自身获取自身的shared_ptr管理对象
            return _message_callback(shared_from_this(), &_in_buffer);
        }
    }
    // 描述符可写事件触发后调用的函数,将发送缓冲区中的数据进行发送
    void HandleWrite()
    {
        // _out_buffer中保存的就是要发送的数据
        ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());
        if (ret < 0)
        {
            // 发送错误就应该关闭连接了
            if (_in_buffer.ReadAbleSize() > 0)
            {
                _message_callback(shared_from_this(), &_in_buffer);
            }
        }
        _out_buffer.MoveReadOffset(ret); // 千万不要忘了,将读偏移向后移动
        if (_out_buffer.ReadAbleSize() == 0)
        {
            _channel.DisableWrite(); // 没有数据待发送,关闭写事件监控
            // 如果当前是连接待关闭状态,则有数据,发送完数据释放连接,没有数据则直接释放
            if (_statu == DISCONNECTING)
            {
                return ReleaseInLoop(); // 这时候就是实际的关闭释放操作了
            }
        }
        return;
    }
    // 描述符触发挂断事件
    void HandleClose()
    {
        /*一旦连接挂断了,套接字就什么都干不了了,因此有数据待处理就处理一下,完毕关闭连接*/
        if (_in_buffer.ReadAbleSize() > 0)
        {
            _message_callback(shared_from_this(), &_in_buffer);
        }
        return ReleaseInLoop();
    }
    // 描述符触发出错事件
    void HandleError()
    {
        return HandleClose();
    }
    // 描述符触发任意事件: 1.刷新连接活跃度--延迟定时销毁任务 2.调用组件使用者的任意事件回调
    void HandleEvent()
    {
        if (_enable_inactive_release == true)
        {
            _loop->TimerRefresh(_conn_id);
        }
        if (_event_callback)
        {
            _event_callback(shared_from_this());
        }
    }
    // 连接获取之后,所处的状态要进行各种设置(给channel设置事件回调,启动读监控,调用回调函数)
    void EstablishedInLoop()
    {
        // 1.修改连接状态   2.启动读事件监控    3.调用回调函数
        assert(_statu == CONNECTING); // 当前状态必须一定是上层的半连接状态
        _statu = CONNECTED;           // 当前函数执行完毕,则连接进入已完成连接状态
        // 一旦启动读事件监控就有可能会立即触发读事件,如果这时候启动了非活跃连接销毁
        _channel.EnableRead();
        if (_connected_callback)
            _connected_callback(shared_from_this());
    }
    // 这个接口才是实际的释放接口
    void ReleaseInLoop()
    {
        // 1.修改连接状态,将其置为DISCONNECTED
        _statu = DISCONNECTED;
        // 2.移除连接的事件监控
        _channel.Remove();
        // 3.关闭描述符
        _socket.Close();
        // 4.如果当前定时器队列中还有定时销毁任务,则取消任务
        if (_loop->HasTimer(_conn_id))
            CancelInactiveReleaseInLoop();
        // 5.调用关闭回调函数,避免先移除服务器管理的连接信息导致Connection被释放,再去处理会出错,因此先调用用户的回调函数
        if (_closed_callback)
            _closed_callback(shared_from_this());
        // 移除服务器内部管理的连接信息
        if (_server_closed_callback)
            _server_closed_callback(shared_from_this());
    }
    // 这个并不是实际的发送接口,而只是把数据放到了发送缓冲区,启动了可写事件监控
    void SendInLoop(Buffer &buf)
    {
        if (_statu == DISCONNECTED)
            return;
        _out_buffer.WriteBufferAndPush(buf); // 可以在这个函数后面加上const表示不修改this
        if (_channel.WriteAble() == false)
        {
            _channel.EnableWrite();
        }
    }
    // 这个关闭操作并非实际的连接释放操作,需要判断还有没有数据待处理,待发送
    void ShutdownInLoop()
    {
        _statu = DISCONNECTING; // 设置连接为半关闭状态
        if (_in_buffer.ReadAbleSize() > 0)
        {
            if (_message_callback)
                _message_callback(shared_from_this(), &_in_buffer);
        }
        // 要么就是写入数据的时候出错关闭,要么就是没有待发送数据,直接关闭
        if (_out_buffer.ReadAbleSize() > 0)
        {
            if (_channel.WriteAble() == false)
            {
                _channel.EnableWrite();
            }
        }
        if (_out_buffer.ReadAbleSize() == 0)
        {
            ReleaseInLoop();
        }
    }
    // 启动非活跃连接超时释放规则
    void EnableInactiveReleaseInLoop(int sec)
    {
        // 1.将判断标志 _enable_inactive_release 置为true
        _enable_inactive_release = true;
        // 2.如果当前定时销毁任务已经存在,那就刷新一下延迟即可
        if (_loop->HasTimer(_conn_id))
        {
            return _loop->TimerRefresh(_conn_id);
        }
        // 3.如果不存在定时销毁任务,则新增
        _loop->TimerAdd(_conn_id, sec, std::bind(&Connection::ReleaseInLoop, this));
    }
    void CancelInactiveReleaseInLoop()
    {
        _enable_inactive_release = false;
        if (_loop->HasTimer(_conn_id))
        {
            _loop->TimerCancel(_conn_id);
        }
    }
    void UpgradeInLoop(const Any &context,
                       const ConnectedCallback &conn,
                       const MessageCallback &msg,
                       const ClosedCallback &closed,
                       const AnyEventCallback &event)
    {
        _context = context;
        _connected_callback = conn;
        _message_callback = msg;
        _closed_callback = closed;
        _event_callback = event;
    }

public:
    Connection(EventLoop *loop, uint64_t conn_id, int sockfd) : _conn_id(conn_id), _sockfd(sockfd),
                                                                _enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd),
                                                                _channel(loop, _sockfd)
    {
        _channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));
        _channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));
        _channel.SetReadCallback(std::bind(&Connection::HandleRead, this));
        _channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));
        _channel.SetErrorCallback(std::bind(&Connection::HandleError, this));
    }
    ~Connection() { DBG_LOG("RELEASE CONNECTION:%p", this); }
    // 获取管理的文件描述符
    int Fd() { return _sockfd; }
    // 获取连接ID
    int Id() { return _conn_id; }
    // 是否处于CONNECTED状态
    bool Connected() { return (_statu == CONNECTED); }
    // 设置上下文--连接建立完成时进行调用
    void SetContext(const Any &context) { _context = context; }
    // 获取上下文,返回的是指针
    Any *GetContext() { return &_context; }
    void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }
    void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }
    void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }
    void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }
    void SetSrvClosedCallback(const ClosedCallback &cb) { _server_closed_callback = cb; }
    // 连接建立就绪后,进行channel回调设置,启动读监控,调用_connect_callback
    void Established()
    {
        _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));
    }
    // 发送数据,将数据发送到发送缓冲区,启动写事件监控
    void Send(const char *data, size_t len)
    {
        // 外界传入的data,可能是个临时空间,我们现在只是把发送操作压入了任务池,有可能并没有被执行
        // 因此有可能执行的时候,data指向的空间有可能已经被释放了
        Buffer buf;
        buf.WriteAndPush(data, len);
        _loop->RunInLoop(std::bind(&Connection::SendInLoop, this, buf));
    }
    // 提供给组件使用者的关闭接口--并不实际关闭,需要判断有没有数据待处理
    void Shutdown()
    {
        _loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this));
    }
    // 启动非活跃销毁,并定义多长时间无通信就是非活跃,添加定时任务
    void EnableInactiveRelease(int sec)
    {
        _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));
    }
    // 取消非活跃销毁
    void CancelInactiveRelease()
    {
        _loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this));
    }
    // 切换协议--重置上下文以及阶段性处理函数--而是这个接口必须在EventLoop线程中立即执行
    // 防备新的事件触发后,处理的时候,切换任务还没有被执行--会导致数据使用原协议处理了
    void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg,
                 const ClosedCallback &closed, const AnyEventCallback &event)
    {
        _loop->AssertInLoop();
        _loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event));
    }
};

// 监听套接字管理类
class Acceptor
{
private:
    Socket _socket;   // 用于创建监听套接字
    EventLoop *_loop; // 用于对监听套接字进行事件监控
    Channel _channel; // 用于对监控套接字进行事件管理

    using AcceptCallback = std::function<void(int)>;
    AcceptCallback _accept_callback;

private:
    /*监听套接字的读事件回调处理函数 -- 获取新连接,调用_accept_callback函数进行新连接管理*/
    void HandleRead()
    {
        int newfd = _socket.Accept();
        if (newfd < 0)
        {
            return;
        }
        if (_accept_callback)
            _accept_callback(newfd);
    }
    int CreateServer(int port)
    {
        bool ret = _socket.CreateServer(port);
        assert(ret == true);
        return _socket.Fd();
    }

public:
    /* 不能将启动读监控,放到构造函数中,必须在设置回调函数后,再去启动*/
    /* 否则有可能造成启动监控后,立即有事件,处理的时候回调函数还没有设置:新连接得不到处理,且资源泄漏*/
    Acceptor(EventLoop *loop, int port) : _socket(CreateServer(port)), _loop(loop),
                                          _channel(loop, _socket.Fd())
    {
        _channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
    }
    void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; }
    void Listen() { _channel.EnableRead(); }
};

// TcpServer服务器管理模块(即全部模块的整合)
class TcpServer
{
private:
    uint64_t _next_id; // 这是一个自动增长的连接ID
    int _port;
    int _timeout;                                       // 这是非活跃连接的统计时间--多长时间无通信就是非活跃连接
    bool _enable_inactive_release;                      // 是否启动非活跃连接超时销毁的判断标志
    EventLoop _baseloop;                                // 这是主线程的EventLoop对象,负责监听事件的处理
    Acceptor _acceptor;                                 // 这是监听套接字的管理对象
    LoopThreadPool _pool;                               // 这是从属EventLoop线程池
    std::unordered_map<uint64_t, PtrConnection> _conns; // 保管所有连接对应的share_ptr对象,这里面的对象被删除,就意味这某一个连接被删除

    using ConnectedCallback = std::function<void(const PtrConnection &)>;
    using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;
    using ClosedCallback = std::function<void(const PtrConnection &)>;
    using AnyEventCallback = std::function<void(const PtrConnection &)>;
    using Functor = std::function<void()>;
    ConnectedCallback _connected_callback;
    MessageCallback _message_callback;
    ClosedCallback _closed_callback;
    AnyEventCallback _event_callback;

private:
    void RunAfterInLoop(const Functor &task, int delay)
    {
        _next_id++;
        _baseloop.TimerAdd(_next_id, delay, task);
    }
    // 为新连接构造一个Connection进行管理
    void NewConnection(int fd)
    {
        _next_id++;
        PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));
        conn->SetMessageCallback(_message_callback);
        conn->SetClosedCallback(_closed_callback);
        conn->SetConnectedCallback(_connected_callback);
        conn->SetAnyEventCallback(_event_callback);
        conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));
        if (_enable_inactive_release)
            conn->EnableInactiveRelease(10); // 启动非活跃超时销毁
        conn->Established();                 // 就绪初始化
        _conns.insert(std::make_pair(_next_id, conn));
    }
    void RemoveConnectionInLoop(const PtrConnection &conn)
    {
        int id = conn->Id();
        auto it = _conns.find(id);
        if (it != _conns.end())
        {
            _conns.erase(it);
        }
    }
    // 从管理Connection的_conns移除连接信息
    void RemoveConnection(const PtrConnection &conn)
    {
        _baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));
    }

public:
    TcpServer(int port) : _port(port),
                          _next_id(0),
                          _enable_inactive_release(false),
                          _acceptor(&_baseloop, port),
                          _pool(&_baseloop)
    {
        _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));
        _acceptor.Listen(); // 将监听套接字挂到baseloop上
    }
    void SetThreadCount(int count) { return _pool.SetThreadCount(count); }
    void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }
    void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }
    void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }
    void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }
    void EnableInactiveRelease(int timeout) { _timeout = timeout, _enable_inactive_release = true; }
    // 用于添加一个定时任务
    void RunAfter(const Functor &task, int delay)
    {
        _baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));
    }
    void Start()
    {
        _pool.Create(); // 创建线程池中的从属线程
        _baseloop.Start();
    }
};

// 移除监控
void Channel::Remove() { return _loop->RemoveEvent(this); }
void Channel::Update() { return _loop->UpdateEvent(this); }
void TimerWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb)
{
    _loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb));
}
void TimerWheel::TimerRefresh(uint64_t id)
{
    _loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
}
void TimerWheel::TimerCancel(uint64_t id)
{
    _loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));
}

// 忽略SIGPIPE信号,当连接断开的时候,如果我们继续向对端send发送信息,就会触发异常,即SIGPIPE异常,这个就是导致客户端异常退出的原因
class NetWork{
    public:
        NetWork(){
            DBG_LOG("SIGPIPE INIT");
            signal(SIGPIPE, SIG_IGN); // 忽视SIGPIPE异常,这个会导致进程退出
        }
};
static NetWork nw;  // 这个是为了执行里面的构造函数
// 预编译是为了防止头文件重复包含
#endif

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/418951.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【详识JAVA语言】猜数字游戏

游戏规则: 系统自动生成一个随机整数(1-100), 然后由用户输入一个猜测的数字. 如果输入的数字比该随机数小, 提示 "低 了", 如果输入的数字比该随机数大, 提示 "高了" , 如果输入的数字和随机数相等, 则提示 "猜对了" . 参考代码 import java.…

Zynq—AD9238数据采集DDR3缓存千兆以太网发送实验(前导)

ACM9238 高速双通道ADC模块自助服务手册AD9238 Zynq—AD9238数据采集DDR3缓存千兆以太网发送实验&#xff08;一&#xff09;-CSDN博客 一、AD9238 模块在各方面参数性能上与AD9226保持一致。但是在设计上优化了信号调理电路&#xff0c;将单端信号先转成差分信号&#xff0c…

VS2015报错:error MSB8020和MSB8036的解决方案

VS2015编译报错&#xff1a;error MSB8020 提示信息&#xff1a;error MSB8020: The build tools for v141 (Platform Toolset ‘v141’) cannot be found. To build using the v141 build tools, please install v141 build tools. Alternatively, you may upgrade to the c…

租赁回收系统开发详细流程-干货分享

1.需求分析&#xff1a;首先&#xff0c;需要明确系统的功能和特点。这包括确定租赁回收的物品类型、用户群体、业务流程等。通过需求分析&#xff0c;可以确保系统能够满足市场和用户的需求。 2.系统设计&#xff1a;在需求分析的基础上&#xff0c;进行系统的整体设计。这包…

最新基于SWAT-MODFLOW地表水与地下水耦合技术应用

耦合模型被应用到很多科学和工程领域来改善模型的性能、效率和结果&#xff0c;SWAT作为一个地表水模型可以较好的模拟主要的水文过程&#xff0c;包括地表径流、降水、蒸发、风速、温度、渗流、侧向径流等&#xff0c;但是对于地下水部分的模拟相对粗糙&#xff0c;考虑到SWAT…

第十四篇【传奇开心果系列】Python的文本和语音相互转换库技术点案例示例:深度解读Azure Cognitive Services个性化推荐系统

传奇开心果博文系列 系列博文目录Python的文本和语音相互转换库技术点案例示例系列 博文目录前言一、个性化推荐系统介绍和关键功能以及优势解说二、雏形示例代码三、个性化推荐示例代码四、实时推荐示例代码五、多种推荐算法示例代码六、易于集成示例代码七、数据安全和隐私保…

腾讯云安装MYSQL远程连接不上解决方案

推荐安装步骤博客&#xff0c;写的很详细&#xff0c;如果不会安装的话&#xff0c;可以根据安装步骤一直走。 Windows10下超详细Mysql安装_win10安装mysql-CSDN博客 修改 my.cnf或者my.ini 找到里面bind-address将bind-address 127.0.0.1设置成bind-address 0.0.0.0&#x…

stm32触发硬件错误位置定位

1.背景 1. 项目中&#xff0c;调试过程或者测试中都会出现程序跑飞问题&#xff0c;这个时候问题特别难查找。 2. 触发硬件错误往往是因为内存错误。这种问题特别难查找&#xff0c;尤其是产品到了测试阶段&#xff0c;而这个异常复现又比较难的情况下&#xff0c;简直头疼。…

FaceBook获取广告数据

1、访问 广告管理工具 确认自己登陆的账号下面能看到户。 ​ 2、使用 图谱Api探索工具 生成用户短期口令 ​ 3、get请求(或者浏览器直接打开)访问&#xff1a; https://graph.facebook.com/v19.0/me?fieldsid,name, email&access_token{上一步生成的口令} ​ 4、短期…

MySQL 自增列解析(Auto_increment)

MySQL数据库为列提供了一种自增属性&#xff0c;当列被定义为自增时。Insert语句对该列即使不提供值&#xff0c;MySQL也会自动为该列生成递增的唯一标识&#xff0c;因此这个特性广泛用于主键的自动生成。 一、自增列的用法 自增列具有自动生成序列值&#xff0c;整型&#…

SpringBoot源码解读与原理分析(三十七)SpringBoot整合WebMvc(二)DispatcherServlet的工作全流程

文章目录 前言12.4 DispatcherServlet的工作全流程12.4.1 DispatcherServlet#service12.4.2 processRequest12.4.3 doService12.4.3.1 isIncludeRequest的判断12.4.3.2 FlashMapManager的设计 12.4.4 doDispatch12.4.4.1 处理文件上传请求12.4.4.2 获取可用的Handler&#xff0…

FPGA之进位逻辑

进位逻辑&#xff08;Carry Logic&#xff09;Slice 中除了LUT&#xff0c;寄存器&#xff0c;触发器&#xff0c;锁存器外&#xff0c;还提供了专用的快速超前进位逻辑&#xff0c;可以在slice 中执行快速算术加法和减法。CLB 中的专用进位逻辑提高了算术功能&#xff08;如加…

【BBuf的CUDA笔记】十四,OpenAI Triton入门笔记三 FusedAttention

0x0. 前言 继续Triton的学习&#xff0c;这次来到 https://triton-lang.org/main/getting-started/tutorials/06-fused-attention.html 教程。也就是如何使用Triton来实现FlashAttention V2。对于FlashAttention和FlashAttention V2网上已经有非常多的介绍了&#xff0c;大家如…

PSO-CNN-LSTM多输入回归预测|粒子群算法优化的卷积-长短期神经网络回归预测(Matlab)——附代码数据

目录 一、程序及算法内容介绍&#xff1a; 基本内容&#xff1a; 亮点与优势&#xff1a; 二、实际运行效果&#xff1a; 三、算法介绍&#xff1a; 四、完整程序数据分享下载&#xff1a; 一、程序及算法内容介绍&#xff1a; 基本内容&#xff1a; 本代码基于Matlab平台…

羊大师揭秘,羊奶营养与健康的双重礼赞

羊大师揭秘&#xff0c;羊奶营养与健康的双重礼赞 羊奶&#xff0c;一种古老而珍贵的饮品&#xff0c;自古以来就以其独特的营养价值和健康益处受到人们的青睐。它不仅是滋养的源泉&#xff0c;更是健康的守护者&#xff0c;为我们带来营养与健康的双重礼赞。 羊奶的营养价值不…

通过css修改video标签的原生样式

通过css修改video标签的原生样式 描述实现结果 描述 修改video标签的原生样式 实现 在控制台中打开设置&#xff0c;勾选显示用户代理 shadow DOM&#xff0c;就可以审查video标签的内部样式了 箭头处标出来的就是shodow DOM的内容&#xff0c;这些内容正常不可见的&#x…

vscode——远端配置及一些问题解决

vscode——远端配置 安装Remote -SSH插件配置config本地变化一些问题缺失核心关闭vscode自动更新 尝试写入管道不存在hostname -I 查出来的ip连不上 我们之前大概了解了vscode的本地设置&#xff0c;我们之前提过&#xff0c;vscode是一款编辑器&#xff0c;在文本编辑方面有着…

UI 自动化测试实战(二)| 测试数据的数据驱动

数据驱动就是通过数据的改变驱动自动化测试的执行&#xff0c;最终引起测试结果的改变。简单来说&#xff0c;就是参数化在自动化测试中的应用。 测试过程中使用数据驱动的优势主要体现在以下几点&#xff1a; 1.提高代码复用率&#xff0c;相同的测试逻辑只需编写一条测试用例…

【系统设计】高性能秒杀系统如何设计?

欢迎关注公众号&#xff08;通过文章导读关注&#xff1a;【11来了】&#xff09;&#xff0c;及时收到 AI 前沿项目工具及新技术的推送&#xff01; 在我后台回复 「资料」 可领取编程高频电子书&#xff01; 在我后台回复「面试」可领取硬核面试笔记&#xff01; 文章导读地址…

github-actions

文章目录 workflow触发器action市场contextsecrets 默认环境变量 workflow name: {{workflow name}} run-name: {{workflow runs name}}on: {{触发器}} #[push]env:{{定义workflow变量}}: valuejobs:{{job name}}:runs-on: {{运行机器}} #ubuntu-latestenv:{{定义job变量}}: v…