Ray源码解析之整体逻辑结构

Ray Version 0.4.0
以一个Ray示例程序来说明,Ray执行多进程/分布式程序的过程。

import ray
import time

@ray.remote
def f():
    time.sleep(1)
    return 1

ray.init()
results = ray.get([f.remote() for i in range(4)])
print(results)

结果输出如下:

Process STDOUT and STDERR is being redirected to /tmp/raylogs/.
Waiting for redis server at 127.0.0.1:35084 to respond...
Waiting for redis server at 127.0.0.1:59058 to respond...
Starting local scheduler with the following resources: {'CPU': 8, 'GPU': 0}.

======================================================================
View the web UI at http://localhost:8888/notebooks/ray_ui26399.ipynb?token=7e19e5b2051ef36474500c26427d304da95835e4f0933992
======================================================================

[1, 1, 1, 1]

Process finished with exit code 0

首先我们定义 remote
函数 f
,将一个普通的Python函数变为 remote
函数只需在其上加上 @ray.remote
装饰器。

remote
是一个装饰工厂函数,返回修饰函数的装饰器,主要定义代码如下:

# ray/python/ray/worker.py
def remote(*args, **kwargs):

    worker = global_worker
    
    def make_remote_decorator(num_return_vals, num_cpus, num_gpus, resources,
                              max_calls, checkpoint_interval, func_id=None):  # 装饰器工厂函数
        # 装饰器,装饰函数,做Actor和function的区分
        def remote_decorator(func_or_class):                                  
            if inspect.isfunction(func_or_class) or is_cython(func_or_class):
                ...
                return remote_function_decorator(..)   # 是函数,调用远程函数装饰器
            if inspect.isclass(func_or_class):
                ...
                return worker.make_actor(..)           # 是actor,由全局worker创建一个actor
            raise Exception
        # 装饰器,参数是函数,起装饰函数的作用
        def remote_function_decorator(func, function_properties):
            def func_call(*args, **kwargs):
                return _submit(args=args, kwargs=kwargs)

            def _submit(...):
                ...
            def func_executor(arguments):
                """This gets run when the remote function is executed."""
                result = func(*arguments)
                return result

            def func_invoker(*args, **kwargs):
                raise Exception
            func_invoker.remote = func_call              # func.remote() 直接调用func_call
            func_invoker._submit = _submit
            func_invoker.executor = func_executor
            func_invoker.is_remote = True
            func_name = "{}.{}".format(func.__module__, func.__name__)
            func_invoker.func_name = func_name
            ...
            return func_invoker

        return remote_decorator
    if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
        # 不带参数的 @ray.remote 装饰
        return make_remote_decorator(
            num_return_vals, num_cpus, num_gpus, resources,
            max_calls, checkpoint_interval)(args[0])
    else:
        # 带参数的 @ray.remote(xx=x) 装饰
        ...
        return make_remote_decorator(num_return_vals, num_cpus, num_gpus,
                                     resources, max_calls, checkpoint_interval)

【先验知识:Python装饰器的概念】

由此可见, remote
是一个通用的装饰器,可以装饰普通的Python函数,或者是Python的class。

进入 remote
装饰器体,首先得到全局Worker,然后定义了一个 make_remote_decorator
装饰器工厂函数,然后判断是无参装饰还是带参装饰。
如果是无参装饰,那么

@ray.remote
def f():
    time.sleep(1)
    return 1

等价于

def f():
    ...
    
f = ray.remote(f)

此时,remote的参数只有一个,那就是 f
本身,也即 args[0]

所以上述代码返回 make_remote_decorator(...)(args[0])
,即调用过的 make_remote_decorator
,参数是 f

否则remote函数定义等价于:

def f():
    ...

f = ray.remote(num_cpus=1, ..)(f)

所以 remote
装饰器返回一个未调用的,将会在 f
上调用的 make_remote_decorator
函数。

make_remote_decorator
中再嵌套了一层装饰,本身提供对函数和actor的区分。

如果是函数,则进入 remote_function_decorator
远程函数装饰器;
否则是class,由全局worker创建一个actor。

远程函数装饰器 remote_function_decorator
的责任就是接受函数参数,返回一个函数,这个函数就是远程函数,不能直接传参调用(第29行)。将 remote()
绑定到 func_call
,接受参数后,提交任务( _submit_task
)运行这个函数,最后得到结果,这个结果也是 f.remote()
调用的结果,是一个 object id
,因为返回结果存在object store中。

