写在前面
Redis作为我们日常工作中最常使用的缓存数据库,其重要性不言而喻,作为普通开发者,我们在日常开发中使用Redis,主要聚焦于Redis的基层数据结构的命令使用,很少会有人对Redis的内部实现机制进行了解,对于我而言,也是如此,但一直以来,我对于Redis的内部实现都很好奇,它为什么会如此高效,本系列文章是旨在对Redis源代码分析拆解,通过阅读Redis源代码,了解Redis基础数据结构的实现机制。
关于Redis的源码分析,已经有非常多的大佬写过相关的内容,最为著名的是《Redis设计与实现》,对于Redis源码的分析已经非常出色,本系列文章对于源码拆解时,并不会那么详细,相信大部分读者应该不是从事Redis的二次开发工作,对于源码细节过于深入,会陷入细节的泥潭,这是我在阅读源码时尽量避免的,我尽量做到对大体的脉络进行梳理,讲清楚主干逻辑,细节部分,如果读者有兴趣,可以自行参阅源码或相关资料。
本系列源代码,基于Redis 3.2.6
前言
在上一篇中浅析Redis①:命令处理核心源码分析(上),我们大致了解了Redis客户端命令请求的处理流程,在整个流程中,我们还有两个问题没有解释:
1、非阻塞的核心epoll是如何实现的?
2、Redis是如何将数据写回Client端的?
本篇我们就围绕第一个问题,寻找答案,继续看Redis客户端命令请求的处理流程。
Redis的epoll实现
Redis的非阻塞I/O是指Redis在处理客户端请求时,不会一直等待I/O操作完成,而是会尽快返回,并在I/O操作完成后通知Redis进行后续处理。
epoll作为非阻塞I/O的实现,是Linux内核提供的一种多路I/O复用机制。epoll可以监视多个文件描述符,一旦某个文件描述符就绪,epoll就会通知Redis进行后续处理。
Redis的非阻塞I/O模型可以提高并发处理能力,在阻塞I/O模型中,Redis在处理一个客户端请求时,如果遇到I/O操作,会一直等待I/O操作完成,这意味着Redis无法处理其他客户端的请求。
而在非阻塞I/O模型中,Redis在遇到I/O操作时,会尽快返回,并在I/O操作完成后通知Redis进行后续处理。这样,Redis就可以同时处理多个客户端的请求,提高了并发处理能力。
同时非阻塞I/O模型还可以减少Redis的CPU占用率。在阻塞I/O模型中,Redis在遇到I/O操作时,会一直等待I/O操作完成,这意味着Redis的CPU会一直处于占用状态。
在非阻塞I/O模型中,Redis在遇到I/O操作时,会尽快返回,并在I/O操作完成后通知Redis进行后续处理。这样CPU就不会一直处于占用状态,可以减少CPU占用率,提升CPU使用效率。
核心实现
Redis非阻塞IO的实现是基于OS的内核函数支持,源码逻辑如下:
redis.c中main方法启动,执行initServer()初始化redis配置,同时创建非阻塞事件监听器:
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
其中,ipfd_count默认参数为1024,该参数表示Redis可以同时处理的最大TCP连接数。
aeCreateEventLoop与aeCreateFileEvent的实现逻辑在ae.c文件中:
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
其中aeApiCreate()是核心创建逻辑,aeApiCreate()方法采用了类似Java中多态的实现方式,由于C本身并不支持多态,因此需要使用C中的技巧实现:
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
这段代码是Redis中的一个条件编译语句,用于根据不同的操作系统和编译器选择不同的事件驱动库。
事件驱动库是Redis的一个核心组件,用于处理各种事件,包括网络IO事件、定时器事件等。Redis支持多种事件驱动库,比如epoll、kqueue、select
等。在编译Redis时,需要根据操作系统和编译器选择合适的事件驱动库进行编译。
这段代码中,首先判断是否定义了HAVE_EVPORT
宏。如果定义了该宏,则使用ae_evport.c
文件中的事件驱动库,否则继续判断是否定义了HAVE_EPOLL
宏。如果定义了该宏,则使用ae_epoll.c
文件中的事件驱动库,否则继续判断是否定义了HAVE_KQUEUE
宏。如果定义了该宏,则使用ae_kqueue.c
文件中的事件驱动库,否则使用ae_select.c
文件中的事件驱动库。
这种条件编译技术可以使Redis在不同操作系统和编译器下具有更好的兼容性和可移植性,使得Redis可以在不同的平台上运行,并且可以充分发挥不同平台的优势。
简言之,就是根据不同的操作系统,决定选择不同的内核IO模型,优先级: evport > epoll > kqueue > select
关于系统内核实现,参考:
#ifdef HAVE_EVPORT: 如果定义了宏 HAVE_EVPORT,则包含文件 ae_evport.c。ae_evport.c 可能包含了 Solaris 10 系统使用的事件驱动库。
#else: 如果没有定义宏 HAVE_EVPORT,则继续处理后续代码。
#ifdef HAVE_EPOLL: 如果定义了宏 HAVE_EPOLL,则包含文件 ae_epoll.c。ae_epoll.c 可能包含了 Linux 系统使用的事件驱动库 epoll。
#else: 如果没有定义宏 HAVE_EPOLL,则继续处理后续代码。
#ifdef HAVE_KQUEUE: 如果定义了宏 HAVE_KQUEUE,则包含文件 ae_kqueue.c。ae_kqueue.c 可能包含了 FreeBSD 或 macOS 系统使用的事件驱动库 kqueue。
#else: 如果没有定义宏 HAVE_KQUEUE,则包含文件 ae_select.c。ae_select.c 可能包含了所有系统都支持的 select 事件驱动库,但效率较低。
#endif: 结束条件编译语句块。
我们常用的CentOS使用的是epoll
的实现,在Linux系统中,epoll
机制是一种高效的事件触发机制,可以监听大量的文件描述符,并在文件描述符上发生事件时,立即通知应用程序。使用epoll
机制时,需要使用epoll_create
函数创建一个epoll
对象,然后使用epoll_ctl
函数向epoll
对象添加或删除文件描述符,最后使用epoll_wait
函数等待事件的发生。
epoll的实现在ae_epoll.c文件中:
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
epoll_create
是Linux系统中的一个系统调用,用于创建一个epoll
对象,以便对文件描述符进行事件监听。
在Linux系统中,如果需要对多个文件描述符进行事件监听,常用的方式是使用select或poll
函数。但是随着文件描述符数量的增加,select和poll
函数的效率会逐渐降低,因为它们需要遍历所有的文件描述符,而无法实现快速的事件通知。为了解决这个问题,Linux引入了epoll
机制,通过epoll_create
系统调用创建一个epoll
对象,然后使用epoll_ctl
函数向epoll
对象添加或删除文件描述符,最后使用epoll_wait
函数等待事件的发生。
epoll_create
函数的原型如下:
#include <sys/epoll.h>
int epoll_create(int size);
其中,size参数表示epoll对象中能够监听的最大文件描述符数量,这个参数在Linux 2.6.8之后已经无效,可以忽略。epoll_create
函数返回一个整数类型的文件描述符,表示创建的epoll对象的标识符。如果创建失败,返回-1。
需要注意的是,使用epoll_create
函数创建的epoll对象是在内核中创建的,而不是在用户空间中创建的。因此,在使用epoll机制时,需要将文件描述符设置为非阻塞模式,并且需要使用epoll_ctl函数向内核注册文件描述符,从而实现文件描述符的事件监听。
epoll_ctl
是Linux系统中的一个系统调用,用于向epoll对象中添加或删除文件描述符,并设置对应的事件类型。
在Linux系统中,epoll机制是一种高效的事件触发机制,可以监听大量的文件描述符,并在文件描述符上发生事件时,立即通知应用程序。使用epoll机制时,需要使用epoll_create
函数创建一个epoll对象,然后使用epoll_ctl
函数向epoll对象添加或删除文件描述符,最后使用epoll_wait
函数等待事件的发生。
epoll_ctl
函数的原型如下:
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
其中,epfd
参数表示epoll
对象的文件描述符,op
参数表示操作类型,可以是EPOLL_CTL_ADD、EPOLL_CTL_MOD或EPOLL_CTL_DEL
,分别表示添加、修改或删除文件描述符。fd
参数表示要添加、修改或删除的文件描述符,event
参数表示要监听的事件类型,包括读事件、写事件等。
需要注意的是,使用epoll_ctl
函数添加、修改或删除文件描述符时,需要将文件描述符设置为非阻塞模式。在调用epoll_wait
函数等待事件时,如果有事件发生,epoll_wait
函数会返回一组事件列表,然后可以处理这些事件。处理完毕后,可以使用epoll_ctl
函数修改或删除已经处理过的文件描述符,然后再次调用epoll_wait
函数等待事件的发生。
epoll_wait
是一个Linux内核提供的系统调用,用于等待文件描述符上的事件。epoll是Linux内核提供的一种多路I/O复用机制,可以监视多个文件描述符,一旦某个文件描述符就绪,epoll就会通知用户进程进行后续处理。
epoll_wait
的函数原型如下:
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
参数说明如下:
- epfd:epoll实例的文件描述符。
- events:用于存放就绪文件描述符的数组。
- maxevents:events数组的大小。
- timeout:等待事件的超时时间,单位为毫秒。
epoll_wait
的返回值如下:
- 成功时,返回就绪文件描述符的数目。
- 出错时,返回-1。
epoll_wait
的使用步骤如下:
- 创建一个epoll实例,并获取其文件描述符。
- 将需要监视的文件描述符注册到epoll实例中。
- 调用epoll_wait函数,等待事件。
- 处理就绪文件描述符上的事件。
以下是epoll_wait
的使用示例:
#include <sys/epoll.h>
int main() {
// 创建一个epoll实例
int epfd = epoll_create(1024);
if (epfd == -1) {
perror("epoll_create");
return -1;
}
// 将需要监视的文件描述符注册到epoll实例中
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = 0; // 标准输入
if (epoll_ctl(epfd, EPOLL_CTL_ADD, 0, &event) == -1) {
perror("epoll_ctl");
return -1;
}
// 等待事件
struct epoll_event events[10];
int nfds = epoll_wait(epfd, events, 10, -1);
if (nfds == -1) {
perror("epoll_wait");
return -1;
}
// 处理就绪文件描述符上的事件
for (int i = 0; i < nfds; ++i) {
if (events[i].events & EPOLLIN) {
// 读取标准输入
char buf[1024];
int n = read(events[i].data.fd, buf, sizeof(buf));
if (n == -1) {
perror("read");
return -1;
}
// ...
}
}
return 0;
}
在上述示例中,我们创建了一个epoll实例,并将标准输入注册到epoll实例中。然后,我们调用epoll_wait
函数,等待标准输入上的数据到达。如果标准输入上有数据到达,epoll_wait
函数就会返回,并将就绪文件描述符的相关信息保存在events
数组中。最后,我们遍历events
数组,处理每个就绪文件描述符上的事件。
epoll_wait
是Linux内核提供的一种高效的多路I/O复用机制。它可以提高程序的并发处理能力,减少CPU占用率。
ae_epoll.c
文件中封装了一系列的epoll操作,包括epoll的创建、新增、删除、扩容、等待。
那这个非阻塞IO是怎么工作的?
核心关注aeApiPoll()
:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
epoll_wait
等待FD的就绪通知,如果FD准备完毕,则进行数据流处理,否则就阻塞等待,在Redis启动时,会在main函数中创建一个死循环,轮询监听epoll事件,当有事件就绪时,执行事件的回调函数,即我们上一篇中所讲到的,具体的命令执行函数。
ae.c aeMain()
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
//省略部分非核心代码
.....
// 等待epoll就绪事件
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
// 核心:执行命令对应的回调函数
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 核心:执行命令执行结果数据,写回客户端,回调函数
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
就此,命令执行流程的epoll部分,就此完成,我们还是用一张图描述整个执行过程:
结语
本篇,我们对Redis源码中非阻塞的核心epoll是如何实现进行了浅析,简单了解了Redis中epoll的工作流程,至此,我们已经大体了解了Redis如何处理执行来自客户端的命令请求,但是还有一个问题我们没有清楚,Redis是如何将命令读取到的数据返回客户端的,下一篇中,我们将围绕这个问题,进行拆解,敬请期待。