并发编程趣题: 制造一氧化二氢

最近leetcode的算法题中新增加了一个并发问题的 分类 , 目前提供了四道题,一道简单题,两道中级题和一道 Hard 题。这是leetcode尝试在并发算法分类的一个尝试,虽然目前还在摸索阶段,有的题目可以通过 作弊 的方式提供高分答案,有的即使是通过了测试,但是其实还是有并发的问题的,只不过测试用例没有覆盖到而已。

这也说明了并发编程并不是一件很容易的事情,但是还是很高兴leetcode能在这个方面往前走一步。其中的最难的那道题,看它第一眼的时候觉得还是比较简单的,但是仔细思考后却发现实现它并不是一件很容易的事情,即使目前的接受率才51.9%,但其实已接收的提交有很多还是有并发的问题的,只不过由于验证的问题并没有验证出来。

这道题 是这样子的,利用两个氢原子和一个氧原子生成一氧化二氢, 一氧化二氢这种化合物在常温常压下为无色无味的透明液体,也就是我们常说的 。这道题目我大致翻译一下:

oxygen (氧)和 hydrogen (氢) 两个线程,你的目标是把它们分组生成水分子。当然这里有道闸门(barrier)在水分子生成之前不得不等待。 oxygenhydrogen 线程每三个会被分成一组,包括两个 hydrogen 线程和一个 oxygen ,它们三个每个都会产生一个对应的原子组合成水分子。 你必须保证当前分组内的各个线程提供的原子组合成一个水分子之后这些线程才能参与产生下一个水分子

换句话说:

  • 假如一个 oxygen 到达了闸门,而其它 hydrogen 还没来,那么这个 oxygen 会一直等待另两个 hydrogen 线程
  • 假如 hydrogen 线程到达了闸门,而其它的线程还没来,那么它会等待 oxygen 线程和另外一个 hydrogen 线程

所以,一个水分子总是由两个 hydrogen 线程和 oxygen 提供。

你不必担心线程是如何分组的,外部调度器可以正确地把两个 hydrogen 线程和一个 oxygen 线程分成一组。问题的关键是线程组要完整地通过闸门。

你需要编写同步代码,保证线程能够按照前面的要求有序地生成水分子。

举几个例子:

Input: "HOH"
Output: "HHO"
Explanation: "HOH" 和 "OHH" 也是合法的答案
Input: "OOHHHH"
Output: "HHOHHO"
Explanation: "HOHHHO", "OHHHHO", "HHOHOH", "HOHHOH", "OHHHOH", "HHOOHH", "HOHOHH" 和 "OHHOHH" 也是合法的答案

约束条件:

  • 总的输入是 3n , 这里 1 ≤ n ≤ 30
  • 总的H的数量是 2n
  • 总的O的数量是 O

目前leetcode这类题目只支持C、C++、Python和Java,还不支持其它编程语言。一开始我觉得这道题目还是比较容易解决的,使用三个Java的Semaphore对线程进行编排,提交解决方案也顺利通过的,耗时也在最前面。过了两天再品味这道题的时候,发现实现还是有问题的,因为同一个 hydrogen 线程提供两个氢原子也可以顺利通过测试,随后看了讨论区的答案,很多也是有问题的,输出结果可能没问题,但是实际线程的运行并不符合题目的要求。

下面尝试使用Go来解决这个问题。

首先定义这个题的实现框架,保持和网站上其它语言一致:

type H2O struct {
}


func (h2o *H2O) hydrogen(releaseHydrogen func()) {
    // releaseHydrogen() outputs "H". Do not change or remove this line.
    releaseHydrogen()
}

func (h2o *H2O) oxygen(releaseOxygen func()) {
    // releaseOxygen() outputs "O". Do not change or remove this line.
    releaseOxygen()
}

hydrogen 线程会调用 hydrogen 方法,而 oxygen 线程会调用 oxygen 方法。一组线程(三个线程,包括两个 hydrogen 和一个 oxygen 线程)调用完各自的方法后,输出完 HHO (或者 HOHOHH )后才能进行下一次的调用。

