Golang限流器time/rate实现剖析

限流器是微服务中必不缺少的一环,可以起到保护下游服务,防止服务过载等作用。上一篇文章 《Golang限流器time/rate使用介绍》 简单介绍了time/rate的使用方法,本文则着重分析下其实现原理。建议在正式阅读本文之前,先阅读下上一篇文章。

上一篇文章讲到,time/rate是基于Token Bucket(令牌桶)算法实现的限流。本文将会基于源码,深入剖析下Golang是如何实现Token Bucket的。其代码也非常简洁,去除注释后,也就200行左右的代码量。

同时,我也提供了 time/rate注释版 ,辅助大家理解该组件的实现。

背景

简单来说,令牌桶就是想象有一个固定大小的桶,系统会以恒定速率向桶中放Token,桶满则暂时不放。

而用户则从桶中取Token,如果有剩余Token就可以一直取。如果没有剩余Token,则需要等到系统中被放置了Token才行。

一般介绍Token Bucket的时候,都会有一张这样的原理图:

从这个图中看起来,似乎令牌桶实现应该是这样的:

有一个Timer和一个BlockingQueue。Timer固定的往BlockingQueue中放token。用户则从BlockingQueue中取数据。

这固然是Token Bucket的一种实现方式,这么做也非常直观,但是效率太低了:我们需要不仅多维护一个Timer和BlockingQueue,而且还耗费了一些不必要的内存。

在Golang的 timer/rate 中的实现, 并没有单独维护一个Timer,而是采用了lazyload的方式,直到每次消费之前才根据时间差更新Token数目,而且也不是用BlockingQueue来存放Token,而是仅仅通过计数的方式。

Token的生成和消费

我们在上一篇文章中讲到,Token的消费方式有三种。但其实在内部实现,最终三种消费方式都调用了reserveN函数来生成和消费Token。

我们看下reserveN函数的具体实现,整个过程非常简单:

  1. 计算从当前时间到上次取Token的时刻,期间一共新产生了多少Token。

    因为我们知道Token Bucket每秒产生的Token数,只要把上次取Token的时间记录下来,求出时间差值,就可以很容易的将时间差转化为Token数。

    time/rate 中对应的实现函数为 tokensFromDuration ,后面会详细讲下。从时间转化为Token数目,大致公式如下。

    $$NewTokensNum = Rate*PastSeconds$$

    同时,当前Token数目 = 新产生的Token数目 + 之前剩余的Token数目 – 要消费的Token数目。

  2. 如果消费后剩余Token数目大于零,说明此时Token桶内仍不为空,此时无需等待。如果Token数目小于零,则需等待一段时间。

    那么这个时候,我们需要将负值的Token数转化为需要等待的时间。 time/rate 中对应的实现函数为 durationFromTokens

  3. 将需要等待的时间等相关结果返回给调用方。

从上面可以看出,其实整个过程就是利用了 Token数可以和时间相互转化 的原理。

而如果Token数为负,则需要等待相关时间即可。

注意:如果当消费时,Token桶中的Token数目已经为负值了,依然可以按照上述流程进行消费。随着负值越来越小,等待的时间将会越来越长。

从结果来看,这个行为跟用Timer+BlockQueue实现是一样的。

此外,整个过程为了保证线程安全,更新令牌桶相关数据时都用了mutex加锁。

对于Allow函数实现时,只要判断需要等待的时间是否为0即可,如果大于0说明需要等待,则返回False,反之返回True。

对于Wait函数,直接 t := time.NewTimer(delay) ,等待对应的时间即可。

float精度问题

从上面原理讲述可以看出,在Token和时间的相互转化函数 durationFromTokenstokensFromDuration 中,涉及到float64的乘除运算。

一谈到float的乘除,我们就需要小心精度问题了。

而Golang在这里也踩了坑,以下是 tokensFromDuration 最初的实现版本

func (limit Limit) tokensFromDuration(d time.Duration) float64 {
    return d.Seconds() * float64(limit)
}

