Golang channel源码深度剖析

channel是Golang中一个非常重要的特性,也是Golang CSP并发模型的一个重要体现。简单来说就是,goroutine之间可以通过channel进行通信。

channel在Golang如此重要,在代码中使用频率非常高,以至于不得不好奇其内部实现。本文将基于 go 1.13的源码 ,分析channel的内部实现原理。

channel的基本使用

在正式分析channel的实现之前,我们先看下channel的最基本用法,代码如下:

package main
import "fmt"

func main() {
    c := make(chan int)

    go func() {
        c <- 1 // send to channel
    }()

    x := <-c // recv from channel

    fmt.Println(x)
}

在以上代码中,我们通过 make(chan int) 来创建了一个类型为int的channel。

在一个goroutine中使用 c <- 1 将数据发送到channel中。在主goroutine中通过 x := <- c 从channel中读取数据并赋值给x。

以上代码对应了channel的两种基本操作:send操作 c <- 1 和 recv操作 x := <- c , 分别表示发送数据到channel和从channel中接收数据。

此外,channel还分为有缓存channel和无缓存channel。上述代码中,我们使用的是无缓冲的channel。对于无缓冲的channel,如果当前没有其他goroutine正在接收channel数据,则发送方会阻塞在发送语句处。

我们可以在channel初始化时指定缓冲区大小,例如, make(chan int, 2) 则指定缓冲区大小为2。在缓冲区未满之前,发送方无阻塞地可以往channel发送数据,无需等待接收方准备好。而如果缓冲区已满,则发送方依然会阻塞。

channel对应的底层实现函数

在探究channel源码之前,我们至少需要先找到channel在Golang的具体实现在哪。因为我们在使用channel时,用的是 <- 符号,并不能直接在go源码中找到其实现。但是Golang的编译器必然会将 <- 符号翻译成底层对应的实现。

我们可以使用Go自带的命令: go tool compile -N -l -S hello.go , 将代码翻译成对应的汇编指令。

或者,直接可以使用 Compiler Explorer 这个在线工具。对于上述示例代码可以直接在这个链接看其汇编结果: go.godbolt.org/z/3xw5Cj

通过仔细查看以上示例代码对应的汇编指令,可以发现以下的对应关系:

  1. channel的构造语句 make(chan int) , 对应的是 runtime.makechan 函数
  2. 发送语句 c <- 1 , 对应的是 runtime.chansend1 函数
  3. 接收语句 x := <- c , 对应的是 runtime.chanrecv1 函数

以上几个函数的实现都位于go源码中的 runtime/chan.go 代码文件中。我们接下来针对这几个函数,探究下channel的实现。

channel的构造

channel的构造语句 make(chan int) ,将会被golang编译器翻译为 runtime.makechan 函数, 其函数签名如下:

func makechan(t *chantype, size int) *hchan

其中, t *chantype 即构造channel时传入的元素类型。 size int 即用户指定的channel缓冲区大小,不指定则为0。该函数的返回值是 *hchan 。hchan则是channel在golang中的内部实现。其定义如下:

type hchan struct {
    qcount   uint           // buffer中已放入的元素个数
    dataqsiz uint           // 用户构造channel时指定的buf大小
    buf      unsafe.Pointer // buffer
    elemsize uint16         // buffer中每个元素的大小
    closed   uint32         // channel是否关闭,== 0代表未closed
    elemtype *_type         // channel元素的类型信息
    sendx    uint           // buffer中已发送的索引位置 send index
    recvx    uint           // buffer中已接收的索引位置 receive index
    recvq    waitq          // 等待接收的goroutine  list of recv waiters
    sendq    waitq          // 等待发送的goroutine list of send waiters

    lock mutex
}

hchan中的所有属性大致可以分为三类:

  1. buffer相关的属性。例如buf、dataqsiz、qcount等。 当channel的缓冲区大小不为0时,buffer中存放了待接收的数据。使用ring buffer实现。
  2. waitq相关的属性,可以理解为是一个FIFO的标准队列。其中recvq中是正在等待接收数据的goroutine,sendq中是等待发送数据的goroutine。waitq使用双向链表实现。
  3. 其他属性,例如lock、elemtype、closed等。

makechan 的整个过程基本都是一些合法性检测和对 bufferhchan 等属性的内存分配,此处不再进行深入讨论了,有兴趣的可以直接看此处的源码。

通过简单分析hchan的属性,我们可以知道其中有两个重要的组件, bufferwaitqhchan 所有行为和实现都是围绕这两个组件进行的。

向channel中发送数据

channel的发送和接收流程很相似,我们先分析下channel的发送过程(如 c <- 1 ), 对应于 runtime.chansend 函数的实现。

在尝试向channel中发送数据时,如果 recvq 队列不为空,则首先会从 recvq 中头部取出一个等待接收数据的goroutine出来。并将数据直接发送给该goroutine。代码如下:

if sg := c.recvq.dequeue(); sg != nil {
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
}

recvq中是正在等待接收数据的goroutine。当某个goroutine使用recv操作(例如, x := <- c ),如果此时channel的缓存中没有数据,且没有其他goroutine正在等待发送数据(即 sendq 为空),会将该goroutine以及要接收的数据地址打包成 sudog 对象,并放入到recvq中。

继续接着讲上面的代码,如果此时 recvq 不为空,则调用send函数将数据拷贝到对应的goroutine的堆栈上。

send函数的实现主要包含两点:

