一起读 Gevent 源码

Greenlet

我们知道 Gevent 是基于 Greenlet 实现的,greenlet 有的时候也被叫做微线程或者协程。其实 Greenlet 本身非常简单,其自身实现的功能也非常直接。区别于常规的编程思路——顺序执行、调用进栈、返回出栈—— Greenlet 提供了一种在不同的调用栈之间自由跳跃的功能。从一个简单的例子来看一下吧(摘自官方文档):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from greenlet import greenlet
 
def test1():
    print 12
    gr2.switch()
    print 34
 
def test2():
    print 56
    gr1.switch()
    print 78
 
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
 


这里,每一个 greenlet 就是一个调用栈——您可以把他想象成一个线程,只不过真正的线程可以并行执行,而同一时刻只能有一个 greenlet 在执行(同一线程里)。正如例子中最后三句话,我们创建了 gr1gr2 两个不同的调用栈空间,入口函数分别是 test1test2;这最后一句 gr1.switch() 得多解释一点。

因为除了 gr1gr2,我们还有一个栈空间,也就是所有 Python 程序都得有的默认的栈空间——我们暂且称之为 main,而这一句 gr1.switch() 恰恰实现了从 maingr1 的跳跃,也就是从当前的栈跳到指定的栈。这时,就犹如常规调用 test1() 一样,gr1.switch() 的调用暂时不会返回结果,程序会跳转到 test1 继续执行;只不过区别于普通函数调用时 test1() 会向当前栈压栈,而 gr1.switch() 则会将当前栈存档,替换成 gr1 的栈。如图所示:

对于这种栈的切换,我们有时也称之为执行权的转移,或者说 main 交出了执行权,同时 gr1 获得了执行权。Greenlet 在底层是用汇编实现的这样的切换:把当前的栈(main)相关的寄存器啊什么的保存到内存里,然后把原本保存在内存里的 gr1 的相关信息恢复到寄存器里。这种操作速度非常快,比操作系统对多进程调度的上下文切换还要快。代码在这里,有兴趣的同学可以一起研究一下(其中 switch_x32_unix.h 是我写的哈哈)。

回到前面的例子,最后一句 gr1.switch() 调用将执行点跳到了 gr1 的第一句,于是输出了 12。随后顺序执行到 gr2.switch(),继而跳转到 gr2 的第一句,于是输出了 56。接着又是 gr1.switch(),跳回到 gr1,从之前跳出的地方继续——对 gr1 而言就是 gr2.switch() 的调用返回了结果 None,然后输出 34

这个时候 test1 执行到头了,gr1 的栈里面空了。Greenlet 设计了 parent greenlet 的概念,就是说,当一个 greenlet 的入口函数执行完之后,会自动切换回其 parent。默认情况下,greenlet 的 parent 就是创建该 greenlet 时所在的那个栈,前面的例子中,gr1gr2 都是在 main 里被创建的,所以他们俩的 parent 都是 main。所以当 gr1 结束的时候,会回到 main 的最后一句,接着 main 结束了,所以整个程序也就结束了——78 从来没有被执行到过。另外,greenlet 的 parent 也可以手工设置。

简单来看,greenlet 只是为 Python 语言增加了创建多条执行序列的功能,而且多条执行序列之间的切换还必须得手动显式调用 switch() 才行;这些都跟异步 I/O 没有必然关系。

gevent.sleep

接着来看 Gevent。最简单的一个 Gevent 示例就是这样的了:

1
2
3
import gevent
gevent.sleep(1)
 

貌似非常简单的一个 sleep,却包含了 Gevent 的关键结构,让我们仔细看一下 sleep 的实现吧。代码在 gevent/hub.py

1
2
3
4
5
def sleep(seconds=0):
    hub = get_hub()
    loop = hub.loop
    hub.wait(loop.timer(seconds))
 

这里我把一些当前用不着的代码做了一些清理,只留下了三句关键的代码,其中就有 Gevent 的两个关键的部件——hublooploop 是 Gevent 的核心部件,也就是主循环核心,默认是用 Cython 写的 libev 的包装(所以性能杠杠滴),稍后会在详细提到它。hub 则是一个 greenlet,里面跑着 loop

