Go cond 源码学习

概述

cond是go语言sync提供的条件变量,通过cond可以让一系列的goroutine在触发某个条件时才被唤醒。每一个cond结构体都包含一个锁L。cond提供了三个方法:

  • Signal:调用Signal之后可以唤醒单个goroutine。
  • Broadcast:唤醒等待队列中所有的goroutine。
  • Wait:会把当前goroutine放入到队列中等待获取通知,调用此方法必须先Lock,不然方法里会调用Unlock()报错。

简单使用

创建40个goroutine都wait阻塞住。调用Signal则唤醒第一个goroutine。调用Broadcast则唤醒所有等待的goroutine。

package main

import (
    "fmt"
    "sync"
    "time"
)

var locker = new(sync.Mutex)
var cond = sync.NewCond(locker)

func test(x int) {
    cond.L.Lock() //获取锁
    cond.Wait()   //等待通知  暂时阻塞
    fmt.Println(x)
    time.Sleep(time.Second * 1)
    cond.L.Unlock() //释放锁
}
func main() {
    for i := 0; i < 40; i++ {
        go test(i)
    }
    fmt.Println("start all")
    time.Sleep(time.Second * 3)
    fmt.Println("broadcast")
    cond.Signal() // 下发一个通知给已经获取锁的goroutine
    time.Sleep(time.Second * 3)
    cond.Signal() // 3秒之后 下发一个通知给已经获取锁的goroutine
    time.Sleep(time.Second * 3)
    cond.Broadcast() //3秒之后 下发广播给所有等待的goroutine
    time.Sleep(time.Second * 60)
}

源码分析

Cond

type Cond struct {
    noCopy noCopy

    // 锁的具体实现,通常为 mutex 或者rwmutex
    L Locker
    // notifyList对象,维护等待唤醒的goroutine队列,使用链表实现
    notify  notifyList
    checker copyChecker
}

// 新建cond初始化cond对象
func NewCond(l Locker) *Cond {
    return &Cond{L: l}
}

type notifyList struct {
    // 等待数量
    wait uint32

    // 通知数量
    notify uint32

    // 锁对象
    lock mutex
    // 链表头
    head *sudog
    // 链表尾
    tail *sudog
}

Wait

// 等待函数
func (c *Cond) Wait() {
    c.checker.check()
    // 等待计数器加1 看下面具体实现
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    // 
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

// 此函数在sema.go中控制计数器加1
func notifyListAdd(l *notifyList) uint32 {
    // This may be called concurrently, for example, when called from
    // sync.Cond.Wait while holding a RWMutex in read mode.
    return atomic.Xadd(&l.wait, 1) - 1
}

// 此函数在sema.go中
// 获取当前goroutine 添加到链表末端,然后goparkunlock函数休眠阻塞当前goroutine
// goparkunlock函数会让出当前处理器的使用权并等待调度器的唤醒
func notifyListWait(l *notifyList, t uint32) {
    lock(&l.lock)

    // Return right away if this ticket has already been notified.
    if less(t, l.notify) {
        unlock(&l.lock)
        return
    }

    // Enqueue itself.
    s := acquireSudog()
    s.g = getg()
    s.ticket = t
    s.releasetime = 0
    t0 := int64(0)
    if blockprofilerate > 0 {
        t0 = cputicks()
        s.releasetime = -1
    }
    if l.tail == nil {
        l.head = s
    } else {
        l.tail.next = s
    }
    l.tail = s
    goparkunlock(&l.lock, "semacquire", traceEvGoBlockCond, 3)
    if t0 != 0 {
        blockevent(s.releasetime-t0, 2)
    }
    releaseSudog(s)
}

Broadcast

唤醒链表中所有的阻塞中的goroutine,还是使用readyWithTime来实现这个功能

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

// 源代码在sema.go中
func notifyListNotifyAll(l *notifyList) {
    // Fast-path: if there are no new waiters since the last notification
    // we don't need to acquire the lock.
    if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
        return
    }

    // Pull the list out into a local variable, waiters will be readied
    // outside the lock.
    lock(&l.lock)
    s := l.head
    l.head = nil
    l.tail = nil

    // Update the next ticket to be notified. We can set it to the current
    // value of wait because any previous waiters are already in the list
    // or will notice that they have already been notified when trying to
    // add themselves to the list.
    atomic.Store(&l.notify, atomic.Load(&l.wait))
    unlock(&l.lock)

    // Go through the local list and ready all waiters.
    for s != nil {
        next := s.next
        s.next = nil
        readyWithTime(s, 4)
        s = next
    }
}

Signal

// 调用runtime_notifyListNotifyOne方法唤醒链表头的goroutine
func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

// runtime_notifyListNotifyOne具体实现 获取链表头部的G,然后调用readyWithTime唤醒goroutine
// 源代码在sema.go中
func notifyListNotifyOne(l *notifyList) {
    if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
        return
    }

    lock(&l.lock)

    t := l.notify
    if t == atomic.Load(&l.wait) {
        unlock(&l.lock)
        return
    }

    atomic.Store(&l.notify, t+1)
    
    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
        if s.ticket == t {
            n := s.next
            if p != nil {
                p.next = n
            } else {
                l.head = n
            }
            if n == nil {
                l.tail = p
            }
            unlock(&l.lock)
            s.next = nil
            readyWithTime(s, 4)
            return
        }
    }
    unlock(&l.lock)
}