Redis事件循环器(AE)实现剖析

Redis作为一个单线程高性能的内存缓存Server而被人熟知。作为一个典型的Reactor式网络应用,Redis能够达到如此高的性能,必然要依靠足够可靠的事件循环库。

Redis内置了一个高性能事件循环器,叫做AE。其定义和实现可以在 ae*.h/cpp
这些文件中找到。
AE本身就是Redis的一部分,所以整体设计原则就是够用就行。也正因为这个背景,AE的代码才可以简短干净,非常适合阅读和学习。
本文将基于Redis 5.0.6的源码分析下其事件循环器(AE)的实现原理。

同时本人也提供了一个 Redis注释版
,用以辅助理解Redis的源码。

eventloop的创建

Redis通过以下接口进行eventloop的创建和释放。

aeEventLoop *aeCreateEventLoop(int setsize);
void aeDeleteEventLoop(aeEventLoop *eventLoop);

Redis通过将对应事件注册到eventloop中,然后不断循环检测有无事件触发。目前eventloop支持超时事件和网络IO读写事件的注册。
我们可以通过aeCreateEventLoop来创建一个eventloop。可以看到在创建EventLoop的时候,必须指定一个setsize的参数。
setsize参数表示了eventloop可以监听的网络事件fd的个数(不包含超时事件),如果当前监听的fd个数超过了setsize,eventloop将不能继续注册。
我们知道,Linux内核会给每个进程维护一个文件描述符表。而POSIX标准对于文件描述符进行了以下约束:

  1. fd为0、1、2分别表示标准输入、标准输出和错误输出
  2. 每次新打开的fd,必须使用当前进程中最小可用的文件描述符。

Redis充分利用了文件描述符的这些特点,来存储每个fd对应的事件。
在Redis的eventloop中,直接用了一个连续数组来存储事件信息:

eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
for (i = 0; i events[i].mask = AE_NONE;

可以看到数组长度就是setsize,同时创建之后将每一个event的mask属性置为AE_NONE(即是0),mask代表该fd注册了哪些事件。

对于 eventLoop->events
数组来说,fd就是这个数组的下标。

例如,当程序刚刚启动时候,创建监听套接字,按照标准规定,该fd的值为3。此时就直接在 eventLoop->events
下标为3的元素中存放相应event数据。
不过也基于文件描述符的这些特点,意味着events数组的前三位一定不会有相应的fd赋值。
那么,Redis是如何指定eventloop的setsize的呢?以下是Redis创建eventloop的相关代码:

server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

其中:

--maxclients

也正是因为Redis利用了fd的这个特点,Redis只能在完全符合POSIX标准的系统中工作。其他的例如Windows系统,生成的fd或者说HANDLE更像是个指针,并不符合POSIX标准。

网络IO事件

Redis通过以下接口进行网络IO事件的注册和删除。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData);

void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);

aeCreateFileEvent表示将某个fd的某些事件注册到eventloop中。
目前可注册的事件有三种:

  1. AE_READABLE 可读事件
  2. AE_WRITABLE 可写事件
  3. AE_BARRIER 该事件稍后在” 事件的等待和处理
    “一节详细讲到。

而mask就是这几个事件经过或运算后的掩码。
aeCreateFileEvent在epoll的实现中调用了epoll_ctl函数。Redis会根据该事件对应之前的mask是否为AE_NONE,来决定使用EPOLL_CTL_ADD还是EPOLL_CTL_MOD。
同样的,aeDeleteFileEvent也使用了epoll_ctl,Redis判断用户是否是要完全删除该fd上所有事件,来决定使用EPOLL_CTL_DEL还是EPOLL_CTL_MOD。

定时器

AE中最不值得分析的大概就是定时器了。。因为实现的实在是太简单了,甚至可以说是简陋。
Redis通过以下两个接口进行定时器的注册和取消。

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);

在调用aeCreateTimeEvent注册超时事件的时候,调用方需要提供两个callback: aeTimeProc和aeEventFinalizerProc。

  • aeTimeProc很简单,就是当超时事件触发时调用的callback。有点特殊的是,aeTimeProc需要返回一个int值,代表下次该超时事件触发的时间间隔。如果返回-1,则说明超时时间不需要再触发了,标记为删除即可。
  • finalizerProc 当timer被删除的时候,会调用这个callback

Redis的定时器其实做的非常简陋,只是一个普通的双向链表,链表也并不是有序的。每次最新的超时事件,直接插入链表的最头部。
当AE要遍历当前时刻的超时事件时,也是直接暴力的从头到尾遍历链表,看看有没有超时的事件。

当时我看到这里源码的时候,还是很震惊的。因为一般来说,定时器都会采用最小堆或者时间轮等有序数据结构进行存储,
为什么Redis的定时器做的这么简陋?