至此,远程函数就定义好了。我们在原始的普通Python函数 f
上,装饰了一下,得到了一个可以通过 f.remote()
来调用的远程函数,如此调用将会立马提交一个任务,供Ray引擎调度执行,返回结果。

下面是 ray.init()
过程。可以理解为初始化Ray引擎的过程,类似于启动Tensorflow的Session的过程。

ray.init()
也有带参版本和无参版本。
带参版本用于已经存在并启动一个Ray集群的情况下,直接填入该集群的redis地址,即可连接到集群,就初始化好了。
无参版本适用于单机多进程的运行,这种情况下会创建一个Ray环境,默认启动一个local scheduler,一个global scheduler,一个或多个redis server, 一个object store和一个object store manager,和若干worker进程(默认为CPU核数个)。

init()
主要逻辑为:

# ray/python/ray/worker.py
init()
  _init()
    if PYTHON_MODE:
      pass
    elif start_ray_local:  # 本地开启一个Ray主节点进程
      address_info = services.start_ray_head(..)
    else:  # 连接到已有集群
      address_info = get_address_info_from_redis(redis_address, node_ip_address)
    # 将全局worker连接到 local scheduler, Plasma 和 Redis
    connect(driver_address_info, object_id_seed=object_id_seed,
            mode=driver_mode, worker=global_worker)

四个模式:

SCRIPT_MODE:如果Worker是driver,且由Python脚本启动或者在shell中交互式运行的话,使用脚本模式。会打印任务失败信息。
WORKER_MODE:如果Worker不是driver,只是slave的话,启动WORKER_MODE,不打印关于task的任何信息。
PYTHON_MODE:如果要顺序运行或是调试,可以使用PYTHON_MODE,此时的Worker即是driver。此模式下,不会发送remote函数到调度器,而是直接以阻塞的形式执行。
SILENT_MODE:测试的时候使用SILENT_MODE。不会打印error信息,因为许多测试时故意失败的。

我们的示例代码中, ray.init()
是无参的,代表我们会在本地开启一个ray head节点进程。
此部分代码简要逻辑如下:

# ray/python/ray/services.py
start_ray_head
  | start_ray_processes
    | print("Process STDOUT and STDERR is being redirected to /tmp/raylogs/.")  # 程序输出中第一行的来源
    | if redis_address is None:
    |   start_redis(...)
        |  start_redis_instance(..)
          |  创建redis_shards个redis server
          # 等待redis server可用并响应,程序输出第2,3行的来源
          |  wait_for_redis_to_start("127.0.0.1", port)   
    | if include_log_monitor:
    |   start_log_monitor(..)
    | if include_global_scheduler:
    |   start_global_scheduler(...)
        # 开启local_scheduler并打印 Starting local scheduler ..,程序第4行的来源
        | local_scheduler_name, pid = ray.local_scheduler.start_local_scheduler(...)
    | for i in range(num_local_schedulers - len(object_store_addresses)):
    |   start_objstore(...)
        | ray.plasma.start_plasma_store(..)
        | ray.plasma.start_plasma_manager(..)
    | for i in range(len(local_scheduler_socket_names), num_local_schedulers):
    |   start_local_scheduler(...)
    # 每个local scheduler默认搭配CPU核数个workers,因此workers_per_local_scheduler[i] = #cpus
    | for i, num_local_scheduler_workers in enumerate(workers_per_local_scheduler):
    |   for j in range(num_local_scheduler_workers):
    |     start_worker(...)
    | if include_webui:
    |   start_ui(...)
    # 开启UI会打印输出中UI的部分

可以看到, start_ray_head
的过程配套启动了redis, global scheduler, local scheduler及其workers,UI等。
这些都是ray执行快速的分布式任务分发的基本组件,其中redis用来存储全局系统状态,global scheduler和local scheduler分数两级调度器,负责快速的任务调度,workers负责执行远程函数,UI负责观察运行状态,不过目前UI做的还比较简陋。

每个Worker执行一个主循环 main_loop
,循环不断地接受任务,处理任务返回……

这部分代码见 ray/python/ray/workers/default_worker.py