一个 hydrogen 线程在本次 HHO 完成后才能再次调用 hydrogen 方法。

一个 oxygen 线程在本次 HHO 完成后才能再次调用 oxygen 方法。

你现在的工作就是在这个代码中添加同步逻辑,保证上面的约束要满足,也就是程序能够按照要求顺利进行。

因为各个线程的调度是无法控制的,先到的线程需要等待,目前还没有太好的办法,我们就让它暂时sleep,等水分子所需的原子都满足后触发事件,然后让它们合成水分子。

首先考虑使用信号量保证没有过多的线程调用这个方法。一个水分子是有两个氢原子和一个氧原子组成的,我们就使用两个信号量来控制它们。

因为 hydrogen 方法会被两个线程(goroutine)调用,不太好控制,而 oxygen 被一个线程来控制,我们基于它触发事件。

实现代码如下:

package water

import (
    "context"

    "golang.org/x/sync/semaphore"
)

type H2O struct {
    semaH   *semaphore.Weighted
    semaO   *semaphore.Weighted
    phaser1 chan struct{}
    phaser2 chan struct{}
}

func New() *H2O {
    return &H2O{
        semaH:   semaphore.NewWeighted(2),
        semaO:   semaphore.NewWeighted(1),
        phaser1: make(chan struct{},2),
        phaser2: make(chan struct{},2),
    }
}

func (h2o *H2O) hydrogen(releaseHydrogen func()) {
    h2o.semaH.Acquire(context.Background(),1)

    // releaseHydrogen() outputs "H". Do not change or remove this line.
    releaseHydrogen()

    <-h2o.phaser1 //wait oxygen goroutine for preparing oxygen

    <-h2o.phaser2 // wait water molecule

    h2o.semaH.Release(1)
}

func (h2o *H2O) oxygen(releaseOxygen func()) {
    h2o.semaO.Acquire(context.Background(),1)

    h2o.phaser1 <- struct{}{} // trigger hydrogen goroutine that oxygen is ready
    h2o.phaser1 <- struct{}{} // trigger another hydrogen goroutine that oxygen is ready

    // releaseOxygen() outputs "O". Do not change or remove this line.
    releaseOxygen()

    h2o.phaser2 <- struct{}{} // trigger hydrogen goroutine to prepare next water molecule
    h2o.phaser2 <- struct{}{} // trigger another hydrogen goroutine to prepare next water molecule

    h2o.semaO.Release(1)
}

编写一个测试用例来测试一下(我们就使用三个goroutine来测试):

package water

import (
    "math/rand"
    "sort"
    "sync"
    "testing"
    "time"
)

func TestWaterFactory(t *testing.T) {
    var ch chan string

    releaseHydrogen := func() {
        ch <- "H"
    }

    releaseOxygen := func() {
        ch <- "O"
    }

    var N =100
    ch = make(chan string, N*3)

    h2o := New()
    var wg sync.WaitGroup
    wg.Add(N *3)
    // h1
    go func() {
        for i :=0; i < N; i++ {
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            h2o.hydrogen(releaseHydrogen)
            wg.Done()
        }
    }()

    // h2
    go func() {
        for i :=0; i < N; i++ {
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            h2o.hydrogen(releaseHydrogen)
            wg.Done()
        }
    }()

    // o
    go func() {
        for i :=0; i < N; i++ {
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            h2o.oxygen(releaseOxygen)
            wg.Done()
        }
    }()

    wg.Wait()

    if len(ch) != N*3 {
        t.Fatalf("expect %d atom but got %d", N*3, len(ch))
    }

    var s = make([]string,3)
    for i :=0; i < N; i++ {
        s[0] = <-ch
        s[1] = <-ch
        s[2] = <-ch
        sort.Strings(s)

        water := s[0] + s[1] + s[2]
        if water != "HHO" {
            t.Fatalf("expect a water molecule but got %s", water)
        }
    }
}

晚会回家的路上时候想,如果不是生成水分子,而是使用四个线程一组生成双氧水,也就是过氧化氢 H₂O₂ ,又该如何编写同步代码呢?