译 | 如何优雅地关闭 Go 中的工作 goroutine
原文: Go – graceful shutdown of worker goroutines
在这篇博文中,我们将看看 Go 程序的优雅关闭。这类 Go 程序有一些执行任务的工作 goroutine,要求在程序关闭之前,这些工作 goroutine 必须完成任务。
介绍
在一个最近的项目中,我们有一个使用场景:一个基于 Go 的微服务不断地消费另一个第三方库发出的事件。这些事件在调用外部服务之前,会进行一些处理。而外部服务处理每个请求的速度都相当慢,但另一方面,它能够处理许多并发请求。因此,我们实现了一个简单的 worker 池,将输入事件扇出为几个并发执行的 goroutine。
总的来说,它看起来像这样:
然而,我们需要保证在该微服务关闭的时候,当前任何正在运行的对外部服务的请求必须完成,并且请求结果在我们的内部后端持久化。
worker 池和终止信号处理
worker 池模式 是一个有名的关于 worker 池的 Go 模式。此外,还有大量关于如何进行 基于 SIGTERM 通知的优雅关闭 的例子。但我们意识到,我们的一些需求使得使用场景有点更复杂。
当程序接收到 SIGTERM 或者 SIGINT 信号(例如,因容器编排器缩容到一定数目的副本数而产生的)时,在终止整个程序之前,必须允许当前任何工作中的 worker goroutine 完成它们长期运行的工作。
让事情稍微复杂些的是,我们对生产者端的库没有任何控制权。一开始我们会注册一个回调函数,每当生产端的库有了一个(我们需要的)事件,就会调用这个回调函数。该库会处于阻塞状态,直到回调函数结束执行。然后,当有更多事件产生时,库会再次调用这个函数。
worker-pool 的诸多 goroutine 通过使用标准的“对 channel 进行 range 操作”结构,来不断处理事件,例如:
func workerFunc() { for event := range jobsChan { // 阻塞直到接收到一个事件,或者该 channel 被关闭。 // handle the event... } }
这意味着,让一个 worker “结束”最干净的方式是关闭名为 “jobsChan” 的 channel。
在生产者端进行关闭
你首先学到的关于在 Go 中关闭 channel 的第一件事情之一是,如果向已关闭的 channel 发送数据,程序就会 panic。这归结于一个非常简单的规则:
“总是在生产者端关闭一个 channel(Always close a channel on the producer side)”
不管怎样,什么是生产者端呢?嗯,一般是那个将事件发送到 channel 里的 goroutine :
func callbackFunc(event int) { jobsChan<-event }
上面是我们的回调函数 callbackFunc,我们将其注册到外部库中,外部库就会将事件传给我们。 (为了让这些例子简单些,我将真实的事件替换为一个简单的整形,以作为负载。)
你要如何 安全地 保护上面的代码免于给已关闭的 channel 发送数据呢?一路沿着 Mutex、布尔型标志和 if 语句以确定是否一些_其他_ goroutine 关闭了 channel,以及控制是否应该允许发送数据,这并不简单。多留心潜在的竞争条件和不确定行为。
我们的解决方法是引入一个中间 channel 和一个内部的“消费者”,后者作为回调和任务 channel 之间的代理:
消费者函数看起来像这样:
func startConsumer(ctx context.Context) { // Loop until a ctx.Done() is received. Note that select{} blocks until either case happens for { select { case event := <-intermediateChan: jobsChan <- event case _ <- ctx.Done(): close(jobsChan) return // exit this function so we don't consume anything more from the intermediate chan } } }
好了,等下。这个 “select” 和 “ctx.Done()” 是啥?
恕我直言, select 语句是 Go 最神奇的东西之一。它允许多个 channel 的等待和协同。在这种情况下,我们或者会从中间 channel 那里收到事件,然后将其传到 jobsChan,又或者会从 context.Context 接收到取消信号。
关闭 jobsChan 之后的 return 语句将让我们离开 for 循环和函数,这确保了 不会有新事件被传递给 jobsChan,并且不会从 intermediateChan 消费到 任何事件 。
所以,要么是传递事件到 jobsChan(worker 从这里消费),要么在作为生产者的 同一个 goroutine 中 关闭 jobsChan。
关闭 jobsChan 意味着消费端的所有 worker 将会停止遍历 jobsChan:
for event := range jobsChan { // <- on the close(jobsChan), all goroutines waiting for jobs here will exit the for-loop // handle the event... }
发出取消信号
等待 Go 程序退出是一种有名的模式:
func main() { ... rest of program ... termChan := make(chan os.Signal) signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) <-termChan // Blocks here until either SIGINT or SIGTERM is received. // 接下来呢? }
在 “接下来呢?” 这一部分,捕获到 SIGINT 或者 SIGTERM 后,主 goroutine 恢复执行。我们需要告诉将事件从 intermediateChan 传到 jobsChan 的消费者,跨 goroutine 边界关闭 jobsChan。
再次,使用 Mutex 和条件语句来解决这个问题,技术上是可行的,但是相当难搞并且容易出错。作为替代,我们会利用前面提及的 context.Context 的取消支持。
在 func main() 的某个地方,我们设置了一个带取消支持的根 background context:
func main() { ctx, cancelFunc := context.WithCancel(ctx.Background()) // ... some omitted code ... go startConsumer(ctx) // pass the cancellable context to the consumer function // ... some more omitted code ... <-termChan cancelFunc() // call the cancelfunc to notify the consumer it's time to shut stuff down. }
这就是 < -ctx.Done() 这一 select case 如何被调用的,它开始优雅拆卸 channel 和 worker。
使用 WaitGroup
上面这个方法只有一个问题:调用 cancelFunc() 后,程序会立即退出,这意味着,正在动态调用中的工作 goroutine 将没有时间执行完毕,这使得我们系统中的处理有可能处于中间态。
我们需要停止关闭,直到所有的 worker 都报告说它们完成了工作。现在,我们进入 sync.WaitGroup ,它允许我们等待任意数目的 goroutine 结束!
当启动 worker 时,我们传递一个指向在 func main() 中创建的 WaitGroup 的指针:
const numberOfWorkers = 4 func main() { // ... omitted ... wg := &sync.WaitGroup{} wg.Add(numberOfWorkers) // Start [workerPoolSize] workers for i := 0; i < workerPoolSize; i++ { go workerFunc(wg) } // ... more omitted stuff ... <-termChan // wait for SIGINT / SIGTERM cancelFunc() // send the shutdown signal through the context.Context wg.Wait() // program will wait here until all worker goroutines have reported that they're done fmt.Println("Workers done, shutting down!") }
这会稍微改变我们的 worker 启动函数:
func workerFunc(wg *sync.WaitGroup) { defer wg.Done() // Mark this goroutine as done! once the function exits for event := range jobsChan { // handle the event... } }
wg.Done() 将 waitgroup 减一,一旦内部计数器变成 0,那么主 goroutine 将继续执行 wg.Wait() 之下的语句。这就完成了优雅关闭!
运行
最终程序的源代码在下一个部分。在此其中,我添加了一些日志,这样就能看看该过程发生了什么。
下面是一个带有 4 个工作 goroutine 的程序的执行输出,这里,我使用 Ctrl+C 来停止程序:
$ go run main.go Worker 3 starting Worker 2 starting Worker 1 starting Worker 0 starting Worker 3 finished processing job 0 Worker 0 finished processing job 3 ^C********************************* <-- HERE I PRESS CTRL+C Shutdown signal received ********************************* Worker 3 finished processing job 4 Worker 2 finished processing job 1 Worker 1 finished processing job 2 Consumer received cancellation signal, closing jobsChan! <-- Here, the consumer receives the <-ctx.Done() Worker 3 finished processing job 6 Worker 0 finished processing job 5 Worker 1 finished processing job 8 Worker 2 finished processing job 7 Worker 0 finished processing job 10 Worker 0 interrupted <-- Worker 0 has finished job #10, 3 left Worker 2 finished processing job 12 Worker 2 interrupted <-- Worker 2 has finished job #12, 2 left Worker 3 finished processing job 9 Worker 3 interrupted <-- Worker 3 has finished job #9, 1 left Worker 1 finished processing job 11 Worker 1 interrupted <-- Worker 1 has finished job #11, all done All workers done, shutting down!
有人可能会观察到,消费者接收到 < -ctx.Done() 的时间点实际上是不确定的,这是因为 Go 运行时调度 channel 上的通信到 select 语句的方法。Go 规范是这样说的:
“如果可以处理一个或多个通信,那么选择进行处理的那个 chanel 是通过统一的伪随机选择的。(If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection)”。
这就是为什么即使在按下 CTRL+C 之后,任务也可以被传给 worker。
另一个特别的事情是,似乎即使在关闭了 jobsChan _之后_,任务(任务 9-12)还是被传给 worker 了。恩,它们实际是在该 channel 被关闭 _之前_ 被传给 worker 的。这个现象会发生是因为我们使用了一个带有 4 个“槽” 的缓存 channel。这意味着,假定我们第三方生产者以比我们的 worker 可以处理的速度更快地不断传递新事件,如果所有四个 worker 都从 channel 中消费了一个任务并且处理它们,那么该 channel 里就可能会有四个新的事件正等待被消费。关闭 channel 并不会影响那些已经缓存到 channel 里的数据 —— Go 允许消费者消费它们。
如果我们将 jobsChan 修改为无缓存的:
jobsChan := make(chan int)
然后再次运行:
$ go run main.go .... omitted for brevity .... ^C********************************* Shutdown signal received ********************************* Worker 3 finished processing job 3 Worker 3 started job 5 Worker 0 finished processing job 4 Worker 0 started job 6 Consumer received cancellation signal, closing jobsChan! <-- again, it may take some time until the consumer is handed <-ctx.Done() Consumer closed jobsChan Worker 1 finished processing job 1 <-- From here on, we see that each worker finishes exactly one job before being interrupted. Worker 1 interrupted Worker 2 finished processing job 2 Worker 2 interrupted Worker 0 finished processing job 6 Worker 0 interrupted Worker 3 finished processing job 5 Worker 3 interrupted All workers done, shutting down!
这一次,在 channel 关闭后,我们就没有看到任何“不期望的”任务被 worker 消费了。然而,让 channel 缓存跟 worker 数相同的数据,是在不必要拖慢生产端的情况下,让 worker 保持处理数据的常见优化手法。
完整的程序
上面的代码片段在某些地方进行了简化,以使得它们尽可能简洁。带有某些结构以封装和模拟第三方生产者的完整程序如下:
package main import ( "context" "fmt" "math/rand" "os" "os/signal" "sync" "syscall" "time" ) const workerPoolSize = 4 func main() { // 创建消费者 consumer := Consumer{ ingestChan: make(chan int, 1), jobsChan: make(chan int, workerPoolSize), } // 模拟外部库:每秒发送 10 个事件 producer := Producer{callbackFunc: consumer.callbackFunc} go producer.start() // 设置取消 context 和 waitgroup ctx, cancelFunc := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} // 传递取消 context 以启动消费者 go consumer.startConsumer(ctx) // 启动 worker,并添加 [workerPoolSize] 到 WaitGroup wg.Add(workerPoolSize) for i := 0; i < workerPoolSize; i++ { go consumer.workerFunc(wg, i) } // 处理终止信号,并等待 termChan 信号 termChan := make(chan os.Signal) signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) <-termChan // 这里阻塞直到接收到信号 // 处理关闭 fmt.Println("*********************************\nShutdown signal received\n*********************************") cancelFunc() // 向 context.Context 发送取消信号 wg.Wait() // 这里阻塞直至所有 worker 完成 fmt.Println("All workers done, shutting down!") }
消费者结构:
// -- 从这里起,下面是 Consumer! type Consumer struct { ingestChan chan int jobsChan chan int } // 每当外部库传递一个事件给我们,就会调用 callbackFunc。 func (c Consumer) callbackFunc(event int) { c.ingestChan <- event } // workerFunc 启动一个 worker 函数,它会遍历 jobsChan,直到该 channel 关闭。 func (c Consumer) workerFunc(wg *sync.WaitGroup, index int) { defer wg.Done() fmt.Printf("Worker %d starting\n", index) for eventIndex := range c.jobsChan { // 模拟工作执行 1 ~ 3 秒 fmt.Printf("Worker %d started job %d\n", index, eventIndex) time.Sleep(time.Millisecond * time.Duration(1000+rand.Intn(2000))) fmt.Printf("Worker %d finished processing job %d\n", index, eventIndex) } fmt.Printf("Worker %d interrupted\n", index) } // startConsumer 作为 ingestChan 和 jobsChan 之间的代理,使用 select 语句以支持优雅关闭。 func (c Consumer) startConsumer(ctx context.Context) { for { select { case job := <-c.ingestChan: c.jobsChan <- job case <-ctx.Done(): fmt.Println("Consumer received cancellation signal, closing jobsChan!") close(c.jobsChan) fmt.Println("Consumer closed jobsChan") return } } }
最后,模拟外部库的生产者结构:
// -- Producer 模拟一个外部库,每 100ms 有新数据时simulates an external library that invokes the // 它会调用注册的回调函数。 type Producer struct { callbackFunc func(event int) } func (p Producer) start() { eventIndex := 0 for { p.callbackFunc(eventIndex) eventIndex++ time.Sleep(time.Millisecond * 100) } }
总结
我希望这篇小博文提供了一个简单的例子,说明了基于 goroutine 的 worker 池,以及如何使用基于 context 的取消、WaitGroup 和生产端 channel 关闭,来优雅关闭这些 goroutine。