main_loop
的代码如下:

# ray/python/ray/worker.py
def main_loop(self):
    def exit(signum, frame):
        cleanup(worker=self)
        sys.exit(0)
    signal.signal(signal.SIGTERM, exit)
    check_main_thread()
    while True:
        # 此处调用self.local_scheduler_client.get_task()获得任务
        task = self._get_next_task_from_local_scheduler()
        self._wait_for_and_process_task(task)
        |  self._wait_for_function(function_id, task.driver_id().id())
        |  with self.lock:
           |  self._process_task(task)

初始化好以后,就可以运行 f.remote()
了,运行后还是回到装饰器里面的 _submit_task
函数,

# ray/python/ray/worker.py
def _submit(args=None, kwargs=None, num_return_vals=None,
            num_cpus=None, num_gpus=None, resources=None):
    check_connected()       # 检查worker是否连接
    check_main_thread()     # 检查是否主线程,不允许非主线程提交任务
    kwargs = {} if kwargs is None else kwargs
    args = signature.extend_args(function_signature, args, kwargs)

    if _mode() == PYTHON_MODE:
        # PYTHON模式下,并不提交任务,而是串行执行,拷贝参数以防修改
        result = func(*copy.deepcopy(args))
        return result
    # 提交任务,返回结果的object id或者一组object ids
    object_ids = _submit_task(function_id, args,
                              num_return_vals=num_return_vals,
                              num_cpus=num_cpus, num_gpus=num_gpus,
                              resources=resources)
    if len(object_ids) == 1:
        return object_ids[0]
    elif len(object_ids) > 1:
        return object_ids

代码中调用的 _submit_task
是对 worker.submit_task
的一个封装:

# ray/python/ray/worker.py
def _submit_task(function_id, *args, **kwargs):
    """This is a wrapper around worker.submit_task.

    We use this wrapper so that in the remote decorator, we can call
    _submit_task instead of worker.submit_task. The difference is that when we
    attempt to serialize remote functions, we don't attempt to serialize the
    worker object, which cannot be serialized. 【这样搞一下就不需要序列化worker对象了?】
    """
    return global_worker.submit_task(function_id, *args, **kwargs)

最终,Worker的 submit_task
函数如下:

# ray/python/ray/worker.py
def submit_task(self, function_id, args, ...):
    with log_span("ray:submit_task", worker=self):
        check_main_thread()
        ...
        # 将参数put进object store,注意,如果多个函数使用的是相同的输入,直接调用的话仍然会put多次
        # 一个方法是先在调用前put参数,然后传入put后的ObjectID对象。
        args_for_local_scheduler = []
        for arg in args:
            if isinstance(arg, ray.local_scheduler.ObjectID):
                args_for_local_scheduler.append(arg)
            elif isinstance(arg, ray.actor.ActorHandleParent):
                args_for_local_scheduler.append(put(
                    ray.actor.wrap_actor_handle(arg)))
            elif ray.local_scheduler.check_simple_value(arg):
                args_for_local_scheduler.append(arg)
            else:
                args_for_local_scheduler.append(put(arg))
        ...
        # Submit the task to local scheduler.
        task = ray.local_scheduler.Task(
            self.task_driver_id,
            ray.local_scheduler.ObjectID(function_id.id()),
            args_for_local_scheduler,
            ...)
        ...
        self.task_index += 1
        self.local_scheduler_client.submit(task)

        return task.returns()

也就是说, [f.remote() for i in range(4)]
这一句,默认的全局worker会提交4个任务给local scheduler,然后local scheduler将这些任务调度到嗷嗷待哺的各个worker,前面的代码说过,默认会启动CPU核数个Worker。

运行完毕后,列表中就是返回的值的object id,我们需要使用 ray.get(id)
从object store中将真正的数据拿出来。

最后就成了 [1, 1, 1, 1]
,程序到此就结束了。
再回顾一下整个过程:

@ray.remote   # 装饰器
def f():
    time.sleep(1)
    return 1
              # 装饰完成,装饰过后的远程函数f已形成
ray.init()    # 初始化Ray引擎,会启动各个必要组件,包括调度,状态存储,对象存储和workers等
results = ray.get([f.remote() for i in range(4)])   # 提交任务,获得结果,从object store中取出
print(results)

Happy Reading!