Tensorflow上手4: 初探分布式训练

今天打算简单讲讲Tensorflow分布式训练的方法和遇到的一些问题。感兴趣的朋友也可以进入 Google Colab提供的文档 中进行更多学习。

一般情况下,分布式训练主要有两种模式,一种是模型并行,一种是数据并行。模型并行指将模型分散到不同的GPU当中,每个GPU只计算和更新自己负责的参数,这种模型并行主要解决模型参数量过大的问题。数据并行是在多个GPU上运行同样的模型,但是他们的输入数据却不一样,实现更大的batch同时计算,加快模型训练速度。接下来的内容针对数据并行展开进行讨论。

Tensorflow分布式训练的支持主要是通过 tf.distribute.Strategy 来实现。现在的Tensorflow分布式库中主要包含五种不同的分布式策略:

  • MirroredStrategy
  • TPUStrategy
  • MultiWorkerMirroredStrategy
  • CentralStorageStrategy
  • ParameterServerStrategy

在这里面我很少听说有人用CentralStorageStrategy,自己也没有过尝试,所以暂时不好说些什么。TPUStrategy可以让用户更好的在Google的Tensor Processing Unites(TPUs)上面进行训练。他与MirroredStrategy类似,实现了同步的分布式训练(Synchronized distributed training)。TPUs本身有很快的all-reduce操作,而TPUStrategy则实现了一系列TPU ops来利用这一特性。对于一个不在Google的工程师,我常接触的则是多个GPU,多台机器的情况,所以这里也不对TPUStrategy进行太多探讨。

MirroredStrategy

MirroredStrategy是一种支持多张GPU在同一个机器上的同步训练方法。在训练开始时,Mirrored会在每张卡上复制一份模型,个显卡会收到tf.data.Dataset传来的数据,独立计算梯度,然后采用all-reduce的方法进行同步更新。多个显卡在通信时默认使用Nvidia NCCL进行。

我们可以深入 MirroredStrategy 的实现了解一下。基本上所有的distributed strategy都是通过某些collective ops和cross device ops进行数据通讯。MirroredStrategy也是如此,它是这样选择cross device ops的:

if len(workers) > 1:
 if not isinstance(self._cross_device_ops, cross_device_ops_lib.MultiWorkerAllReduce):
 raise ValueError(
 "In-graph multi-worker training with `MirroredStrategy` is not "
 "supported.")
 self._inferred_cross_device_ops = self._cross_device_ops
else:
 # TODO(yuefengz): make `choose_the_best` work with device strings
 # containing job names.
 self._inferred_cross_device_ops = cross_device_ops_lib.NcclAllReduce()

这也就印证了MirroredStrategy在单机多卡的情况下默认使用NCCL来进行通信的说明。具体的实现大家可以去查看 AllReduceCrossDeviceOps 的实现。

同时,上面的程序也说明MirroredStrategy可以运用到多机多卡的情况中去,然而多机多卡的情况下用户需要自己传入 cross_device_ops_lib.MultiWorkerAllReduce 进行通讯,这里MultiWorkerAllReduce支持若干种通讯方式,比如 nccl , nccl/xring , nccl/rechd , nccl/pscpu , xring , pscpu , pscpu/pscpu 等等。由于目前最佳的通讯方式需要NCCL2.0加上xring,然而Tensorflow目前使用NCCL 1.1,并且 nccl/xring 在现有的代码中有bug无法工作,所以这一模式常常被大家诟病。

在Esitmator当中使用MirroredStrategy是非常简单的,我们只需要加入以下代码即可:

mirrored_strategy = tf.contrib.distribute.MirroredStrategy(
 num_gpus_per_worker=x)
estimator_config = tf.estimator.RunConfig(
 train_distribute=mirrored_strategy)
estimator = tf.estimator.Estimator(
 model_fn=...
 config=estimator_config,
 ...)

训练脚本就会自动进行分布式训练。

ParameterServerStrategy

接下来我们先跳过MultiWorkerMirroredStrategy讨论一下ParameterServerStrategy,他是Tensorflow最初的分布式训练方法。其概念如下图所示,它由若干个parameter servers和若干个worker servers构成,parameter servers用于存储参数,workers用于计算。

https://eng.uber.com/horovod/

ParameterServerStrategy是一种严格的异步训练方法,在训练过程中worker servers会和不同的parameter servers沟通获得参数,然后计算,向parameter servers传递参数的梯度。配置一个这样的训练环境非常简单,只需要在程序运行时设置好环境变量TF_CONFIG,需要注意的是需要给分布式集群里每一个机子不同的task。

os.environ["TF_CONFIG"] = json.dumps({
 "cluster": {
 "worker": ["host1:port", "host2:port", "host3:port"],
 "ps": ["host4:port", "host5:port"]
 },
 "task": {"type": "worker", "index": 1}
})

同时,ParameterServerStrategy还有比较神奇的功能,它可以通过传入 num_gpus_per_worker 在一个worker上进行多GPU的同步计算,然后不同worker之间进行异步计算。但是由于单一worker上多GPU并没有利用NCCL进行通讯,而是直接将结果发送到CPU,所以效率非常低下。

最后要说的是Parameter Server的配置数量也非常复杂,不同的网络环境,模型大小都会对效率有影响,所以现在官方好像也不怎么推荐这种做法了。

MultiWorkerMirroredStrategy

tf.distribute.experimental.MultiWorkerMirroredStrategy在之前的版本里应该对应的是tf.contrib.distribute.CollectiveAllReduceStrategy,他与MirroredStrategy非常类似,都在每一个device上存储一份模型的备份,进行同步的分布式训练。

该策略采用CollectiveOps作为多个worker之间通讯的操作。所谓的collective op是Tensorflow自己实现的根据当前硬件环境,网络结构,和Tensor大小自动采用最佳算法进行all-reduce的计算操作。一个collective op的实现逻辑十分简单

if (CanProceedWithCompute(c, col_exec, done)) {
 col_exec->ExecuteAsync(
 c, col_params_, GetCollectiveKey(c), actual_done);
}

c 是当前op的计算状态, col_exec 是Tensorflow根据系统情况选择的collective executor,所有的all reduce,boardcast和receive操作都有collective executor去执行。

该策略目前也实现了很多优化,比方将很多个小tensor的all reduce操作变成几个大tensor的all reduce操作,以及在开发当中的采用最新NCCL 2.0进行通讯的操作,具体可以参见 Issue 24505 。可以看出Tensorflow分布式训练在被吐槽很多次后,感受到了来自Pytorch,Horovod的压力,在努力的提升自己。

最后,关于MultiWorkerMirroredStrategy的配置,有两点需要注意。一点是collective ops的策略选择,目前支持 CollectiveCommunication.RING ,采用与Horovod类似的ring-based通讯策略。另一个是 CollectiveCommunication.NCCL ,采用Nvidia NCCL进行通讯,在启动策略时可以传入参数指定:

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
 tf.distribute.experimental.CollectiveCommunication.NCCL)

另一个需要注意的是关于TF_CONFIG的设置,该策略并不需要指定Parameter server,只需要一系列worker即可,其配置如下:

TF_CONFIG = {
 'cluster': {
 'worker': ['worker1:port1', 'worker2:port2', 'worker3:port3', ...]
 },
 'task': {'type': 'worker', 'index': 0}
})

结语

今天讨论了这么久关于Tensorflow分布式训练的内容,主要目的是抛砖引玉,Tensorflow分布式训练代码里的bug不少,还有很多需要摸索的地方,欢迎大家多多交流。