[快速入门]Go语言的CSP并发模型
Go语言实现了一下两种并发形式:
第一种是大家普遍认知的: 多线程共享内存 。其实就许多主流编程语言中的多线程开发。
另外一种是Go语言特有的,也是Go语言推荐的: CSP(communicating sequential processes)并发模型 。该方式是Go语言最大的两个亮点goroutine和chan,二者合体的典型应用。
CSP 是 Communicating Sequential Process 的简称,中文可以叫做通信顺序进程,是一种并发编程模型,是一个很强大的并发数据模型,是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯 channel (管道)进行通信的并发模型。
Go语言其实只用到了 CSP 的很小一部分,即理论中的 Process/Channel(对应到语言中的 goroutine/channel):这两个并发原语之间没有从属关系, Process 可以订阅任意个 Channel,Channel 也并不关心是哪个 Process 在利用它进行通信;Process 围绕 Channel 进行读写,形成一套有序阻塞和可预测的并发模型。
相信大家一定见过一句话:
Do not communicate by sharing memory; instead, share memory by communicating.
不要通过共享内存来通信,而要通过通信来实现内存共享。
这就是 Go 的并发哲学,它依赖 CSP 模型,基于 channel 实现。
channel
channel的创建
channel 字面意义是 “通道”,类似于 Linux 中的管道。声明 channel 的语法如下:
chan T // 可以接收和发送类型为 T 的数据 chan<- float64 // 只可以用来发送 float64 类型的数据 <-chan int // 只可以用来接收 int 类型的数据 复制代码
使用make初始化Channel,并且可以设置容量:
make(chan int, 100) 复制代码
因为 channel 是一个引用类型,所以在它被初始化之前,它的值是 nil,channel 使用 make 函数进行初始化。可以向它传递一个 int 值,代表 channel 缓冲区的大小(容量),构造出来的是一个缓冲型的 channel;不传或传 0 的,构造的就是一个非缓冲型的 channel。
Channel 分为两种:带缓冲、不带缓冲。对不带缓冲的 channel 进行的操作实际上可以看作 “同步模式”,带缓冲的则称为 “异步模式”。
同步模式
下,发送方和接收方要同步就绪,只有在两者都 ready 的情况下,数据才能在两者间传输。否则,任意一方先行进行发送或接收操作,都会被挂起,等待另一方的出现才能被唤醒。

异步模式
下,在缓冲槽可用的情况下(有剩余容量),发送和接收操作都可以顺利进行。否则,操作的一方(如写入)同样会被挂起,直到出现相反操作(如接收)才会被唤醒。

代码示例
//这里定义两个函数,下面分别验证同步模式执行以及异步模式执行的效果 func service() { time.Sleep(time.Millisecond * 30) return "Done" } func otherTask() { fmt.Println("this is other task B") time.Sleep(time.Millisecond * 100) fmt.Println("Task B is done") } 复制代码
同步模式执行
func AsyncService() chan string { //阻塞模式,即A将信息放进channel直到有人读取,否则将一直阻塞 retCh := make(chan string) go func () { ret := service() fmt.Println("service return result") retCh <- ret fmt.Println("service exited") }() return retCh } //单元测试 func TestAsynService(t *testing.T) { retCh := AsyncService() otherTask() fmt.Println(<-retCh) time.Sleep(time.Second * 1) } 复制代码
单测结果运行如下,可以看出等到当othertask执行完开始从chan中取数据时协程才继续向下执行,在这之前一直处于挂起状态
this is other task B service return result Task B is done Done service exited 复制代码
异步模式执行
func AsyncService() chan string { retCh := make(chan string,1) //buffer模式,非阻塞 丢进channel就继续向下执行 go func () { ret := service() fmt.Println("service return result") retCh <- ret fmt.Println("service exited") }() return retCh } func TestAsynService(t *testing.T) { retCh := AsyncService() otherTask() fmt.Println(<-retCh) time.Sleep(time.Second * 1) } 复制代码
执行结果如下,可以明显的看到这种模式下并没有等待从chan中获取消息,直接向下继续运行
this is other task service return result service exited Task B is done Done 复制代码
channel的使用
1.send操作
c := make(chan int) c <- 3 复制代码
注意,往一个已经被close的channel中继续发送数据会导致 run-time panic
2.recive操作
c := make(chan int) c <- 3 i := <-c fmt.Println(i) //3 复制代码
从一个nil channel中接收数据会一直被block,直到有数据可以接收;从一个被close的channel中接收数据不会被阻塞,而是立即返回,会返回元素类型的零值(zero value)以及一个代表当前channel状态的bool值。可以通过这个特性判断channel是否关闭
if x, ok := <-ch;ok { //ok 为bool值,true标识正常接收,false表示通道关闭 ... }else{ ... } 复制代码
3.close操作
c := make(chan int) close(c) 复制代码
所有的channel接受者都会在channel关闭时,立刻从阻塞等待中返回且上述ok值为false(如果有值可取依旧会正常取值)。这个广播机制常被利用,进行向多个订阅者同时发送信号
代码示例
//数据生产者 func dataProducer(ch chan int, wg *sync.WaitGroup) { go func() { for i := 0; i < 10; i++ { ch <- i } close(ch) //channel关闭 wg.Done() }() } //数据接受者 func dataReceiver(ch chan int, wg *sync.WaitGroup) { go func() { for { if data, ok := <-ch; ok { //channel关闭后,ok值将变为false fmt.Println(data) } else { break } } wg.Done() }() } func TestCloseChannel(t *testing.T) { var wg sync.WaitGroup ch := make(chan int) wg.Add(1) dataProducer(ch, &wg) wg.Add(1) dataReceiver(ch, &wg) wg.Wait() 复制代码
与switch-case搭配实现选路
select-case语句配合channel可以实现多路选择以及超时控制功能,每个case后面跟一个阻塞事件,当有事件收到响应后则结束等待,如果均没有响应则执行default
//多渠道选择 //原理如下,采用select-case语句 每个case后面跟一个阻塞事件,当有事件收到响应后则结束等待,如果均没有响应则执行default func TestSwitch(t *testing.T){ select{ case ret1 := <-retCH1: t.Logf("case 1 return") case ret2 := <-retCH2: t.Logf("case 2 return") default: t.Logf("no one return") } } //超时控制 func TestTimeOut(t *testing.T){ select { case ret := <- retCH1: t.Logf("case 1 return") case <-time.After(time.Second*1): t.Logf("time out") } } 复制代码
欢迎关注我们的微信公众号,每天学习Go知识