hub 是一个单例,从 get_hub() 的源码就可以看出来:

1
2
3
4
5
6
7
8
9
10
11
12
import _thread
_threadlocal = _thread._local()
 
def get_hub(*args, **kwargs):
    global _threadlocal
    try:
        return _threadlocal.hub
    except AttributeError:
        hubtype = get_hub_class()
        hub = _threadlocal.hub = hubtype(*args, **kwargs)
        return hub
 

所以第一次执行 get_hub() 的时候,就会创建一个 hub 实例:

1
2
3
4
5
6
7
8
class Hub(greenlet):
    loop_class = config(‘gevent.core.loop’, ‘GEVENT_LOOP’)
 
    def __init__(self):
        greenlet.__init__(self)
        loop_class = _import(self.loop_class)
        self.loop = loop_class()
 

同样这是一段精简了的代码,反映了一个 hub 的关键属性——looploop 实例随着 hub 实例的创建而创建,默认的 loop 就是 gevent/core.ppyx 里的 class loop,也可以通过环境变量 GEVENT_LOOP 来自定义。

值得注意的是,截止到 hub = get_hub()loop = hub.loop,我们都只是创建了 hubloop,并没有真正开始跑我们的主循环。稍安勿躁,第三句就要开始了。

loop 有一堆接口,对应着底层 libev 的各个功能,详见此处。我们这里用到的是 timer(seconds),该函数返回的是一个 watcher 对象,对应着底层 libev 的 watcher 概念。我们大概能猜到,这个 watcher 对象会在几秒钟之后做一些什么事情,但是具体怎么做,让我们一起看看 hub.wait() 的实现吧。

1
2
3
4
5
    def wait(self, watcher):
        waiter = Waiter()
        watcher.start(waiter.switch)
        waiter.get()
 

代码也不长,不过能看到 watcher 的接口 watcher.start(method),也就是说,当给定的几秒钟过了之后,会调用这里给的函数,也就是 waiter.switch。让我们再看一下这里用到的 Waiter,都是在同一个文件 hub.py 里面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from greenlet import getcurrent
 
class Waiter(object):
    def __init__(self):
        self.hub = get_hub()
        self.greenlet = None
 
    def switch(self):
        assert getcurrent() is self.hub
        self.greenlet.switch()
 
    def get(self):
        assert self.greenlet is None
        self.greenlet = getcurrent()
        try:
            self.hub.switch()
        finally:
            self.greenlet = None
 

这里同样删掉了大量干扰因素。根据前面 wait() 的定义,我们会先创建一个 waiter,然后调用其 get(),随后几秒钟之后 loop 会调用其 switch()。一个个看。

get() 一上来会保证自己不会被同时调用到(assert),接着就去获取了当前的 greenlet,也就是调用 get() 时所处的栈,一直往前找,找到 sleep(1),所以 getcurrent() 的结果是 mainWaiter 随后将 main 保存在了 self.greenlet 引用中。

下面的一句话是重中之重了,self.hub.switch()!由不管任何上下文中,直接往 hub 里跳。由于这是第一次跳进 hub 里,所以此时 loop 就开始运转了。

正巧,我们之前已经通过 loop.timer(1)watcher.start(waiter.switch),在 loop 里注册了说,1 秒钟之后去调用 waiter.switchloop 一旦跑起来就会严格执行之前注册的命令。所以呢,一秒钟之后,我们在 hub 的栈中,调用到了 Waiter.switch()

switch() 里,程序一上来就要验证当前上下文必须得是 hub,翻阅一下前面的代码,这个是必然的。最后,跳到 self.greenlet!还记得它被设置成什么了吗?——main。于是乎,我们就回到了最初的代码里,gevent.sleep(1) 在经过了 1 秒钟的等待之后终于返回了。

回头看一下这个过程,其实也很简单的:当我们需要等待一个事件发生时——比如需要等待 1 秒钟的计时器事件,我们就把当前的执行栈跟这个事件做一个绑定(watcher.start(waiter.switch)),然后把执行权交给 hubhub 则会在事件发生后,根据注册的记录尽快回到原来的断点继续执行。

异步