这个操作看起来一点问题都没:每秒生成的Token数乘于秒数。

然而,这里的问题在于, d.Seconds() 已经是小数了。两个小数相乘,会带来精度的损失。

所以就有了这个issue: golang.org/issues/34861

修改后新的版本如下:

func (limit Limit) tokensFromDuration(d time.Duration) float64 {
    sec := float64(d/time.Second) * float64(limit)
    nsec := float64(d%time.Second) * float64(limit)
    return sec + nsec/1e9
}

time.Durationint64 的别名,代表纳秒。分别求出秒的整数部分和小数部分,进行相乘后再相加,这样可以得到最精确的精度。

数值溢出问题

我们讲reserveN函数的具体实现时,第一步就是计算从当前时间到上次取Token的时刻,期间一共新产生了多少Token,同时也可得出当前的Token是多少。

我最开始的理解是,直接可以这么做:

// elapsed表示过去的时间差
elapsed := now.Sub(lim.last)
// delta表示这段时间一共新产生了多少Token
delta = tokensFromDuration(now.Sub(lim.last))

tokens := lim.tokens + delta
if(token > lim.burst){
    token = lim.burst
}

其中, lim.tokens 是当前剩余的Token, lim.last 是上次取token的时刻。 lim.burst 是Token桶的大小。

使用tokensFromDuration计算出新生成了多少Token,累加起来后,不能超过桶的容量即可。

这么做看起来也没什么问题,然而并不是这样。

time/rate 里面是这么做的,如下代码所示:

maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last)
if elapsed > maxElapsed {
    elapsed = maxElapsed
}

delta := lim.limit.tokensFromDuration(elapsed)

tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
    tokens = burst
}

与我们最开始的代码不一样的是,它没有直接用 now.Sub(lim.last) 来转化为对应的Token数,而是

先用 lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) ,计算把桶填满的时间maxElapsed。

取elapsed和maxElapsed的最小值。

这么做算出的结果肯定是正确的,但是这么做相比于我们的做法,好处在哪里?

对于我们的代码,当last非常小的时候(或者当其为初始值0的时候),此时 now.Sub(lim.last) 的值就会非常大,如果 lim.limit 即每秒生成的Token数目也非常大时,直接将二者进行乘法运算, 结果有可能会溢出。

因此, time/rate 先计算了把桶填满的时间,将其作为时间差值的上限,这样就规避了溢出的问题。

Token的归还

而对于Reserve函数,返回的结果中,我们可以通过 Reservation.Delay() 函数,得到需要等待时间。

同时调用方可以根据返回条件和现有情况,可以调用 Reservation.Cancel() 函数,取消此次消费。

当调用 Cancel() 函数时,消费的Token数将会尽可能归还给Token桶。

此外,我们在上一篇文章中讲到,Wait函数可以通过Context进行取消或者超时等,

当通过Context进行取消或超时时,此时消费的Token数也会归还给Token桶。

然而,归还Token的时候,并不是简单的将Token数直接累加到现有Token桶的数目上,这里还有一些注意点:

restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
if restoreTokens <= 0 {
    return
}

以上代码就是计算需要归还多少的Token。其中:

r.tokens
r.timeToAct
r.lim.lastEvent

其中: r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) 指的是,从该次消费到当前时间,一共又新消费了多少Token数目。

根据代码来看,要归还的Token要是该次消费的Token减去新消费的Token。

不过这里我还没有想明白,为什么归还的时候,要减去新消费数目。

按照我的理解,直接归还全部Token数目,这样对于下一次消费是无感知影响的。这块的具体原因还需要进一步探索。

总结

Token Bucket其实非常适合互联网突发式请求的场景,其请求Token时并不是严格的限制为固定的速率,而是中间有一个桶作为缓冲。

只要桶中还有Token,请求就还可以一直进行。当突发量激增到一定程度,则才会按照预定速率进行消费。

此外在维基百科中,也提到了分层Token Bucket(HTB)作为传统Token Bucket的进一步优化,Linux内核中也用它进行流量控制。