day16-重构核心库、使用智能指针
太多了分一篇写。
5、EventLoop
这是一个事件轮询,在这个部分会通过Poller进行就绪事件的获取,并将事件进行处理。
头文件
这里使用了一个智能指针并使用的是unique_ptr指向Poller红黑树,防止所有权不止一个,也就是出现指针悬挂的问题。
class EventLoop {
public:
DISALLOW_COPY_AND_MOVE(EventLoop);
EventLoop();
~EventLoop();
void Loop() const;
void UpdateChannel(Channel *ch) const;
void DeleteChannel(Channel *ch) const;
private:
std::unique_ptr<Poller> poller_;
};
实现
这更像是一个中间件,通过初始化一个unique_ptr指针调用Poller中各种方法,由于使用了智能指针,析构函数为空,这是由于在超出作用域后poller_会自动释放。
通过Loop轮询处理将就绪事件链表中的事件进行处理,处理事件的方法在Channel中实现。
EventLoop::EventLoop() { poller_ = std::make_unique<Poller>(); }
EventLoop::~EventLoop() {}
void EventLoop::Loop() const {
while (true) {
for (Channel *active_ch : poller_->Poll()) {
active_ch->HandleEvent();
}
}
}
void EventLoop::UpdateChannel(Channel *ch) const { poller_->UpdateChannel(ch); }
void EventLoop::DeleteChannel(Channel *ch) const { poller_->DeleteChannel(ch); }
6、Acceptor
上述几个类处理了事件,但是建立连接只在Socket中出现过,这部分太底层了,我们需要开发更高级的接口,用来建立连接。
头文件
同样禁止了类的复制和移动,这里将构造函数设置为explicit目的是防止发生类似于这样的隐式类型转换:
EventLoop loop;
Acceptor acceptor = loop;
这个类中包含有socket、channel以及回调函数三个属性用以构建连接。这里使用unique_ptr来管理socket、channel属性。
class Acceptor {
public:
DISALLOW_COPY_AND_MOVE(Acceptor);
explicit Acceptor(EventLoop *loop);
~Acceptor();
RC AcceptConnection() const;
void set_new_connection_callback(std::function<void(int)> const &callback);
private:
std::unique_ptr<Socket> socket_;
std::unique_ptr<Channel> channel_;
std::function<void(int)> new_connection_callback_;
};
实现
在构造和析构函数中,首先创建指向socket的指针,之后开始调用创建、绑定、将socket设置为可监听的状态,然后构建channel并加入到轮询中,之后将AcceptConnection作为channel中的读回调函数并将通道事件置为可读。由于使用智能指针管理,因此析构函数为空。
Acceptor::Acceptor(EventLoop *loop) {
socket_ = std::make_unique<Socket>();
assert(socket_->Create() == RC_SUCCESS);
assert(socket_->Bind("127.0.0.1", 1234) == RC_SUCCESS);
assert(socket_->Listen() == RC_SUCCESS);
channel_ = std::make_unique<Channel>(socket_->fd(), loop);
std::function<void()> cb = std::bind(&Acceptor::AcceptConnection, this);
channel_->set_read_callback(cb);
channel_->EnableRead();
}
Acceptor::~Acceptor() {}
将服务器与请求连接的客户端进行连接,并记录客户端的socket,将建立的连接设置为无阻塞状态,最后调用new_connection_callback_回调函数。
RC Acceptor::AcceptConnection() const{
int clnt_fd = -1;
if( socket_->Accept(clnt_fd) != RC_SUCCESS ) {
return RC_ACCEPTOR_ERROR;
}
// TODO: remove
fcntl(clnt_fd, F_SETFL, fcntl(clnt_fd, F_GETFL) | O_NONBLOCK); // 新接受到的连接设置为非阻塞式
if (new_connection_callback_) {
new_connection_callback_(clnt_fd);
}
return RC_SUCCESS;
}
set_new_connection_callback设置回调函数,在设置的过程中有个小细节就是利用std::move转移所有权,也就是callback变成了右值。
void Acceptor::set_new_connection_callback(std::function<void(int)> const &callback) {
new_connection_callback_ = std::move(callback);
}
7、Connection
Connection 类的主要任务是管理一个单一的网络连接。它封装了连接的各种操作和状态,提供了一种面向对象的方式来处理读写数据、维护连接状态以及处理事件,使得用户可以通过面向对象的方式来管理每一个连接,并且可以通过设置回调函数来处理连接建立、数据接收等事件。
头文件
提供了标志连接状态的标志位State,主要有socket、channel、buf、state、以及一些回调函数。可以通过这些属性看出来,在这个类中完成了所有连接上可能发生的事件。
class Connection {
public:
enum State {
Invalid = 0,
Connecting,
Connected,
Closed,
};
DISALLOW_COPY_AND_MOVE(Connection);
Connection(int fd, EventLoop *loop);
~Connection();
void set_delete_connection(std::function<void(int)> const &fn);
void set_on_connect(std::function<void(Connection *)> const &fn);
void set_on_recv(std::function<void(Connection *)> const &fn);
State state() const;
Socket *socket() const;
void set_send_buf(const char *str);
Buffer *read_buf();
Buffer *send_buf();
RC Read();
RC Write();
RC Send(std::string msg);
void Close();
void onConnect(std::function<void()> fn);
void onMessage(std::function<void()> fn);
private:
void Business();
RC ReadNonBlocking();
RC WriteNonBlocking();
RC ReadBlocking();
RC WriteBlocking();
private:
std::unique_ptr<Socket> socket_;
std::unique_ptr<Channel> channel_;
State state_;
std::unique_ptr<Buffer> read_buf_;
std::unique_ptr<Buffer> send_buf_;
std::function<void(int)> delete_connectioin_;
std::function<void(Connection *)> on_recv_;
};
实现
构造函数中首先创建了指向socket的指针并为socket进行赋值,再创建Channel将socket和事件类型进行绑定,并初始化读缓存和发送缓存,将状态设置为已连接。。
析构函数由于使用智能指针管理了几个类,会自动释放,因此析构函数仍然为空。
Connection::Connection(int fd, EventLoop *loop) {
socket_ = std::make_unique<Socket>();
socket_->set_fd(fd);
if (loop != nullptr) {
channel_ = std::make_unique<Channel>(fd, loop);
channel_->EnableRead();
channel_->EnableET();
}
read_buf_ = std::make_unique<Buffer>();
send_buf_ = std::make_unique<Buffer>();
state_ = State::Connected;
}
Connection::~Connection() {}
对读写事件逻辑进行设置,首先判断链接状态,如果Connection是未连接的状态则无法进行任何操作。在读事件中需要先清空缓存数据再读,防止上次遗留数据发生影响。在写事件中需要先操作再清空数据,防止对下次写入产生影响。这样无论写读还是读写都不会发生问题。
RC Connection::Read() {
if (state_ != State::Connected) {
perror("Connection is not connected, can not read");
return RC_CONNECTION_ERROR;
}
assert(state_ == State::Connected && "connection state is disconnected!");
read_buf_->Clear();
if (socket_->IsNonBlocking()) {
return ReadNonBlocking();
} else {
return ReadBlocking();
}
}
RC Connection::Write() {
if (state_ != State::Connected) {
perror("Connection is not connected, can not write");
return RC_CONNECTION_ERROR;
}
RC rc = RC_UNDEFINED;
if (socket_->IsNonBlocking()) {
rc = WriteNonBlocking();
} else {
rc = WriteBlocking();
}
send_buf_->Clear();
return rc;
}
接下来是无阻塞和有阻塞读事件的具体代码,无阻塞的逻辑是一直在死循环中读取数据,只要数据就绪就进行读取,没有数据是则进行轮询;而阻塞的逻辑是一直读,如果没有数据就停在那里等待数据。
RC Connection::ReadNonBlocking() {
int sockfd = socket_->fd();
char buf[1024]; // 这个buf大小无所谓
while (true) { // 使用非阻塞IO,读取客户端buffer,一次读取buf大小数据,直到全部读取完毕
memset(buf, 0, sizeof(buf));
ssize_t bytes_read = read(sockfd, buf, sizeof(buf));
if (bytes_read > 0) {
read_buf_->Append(buf, bytes_read);
} else if (bytes_read == -1 && errno == EINTR) { // 程序正常中断、继续读取
printf("continue reading\n");
continue;
} else if (bytes_read == -1 &&
((errno == EAGAIN) || (errno == EWOULDBLOCK))) { // 非阻塞IO,这个条件表示数据全部读取完毕
break;
} else if (bytes_read == 0) { // EOF,客户端断开连接
printf("read EOF, client fd %d disconnected\n", sockfd);
state_ = State::Closed;
Close();
break;
} else {
printf("Other error on client fd %d\n", sockfd);
state_ = State::Closed;
Close();
break;
}
}
return RC_SUCCESS;
}
RC Connection::ReadBlocking() {
int sockfd = socket_->fd();
// unsigned int rcv_size = 0;
// socklen_t len = sizeof(rcv_size);
// getsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &rcv_size, &len);
size_t data_size = socket_->RecvBufSize();
char buf[1024];
ssize_t bytes_read = read(sockfd, buf, sizeof(buf));
if (bytes_read > 0) {
read_buf_->Append(buf, bytes_read);
} else if (bytes_read == 0) {
printf("read EOF, blocking client fd %d disconnected\n", sockfd);
state_ = State::Closed;
} else if (bytes_read == -1) {
printf("Other error on blocking client fd %d\n", sockfd);
state_ = State::Closed;
}
return RC_SUCCESS;
}
在写数据时阻塞和非阻塞差别和读事件时一样,但是需要注意的是在写事件中我们需要记录每一次写到了缓存中那部分,下次从新的位置开始写入。
RC Connection::WriteNonBlocking() {
int sockfd = socket_->fd();
char buf[send_buf_->Size()];
memcpy(buf, send_buf_->c_str(), send_buf_->Size());
int data_size = send_buf_->Size();
int data_left = data_size;
while (data_left > 0) {
ssize_t bytes_write = write(sockfd, buf + data_size - data_left, data_left);
if (bytes_write == -1 && errno == EINTR) {
printf("continue writing\n");
continue;
}
if (bytes_write == -1 && errno == EAGAIN) {
break;
}
if (bytes_write == -1) {
printf("Other error on client fd %d\n", sockfd);
state_ = State::Closed;
break;
}
data_left -= bytes_write;
}
return RC_SUCCESS;
}
RC Connection::WriteBlocking() {
// 没有处理send_buffer_数据大于TCP写缓冲区,的情况,可能会有bug
int sockfd = socket_->fd();
ssize_t bytes_write = write(sockfd, send_buf_->buf().c_str(), send_buf_->Size());
if (bytes_write == -1) {
printf("Other error on blocking client fd %d\n", sockfd);
state_ = State::Closed;
}
return RC_SUCCESS;
}
这部分向send_buf_中写入信息数据,并开始传输。这是对外的最主要的接口,首先通过set_send_buf写入写缓存,再通过Write写入临时缓存。
RC Connection::Send(std::string msg) {
set_send_buf(msg.c_str());
Write();
return RC_SUCCESS;
}
void Connection::set_send_buf(const char *str) { send_buf_->set_buf(str); }
这部分在设置回调函数的过程中就调用了读事件进行数据读取,首先通过read将数据读取到读缓存,再通过on_recv_做显示。注意:虽然代码中使用了临时的 buf 数组来暂存待发送的数据,但这些数据最终会通过操作系统内核的套接字发送缓冲区发送到目标端。发送过程中的详细流程是由操作系统网络栈和驱动程序处理的,应用程序通过 write 函数将数据提交给操作系统,而后续的网络传输则由操作系统负责管理。
临时缓冲区更易于维护,在写入前读取,在错误时不必重新构建整个send_buf_,而且不会发生竞态问题,提高效率。
void Connection::Business() {
Read();
on_recv_(this);
}
void Connection::set_on_recv(std::function<void(Connection *)> const &fn) {
on_recv_ = std::move(fn);
std::function<void()> bus = std::bind(&Connection::Business, this);
channel_->set_read_callback(bus);
}
获取/设置一些属性。
void Connection::set_delete_connection(std::function<void(int)> const &fn) { delete_connectioin_ = std::move(fn); }
void Connection::Close() { delete_connectioin_(socket_->fd()); }
Connection::State Connection::state() const { return state_; }
Socket *Connection::socket() const { return socket_.get(); }
Buffer *Connection::read_buf() { return read_buf_.get(); }
Buffer *Connection::send_buf() { return send_buf_.get(); }
8、Buffer
这里就是缓冲区的构建,注意如果不是在单线程并且没有涉及到异步非阻塞的网络操作时,可以不显式的构造send_buf_和read_buf_而只需要一个buf;在多线程的情况下,使用send_buf_和read_buf_可以同时进行读写操作。
头文件
在缓冲区中只有一个属性buf_,而且使用默认构造/析构函数。
class Buffer {
public:
DISALLOW_COPY_AND_MOVE(Buffer);
Buffer() = default;
~Buffer() = default;
const std::string &buf() const;
const char* c_str() const;
void set_buf(const char *buf);
size_t Size() const;
void Append(const char *_str, int _size);
void Clear();
private:
std::string buf_;
};
实现
这里获取buf和buf中数据的方法都被定义为const,代表在方法内不能修改数据,返回的数据不能被修改。
之后还有一个将buf中的数据交换到buf_的操作,注意这里由于buf由const修饰,所以不能直接swap,需要利用一个临时变量进行swap,string中接收使用const char * 类型的参数进行初始化。
之后有返回缓冲区字符长度以及清空缓存的方法。将字符串写入buf_中的方法。
const std::string &Buffer::buf() const { return buf_; }
const char *Buffer::c_str() const { return buf_.c_str(); }
void Buffer::set_buf(const char *buf) {
std::string new_buf(buf);
buf_.swap(new_buf);
}
size_t Buffer::Size() const { return buf_.size(); }
void Buffer::Append(const char *str, int size) {
for (int i = 0; i < size; ++i) {
if (str[i] == '\0') {
break;
}
buf_.push_back(str[i]);
}
}
void Buffer::Clear() { buf_.clear(); }
9、ThreadPool
线程池用来处理多线程的部分。
头文件
在线程池中有打工人(线程容器),任务(任务队列),锁,条件变量以及标志位。
由于线程池需要处理返回值参数各不相同的任务,因此在添加任务的方法上设置比较困难。
template<class F, class… Args>:这是模板函数的模板参数部分。它声明了函数模板add,其中class F表示一个类型参数,class… Args表示一个模板参数包(C++11引入的可变参数模板)。
auto add(F&& f, Args&&… args):这是函数的声明部分。auto表示返回类型会根据函数体内的具体实现而推导得出。F&& f表示一个通用引用(universal reference),用于接收传递给函数的函数对象f。Args&&… args表示一个模板参数包,用于接收传递给函数的参数列表。
-> std::future<typename std::result_of<F(Args…)>::type>:这是函数的返回类型部分。->后面跟着返回类型的声明。std::future是C++标准库中用于异步操作的类模板,它表示一个可能还没有完成的异步操作。typename std::result_of<F(Args…)>::type其中std::result_of用于获取函数调用的返回类型,它接受函数类型和参数类型,并提供一个type成员,表示函数调用的返回类型。在这里,F表示函数类型,Args…表示参数类型,因此std::result_of<F(Args…)>表示函数f的返回类型,而typename std::result_of<F(Args…)>::type表示该返回类型。
由于C++并非不支持模板分离编译(或者单独写一个文件能解决),因此需要在头文件中进行方法实现,首先利用using定义类型别名
创建了一个std::shared_ptr指向一个std::packaged_task对象的智能指针task。packaged_task用于包装可调用对象,并将其结果存储在std::future中,以便在需要时获取。
std::future<return_type> res = task->get_future();: 获取了task中的std::future对象,以便在任务执行完成后获取任务的返回值。
之后就是开始加锁并检查线程池是否停止运行,如果停止运行则抛出异常。
tasks.emplace(task{ (*task)(); });: 将一个 lambda 表达式添加到任务队列tasks中。这个 lambda 表达式捕获了task指针,并调用(*task)(),即执行被包装的任务。
cv.notify_one();: 通过条件变量cv通知一个等待的线程有任务可执行。
return res;: 返回任务的std::future对象,以便调用者可以在需要时获取任务的返回值。
至于为什么不使用unique_ptr呢?
1、任务对象被添加到任务队列后,可能会被多个工作线程共享执行。使用 std::shared_ptr 可以确保任务对象在所有持有者释放之前都不会被销毁,这样在多线程环境下更加安全。
2、在添加任务到任务队列时,任务对象需要被捕获到 lambda 表达式中,以便在工作线程中执行。由于 std::unique_ptr 是独占所有权的智能指针,它不能被复制或移动,这使得在多线程环境中捕获到 lambda 中有所限制。
3、跨线程传递任务对象可能是必须的,而 std::shared_ptr 提供了原子操作和线程安全的引用计数,使得跨线程传递更为方便。
class ThreadPool {
public:
DISALLOW_COPY_AND_MOVE(ThreadPool);
explicit ThreadPool(unsigned int size = std::thread::hardware_concurrency());
~ThreadPool();
template <class F, class... Args>
auto Add(F &&f, Args &&...args) -> std::future<typename std::invoke_result<F, Args...>::type>;
private:
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex queue_mutex_;
std::condition_variable condition_variable_;
std::atomic<bool> stop_{false};
};
// 不能放在cpp文件,C++编译器不支持模版的分离编译
template <class F, class... Args>
auto ThreadPool::Add(F &&f, Args &&...args) -> std::future<typename std::invoke_result<F, Args...>::type> {
using return_type = typename std::invoke_result<F, Args...>::type;
auto task =
std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex_);
// don't allow enqueueing after stopping the pool
if (stop_) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks_.emplace_back([task]() { (*task)(); });
}
condition_variable_.notify_one();
return res;
}
实现
构造函数中首先创建足够的打工人,之后在死循环中不断加入任务,当且仅当停止标志为真且任务队列为空才会停止所有线程。
析构函数中调用锁将停止位置真,调用 condition_variable_.notify_all(),通知所有等待在 condition_variable_ 条件变量上的线程,告知它们有可能发生了状态变化,需要重新检查条件,接着,使用循环遍历线程池中的工作线程,并逐一判断是否可加入(joinable),如果是,则调用 join() 方法等待线程执行完毕并回收资源。
析构函数设置 stop_ 为 true 并通知所有线程,这使得所有工作线程停止获取新任务并安全地退出。如果在设置 stop_ 标志时队列中仍有未完成的任务,则已经被工作线程获取的任务会继续执行,但线程池不会再处理新的任务。
ThreadPool::ThreadPool(unsigned int size) {
for (unsigned int i = 0; i < size; ++i) {
workers_.emplace_back(std::thread([this]() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
condition_variable_.wait(lock, [this]() { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) {
return;
}
//从这里开始是以获取的任务
task = tasks_.front();
tasks_.pop();
}
task();
}
}));
}
}
ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
stop_ = true;
}
condition_variable_.notify_all();
for (std::thread &th : workers_) {
if (th.joinable()) {
th.join();
}
}
}
10、TcpServer
这部分就包含了所有操作,从创建socket、建立连接、处理事件、删除链接,这个类提供了一个最朴实对外的接口。
头文件
在这部分包含有一个MainReactor用以完成初始化、分发连接,Acceptor用以建立连接,SubReactor用以处理事件并接收响应。并定义了一个线程池用以多线程完成高并发的任务,并且有两个回调函数。
class TcpServer {
public:
DISALLOW_COPY_AND_MOVE(TcpServer);
TcpServer();
~TcpServer();
void Start();
RC NewConnection(int fd);
RC DeleteConnection(int fd);
void onConnect(std::function<void(Connection *)> fn);
void onRecv(std::function<void(Connection *)> fn);
private:
std::unique_ptr<EventLoop> main_reactor_;
std::unique_ptr<Acceptor> acceptor_;
std::unordered_map<int, std::unique_ptr<Connection>> connections_;
std::vector<std::unique_ptr<EventLoop>> sub_reactors_;
std::unique_ptr<ThreadPool> thread_pool_;
std::function<void(Connection *)> on_connect_;
std::function<void(Connection *)> on_recv_;
};
实现
构造函数中首先构建一个指向轮询的MainReactor智能指针,之后从指针中利用.get()获取EventLoop并建立Acceptor(这里创建socket、绑定、监听),之后设置新连接到来时的回调函数为NewConnection,创建一个线程池并创建多个子 Reactor 。
析构函数同样由于管理由智能指针完成,析构函数为空。
TcpServer::TcpServer() {
main_reactor_ = std::make_unique<EventLoop>();
acceptor_ = std::make_unique<Acceptor>(main_reactor_.get());
std::function<void(int)> cb = std::bind(&TcpServer::NewConnection, this, std::placeholders::_1);
acceptor_->set_new_connection_callback(cb);
unsigned int size = std::thread::hardware_concurrency();
thread_pool_ = std::make_unique<ThreadPool>(size);
for (size_t i = 0; i < size; ++i) {
std::unique_ptr<EventLoop> sub_reactor = std::make_unique<EventLoop>();
sub_reactors_.push_back(std::move(sub_reactor));
}
}
TcpServer::~TcpServer() {}
在NewConnection方法中,创建一个新的 Connection 对象并关联到合适的子 Reactor 上,设置删除连接/接收数据的回调函数 DeleteConnection/on_recv_,将创建的 Connection 对象存储在 connections_ 容器中,如果有设置连接回调函数 on_connect_,则调用该函数。
RC TcpServer::NewConnection(int fd) {
assert(fd != -1);
uint64_t random = fd % sub_reactors_.size();
std::unique_ptr<Connection> conn = std::make_unique<Connection>(fd, sub_reactors_[random].get());
std::function<void(int)> cb = std::bind(&TcpServer::DeleteConnection, this, std::placeholders::_1);
conn->set_delete_connection(cb);
conn->set_on_recv(on_recv_);
connections_[fd] = std::move(conn);
if (on_connect_) {
on_connect_(connections_[fd].get());
}
return RC_SUCCESS;
}
删除指定文件描述符的连接,在 connections_ 容器中查找并移除对应的 Connection 对象。
RC TcpServer::DeleteConnection(int fd) {
auto it = connections_.find(fd);
assert( it != connections_.end() );
connections_.erase(fd);
return RC_SUCCESS;
}
启动服务器的事件循环,使用线程池中的线程执行每个子 Reactor 的事件循环,主 Reactor 执行自己的事件循环。
void TcpServer::Start() {
for (size_t i = 0; i < sub_reactors_.size(); ++i) {
std::function<void()> sub_loop = std::bind(&EventLoop::Loop, sub_reactors_[i].get());
thread_pool_->Add(std::move(sub_loop));
}
main_reactor_->Loop();
}
注意:线程池中的线程会专门处理读写事件,而不处理accept、connection这个过程。