hub 一旦拿到执行权,就可以做很多事情了,比如切换到别的 greenlet 去执行一些其他的任务,直到这些 greenlet 又主动把执行权交回给 hub。宏观的来看,就是这样的:一个 hub,好多个其他的任务 greenlet(其中没准就包括 main),hub 负责总调度,去依次调用各个任务 greenlet;任务 greenlet 则在执行至下一次断点时,主动切换回 hub。这样一来,许多个任务 greenlet 就可以看似并行地同步运行了,这种任务调度方式叫做协作式的任务调度(cooperative scheduling)。

举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
import gevent
 
def beep(interval):
    while True:
        print(“Beep %s” % interval)
        gevent.sleep(interval)
 
for i in range(10):
    gevent.spawn(beep, i)
 
beep(20)
 

例子里我们总共创建了 10greenlet,每一个都会按照不同频率输出“蜂鸣”;最后一句的 beep(20) 又让 main greenlet 也不断地蜂鸣。算上 hub,这个例子一共会有 12 个不同的 greenlet 在协作式地运行。

I/O

Gevent 最主要的功能当然是异步 I/O 了。其实,I/O 跟前面 sleep 的例子没什么本质的区别,只不过 sleep 用的 watchertimer,而 I/O 用到的 watcherio。比如说 wait_read(fileno) 是这样的:

1
2
3
4
5
def wait_read(fileno):
    hub = get_hub()
    io = hub.loop.io(fileno, 1)
    return hub.wait(io)
 

没什么太大区别吧,原理其实都是一样的。基于这个,我们就可以搞异步 socket 了。socket 的接口较为复杂,这里提取一些标志性的代码一起读一下吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class socket(object):
    def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0):
        self._sock = _realsocket(family, type, proto)  # 创建底层的 socket
        self._sock.setblocking(0)  # 将其设置为非阻塞的
        fileno = self._sock.fileno()  # 获得其文件描述符
        self.hub = get_hub()  # 自己留一份 hub 的引用,省的每次再现取
        io = self.hub.loop.io  # 快捷方式
        self._read_event = io(fileno, 1)  # socket 的读取事件
        self._write_event = io(fileno, 2)  # socket 的写入事件
 
    def _wait(self, watcher):
        assert watcher.callback is None  # 一个 socket 只能被一个 greenlet 用
        self.hub.wait(watcher)  # 见之前的例子,等待一个事件发生
 
    def recv(self, *args):
        sock = self._sock
        while True:
            try:
                return sock.recv(*args)  # 异步接收,要么立即成功,要么立即失败
            except error as ex:
                if ex.args[0] != EWOULDBLOCK:  # 如果失败的话,除了是异步等待的情况,
                    raise  # 其他情况都报错
            self._wait(self._read_event)  # 等待 socket 有数据可读
 

libev

最后提一点关于 libev 的东西,因为有同学也问到 Gevent 底层的调度方式。简单来说,libev 是依赖操作系统底层的异步 I/O 接口实现的,Linux 用的是 epoll,FreeBSD 则是 kqueue。Python 代码里,socket 会创建一堆 io watcher,对应底层则是将一堆文件描述符添加到一个——比如—— epoll 的句柄里。当切换到 hub 之后,libev 会调用底层的 epoll_wait 来等待这些 socket 中可能出现的事件。一旦有事件产生(可能是一次出现好多个事件),libev 就会按照优先级依次调用每个事件的回调函数。注意,epoll_wait 是有超时的,所以一些无法以文件描述符的形式存在的事件也可以有机会被触发。关于 libev 网上还有很多资料,有兴趣大家可以自行查阅。

Gevent 的性能调优

Gevent 不是银弹,不能无限制地创建 greenlet。正如多线程编程一样,用 gevent 写服务器也应该创建一个“微线程池”,超过池子大小的 spawn 应该被阻塞并且开始排队。只有这样,才能保证同时运行的 greenlet 数量不至于多到显著增加异步等待的恢复时间,从而保证每个任务的响应速度。其实,当池子的大小增加到一定程度之后,CPU 使用量的增速会放缓甚至变为 0,这时继续增加池子大小只能导致回调函数开始排队,不能真正增加吞吐量。正确的做法是增加硬件或者优化代码(提高算法效率、减少无谓调用等)。

