组件的实现
1. 序
1.1. 总述
muduo库是基于多Reactor-多线程模型实现的TCP网络编程库,性能良好。如libev作者:“One loop per thread is usually a good model”,muduo库的作者陈硕在其《Linux多线程服务端编程》中也力荐这种“One loop per thread”的IO模型,使我们仅需要关注EventLoop的设计与实现,然后每个线程run一个loop即可。不过由于当时C++11并没有进入实用,在这一书中,作者没有谈及C++11,整个muduo库的实现,也依赖了boost库。
而在项目设计与实现中,按照C++11标准对muduo库中核心部分进行重写,主要涉及了以下模块:Channel、Poller、EventLoop、Thread、EventLoopThread、EventLoopThreadPool、Socket、Acceptor、Buffer、TcpConnection、TcpServer,下面将进行分述。
1.2. One loop per thread
在多Reactor-多线程模型中,运用one loop per thread的思想,由一个mainReactor负责accept连接,然后把该连接挂载到某个subReactor,多个连接分配到多个线程,充分利用CPU。
2. 核心部分
在手写muduo库项目之中,存在三个核心部分,分别是Channel类、Poller类和EventLoop类,这三大类的组合,实现了reactor用以监听fd并同时处理相应的回调函数。其中Poller和Channel之间通过EventLoop相互通信。
2.1. Channel
- fd_:封装sockfd,两种Channel:listenfd-acceptorChannel,connfd-ConnectionChannel;
- events_:fd监听的事件类型;
- revents_:Poller返回的具体监听到的事件。
- callback:上层设置的各种类型事件回调;
- tie_:weak_ptr<void>,在事件监听器返回监听结果后,就会调用Channel中的handleEvent()函数。首先会把tie_这个weak_ptr提升为shared_ptr,它会指向当前的TcpConnection对象,即使外面调用了删除析构了其他所有指向该TcpConnection的智能指针,只要没有handleEvent()完,这个TcpConnection都不会被析构释放堆内存。
2.2. Poller/EpollPoller
muduo库提供poll和epoll两种IO多路复用方法来实现事件监听,重写时,通过基类Poller和派生类EpollPoller,支持了Epoll。Poller主要扮演Reactor模型中Demultiplex事件分发器(也可以说是事件监听器)的角色。
- epollfd_:记录epoll_create返回的句柄
- channels_:用来记录注册在其上的Channel的unordered_map。
2.3. EventLoop
EventLoop扮演Reactor模型中Reactor的角色,是对epoll的封装。EventLoop在epoll_create,注册各个Channel之后,处于epoll_wait阻塞状态,要想唤醒当前的EventLoop去执行新的连接,通过往wakefd上写入一个字符,唤醒当前的EventLoop。(而并非生产者-消费者模型)。
- 包含了所有的Channel
- 每一个loop都有一个wakeupFd
2.4. 具体方法的部分代码实现
- EventLoop::loop()——开启事件循环
// 开启事件循环
void EventLoop::loop()
{
// ...
while(!quit_)
{
activeChannels_.clear();
// 监听两种fd: client的fd、wakeupfd
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); // epoll_wait发生的位置
for(Channel *channel : activeChannels_)
{
// Poller监听哪些Channel发生事件了,然后上报给EventLoop,EventLoop通知处理相应的事件
// handleEevent根据具体事件类型调用相应类型的回调函数
channel->handleEvent(pollReturnTime_);
}
// ...
}
LOG_INFO("EventLoop %p stop looping. \n", this);
// ...
}
- EpollPoller::poll()——开启Poller事件监听,调用了::epoll_wait()
// 通过epoll_wait监听哪些Channel/fd发生事件
Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels)
{
// ...
int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);
int savedErrno = errno;
Timestamp now(Timestamp::now());
if(numEvents > 0) // 有事件发生
{
LOG_DEBUG("%d events happened \n", numEvents);
fillActiveChannels(numEvents, activeChannels);
if(numEvents == events_.size())
{
events_.resize(events_.size() * 2);
}
}
else if(numEvents == 0) // 超时
{
LOG_DEBUG("%s timeout! \n", __FUNCTION__);
}
else
{
if(savedErrno != EINTR)
{
errno = savedErrno;
LOG_ERROR("EPollPoller::poll() err! \n");
}
}
return now;
}
- 唤醒机制——通过向eventfd写一个数据
在Linux操作系统上,可以通过三种方式唤醒fd:1. 通过管道pipe向绑定到epollfd的一端写一个字节;2. 使用Linux内核2.6版本之后的eventfd;3. 使用socketpair。而在本项目中,采用的是创建eventfd然后在需要唤醒的时候写数据(8个字节)来唤醒subLoop。
// 创建wakeupfd,用来notify唤醒subReactor处理新的Channel
//O_CLOEXEC避免文件描述符被继承到子进程中
int createEventFd()
{
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
// ...
return evtfd;
}
// 用于唤醒loop所在线程: 向wakefd写一个数据
//wakeupFd_在构造函数中通过createsEventFd()函数初始化
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof(one));
if(n != sizeof(one))
{
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);
}
}
3. 其他部分
3.1. EventLoopThreadPool
EventLoopThreadPool类,可以理解为subLoop池,主要是对EventLoopThread的封装,而EventLoopThread又是对EventLoop(Reactor)和Thread(记录线程的详细信息)的封装。
其中,初始化时,会提供一个baseLoop(mainLoop)来进行基本的事件循环。通过设置numthreads_来创建对应数量的subReactor,每当创建一个线程,就会生成一个EventLoop。
在工作方式上,通过getNextLoop()方法,实现对subReactor的轮询。
// ...
class EventLoopThreadPool : noncopyable
{
public:
using ThreadInitCallback = std::function<void(EventLoop *)>;
EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg);
~EventLoopThreadPool();
void setThreadNum(int numThreads) { numThreads_ = numThreads; }
void start(const ThreadInitCallback &cb = ThreadInitCallback());
// 如果工作在多线程中,baseLoop默认以轮询的方式分配Channel给subLoop
EventLoop *getNextLoop();
std::vector<EventLoop *> getAllLoops();
bool started() const { return started_; }
const std::string &name() const { return name_;}
private:
EventLoop *baseLoop_; //EventLoop loop 用户线程
std::string name_;
bool started_;
int numThreads_;
int next_;
std::vector<std::unique_ptr<EventLoopThread>> threads_;
std::vector<EventLoop *> loops_;
};
3.2. Acceptor
Acceptor类,封装的是服务器监听socketfd和相关处理函数。接收新用户连接后,通过轮询来选择subReactor并给它分发连接。
3.3. TcpConnection
每个连接进来的客户端,对应一个TcpConnection,封装了一个connfd,一个Channel,各种回调函数(Callback)和读写缓冲区(Buffer)。
state_:记录当前连接状态,一共有四种:kConnected、kConnecting、kDisconnecting、kDisconnected。
整个TcpConnection的工作流程
- TcpServer通过Acceptor监听用户新连接,用accept拿到connfd
- TcpConnection设置回调给Channel,Channel注册到Poller
- Poller监听到事件就通知调用Channel的回调
3.4. Buffer
Buffer缓冲区通过vector来实现,空间不足时,通过vector类的成员函数resize()即可实现扩容。在空间的设计上,主要分为如下图三个区域(和Netty中Buffer的设计类似?)
3.5. TcpServer
在TcpServer类中,有一个Acceptor,一个EventLoopThreadPool,一些回调函数,一个记录所有连接的unordered_map<string, TcpConnectionPtr>。
// 对外服务器编程需要使用的类
class TcpServer : noncopyable
{
public:
using ThreadInitCallback = std::function<void(EventLoop *)>;
enum Option
{
kNoReusePort,
kReusePort,
};
TcpServer(EventLoop *loop, const InetAddress &listenAddr, const std::string &nameArg,Option option = kNoReusePort);
~TcpServer();
void setThreadInitCallback(const ThreadInitCallback &cb) { threadInitCallback_ = cb; }
void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; }
void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; }
void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; }
// 设置subLoop个数
void setThreadNum(int numThreads);
// 开启服务器监听
void start();
private:
void newConnection(int sockfd, const InetAddress &peerAddr);
void removeConnection(const TcpConnectionPtr &conn);
void removeConnectionInLoop(const TcpConnectionPtr &conn);
using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>;
EventLoop *loop_; // baseLoop
const std::string ipPort_;
const std::string name_;
std::unique_ptr<Acceptor> acceptor_; // 运行在mainLoop,监听新连接事件
std::shared_ptr<EventLoopThreadPool> threadPool_; // one loop per thread
ConnectionCallback connectionCallback_; // 有新连接时的回调
MessageCallback messageCallback_; // 有读写消息时的回调
WriteCompleteCallback writeCompleteCallback_; // 消息发送完成后的回调
ThreadInitCallback threadInitCallback_; // loop线程初始化的回调
std::atomic_int started_;
int nextConnId_;
ConnectionMap connections_; // 保存连接的HashMap
};
start():启动EventLoopThreadPool,调用acceptor_的listen()方法,监听客户端的连接套接字。
newConnection():该方法被注册到了acceptor_中,当acceptor_监听到新用户连接时会执行该回调,轮询选择一个subReactor;根据连接成功的sockfd,创建一个连接对象并加入到TcpServer的存储连接信息的connections_中;给这个连接设置回调;然后在mainLoop执行connectEstablished();
上面提到的关闭连接的回调函数,真实的调用过程:TcpConnection::setCloseCallBack() --> TcpServer::removeConnection() --> TcpServer::removeConnectionInLoop() --> TcpConnection::connectionDestroyed()
4. 工作流程
4.1. 安装
下载到文件夹后,sudo ./autobuild.sh,运行编译和安装脚本,相关头文件也会添加到系统路径。
4.2. 测试代码
下面的内容是一个回射服务器,可以编译运行后,使用telnet、netcat等工具进行简单测试。
#include <ee_muduo_cpp11/TcpServer.h>
#include <ee_muduo_cpp11/Logger.h>
#include <string>
#include <functional>
class EchoServer
{
public:
EchoServer(EventLoop *loop, const InetAddress &addr, const std::string &name)
: server_(loop, addr, name)
, loop_(loop)
{
// 注册回调函数
server_.setConnectionCallback(
std::bind(&EchoServer::onConnection, this, std::placeholders::_1)
);
server_.setMessageCallback(
std::bind(&EchoServer::onMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
);
// 设置合适的loop线程数量 loopthread
server_.setThreadNum(3);
}
void start()
{
server_.start();
}
private:
// 连接建立或者断开的回调
void onConnection(const TcpConnectionPtr &conn)
{
if (conn->connected())
{
LOG_INFO("Connection UP : %s", conn->peerAddress().toIpPort().c_str());
}
else
{
LOG_INFO("Connection DOWN : %s", conn->peerAddress().toIpPort().c_str());
}
}
// 可读写事件回调
void onMessage(const TcpConnectionPtr &conn,
Buffer *buf,
Timestamp time)
{
std::string msg = buf->retrieveAllAsString();
conn->send(msg);
conn->shutdown(); // 写端 EPOLLHUP =》 closeCallback_
}
EventLoop *loop_;
TcpServer server_;
};
int main()
{
EventLoop loop;
InetAddress addr(8000);
EchoServer server(&loop, addr, "EchoServer"); // Acceptor non-blocking listenfd create bind
server.start(); // listen loopthread listenfd => acceptChannel => mainLoop =>
loop.loop(); // 启动mainLoop的底层Poller
return 0;
}
4.3. 工作流程
对于整个库:
- 用户创建mainLoop,主线程作为mainReactor,主要用来接收/断开用户连接。
- 给TcpServer设置连接和读写事件回调,TcpServer再给TcpConnection设置回调(用户设置的),TcpConnection再给Channel设置回调(先执行这个,再执行用户回调)。
- TcpServer根据用户设置传入的线程数,去ThreadPool中开启几个线程。如果没有设置,mainLoop还要负责读写事件的任务。
- 当有新连接进来,创建一个TcpConnection,然后由Acceptor轮询唤醒subLoop来提供服务。
- 每个subLoop在服务时,其所包含的Poller没有事件就会处于循环阻塞状态,发生事件之后,根据类型再去执行相应的回调操作。
5. 参考资料
- 《高性能服务结构设计思想——one-thread-one-loop》,张小方,CppGuide,05. 高性能服务结构设计思想——one-thread-one-loop
- 《Linux多线程服务器编程:使用muduo C++网络库》,陈硕
- 《Muduo网络库源代码分析:EventLoopThread和EventLoopThreadPool的封装》,blfbuaa,https://www.cnblogs.com/blfbuaa/p/7263398.html
- 《图解操作系统》,小林coding,https://mp.weixin.qq.com/mp/appmsgalbum?action=getalbum&__biz=MzUxODAzNDg4NQ==&scene=1&album_id=1408057986861416450&count=3#wechat_redirect