接着上一节[muduo网络库]——muduo库三大核心组件之 Poller/EpollPoller类(剖析muduo网络库核心部分、设计思想),我们来剖析muduo库中最后一类核心组件,EventLoop类。
先回顾一下三大核心组件之间的关系。
接着我们进入正题。
EventLoop
Poller封装了和事件监听有关的方法和成员,调用Poller派生类EpollPoller::poll方法,我们就可以获得发生事件的fd 及其 发生的事件。EventLoop是网络服务器中负责 循环 的重要模块,从而做到持续监听、持续获取监听结果、持续处理监听结果对应的事件。
也就是说: EventLoop起到一个驱动循环的功能,Poller负责从事件监听器上获取监听结果,Channel类将fd及其相关属性封装,并将fd及其感兴趣事件和发生的事件以及不同事件对应的回调函数封装在一起,这样在各个模块中传递更加方便。接着被EventLoop调用。
可能上面我画的图不能充分表达三者在muduo库中的角色,下面借用我在地铁站里吃闸机博主的图,可能会让大家看的更加直观。
在EventLoop就能够充分提现muduo库的重要思想:One Loop Per Thread
在muduo库里边有两种线程:一种里边的事件循环专门处理新用户连接(mainLoop( 也就是baseLoop)),一种里边的事件循环专门处理对应连接的所有读写事件(ioLoop)。
重要成员变量
std::unique_ptr<Poller> poller_;
const pid_t threadId_; //记录当前loop所在线程的id
TimeStamp pollReturnTime_; //poller返回发生事件的channels的时间点
int wakeupFd_;
std::unique_ptr<Channel> wakeupChannel_;
ChannelList activeChannels_;
std::atomic_bool callingPendingFunctors_;
std::vector<Functor> pendingFunctors_;
std::mutex mutex_;
poller_
就不用在多说什么了,通过它会返回给EventLoop发生的事件。wakeupFd_
是非常重要的一个成员,与之对应的wakeupChannel_
,起到了一个唤醒loop所在的线程的作用,因为当前线程主要阻塞在poll函数上,唤醒的方法时手动激活这个wakeupChannel_, 写入几个字节让Channel变为可读, 当然这个Channel也注册到Pooll中,在下面的成员函数会详细介绍它的实现。threadId_
创建时要保存当前时间循环所在的线程,用于之后运行时判断使用EventLoop的线程是否时EventLoop所属的线程.pollReturnTime_
保存poll返回的时间,用于计算从激活到调用回调函数的延迟activeChannels_
就是poller返回的所有发生事件的channel列表。callingPendingFunctors_
标识当前loop是否有需要执行的回调操作pendingFunctors_
存储loop需要执行的所有回调操作,避免本来属于当前线程的回调函数被其他线程调用,应该把这个回调函数添加到属于它所属的线程,等待它属于的线程被唤醒后调用,满足线程安全mutex_
互斥锁,用来保护vector容器的线程安全操作
重要成员函数
- 最最最最重要的莫过于
loop()
了
void EventLoop::loop()
{
looping_ = true;
quit_ = false;
LOG_INFO("EventLoop %p start looping \n",this);
while(!quit_)
{
activeChannels_.clear();
//监听两类fd 一种是client的fd 一种是wakeup
pollReturnTime_ = poller_->poll(kPollTimeMs,&activeChannels_);
for(Channel *channel : activeChannels_)
{
//poller监听哪些channel发生事件了,然后上报给EventLoop,通知channel处理相应的事件
channel->handleEvent(pollReturnTime_);
}
//执行当前EventLoop事件循环需要处理的回调操作
/**
* IO线程 mainloop accept fd <= channel subloop
* mainloop事先注册一个回调cb,需要subloop执行
* wakeup subloop后执行下面的方法 执行之前mainloop注册的cb回调
*
*/
doPendingFunctors();
}
LOG_INFO("EventLoop %p stop looping,\n",this);
looping_ = false;
}
从代码中,我们可以看出最核心的部分就是调用了Poller的poll
方法,它返回了发生的事件channel列表以及发生的时间now
。
接着可以看出还有一个doPendingFunctors
函数
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true; //需要执行回调
//括号用于上锁 出了括号就解锁了
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for(const Functor &functor: functors)
{
functor();//执行当前loop需要执行的回调操作
}
callingPendingFunctors_ = false;
}
实际上,这个函数就是用来执行回调的,值得注意的一点就是: 这里使用了一个比较巧妙的思想就是,使用一个局部的vector
和pendingFunctors_
的交换,这样就避免了因为要读取这个pendingFunctors_
的时候,没有释放锁,而新的事件往里写得时候写不进去(mainloop向subloop里面写回调)。
还有一点,一开始的时候很疑惑functor();
是在执行什么呢?其实在这里我们可以看出来,经过交换functor();
拿到的实际上pendingFunctors_.emplace_back(cb);
中的内容,执行回调。那么pendingFunctors_
怎么来的?
- 那就是
runInLoop
以及queueInLoop
//在当前loop中执行cb
void EventLoop::runInLoop(Functor cb)
{
if(isInLoopThread())//在当前的loop线程中,执行cb
{
cb();
}
else //在非当前loop执行cb,就需要唤醒loop所在线程执行cb
{
queueInLoop(cb);
}
}
void EventLoop::queueInLoop(Functor cb)
{
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}
if(!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
可以看出来runInLoop
主要是判断是否处于当前IO线程,是则执行这个函数,如果不是则将函数加入队列queueInLoop
。在queueInLoop
就会把cb
放入pendingFunctors_
值得注意: wakeup();
这个函数:
在构造函数中已经给它注册了回调函数:
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead,this));
wakeupChannel_->enableReading();
每一个eventloop都将监听wakeupchannel的EPOLLIN读事件了,mianreactor通过给subreactor写东西,通知其苏醒,那么handleRead
里面是什么呢?
handleRead
也是其中比较重要的一个回调了
//发送给subreactor一个读信号,唤醒subreactor
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = read(wakeupFd_, &one, sizeof one);
if(n != sizeof one)
{
LOG_ERROR("EventLoop::handleRead() reads %d bytes instead of 8",n);
}
}
- 接着看看
wakeup()
源码
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = write(wakeupFd_,&one,sizeof one);
if(n != sizeof one)
{
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n",n);
}
}
- 在析构的时候,关闭它
EventLoop::~EventLoop()
{
wakeupChannel_->disableAll();
wakeupChannel_->remove();
::close(wakeupFd_);
t_loopInThisThread = nullptr;
}
这就和上面提到的wakeupFd_
联系起来了,
首先wakeupFd_
实际上是调用eventfd
,把这个wakeupFd_
添加到poll中,在需要唤醒时写入8字节数据,
在构造函数中,也注册了它对应的回调函数wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead,this));
,
此时poll返回,执行回调函数,然后执行在pendingFunctors_中的函数。
什么时候需要唤醒呢?
if(!isInLoopThread() || callingPendingFunctors_)
前者还是比较好理解的,One Loop Per Thread
既然不在这个loop中,那就唤醒它;后者呢?从doPendingFunctors
函数中我们可以看到callingPendingFunctors_= true;
时,是表明正在执行回调函数,在loop()
中可以看出执行完回调,又会阻塞在poller_->poll(kPollTimeMs,&activeChannels_);
,如果再次调用queueInLoop
,就需要再次唤醒才能继续执行新的回调doPendingFunctors
。
- 判断是否在当前线程
首先通过以下代码获取了当前的loop的线程id,
threadId_(CurrentThread::tid())
实际上是在CurrentThread
类中,通过调用SYS_gettid
来获得,有关于SYS_gettid
在我的另一篇博客,已经给了详细的介绍Linux—C/C++编程:syscall(系统调用)、SYS_gettid在muduo库中的使用以及static_cast
然后通过isInLoopThread()
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
进行比较,来判断是否在当前的线程
- 接下来还有三个回调函数
//EventLoop的方法=> poller的方法
void EventLoop::updateChannel(Channel* channel)
{
poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel* channel)
{
poller_->removeChannel(channel);
}
bool EventLoop::hasChannel(Channel* channel)
{
return poller_->hasChannel(channel);
}
这就是调用了poller_
的方法。
- 最后的最后,就是退出循环了~
void EventLoop::quit()
{
quit_ = true;
if(!isInLoopThread())
{
wakeup();
}
}
当然了,不在当前线程也是需要唤醒的。
EventLoop中有很多值得学习的点,但是最巧妙的就是wakeupFd_
的设计:
传统的进程/线程间唤醒办法是用pipe或者socketpair,IO线程始终监视管道上的可读事件,在需要唤醒的时候,其他线程向管道中写一个字节,这样IO线程就从IO multiplexing阻塞调用中返回。pipe和socketpair都需要一对文件描述符,且pipe只能单向通信,socketpair可以双向通信。一方面它比 pipe 少用一个 fd,节省了资源;另一方面,wakeupFd_
的缓冲区管理也简单得多,全部buffer只有定长8 bytes,不像 pipe 那样可能有不定长的真正 buffer。muduo库也没有采用生产者消费者的模型,采用了wakeupFd_
这种巧妙的思想,在今后的学习中,我们也可以进一步的使用它。
最后附上代码地址:https://github.com/Cheeron955/mymuduo/tree/master