《Redis的设计与实现》一书中说,在Redis 3.0版本中,只使用到了serverCon这一个超时事件。
所以这种情况下,也无所谓性能了,虽然是个链表,但其实用起来就只有一个元素,相当于当做一个指针在用。

虽然还不清楚5.0.6版本里面超时事件有没有增多,不过可以肯定的是,目前依然达不到花点时间去优化的程度。
Redis在注释里面也说明了这事,并且给出了以后的优化方案:
用skiplist代替现有普通链表,查询的时间复杂度将优化为O(1), 插入的时间复杂度将变成O(log(N))

异常处理

虽然定时器做的这么简陋,但是对于一些时间上的异常情况,Redis还是做了下基本的处理。具体可见如下代码:

if (now lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
                te->when_sec = 0;
                te = te->next;
        }
}

这段代码的意思是,如果当前时刻小于lastTime, 那意味着时间有可能被调整了。

对于这种情况,Redis是怎么处理的呢:

直接把所有的事件的超时时间都置为0, te->when_sec = 0
。这样的话,接下来检查有哪些超时时间到期的时候,所有的超时事件都会被判定为到期。相当于本次遍历把所有超时事件一次性全部激活。
因为Redis认为,在这种异常情况下,与其冒着超时事件可能永远无法触发的风险,还不如把事情提前做了。
还是基于Redis够用就行的原则,这个解决方案在Redis中显然是被接受的。
但是其实还有更好的做法,比如libevent就是通过相对时间的方式进行处理这个问题。为了解决这个几乎不会出现的异常case,libevent也花了大量代码进行处理。

事件的等待和处理

Redis中关于处理等待事件的函数有以下两个:

int aeProcessEvents(aeEventLoop *eventLoop, int flags);
void aeMain(aeEventLoop *eventLoop);

aeMain的实现很简单, 就是我们所说的事件循环了,真的就是个循环:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

而aeProcessEvents代表处理一次事件循环,那么aeProcessEvents都做了那些事情呢?

  1. 取出最近的一次超时事件。
  2. 计算该超时事件还有多久才可以触发。
  3. 等待网络事件触发或者超时。
  4. 处理触发的各个事件,包括网络事件和超时事件

为什么要取出最近的一次超时事件?这是因为对于epoll_wait来说,必须要指定一个超时时间。以下是epoll_wait的定义:

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

timeout参数单位是毫秒,如果指定了大于0的超时时间,则在这段时间内即使如果没有网络IO事件触发,epoll_wait到了指定时间也会返回。
如果超时时间指定为-1,则epoll_wait将会一直阻塞等待,直到网络事件触发。
epoll_wait的超时时间一定要指定为最近超时事件的时间间隔,这样可以防止万一这段时间没有网络事件触发,超时事件也可以正常的响应。
同时,eventloop还有两个callback: beforesleep和aftersleep,分别会在epoll_wait之前和之后调用。

接着,我们看下Redis是怎么处理已触发的网络事件的:
一般情况下,Redis会先处理读事件(AE_READABLE),再处理写事件(AE_WRITABLE)。

这个顺序安排其实也算是一点小优化, 先读后写
可以让一个请求的处理和回包都是在同一次循环里面,使得请求可以尽快地回包,

前面讲到,网络IO事件注册的时候,除了正常的读写事件外,还可以注册一个AE_BARRIER事件,这个事件就是会影响到先读后写的处理顺序。

如果某个fd的mask包含了AE_BARRIER,那它的处理顺序会是 先写后读

针对这个场景,redis举的例子是,如果在beforesleep回调中进行了fsync动作,然后需要把结果快速回复给client。这个情况下就需要用到AE_BARRIER事件,用来翻转处理事件顺序了。

操作系统的适配

Redis不仅支持Linux下的epoll,还支持其他的IO复用方式,目前支持如下四种:

  1. epoll:支持Linux系统
  2. kqueue:支持FreeBSD系统(如macOS)
  3. select
  4. evport: 支持Solaris

几个IO复用方式使用的判断顺序如下:

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

这个顺序其实也代表了四种IO复用方式的性能高低。
对于每种IO复用方式,只要实现以下8个接口就可以正常对接Redis了:

int aeApiCreate(aeEventLoop *eventLoop);
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask);
void aeApiResize(aeEventLoop *eventLoop, int setsize);
void aeApiFree(aeEventLoop *eventLoop);
int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask);
int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);
char *aeApiName(void);

在这个8个接口下面,其实底层并没有做太多的优化,只是简单的对原有API封装而已。

总结

与其他通用型的事件循环库(如libevent)不一样的是,Redis的事件循环库不用考虑太多的用户侧因素:

  1. 不用考虑ABI兼容。因为AE本身就和Redis一起编译,所以无需像libevent一样考虑库的升级问题。
  2. 不支持Windows系统,只支持unix like的系统
  3. 指定了监听fd的个数的上限,默认支持10000个客户端连接。