go sema 源码分析

sema.go semacquire1和 semrelease1 函数是 sync.mutex 用来阻塞 g 和释放 g 的实现,这两个方法也实现了类似信号量的功能,并且是针对 goroutine 的信号量,由于还没看 go 调度相关的代码,sema 里跟调度相关的逻辑也不做仔细说明和代码注解

semacquire1 函数

大致流程:获取 sudog 和 semaRoot ,其中 sudog 是 g 放在等待队列里的包装对象,sudog 里会有 g 的信息和一些其他的参数, semaRoot 则是队列结构体,内部是堆树,把和当前 g 关联的 sudog 放到 semaRoot 里,然后把 g 的状态改为等待,调用调度器执行别的 g,此时当前 g 就停止执行了。一直到被调度器重新调度执行,会首先释放 sudog 然后再去执行别的代码逻辑
semaRoot

// 一个 semaRoot 持有不同地址的 sudog 的平衡树
// 每一个 sudog 可能反过来指向等待在同一个地址上的 sudog 的列表
// 对同一个地址上的 sudog 的内联列表的操作的时间复杂度都是O(1),扫描 semaRoot 的顶部列表是 O(log n)
// n 是 hash 到并且阻塞在同一个 semaRoot 上的不同地址的 goroutines 的总数
type semaRoot struct {
    lock  mutex
    treap *sudog // root of balanced tree of unique waiters. 不同 waiter 的平衡树
    nwait uint32 // Number of waiters. Read w/o the lock. waiter 的数量
}

复制代码
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
    gp := getg()
    if gp != gp.m.curg {
        throw("semacquire not on the G stack")
    }

    // Easy case.
    if cansemacquire(addr) {
        return
    }

    // Harder case:
    //  increment waiter count
    //  try cansemacquire one more time, return if succeeded
    //  enqueue itself as a waiter
    //  sleep
    //  (waiter descriptor is dequeued by signaler)
    // 获取一个 sudog
    s := acquireSudog()
    // 获取一个 semaRoot
    root := semroot(addr)
    t0 := int64(0)
    s.releasetime = 0
    s.acquiretime = 0
    s.ticket = 0
    // 一些性能采集的参数 应该是
    if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
        t0 = cputicks()
        s.releasetime = -1
    }
    if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
        if t0 == 0 {
            t0 = cputicks()
        }
        s.acquiretime = t0
    }
    for {
        // 锁定在 semaRoot 上
        lock(&root.lock)
        // Add ourselves to nwait to disable "easy case" in semrelease.
        // nwait 加一
        atomic.Xadd(&root.nwait, 1)
        // Check cansemacquire to avoid missed wakeup.
        if cansemacquire(addr) {
            atomic.Xadd(&root.nwait, -1)
            unlock(&root.lock)
            break
        }
        // Any semrelease after the cansemacquire knows we're waiting
        // (we set nwait above), so go to sleep.
        // 加到 semaRoot treap 上
        root.queue(addr, s, lifo)
        // 解锁 semaRoot ,并且把当前 g 的状态改为等待,然后让当前的 m 调用其他的 g 执行,当前 g 相当于等待
        goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
        if s.ticket != 0 || cansemacquire(addr) {
            break
        }
    }
    if s.releasetime > 0 {
        blockevent(s.releasetime-t0, 3+skipframes)
    }
    // 释放 sudog
    releaseSudog(s)
}       
复制代码

关键的 goparkunlock 函数,调用的是 gopark函数

// 把当前的 goroutine 改为等待状态,并且调用 unlockf 函数,如果函数返回 flase,则当前 g 被恢复
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
    if reason != waitReasonSleep {
        checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy 两个 goroutine 使调度器忙时,有可能会超时
    }
    mp := acquirem()
    gp := mp.curg
    status := readgstatus(gp)
    if status != _Grunning && status != _Gscanrunning {
        throw("gopark: bad g status")
    }
    mp.waitlock = lock
    // 记住: unlockf 永远返回 true
    mp.waitunlockf = unlockf
    gp.waitreason = reason
    mp.waittraceev = traceEv
    mp.waittraceskip = traceskip
    releasem(mp)
    // can't do anything that might move the G between Ms here.
    mcall(park_m)
}
复制代码

macll 会先切换成 g0,并把当前 g 作为参数调用 park_m

// 在 g0 上继续 park
func park_m(gp *g) {
    // 当前 g 是g0
    _g_ := getg()

    if trace.enabled {
        traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
    }

    // 设置参数 g 的状态
    casgstatus(gp, _Grunning, _Gwaiting)
    // 删除参数 g 和 m 的关系
    dropg()

    if fn := _g_.m.waitunlockf; fn != nil {
        // 执行解锁操作, 假如是从 sema 过来的,fn 永远返回 true
        ok := fn(gp, _g_.m.waitlock)
        _g_.m.waitunlockf = nil
        _g_.m.waitlock = nil
        if !ok {
            if trace.enabled {
                traceGoUnpark(gp, 2)
            }
            casgstatus(gp, _Gwaiting, _Grunnable)
            execute(gp, true) // Schedule it back, never returns.
        }
    }
    // 调度其他的 g 执行
    schedule()
}
复制代码

