目录
摘要
1 举个栗子
2 从 epoll_create 开始
3 epoll_ctl,插入待监听的描述符
3.1 故事围绕 ep_item 展开
3.2 在 socket 等待队列上设置 epoll 回调
3.3 关系变得复杂
4 epoll_wait 等你
4.1 等待就绪事件
4.2 共享内存?
5 来了来了,数据来了
5.1 tcp_rcv_established
5.2 ep_poll_callback
5.3 唤醒任务
6 总结
摘要
IO 多路复用这个词听起来还是蛮高大上的,在 Linux 上,常见的IO 多路复用的实现方案有 select、poll、epoll,相关的文章也都看过许多,都知道 epoll 高效,今天从内核源码角度学习一下 epoll 的高效之处。
文中引用 Linux 内核源码基于版本 2.6.34,并做了一些删减以提高可读性。
1 举个栗子
想要更好的了解源码,得先搞清楚入口是哪个。Linux 提供了几个基本的接口供用户使用:
// 创建一个 epoll 对象
ep_fd = epoll_create(...);
// 向 epoll 对象添加需要监听的 socket 文件描述符
epoll_ctl(ep_fd, EPOLL_CTL_ADD, sock_fd, ...);
// wait 轮询
epoll_wait(ep_fd, ready_events)
// 遍历处理就绪的事件
for (ready_events)
do_thing()
2 从 epoll_create 开始
epoll_create 系统调用实现位于 fs/epollevent.c 下:
// 创建一个 epoll 对象
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
struct eventpoll *ep = NULL;
// 创建内部数据结构 ("struct eventpoll").
error = ep_alloc(&ep);
if (error < 0)
return error;
// 创建 epoll 的文件 inode 节点:ep 作为 file_struct 的一个 private_data
error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
...
}
epoll_create 这个方法比较简单,主要是调用 ep_alloc 创建了 eventpoll 这个结构体并为之关联具体的文件 inode 节点。可见 Linux 一切皆文件思想的应用到处都是。那么 eventpoll 长什么样子呢?
event_poll 结构体中成员还是挺多的,这里只列出关键的几个变量:
struct eventpoll {
// epoll_wait 使用的等待队列:文件事件就绪的时候,会通过这个队列来查找等待的进程
wait_queue_head_t wq;
// 就绪的文件描述符队列
struct list_head rdllist;
// 用来存储被监听文件描述符的红黑树:支持log(n)的查找、插入、删除
struct rb_root rbr;
};
ep_alloc 中负责对其分配内存及初始化:
static int ep_alloc(struct eventpoll **pep)
{
struct eventpoll *ep;
ep = kzalloc(sizeof(*ep), GFP_KERNEL);
// 初始化等待队列
init_waitqueue_head(&ep->wq);
// 初始化就绪队列
INIT_LIST_HEAD(&ep->rdllist);
// 初始化被监听文件节点的结构->红黑树
ep->rbr = RB_ROOT;
*pep = ep;
}
所以我们对 epoll_event 的初映像就可以画出来了:
脑海中有了这个图,也有助于我们理解后面其它部分的逻辑了。
3 epoll_ctl,插入待监听的描述符
同样的,epoll_ctl 系统调用也是位于 fs/eventpoll.c 文件中,为了简化逻辑,我们只关心 ADD 操作:
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
// 从用户空间复制要监听的文件描述符及关心的事件类型
if (ep_op_has_event(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
goto error_return;
// 获取 epfd 关联的文件结构
file = fget(epfd);
// 获取目标文件的 strcut file 结构
tfile = fget(fd);
// 被监听的文件描述符必需支持 poll 方法,普通文件是不支持的
if (!tfile->f_op || !tfile->f_op->poll)
goto error_tgt_fput;
// 通过 epfd 的文件结构中的私有数据部分拿到 eventpoll 结构
ep = file->private_data;
// 尝试查找目标 fd 对应的 ep_item
epi = ep_find(ep, tfile, fd);
switch (op) {
case EPOLL_CTL_ADD:
// 没有找到才需要执行插入操作
if (!epi) {
error = ep_insert(ep, &epds, tfile, fd);
}
break;
case EPOLL_CTL_DEL:
...
case EPOLL_CTL_MOD:
...
}
}
3.1 故事围绕 ep_item 展开
在 epoll_ctl 中,主要是通过 fd、epfd 找到对应的文件结构,然后执行 ep_insert 做真正的插入工作:
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
struct epitem *epi;
struct ep_pqueue epq;
// 从 slab 对象分配器中分配一个 epitem 对象
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;
// 初始化 epitem
// 用于将 epitem 链接到就绪列表上
INIT_LIST_HEAD(&epi->rdllink);
// 一个包含 poll 等待队列的列表
// 每个等待队列代表一个等待文件描述符状态变化的进程或线程
// 当进程或线程使用 epoll_wait 时,会将自己添加到与 epitem 关联的等待队列中
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
// 把目标文件 fd 和 struct file 保存在 ffd 中
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
// 初始化 poll_table,说是表,其实只存放了一个函数指针:ep_ptable_queue_proc
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
// 将 ep_poll_callback 设置到 socket 的等待队列,如果 socket 事件就绪,会回调它
revents = tfile->f_op->poll(tfile, &epq.pt);
// 将 epitem 插入红黑树
ep_rbtree_insert(ep, epi);
// 如果文件已经就绪,将其放入就绪列表中
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
// 唤醒等待的进程/线程
if (pwake)
ep_poll_safewake(&ep->poll_wait);
}
3.2 在 socket 等待队列上设置 epoll 回调
将 ep_poll_callback 设置到 socket 的等待队列,调用的是 tfile->f_op->poll 方法,实际对应 socket 的 sock_poll 函数,该函数位于 net/socket.c 中:
static unsigned int sock_poll(struct file *file, poll_table *wait)
{
struct socket *sock;
sock = file->private_data;
return sock->ops->poll(file, sock, wait);
}
sock_poll 只是通过 struct file 的 private_data 获取到实际的 struct socket 结构,继续调用 scoket 中的 poll 回调。此处的 poll 方法就是 tcp_poll 了(如果是 UDP,则为 udp_poll ),位于 net/ipv4/tcp.c 中:
// 等待 tcp 事件
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
struct sock *sk = sock->sk;
struct tcp_sock *tp = tcp_sk(sk);
// sk_sleep 是 socket 上等待队列的列表头:wait_queue_head_t *sk_sleep;
// 注意:不是 epoll 的等待队列
sock_poll_wait(file, sk->sk_sleep, wait);
// 下面是对 TCP 特殊状态的一些检查:如 connect、listen 状态
...
}
对于普通的用来读写的 socket ,这里继续调用 sock_poll_wait 方法,将等待队列的列表头、poll_table 传给了 sock_poll_wait 。sock_poll_wait 是各静态方法,位于 include/net/sock.h :
static inline void sock_poll_wait(struct file *filp,
wait_queue_head_t *wait_address, poll_table *p)
{
if (p && wait_address) {
poll_wait(filp, wait_address, p);
smp_mb();
}
}
sock_poll_wait 主要是做了一层封装,在 poll_wait 后放了一道内存栅栏,避免编译优化指令乱序可能引发的问题,还是要看下 poll_wait 怎么做的:
// include/linux/poll.h
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && wait_address)
// 终于用到 poll_table 了,qproc 是前面保存的函数指针:ep_ptable_queue_proc
p->qproc(filp, wait_address, p);
}
回过头再去看 ep_ptable_queue_proc 的实现,现在输入参数 file 是目标 socket 对应的文件结构,whead 是目标 socket 上的等待队列列表头,pt 是我们的轮询表:
// fs/eventpoll.c
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
// scoket 等待队列的回调对象有自己的格式,这里需要初始化一下,把 ep_poll_callback 塞进去
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
// 真正把回调对象挂到 socket 的等待队列上了
add_wait_queue(whead, &pwq->wait);
...
}
...
}
说了那么久的 ep_poll_callback 终于出现了,在 poll_table 回调中将其注册到了 socket 的等待队列中,后面 socket 事件就绪时,就可以调用 ep_poll_callback 来通知 epoll 了。
3.3 关系变得复杂
所以 ep_ctl 里内核主要干了三件事:
- 为待监听的描述符分配红黑树节点:epitem
- 在 socket 等待队列中添加发生事件时的回调函数:ep_poll_callback
- 将 epitem 插进红黑树中
此时,struct eventpoll 和 struct socket 开始纠缠不清了:
4 epoll_wait 等你
:啥时候到 epoll_wait 啊?
epoll_wait: 这就来了
4.1 等待就绪事件
epoll_wait 做的事情比较简单,它会查看是否由就绪的事件发生,有就直接返回啦,没有就只能等会啦~
// pos: fs/eventpoll.c
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
struct file *file;
struct eventpoll *ep;
// 最多支持监听 INT_MAX / 16 个事件
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
...
error = ep_poll(ep, events, maxevents, timeout);
}
epoll_wait 主要做了一些用户参数的检查,及获取 ep 对象等,核心逻辑在 ep_poll 中:
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
wait_queue_t wait;
if (list_empty(&ep->rdllist)) {
// 当前并没有就绪事件,这里需要将当前进程调度出去,等待 ep_poll_back 回调唤醒
// 初始化等待队列项,关联了当前进程,current 是当前进程的 task_struct
init_waitqueue_entry(&wait, current);
wait.flags |= WQ_FLAG_EXCLUSIVE;
// 添加等待队列项到 epoll 的等待队列
__add_wait_queue(&ep->wq, &wait);
for (;;) {
// 设置为不可中断状态再检查一次是否有就绪事件,避免上次检查后被调度出去丢失通知
set_current_state(TASK_INTERRUPTIBLE);
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}
// 主动 schedule 让出 CPU
jtimeout = schedule_timeout(jtimeout);
}
__remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
}
// 检查是否有事件就绪
eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
// 如果已经有就绪的事件,就将事件传输到用户空间
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;
return res;
}
4.2 共享内存?
如果有就绪事件,就调用 ep_send_events 方法将就绪事件通过某种方法传递到用户空间。网上有些文章说这里使用了共享内存以优化性能,真的有共享内存吗?看一下 ep_send_events 的实现就知道了:
pos: fs/eventpoll.c
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{
for (eventcnt = 0, uevent = esed->events;
!list_empty(head) && eventcnt < esed->maxevents;) {
epi = list_first_entry(head, struct epitem, rdllink);
revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
epi->event.events;
if (revents) {
// 拷贝就绪事件到用户空间
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
return eventcnt ? eventcnt : -EFAULT;
}
}
}
return eventcnt;
}
看到了吧,ep_send_events 内部是通过 __put_user 这个方法将数据拷贝到用户空间,并不涉及共享内存的使用(截止今日看 Linux master 分支最新代码,也仍然是使用这个方法进行拷贝)。所以这个事是个谣言!源码面前,无所遁形。
5 来了来了,数据来了
此时需要再把前面的这张图祭出来了:
前面已经讲了,epoll_ctl 执行 ADD 的时候, 把 ep_poll_callback 放到了 socket 的等待队列上,socket 上事件就绪的话,就会回调这个函数,具体来看一下吧。
5.1 tcp_rcv_established
协议栈收包流程处理逻辑还是比较复杂的,这里就不逐个展开看了,我们从收包之后的核心流程 tcp_rcv_established 看起:
// pos: net/ipv4/tcp_input.c
int tcp_rcv_established(struct sock *sk, struct sk_buff *skb,
struct tcphdr *th, unsigned len)
{
struct tcp_sock *tp = tcp_sk(sk);
// 将 skb 入到 socket 接受队列
__skb_pull(skb, tcp_header_len);
__skb_queue_tail(&sk->sk_receive_queue, skb);
// ...这里有一些用滑动窗口的处理
// 数据就绪的处理
sk->sk_data_ready(sk, 0);
return 0;
}
在 tcp_rcv_established 中,数据包已经放在 socket 的就绪队列了,然后执行 sk_data_ready ,sk_data_ready 实际是个函数指针,指向 sock_def_readable :
// pos: net/core/sock.c
static void sock_def_readable(struct sock *sk, int len)
{
// 判断 socket 的等待队列是否为空
if (sk_has_sleeper(sk))
// 执行等待队列上的回调
wake_up_interruptible_sync_poll(sk->sk_sleep, POLLIN | POLLRDNORM | POLLRDBAND);
// 通知异步 IO
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
}
执行 ep_poll_back 的核心流程在 wake_up_interruptible_sync_poll 里,其它的暂不需要关注。继续跟踪它就好。wake_up_interruptible_sync_poll 是个宏方法,内部调的是 __wake_up_sync_key :
// pos: kernel/sched.c
void __wake_up(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, void *key)
{
unsigned long flags;
spin_lock_irqsave(&q->lock, flags);
__wake_up_common(q, mode, nr_exclusive, 0, key);
}
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, int wake_flags, void *key)
{
wait_queue_t *curr, *next;
// 遍历 socket 等待队列中的元素,并依次执行其 func 函数,即 ep_poll_callback
list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
unsigned flags = curr->flags;
if (curr->func(curr, mode, wake_flags, key) &&
(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}
可以看到,收到数据后,最终会依次执行 socket 等待队列中设置的回调方法,其中就有 epoll_ctl 执行 ADD 时添加的 ep_poll_callback 回调方法。
5.2 ep_poll_callback
数据包接收的上下文实际处于软中断上下文,所以 ep_poll_callback 也是在软中断上下文中执行的:
// pos: fs/eventpoll.c
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;
// 将就绪文件描述符对应的 epitem 对象放入 epoll 的就绪列表
if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist);
/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
// 查看 epoll等待队列是否非空,是的话需要唤醒等待的任务
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
// 唤醒操作
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return 1;
}
在 ep_poll_callback 中,会先查到对应的 epitem 对象及 eventpoll 对象,将 epitem 放入就绪队列,并检查是否有进程在等待,有就执行唤醒流程。ep_poll_safewake 内部经过逐层调用,没什么营养,最终是调到了 default_wake_function :
// pos: kernel/sched.c
int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,
void *key)
{
// curr 为等待队列上被阻塞的进程,这里将会唤醒之
return try_to_wake_up(curr->private, mode, wake_flags);
}
5.3 唤醒任务
上一小节看到之前阻塞在 epoll_wait 上的进程被执行唤醒流程,那么进程睡醒后,就是从上次中断的地方开始继续运行(此时是在进程上下文的内核态):
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
// 前面是在这里 schedule 出去了
jtimeout = schedule_timeout(jtimeout);
// 进程睡醒就会走到这里了
// 检查是否有事件就绪
eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
// 如果已经有就绪的事件,就将事件传输到用户空间
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;
return res;
}
所以这里事件就绪后,继续执行原有的流程,将就绪事件拷贝到用户空间,随后 epoll_wait 系统调用返回,将从内核态切换到用户态了。
6 总结
完整的总结一下前面学到的流程:
- 用户态调用 epoll_create
- 内核态创建 eventpoll 对象,维护:红黑树、就绪队列、等待队列
- 用户态调用 epoll_ctl 执行 ADD
- 内核态将 ep_poll_callback 回调设置到 socket 的等待队列上,并将 epitem 节点插入红黑树;检查当前是否有事件就绪,有就返回,没有就让出 CPU
- 数据包到来,网卡发出硬中断,中断处理函数中再触发软中断。软中断中执行协议栈收包流程:skb 被放入 socket 等待队列,随后开始遍历执行等待队列上的回调函数,执行到了 ep_poll_callback
- 在 ep_poll_callback 回调中:epitem 被放入就绪队列;检查等待队列,如果有任务在等待,唤醒之
- 任务被唤醒,处于进程上下文的内核态中。从原来被阻塞的地方开始继续执行:把就绪的事件拷贝到用户态
- 系统调用返回,处于进程上下文的用户态了
所以为什么说 epoll 高效呢,不只有通过红黑树高效管理监听的事件,而且有通过 socket 回调机制快速找到文件对应的 epitem,并且只返回就绪事件给用户,避免用户遍历全量的文件描述符。
另外,如果在监听事件非常多、就绪事件非常多的情况下,就绪事件从内核态到用户态拷贝开销也就不可忽视了,应该可以考虑用共享内存来避免这一次拷贝。