Go语言定时器的实现
微信公众号:LinuGo,欢迎关注
我们都知道,Time.sleep(d duration)方法会阻塞一个协程的执行直到d时间结束。
用法很简单,但内部实现却是大有文章,每个go版本的timer的实现都有所不同,本文基于go1.14,接下来分别从宏观和围观介绍一遍主要调度实现过程。
图文演示
下面介绍一种最简单的场景:
首先存在多个goroutine,GT为有time.Sleep休眠的g,当GT被调度到m上执行时,场景如下图。
此时执行到了time.Sleep代码,GT会与m解绑,同时将该GT的sleep时间等信息记录到P的timers字段上,此时GT处于Gwaiting状态,不在运行队列上,调度器会调度一个新的G2到M上执行。(在每次调度过程中,会检查P里面记录的定时器,看看有没有要执行的。)
G2执行完了,当要进行下一轮调度时,调度器检查自己记录的定时器时发现,GT到时间了,是时候执行了。由于任务紧急,GT就会被强行插入到P的运行队列的对头,保证能马上被执行到。
接下来就会直接调度到GT执行了,睡眠结束。接下来跟随这个简单场景看一下源码实现。
阶段一、进入睡眠
首先调用time.Sleep(1)会经过编译器识别//go:linkname链接进入到runtime.timeSleep(n int)方法。
//go:linkname timeSleep time.Sleep func timeSleep(ns int64) { if ns <= 0 { //判断入参是否正常 return } gp := getg() //获取当前的goroutine t := gp.timer //如果不存在timer,new一个 if t == nil { t = new(timer) gp.timer = t } t.f = goroutineReady //后面唤醒时候会用到,修改goroutine状态为goready t.arg = gp t.nextwhen = nanotime() + ns //记录上唤醒时间 gopark(resetForSleep, unsafe.Pointer(t), waitReasonSleep, traceEvGoSleep, 1) //调用gopark挂起goroutine }
resetForSleep作为一个函数入参,他的调用栈依次为resettimer(t, t.nextwhen) -> modtimer(t, when, t.period, t.f, t.arg, t.seq)(后文会讲到),在后面的modtimer里面会将timer定时器加入到当前goroutine所在的p中,定时器在p中的结构为一个四叉堆,最近的时间的放在最堆顶上,对于这个数据结构没有做深入研究。
接下来看一下gopark中重要的部分。
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { ......省略了大部分代码 mp.waitlock = lock //由于runningG和p没有连接,将timer赋值到当前m上,后面会给到p mp.waitunlockf = unlockf //将函数付给m ...... mcall(park_m) //将当前的g停放 }
看一下gopark里面的mcall里面的回调函数park_m中的部分。
func park_m(gp *g) { _g_ := getg() //获取当前goroutine ...... casgstatus(gp, _Grunning, _Gwaiting) //将goroutine状态设为waiting dropg() if fn := _g_.m.waitunlockf; fn != nil { //获取到mresetForSleep函数 ok := fn(gp, _g_.m.waitlock) //返回值是true _g_.m.waitunlockf = nil //清空该m的函数空间 _g_.m.waitlock = nil //... ...... } schedule() //触发新的调速循环,可执行队列中获取g到m上进行调度 }
看一下resetForSleep回调函数,里面依次调用了resettimer(t, t.nextwhen) -> modtimer(t, when, t.period, t.f, t.arg, t.seq),resettimer函数没有什么重要信息,只负责返回一个true,看一下modtimer函数。
func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) { ...... loop: for { switch status = atomic.Load(&t.status); status { ...... case timerNoStatus, timerRemoved: //由于刚创建,所以timer为默认值0,对应timerNoStatus mp = acquirem() if atomic.Cas(&t.status, status, timerModifying) { wasRemoved = true //设置标志位为true break loop } releasem(mp) badTimer() } } t.period = period t.f = f //上文传过来的goroutineReady函数,用于将g转变为runnable状态 t.arg = arg //上文的g实例 t.seq = seq if wasRemoved { //会执行到此处 t.when = when pp := getg().m.p.ptr() //获取当前的p的指针 lock(&pp.timersLock) //加锁,为了并发安全,因为timer可以去其他的p偷取 doaddtimer(pp, t) //添加定时器到当前的p unlock(&pp.timersLock) //解锁 if !atomic.Cas(&t.status, timerModifying, timerWaiting) { //转变到timerWaiting badTimer() } ...... }
当触发完gopark方法,该goroutine脱离当前的m挂起,进入gwaiting状态,不在任何运行队列上。对应上图2。
阶段二、恢复执行
执行的恢复会在shedule()或者findRunnable()函数上,内部checkTimers(pp, 0)方法,该方法内部会判断p中timers堆顶的定时器,如果时间到了的话(当前时间大于计算的时间),调用 runtime.runOneTimer ,该方法里面会一系列调用到goready方法释放阻塞的goroutine,并将该goroutine放到运行队列的第一个。
接下来看一下checkTimers函数:
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) { ......省略掉调整计时器时间的一些步骤 lock(&pp.timersLock) //加锁 adjusttimers(pp) //调整计时器的时间 rnow = now if len(pp.timers) > 0 { if rnow == 0 { rnow = nanotime() } for len(pp.timers) > 0 { if tw := runtimer(pp, rnow); tw != 0 { //进入runtimer方法,携带系统时间参数与处理器 if tw > 0 { pollUntil = tw } break } ran = true } } ...... }
进入runtimer方法,会查看p里面的堆顶的定时器,检查是否需要执行。
func runtimer(pp *p, now int64) int64 { for { t := pp.timers[0] //遍历堆顶的定时器 ....... switch s := atomic.Load(&t.status); s { case timerWaiting: //经过time.Sleep的定时器会是waiting状态 if t.when > now { //判断是否超过时间 // Not ready to run. return t.when } if !atomic.Cas(&t.status, s, timerRunning) { //修改计时器状态 continue } runOneTimer(pp, t, now) //运行该计时器函数 return 0 ........
接下来调用runOneTimer函数处理。
func runOneTimer(pp *p, t *timer, now int64) { ........ f := t.f //goready函数 arg := t.arg //就是之前传入的goroutine seq := t.seq //默认值0 if t.period > 0 { ......... //由于period为默认值0,会走else里面 } else { dodeltimer0(pp) //删除该计时器在p中,该timer在0坐标位 if !atomic.Cas(&t.status, timerRunning, timerNoStatus) { //设置为nostatus badTimer() } }....... unlock(&pp.timersLock) f(arg, seq) //执行goroutineReady方法,唤起等待的goroutine ......... }
看一下上面的f(arg,seq)即goroutineReady方法的实现,该函数的实现就是直接调用了goready方法唤起goroutine,对应上图3:
func goroutineReady(arg interface{}, seq uintptr) { goready(arg.(*g), 0) //该处传入的第二个参数代表调度到运行队列的位置,该处设置为0,说明直接调度到运行队列即将要执行的位置,等待被执行。 }
另外,系统监控sysmon函数也可以触发定时器的调用,该函数是一个循环检查系统中是否拥有应该被运行但是还在等待的定时器,并调度他们运行。
对于time.NewTimer函数等,实现方法也是大致相似,只是回调函数变成了sendTime函数,该函数不会阻塞。调用该函数后,睡眠的goroutine会从channel中释放并加入运行队列,有兴趣可以自己研究一下。
以上就是整个time.sleep的调度过程,你可以根据我总结的对照源码一步一步看,肯定会加深印象,深入理解。