park_m 执行之后,调度器就调度并执行其他的 g, 之前的 gp 也就等待了

semrelease1 函数

大致流程: 设置 addr 信号,从队列里取 sudog s,把 s 对应的 g 变为可执行状态,并且放在 p 的本地队列下一个执行的位置。如果参数 handoff 为 true,并且当前 m.locks == 0,就把当前的 g 放到 p 本地队列的队尾,调用调度器,因为s.g 被放到 p 本地队列的下一个执行位置,所以调度器此刻执行的就是 s.g

func semrelease1(addr *uint32, handoff bool, skipframes int) {
    root := semroot(addr)
    atomic.Xadd(addr, 1)

    // Easy case: no waiters?
    // This check must happen after the xadd, to avoid a missed wakeup
    // (see loop in semacquire).
    // 没有等待的 sudog
    if atomic.Load(&root.nwait) == 0 {
        return
    }

    // Harder case: search for a waiter and wake it.
    lock(&root.lock)
    if atomic.Load(&root.nwait) == 0 {
        // The count is already consumed by another goroutine,
        // so no need to wake up another goroutine.
        unlock(&root.lock)
        return
    }
    // 从队列里取出来 sudog ,此时 ticket == 0
    s, t0 := root.dequeue(addr)
    if s != nil {
        atomic.Xadd(&root.nwait, -1)
    }
    unlock(&root.lock)
    if s != nil { // May be slow or even yield, so unlock first
        acquiretime := s.acquiretime
        if acquiretime != 0 {
            mutexevent(t0-acquiretime, 3+skipframes)
        }
        if s.ticket != 0 {
            throw("corrupted semaphore ticket")
        }
        if handoff && cansemacquire(addr) {
            s.ticket = 1
        }
        // 把 sudog 对应的 g 改为待执行状态,并且放到 p 本地队列的下一个执行
        readyWithTime(s, 5+skipframes)
        if s.ticket == 1 && getg().m.locks == 0 {
            // Direct G handoff
            // readyWithTime has added the waiter G as runnext in the
            // current P; we now call the scheduler so that we start running
            // the waiter G immediately.
            // Note that waiter inherits our time slice: this is desirable
            // to avoid having a highly contended semaphore hog the P
            // indefinitely. goyield is like Gosched, but it does not emit a
            // GoSched trace event and, more importantly, puts the current G
            // on the local runq instead of the global one.
            // We only do this in the starving regime (handoff=true), as in
            // the non-starving case it is possible for a different waiter
            // to acquire the semaphore while we are yielding/scheduling,
            // and this would be wasteful. We wait instead to enter starving
            // regime, and then we start to do direct handoffs of ticket and
            // P.
            // See issue 33747 for discussion.
            // 调用调度器立即执行 G
            // 等待的 g 继承时间片,避免无限制的争夺信号量
            // 把当前 g 放到 p 本地队列的队尾,启动调度器,因为 s.g 在本地队列的下一个,所以调度器立马执行 s.g
            goyield()
        }
    }
}
复制代码

readyWithTime 把 sudog 对应的 g 唤醒,并且放到 p 本地队列的下一个执行位置
readWithTime 会调用 systemstack , systemstack 会切换到当前 os 线程的堆栈执行 read

// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
    if trace.enabled {
        traceGoUnpark(gp, traceskip)
    }

    status := readgstatus(gp)

    // Mark runnable.
    // 此刻的— _g_ 不是 gp
    _g_ := getg()
    mp := acquirem() // disable preemption because it can be holding p in a local var
    if status&^_Gscan != _Gwaiting {
        dumpgstatus(gp)
        throw("bad g->status in ready")
    }

    // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
    casgstatus(gp, _Gwaiting, _Grunnable)
    // 把 g 放到 p 本地队列,next 为 true, 就放在下一个执行, next 为 false,放在队尾
    runqput(_g_.m.p.ptr(), gp, next)
    // TODO 这个看了调度代码再解释
    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
        wakep()
    }
    releasem(mp)
}
复制代码

goyield 调用 mcall 执行 goyield_m, goyield_m 会把当前的 g 放到 p 本地对象的队尾, 然后执行调度器

func goyield_m(gp *g) {
    pp := gp.m.p.ptr()
    casgstatus(gp, _Grunning, _Grunnable)
    dropg()
    runqput(pp, gp, false)
    schedule()
}
复制代码