目录
概要
tcp_cli.cc
tcp_srv.cc
server.hpp
测试结果
第二次整合
概要
本主要是将以下模块进行整合测试
Poller模块与Channel模块整合-CSDN博客
时间轮设计-CSDN博客
timerfd的认识与基本使用-CSDN博客
整合基于的理念
tcp_cli.cc
#include "../source/server.hpp"
int main()
{
Socket cli_sock;
cli_sock.CreateClient(8500, "127.0.0.1");
for(int i = 0; i < 5; i++)
{
std::string str = "hello qingfengyuge!";
cli_sock.Send(str.c_str(), str.size());
char buf[1024] = {0};
cli_sock.Recv(buf, 1023);
DBG_LOG("%s", buf);
sleep(1);
}
while (1) sleep(1);
return 0;
}
tcp_srv.cc
#include "../source/server.hpp"
void HandleClose(Channel *channel)
{
DBG_LOG("close fd:%d", channel->Fd());
channel->Remove(); // 移除监控
delete channel;
}
void HandleRead(Channel *channel)
{
int fd = channel->Fd();
char buf[1024] = {0};
int ret = recv(fd, buf, 1023, 0);
if (ret <= 0)
{
return HandleClose(channel); // 关闭释放
}
DBG_LOG("%s", buf);
channel->EnableWrite(); // 启动可写事件
}
void HandleWrite(Channel *channel)
{
int fd = channel->Fd();
const char *data = "天气还不错!!";
int ret = send(fd, data, strlen(data), 0);
if (ret < 0)
{
return HandleClose(channel); // 关闭释放
}
channel->DisableWrite(); // 关闭写监控
}
void HandleError(Channel *channel)
{
return HandleClose(channel); // 关闭释放
}
void HandleEvent(EventLoop *loop, Channel *channel, uint64_t timerid)
{
loop->TimerRefresh(timerid);
}
void Acceptor(EventLoop *loop, Channel *lst_channel)
{
int fd = lst_channel->Fd();
int newfd = accept(fd, NULL, NULL);
if (newfd < 0)
{
return;
}
uint64_t timerid = rand() % 10000;
Channel *channel = new Channel(loop, newfd);
channel->SetReadCallback(std::bind(HandleRead, channel)); // 为通信套接字设置可读事件的回调函数
channel->SetWriteCallback(std::bind(HandleWrite, channel)); // 可写事件的回调函数
channel->SetCloseCallback(std::bind(HandleClose, channel)); // 关闭事件的回调函数
channel->SetErrorCallback(std::bind(HandleError, channel)); // 错误事件的回调函数
channel->SetEventCallback(std::bind(HandleEvent, loop, channel, timerid)); // 任意事件的回调函数
channel->EnableRead();
// 活跃连接的超时释放操作,10s后关闭连接
// 主要定时销毁任务,必须在启动读事件之前,因为有可能启动了事件监控后,就立即有了事件,但是这时候还没有任务
loop->TimerAdd(timerid, 10, std::bind(HandleClose, channel));
}
int main()
{
srand(time(NULL));
EventLoop loop;
Socket lst_sock;
bool ret = lst_sock.CreateServer(8500);
// 为监听套接字,创建一个Channel进行事件的管理,以及事件的处理
Channel channel(&loop, lst_sock.Fd());
// 回调中,获取新连接,为新连接创建Channel并且添加监控
channel.SetReadCallback(std::bind(Acceptor, &loop, &channel));
channel.EnableRead(); // 启动可读事件监控
while (1)
{
loop.Start();
}
lst_sock.Close();
return 0;
}
server.hpp
#include <iostream>
#include <vector>
#include <cstdint>
#include <cassert>
#include <string>
#include <cstring>
#include <ctime>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <functional>
#include <sys/epoll.h>
#include <unordered_map>
#include <thread>
#include <mutex>
#include <sys/eventfd.h>
#include <memory>
#include <sys/timerfd.h>
#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL DBG
#define LOG(level, format, ...) \
do \
{ \
if (level < LOG_LEVEL) \
break; \
time_t t = time(NULL); \
struct tm *ltm = localtime(&t); \
char tmp[32] = {0}; \
strftime(tmp, 31, "%H:%M:%S", ltm); \
fprintf(stdout, "[%s %s:%d] " format "\n", tmp, __FILE__, __LINE__, ##__VA_ARGS__); \
} while (0)
#define INF_LOG(format, ...) LOG(INF, format, ##__VA_ARGS__)
#define DBG_LOG(format, ...) LOG(DBG, format, ##__VA_ARGS__)
#define ERR_LOG(format, ...) LOG(ERR, format, ##__VA_ARGS__)
// 缓冲区类
#define BUFFER_DEFAULT_SIZE 1024 // Buffer 默认起始大小
class Buffer
{
private:
std::vector<char> _buffer; // 使用vector进行内存空间管理
uint64_t _reader_idx; // 读偏移
uint64_t _writer_idx; // 写偏移
public:
Buffer() : _reader_idx(0), _writer_idx(0), _buffer(BUFFER_DEFAULT_SIZE) {}
char *Begin() { return &*_buffer.begin(); }
// 获取当前写入起始地址
char *WirtePosition() { return Begin() + _writer_idx; }
// 获取当前读取起始地址
char *ReadPosition() { return Begin() + _reader_idx; }
// 获取缓冲区末尾空闲空间大小--写偏移之后的空闲空间, 总体空间大小减去写偏移
uint64_t TailIdleSize() { return _buffer.size() - _writer_idx; }
// 获取缓冲区起始空闲空间大小--读偏移之前的空闲空间
uint64_t HeadIdleSize() { return _reader_idx; }
// 获取可读数据大小 = 写偏移 - 读偏移
uint16_t ReadAbleSize() { return _writer_idx - _reader_idx; };
// 将读偏移向后移动
void MoveReadOffset(uint64_t len)
{
if (len == 0)
return;
// 向后移动的大小, 必须小于可读数据大小
assert(len <= ReadAbleSize());
_reader_idx += len;
}
// 将写偏移向后移动
void MoveWriteOffset(uint64_t len)
{
// 向后移动的大小,必须小于当前后边的空闲空间大小
assert(len <= TailIdleSize());
_writer_idx += len;
}
// 确保可写空间足够(整体空闲空间够了就移动数据,否则就扩容)
void EnsureWriteSpace(uint64_t len)
{
// 如果末尾空闲空间大小足够,直接返回
if (TailIdleSize() >= len)
{
return;
}
// 末尾空闲空间不够,则判断加上起始位置的空闲空间大小是否足够,够了就将数据移动到起始位置
if (len <= TailIdleSize() + HeadIdleSize())
{
// 将数据移动到起始位置
uint64_t rsz = ReadAbleSize(); // 把当前数据大小先保存起来
std::copy(ReadPosition(), ReadPosition() + rsz, Begin()); // 把可读数据拷贝到起始位置
_reader_idx = 0; // 将读偏移归0
_writer_idx = rsz; // 将写位置置为可读数据大小, 因为当前的可读数据大小就是写偏移量
}
else
{
// 总体空间不够,则需要扩容,不移动数据,直接给写偏移之后扩容足够空间即可
_buffer.resize(_writer_idx + len);
}
}
// 写入数据
void Write(const void *data, uint64_t len)
{
// 1.保证有足够空间, 2.拷贝数据进去
EnsureWriteSpace(len);
const char *d = (const char *)data;
std::copy(d, d + len, WirtePosition());
}
void WirteAndPush(const void *data, uint64_t len)
{
Write(data, len);
MoveWriteOffset(len);
}
void WriteString(const std::string &data)
{
return Write(data.c_str(), data.size());
}
void WriteStringAndPush(const std::string &data)
{
WriteString(data);
MoveWriteOffset(data.size());
}
void WriteBuffer(Buffer &data)
{
return Write(data.ReadPosition(), data.ReadAbleSize());
}
void WirteBufferAndPush(Buffer &data)
{
WriteBuffer(data);
MoveWriteOffset(data.ReadAbleSize());
}
// 读取数据
void Read(void *buf, uint64_t len)
{
// 要求获取的数据大小必须小于可读数据大小
assert(len <= ReadAbleSize());
std::copy(ReadPosition(), ReadPosition() + len, (char *)buf);
}
void ReadAndPop(void *buf, uint64_t len)
{
Read(buf, len);
MoveReadOffset(len);
}
std::string ReadAsString(uint64_t len)
{
// 要求获取的数据大小必须小于可读数据大小
assert(len <= ReadAbleSize());
std::string str;
str.resize(len);
Read(&str[0], len); // 这里不直接用str.c_str()的原因是,这个的返回值是const类型
return str;
}
std::string ReadAsStringAndPop(uint64_t len)
{
assert(len <= ReadAbleSize());
std::string str = ReadAsString(len);
MoveReadOffset(len);
return str;
}
char *FindCRLF()
{
char *res = (char *)memchr(ReadPosition(), '\n', ReadAbleSize());
return res;
}
// 这种情况针对的是,通常获取一行数据
std::string GetLine()
{
char *pos = FindCRLF();
if (pos == NULL)
return "";
// +1 是为了把换行字符也取出来
return ReadAsString(pos - ReadPosition() + 1);
}
std::string GetLineAndPop()
{
std::string str = GetLine();
MoveReadOffset(str.size());
return str;
}
// 清空缓冲区
void Clear()
{
// 只需要将偏移量归0即可
_reader_idx = 0;
_writer_idx = 0;
}
};
// 套接字类
#define MAX_LISTEN 1024
class Socket
{
private:
int _sockfd;
public:
Socket() : _sockfd(-1) {}
Socket(int fd) : _sockfd(fd) {}
~Socket() { Close(); };
int Fd() { return _sockfd; }
// 创建套接字
bool Create()
{
// int socket(int domain, int type, int protocol)
_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (_sockfd < 0)
{
ERR_LOG("CREATE SOCKET FAILED!");
return false;
}
return true;
}
// 绑定地址信息
bool Bind(const std::string &ip, uint64_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
// int bind(int sockfd, struct sockaddr* addr, socklen_t len)
int ret = bind(_sockfd, (struct sockaddr *)&addr, len);
if (ret < 0)
{
ERR_LOG("BIND ADDRESS FAILED!");
return false;
}
return true;
}
// 开始监听
bool Listen(int backlog = MAX_LISTEN)
{
// int listen(int backlog)
int ret = listen(_sockfd, backlog);
if (ret < 0)
{
ERR_LOG("SOCKET LISTEN FAILED!");
return false;
}
return true;
}
// 向服务器发起连接
bool Connect(const std::string &ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
// int connect(int sockfd, struct sockaddr* addr, socklen_t len)
int ret = connect(_sockfd, (struct sockaddr *)&addr, len);
if (ret < 0)
{
ERR_LOG("CONNECT SERVER FAILED!");
return false;
}
return true;
}
// 获取新连接
int Accept()
{
// int accept(int sockfd, struct sockaddr *addr, socklen_t *len);
int newfd = accept(_sockfd, NULL, NULL);
if (newfd < 0)
{
ERR_LOG("SOCKET ACCEPT FAILED!");
return -1;
}
return newfd;
}
// 接收数据
ssize_t Recv(void *buf, size_t len, int flag = 0) // 0 阻塞
{
// ssize_t recv(int sockfd, void *buf, size_t len, int flag)
ssize_t ret = recv(_sockfd, buf, len, flag);
if (ret <= 0)
{
// EAGAIN 当前的接收缓冲区中没用数据了,在非阻塞的情况下才有这个错误
// EINTR 表示当前socket的阻塞等待,被信号打断了
if (errno == EAGAIN || errno == EINTR)
{
return 0; // 表示这次没用接收到数据
}
ERR_LOG("SOCKET RECV FAILED!");
return -1;
}
return ret; // 实际接收的数据长度
}
ssize_t NonBlockRecv(void *buf, size_t len)
{
return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞
}
// 发送数据
ssize_t Send(const void *buf, size_t len, int flag = 0)
{
// ssize_t send(int sockfd, void *data, size_t len, int flag)
ssize_t ret = send(_sockfd, buf, len, flag);
if (ret < 0)
{
ERR_LOG("SOCKET SEND FAILED!");
return -1;
}
return ret; // 实际发送的数据长度
}
ssize_t NonBlockSend(void *buf, size_t len)
{
return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞
}
// 关闭套接字
void Close()
{
if (_sockfd != -1)
{
close(_sockfd);
_sockfd = -1;
}
}
// 创建一个服务器连接
bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) // 接收全部
{
// 1.创建套接字 2.绑定地址 3.开始监听 4.设置非阻塞 5.启动地址重用
if (Create() == false)
return false;
if (block_flag) // 默认阻塞
NonBlock();
if (Bind(ip, port) == false)
return false;
if (Listen() == false)
return false;
ReuseAddress();
return true;
}
// 创建一个客户端连接
bool CreateClient(uint16_t port, const std::string &ip)
{
// 1.创建套接字 2.指向连接服务器
if (Create() == false)
return false;
if (Connect(ip, port) == false)
return false;
return true;
}
// 设置套接字选项 -- 开启地址端口重用
void ReuseAddress()
{
// int setsockopt(int fd, int level, int optname, void *val, int vallen)
int val = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(int)); // 地址
val = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(int)); // 端口号
}
// 设置套接字阻塞属性 -- 设置为非阻塞
void NonBlock()
{
// int fcntl(int fd, int cmd, .../*arg*/)
int flag = fcntl(_sockfd, F_GETFL, 0);
fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
}
};
class Poller; // 整合测试1:声明
class EventLoop;
// Channel类
class Channel
{
private:
int _fd;
EventLoop *_loop;
uint32_t _events; // 当前需要监控的事件
uint32_t _revents; // 当前连接触发的事件
using EventCallback = std::function<void()>;
EventCallback _read_callback; // 可读事件被触发的回调函数
EventCallback _write_callback; // 可写事件被触发的回调函数
EventCallback _error_callback; // 错误事件被触发的回调函数
EventCallback _close_callback; // 连接断开事件被触发的回调函数
EventCallback _event_callback; // 任意事件被触发的回调函数
public:
Channel(EventLoop *loop, int fd) : _fd(fd), _events(0), _revents(0), _loop(loop) {}
int Fd() { return _fd; }
uint32_t Events() { return _events; } // 获取想要监控的事件
void SetREvents(uint32_t events) { _revents = events; }
void SetReadCallback(const EventCallback &cb) { _read_callback = cb; } // 设置实际就绪的事件
void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }
void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }
void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }
void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }
// 当前是否监控了可读
bool ReadAble() { return (_events & EPOLLIN); }
// 当前是否监控了可写
bool WriteAble() { return (_events & EPOLLOUT); }
// 启动读事件监控
void EnableRead()
{
_events |= EPOLLIN;
Update();
}
// 启动写事件监控
void EnableWrite()
{
_events |= EPOLLOUT;
Update();
}
// 关闭读事件监控
void DisableRead()
{
_events &= ~EPOLLIN;
Update();
}
// 关闭写事件监控
void DisableWrite()
{
_events &= ~EPOLLOUT;
Update();
}
// 关闭所有事件监控
void DisableAll()
{
_events = 0;
Update();
}
// 移除监控
void Remove(); // 声明和实现要分离,因为实现的时候是不知道里面有什么函数成员的
void Update(); // 这两个特殊,所以把实现放在Poller类的下面进行实现
// 事件处理,一旦触发了事件,就调用这个函数,自己触发了什么事件如何处理自己决定
void HandleEvent()
{
// 第二参数,对方关闭连接,第三参数,带外数据
if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
{
if (_event_callback) // 不管任何事件,都调用的回调函数
_event_callback();
if (_read_callback)
_read_callback();
}
/*有可能会释放连接的操作事件,一次只处理一个*/
if (_revents & EPOLLOUT)
{
if (_event_callback)
_event_callback(); // 放到事件处理完毕后调用,刷新活跃度
if (_write_callback)
_write_callback();
}
else if (_revents & EPOLLERR)
{
if (_event_callback)
_event_callback();
if (_error_callback)
_error_callback();
}
else if (_revents & EPOLLHUP)
{
if (_event_callback)
_event_callback();
if (_close_callback)
_close_callback();
}
}
};
// Poller描述符监控类
#define MAX_EPOLLEVENTS 1024
class Poller
{
private:
int _epfd;
struct epoll_event _evs[MAX_EPOLLEVENTS];
std::unordered_map<int, Channel *> _channels;
private:
// 对epoll的直接操作
void Update(Channel *channel, int op)
{
// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev)
int fd = channel->Fd();
struct epoll_event ev;
ev.data.fd = fd;
ev.events = channel->Events();
int ret = epoll_ctl(_epfd, op, fd, &ev);
if (ret < 0)
{
ERR_LOG("EPOLLCTL FAILED!");
}
return;
}
// 判断一个Channel 是否已经添加了事件监控
bool HasChannel(Channel *channel)
{
auto it = _channels.find(channel->Fd());
if (it == _channels.end())
{
return false;
}
return true;
}
public:
Poller()
{
_epfd = epoll_create(MAX_EPOLLEVENTS); // 这个值大于0就行了,无用处
if (_epfd < 0)
{
ERR_LOG("EPOLL CREATE FAILED!");
abort(); // 退出程序
}
}
// 添加或修改监控事件
void UpdateEvent(Channel *channel)
{
bool ret = HasChannel(channel);
if (ret == false)
{
// 不存在则添加
_channels.insert(std::make_pair(channel->Fd(), channel));
return Update(channel, EPOLL_CTL_ADD);
}
return Update(channel, EPOLL_CTL_MOD);
}
// 移除监控
void RemoveEvent(Channel *channel)
{
auto it = _channels.find(channel->Fd());
if (it != _channels.end())
{
_channels.erase(it);
}
Update(channel, EPOLL_CTL_DEL);
}
// 开始监控, 返回活跃连接
void Poll(std::vector<Channel *> *active)
{
// int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout);
int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1); // -1阻塞监控
if (nfds < 0)
{
if (errno == EINTR) // 信号打断
{
return;
}
ERR_LOG("EPOLL WAIT ERROR:%s\n", strerror(errno));
abort();
}
for (int i = 0; i < nfds; i++) // 添加活跃信息
{
auto it = _channels.find(_evs[i].data.fd); // 没找到就说明不在我们的管理之下,这是不正常的
assert(it != _channels.end());
it->second->SetREvents(_evs[i].events); // 设置实际就绪的事件
active->push_back(it->second);
}
return;
}
};
// timerwheel时间轮定时器类
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
private:
uint64_t _id; // 定时器任务对象
uint32_t _timeout; // 定时任务的超时时间
bool _canceled; // false-表示没有被取消,true-表示被取消
TaskFunc _task_cb; // 定时器要执行的定时任务
ReleaseFunc _release; // 用于删除TimerWheel中保存的定时器对象信息
public:
TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb) : _id(id), _timeout(delay), _task_cb(cb), _canceled(false) {}
~TimerTask()
{
if (_canceled == false)
_task_cb();
_release();
}
void Cancel() { _canceled = true; }
void SetRelease(const ReleaseFunc &cb) { _release = cb; }
uint32_t DelayTime() { return _timeout; } // 返回时间
};
class TimerWheel
{
private:
using WeakTask = std::weak_ptr<TimerTask>;
using PtrTask = std::shared_ptr<TimerTask>;
int _tick; // 当前的的秒针,走到哪里哪里就释放执行
int _capacity; // 表盘最大数量 -- 其实就是最大延迟时间
std::vector<std::vector<PtrTask>> _wheel;
// 用weak_ptr来构造出新的shared_ptr用来计数,不过后续要记得释放
std::unordered_map<uint64_t, WeakTask> _timers;
EventLoop *_loop;
int _timerfd; // 定时器描述符 -- 可读事件回调就是读取计数器,执行定时任务
std::unique_ptr<Channel> _timer_channel;
private:
void RemoveTimer(uint64_t id)
{
auto it = _timers.find(id);
if (it != _timers.end())
{
_timers.erase(it);
}
}
static int CreateTimerfd()
{
// int timerfd_create(int clockid, int flags);
int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
if (timerfd < 0)
{
ERR_LOG("TIMERFD CREATE FAILED!");
abort();
}
// int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec);
struct itimerspec itime;
itime.it_value.tv_sec = 1; // 设置 秒钟
itime.it_value.tv_nsec = 0; // 设置 纳秒 第一次超时时间为1s后
itime.it_interval.tv_sec = 1; // 同上
itime.it_interval.tv_nsec = 0; // 第一次超时后,每隔超时的间隔时
timerfd_settime(timerfd, 0, &itime, NULL); // 0代表阻塞式
return timerfd;
}
void ReadTimefd()
{
uint64_t times;
int ret = read(_timerfd, ×, 8);
if (ret < 0)
{
perror("READ TIMERFD FAILED!");
abort();
}
return;
}
// 这个函数应该每秒钟被执行一次,相当于秒钟向后走了一步
void RunTimerTask()
{
_tick = (_tick + 1) % _capacity;
_wheel[_tick].clear(); // 清空指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉.从而执行函数
}
void OnTime()
{
ReadTimefd();
RunTimerTask();
}
void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb) // 添加定时任务
{
PtrTask pt(new TimerTask(id, delay, cb)); // 实例化定时任务对象
pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id)); // 第0个位置是隐藏的this指针。再把任务id绑定进去
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(pt);
_timers[id] = WeakTask(pt);
}
// 刷新/延迟定时任务
void TimerRefreshInLoop(uint64_t id)
{
// 通过保存的定时器对象的weak_ptr构造一个shared_ptr出来, 添加到轮子中
auto it = _timers.find(id);
if (it == _timers.end())
{
return; // 没找到定时任务, 没法刷新,没法延迟
}
PtrTask pt = it->second.lock(); // lock获取weak_ptr管理的对象对应的shared_ptr
int delay = pt->DelayTime(); // 获取到了初始的延迟时间
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(pt);
}
void TimerCancelInLoop(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
{
return; // 没找到定时任务, 没法刷新,没法延迟
}
PtrTask pt = it->second.lock(); // 当还没有过期才进行取消
if (pt)
pt->Cancel();
}
public:
TimerWheel(EventLoop *loop) : _capacity(60), _tick(0), _wheel(_capacity), _loop(loop),
_timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd))
{
_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));
_timer_channel->EnableRead(); // 启动读事件监控
}
/*定时器中有个_timers成员,定时器信息的操作有可能在多线程中进行,因此需要考虑线程安全问题*/
/*如果不想加锁,那就把对定期的所有操作,都放在一个线程中进行*/
void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb);
// 刷新/延迟定时任务
void TimerRefresh(uint64_t id);
void TimerCancel(uint64_t id);
/*这个接口存在线程安全问题--这个接口实际上不能被外界使用者调用,只能在模块内,对应的EventLoop线程内执行*/
bool HasTimer(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
{
return false; // 没找到定时任务, 没法刷新,没法延迟
}
return true;
}
};
// EventLoop事件监控处理类
class EventLoop
{
private:
using Functor = std::function<void()>;
std::thread::id _thread_id; // 线程ID
int _event_fd; // eventfd唤醒IO事件监控有可能导致的阻塞
std::unique_ptr<Channel> _event_channel; // 智能指针
Poller _poller; // 进行所有描述符的事件监控
std::vector<Functor> _tasks; // 任务池
std::mutex _mutex; // 实现任务池操作的线程安全
TimerWheel _timer_wheel; // 定时器模块
public:
// 执行任务池中的所有任务
void RunAllTask()
{
std::vector<Functor> functor;
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.swap(functor);
}
for (auto &f : functor)
{
f();
}
return;
}
static int CreateEventFd()
{
int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (efd < 0)
{
ERR_LOG("CREATE EVENTFD FAILED!!");
abort(); // 让程序异常退出
}
return efd;
}
void ReadEventfd()
{
uint64_t res = 0;
int ret = read(_event_fd, &res, sizeof(res));
if (ret < 0)
{
// EINTR -- 被信号打断, EAGAIN -- 表示无数据可读
if (errno == EINTR || EAGAIN)
{
return;
}
ERR_LOG("READ EVENTFD FAILED!");
abort();
}
return;
}
void WeakUpEventFd()
{
uint64_t val = 1;
int ret = write(_event_fd, &val, sizeof(val));
if (ret < 0)
{
if (errno == EINTR)
{
return;
}
ERR_LOG("READ EVENTFD FAILED!");
abort();
}
return;
}
public:
EventLoop() : _thread_id(std::this_thread::get_id()),
_event_fd(CreateEventFd()),
_event_channel(new Channel(this, _event_fd)),
_timer_wheel(this)
{
// 给eventfd添加可读事件回调函数,读取eventfd事件通知次数
_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));
// 启动eventfd的读事件监控
_event_channel->EnableRead();
}
// 三步走--事件监控-》就绪事件处理-》执行任务
void Start()
{
// 1.事件监控
std::vector<Channel *> actives;
_poller.Poll(&actives);
// 2.事件处理
for (auto &channel : actives)
{
channel->HandleEvent();
}
// 3.执行任务
RunAllTask();
}
// 用于判断当前线程是否是EventLoop对应的线程
bool IsInLoop()
{
return (_thread_id == std::this_thread::get_id());
}
// 判断将要执行的任务是否处于当前线程中,如果是则执行,否则压入队列
void RunInLoop(const Functor &cb)
{
if (IsInLoop())
{
return cb();
}
return QueueInLoop(cb);
}
// 将操作压入任务池
void QueueInLoop(const Functor &cb)
{
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.push_back(cb);
}
// 唤醒有可能因为没有事件就绪,而导致的epoll阻塞
// 其实就是给eventfd写入一个数据,eventfd就会触发可读事件
WeakUpEventFd();
}
// 添加/修改描述符的事件监控
void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); }
// 移除描述符的监控
void RemoveEvent(Channel *channel) { return _poller.RemoveEvent(channel); }
void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _timer_wheel.TimerAdd(id, delay, cb); }
void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); }
void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); }
bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
};
// 移除监控
void Channel::Remove() { return _loop->RemoveEvent(this); }
void Channel::Update() { return _loop->UpdateEvent(this); }
void TimerWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb)
{
_loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb));
}
void TimerWheel::TimerRefresh(uint64_t id)
{
_loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
}
void TimerWheel::TimerCancel(uint64_t id)
{
_loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));
}
测试结果
符合预期
后记:贴必须得贴上去,代码多拷贝就要多贴几份,不管不管