memmove(dst, src, t.size)
goready(gp, skip+1)

而如果 recvq 队列为空,则说明此时没有等待接收数据的goroutine,那么此时channel会尝试把数据放到缓存中。代码如下:

if c.qcount < c.dataqsiz {
    // 相当于 c.buf[c.sendx]
    qp := chanbuf(c, c.sendx)
    // 将数据拷贝到buffer中
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
        c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
}

以上代码的作用其实非常简单,就是把数据放到buffer中而已。此过程涉及了ring buffer的操作,其中 dataqsiz 代表用户指定的channel的buffer大小,如果不指定则默认为0。其他具体的详细操作后续过程会在ring buffer一节详细讲到。

如果用户使用的是无缓冲channel或者此时buffer已满,则 c.qcount < c.dataqsiz 条件不会满足, 以上流程也并不会执行到。此时会将当前的goroutine以及要发送的数据放入到 sendq 队列中,同时会切出该goroutine。整个流程对应代码如下:

gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
    mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// 将goroutine转入waiting状态,并解锁
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

以上代码中,goparkunlock就是解锁传入的mutex,并切出该goroutine,将该goroutine置为waiting状态。 gopark 和上面的 goready 对应,互为逆操作。 goparkgoready 在runtime的源码中会经常遇到,涉及了goroutine的调度过程,这里就不再深入讨论,以后会单独写一篇文章讲解。

调用 gopark 后,对于用户侧来看,该向channel发送数据的代码语句会进行阻塞。

以上过程就是channel的发送语句(如, c <- 1 )的内部工作流程,同时整个发送过程都使用 c.lock 进行加锁,保证并发安全。

简单来说,整个流程如下:

  1. 检查recvq是否为空,如果不为空,则从recvq头部取一个goroutine,将数据发送过去,并唤醒对应的goroutine即可。
  2. 如果recvq为空,则将数据放入到buffer中。
  3. 如果buffer已满,则将要发送的数据和当前goroutine打包成 sudog 对象放入到 sendq 中。并将当前goroutine置为waiting状态。

从channel中接收数据的过程基本与发送过程类似,此处不再赘述了。具体接收过程涉及到的buffer的相关操作,会在后面进行详细的讲解。

这里需要注意的是,channel的整个发送过程和接收过程都使用 runtime.mutex 进行加锁。 runtime.mutex 是runtime相关源码中常用到的一个轻量级锁。整个过程并不是最高效的lockfree的做法。golang在这里有个issue: go/issues#8899 ,给出了lockfree的channel的方案。

channel的ring buffer实现

channel中使用了ring buffer(环形缓冲区)来缓存写入的数据。ring buffer有很多好处,而且非常适合用来实现FIFO式的固定长度队列。

在channel中,ring buffer的实现如下:

hchan 中有两个与buffer相关的变量: recvxsendx 。其中 sendx 表示buffer中可写的index, recvx 表示buffer中可读的index。 从 recvxsendx 之间的元素,表示已正常存放入buffer中的数据。

我们可以直接使用 buf[recvx] 来读取到队列的第一个元素,使用 buf[sendx] = x 来将元素放到队尾。

buffer的写入

当buffer未满时,将数据放入到buffer中的操作如下:

qp := chanbuf(c, c.sendx)
// 将数据拷贝到buffer中
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
    c.sendx = 0
}
c.qcount++

其中 chanbuf(c, c.sendx) 相当于 c.buf[c.sendx] 。以上过程非常简单,就是将数据拷贝到buffer的 sendx 的位置上。

接着,将 sendx 移到下一个位置上。如果 sendx 已到达最后一位,则将其置为0,这是一个典型的头尾相连的做法。

buffer的读取

当buffer未满时,此时 sendq 里面也一定是空的(因为如果buffer未满,用于发送数据的goroutine肯定不会排队,而是直接放数据到buffer中,具体逻辑参考上文向channel发送数据一节),这时候对于channel的读取过程 chanrecv 就比较简单了,直接从buffer中读取即可,也是一个移动 recvx 的过程。与上文buffer的写入基本一致。

sendq 里面有已等待的goroutine的时候,此时buffer一定是满的。这个时候channel的读取逻辑如下:

// 相当于c.buf[c.recvx]
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
    typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
    c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz

以上代码中, ep 接收数据的变量对应的地址。例如,在 x := <- c 中,表示变量 x 的地址。

sg 代表从sendq中取出的第一个 sudog 。并且:

  1. typedmemmove(c.elemtype, ep, qp) 表示buffer中的当前可读元素拷贝到接收变量的地址处。
  2. typedmemmove(c.elemtype, qp, sg.elem) 表示将sendq中goroutine等待发送的数据拷贝到buffer中。因为此后进行了 recv++ , 因此相当于把sendq中的数据放到了队尾。

简单来说,这里channel将buffer中队首的数据拷贝给了对应的接收变量,同时将sendq中的元素拷贝到了队尾,这样可以才可以做到数据的FIFO(先入先出)。

接下来可能有点绕, c.sendx = c.recvx , 这句话实际的作用相当于 c.sendx = (c.sendx+1) % c.dataqsiz ,因为此时buffer依然是满的,所以 sendx == recvx 是成立的。

总结

channel作为golang中最常用设施,了解其源码可以帮助我们更好的理解和使用。同时也不会过于迷信和依赖channel的性能,channel就目前的设计来说也还有更多的优化空间。

参考