关于 pool 的大小,我觉得是可以算出来的:

1、在压力较小、pool 资源充足的情况下,测得单个请求平均处理总时间,记作 Ta
2、根据系统需求,估计一下能接受的最慢的请求处理时间,记作 Tm
3、设 Ta 中有 Ts 的时间,执行权是不属于当前处理中的 greenlet 的,比如正在进行异步的数据库访问或是调用远端 API 等后端访问
4、在常规压力下,通过测量后端访问请求处理的平均时间,根据代码实际调用情况测算出 Ts
5、pool 的大小 = (Tm / (Ta - Ts)) * 150%,这里的 150% 是个 buffer 值,拍脑门拍出来的

比如理想情况下平均每个请求处理需要 20ms,其中平均有 15ms 是花在数据库访问上(假设数据库性能较为稳定,能够线性 scale)。如果最大能容忍的请求处理时间是 500ms 的话,那池子大小应该设置成 (500 / (20 - 15)) * 150% = 150,也就意味着单进程最大并发量是 150

从这个算法也可以看出,花在 Python 端的 CPU 时间越少,系统并发量就越高,而花在后端访问上的时间长短对并发影响不是很大——当然了,依然得假设数据库等后端可以线性 scale。

下面是我之前在 Amazon EC2 m1.small 机器上的部分测试结果,对比了同步多进程和 Gevent 在处理包含异步 PostgreSQL 和 Redis 访问的请求时的性能:

1
2
3
Log Format (per actor)
handling time for 500 requests / Time receiving 500 responses time per handling / time per request raw handling rate / request per second
 

8 actors, 128 testers: 798 rps on client

1
2
3
4
5
6
7
1230.08 ms / 5649.88 ms 2.46 ms / 11.30 ms 406.48 rps / 88.50 rps
1707.71 ms / 5938.53 ms 3.42 ms / 11.88 ms 292.79 rps / 84.20 rps
2219.12 ms / 6324.48 ms 4.44 ms / 12.65 ms 225.31 rps / 79.06 rps
1446.94 ms / 5491.89 ms 2.89 ms / 10.98 ms 345.56 rps / 91.04 rps
1064.61 ms / 5189.07 ms 2.13 ms / 10.38 ms 469.66 rps / 96.36 rps
2099.23 ms / 5844.37 ms 4.20 ms / 11.69 ms 238.18 rps / 85.55 rps
 

1 async actor with 8 concurrency limit, 128 testers: 1031 rps on client

1
2
3
4
5
6
3995.44 ms / 560.62 ms 7.99 ms / 1.12 ms 125.14 rps / 891.87 rps
4369.57 ms / 575.34 ms 8.74 ms / 1.15 ms 114.43 rps / 869.06 rps
4388.47 ms / 590.63 ms 8.78 ms / 1.18 ms 113.93 rps / 846.55 rps
4439.61 ms / 579.39 ms 8.88 ms / 1.16 ms 112.62 rps / 862.97 rps
3866.82 ms / 574.92 ms 7.73 ms / 1.15 ms 129.31 rps / 869.69 rps
 

1 async actor with no concurrency limit, 128 testers: 987 rps on client

1
2
3
4
5
6
38191.16 ms / 551.76 ms 76.38 ms / 1.10 ms 13.09 rps / 906.20 rps
34354.80 ms / 564.43 ms 68.71 ms / 1.13 ms 14.55 rps / 885.84 rps
40397.18 ms / 543.23 ms 80.79 ms / 1.09 ms 12.38 rps / 920.42 rps
45406.02 ms / 490.45 ms 90.81 ms / 0.98 ms 11.01 rps / 1019.48 rps
37106.92 ms / 581.95 ms 74.21 ms / 1.16 ms 13.47 rps / 859.18 rps
 

能看出来,同样是 8 的并发限制,同步比异步处理快两三倍(但是 load balance 拉低了同步的优势),吞吐量上虽比不上异步,但也不差。在去掉并发限制之后,吞吐量变化不大,但处理时间翻了 10 倍(因为大量 callback 开始排队,无法及时被调用到),且不稳定。

转载自演道,想查看更及时的互联网产品技术热点文章请点击http://go2live.cn