Ray源码解析之整体逻辑结构
以一个Ray示例程序来说明,Ray执行多进程/分布式程序的过程。
import ray import time @ray.remote def f(): time.sleep(1) return 1 ray.init() results = ray.get([f.remote() for i in range(4)]) print(results)
结果输出如下:
Process STDOUT and STDERR is being redirected to /tmp/raylogs/. Waiting for redis server at 127.0.0.1:35084 to respond... Waiting for redis server at 127.0.0.1:59058 to respond... Starting local scheduler with the following resources: {'CPU': 8, 'GPU': 0}. ====================================================================== View the web UI at http://localhost:8888/notebooks/ray_ui26399.ipynb?token=7e19e5b2051ef36474500c26427d304da95835e4f0933992 ====================================================================== [1, 1, 1, 1] Process finished with exit code 0
首先我们定义 remote
函数 f
,将一个普通的Python函数变为 remote
函数只需在其上加上 @ray.remote
装饰器。
remote
是一个装饰工厂函数,返回修饰函数的装饰器,主要定义代码如下:
# ray/python/ray/worker.py def remote(*args, **kwargs): worker = global_worker def make_remote_decorator(num_return_vals, num_cpus, num_gpus, resources, max_calls, checkpoint_interval, func_id=None): # 装饰器工厂函数 # 装饰器,装饰函数,做Actor和function的区分 def remote_decorator(func_or_class): if inspect.isfunction(func_or_class) or is_cython(func_or_class): ... return remote_function_decorator(..) # 是函数,调用远程函数装饰器 if inspect.isclass(func_or_class): ... return worker.make_actor(..) # 是actor,由全局worker创建一个actor raise Exception # 装饰器,参数是函数,起装饰函数的作用 def remote_function_decorator(func, function_properties): def func_call(*args, **kwargs): return _submit(args=args, kwargs=kwargs) def _submit(...): ... def func_executor(arguments): """This gets run when the remote function is executed.""" result = func(*arguments) return result def func_invoker(*args, **kwargs): raise Exception func_invoker.remote = func_call # func.remote() 直接调用func_call func_invoker._submit = _submit func_invoker.executor = func_executor func_invoker.is_remote = True func_name = "{}.{}".format(func.__module__, func.__name__) func_invoker.func_name = func_name ... return func_invoker return remote_decorator if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): # 不带参数的 @ray.remote 装饰 return make_remote_decorator( num_return_vals, num_cpus, num_gpus, resources, max_calls, checkpoint_interval)(args[0]) else: # 带参数的 @ray.remote(xx=x) 装饰 ... return make_remote_decorator(num_return_vals, num_cpus, num_gpus, resources, max_calls, checkpoint_interval)
【先验知识:Python装饰器的概念】
由此可见, remote
是一个通用的装饰器,可以装饰普通的Python函数,或者是Python的class。
进入 remote
装饰器体,首先得到全局Worker,然后定义了一个 make_remote_decorator
装饰器工厂函数,然后判断是无参装饰还是带参装饰。
如果是无参装饰,那么
@ray.remote def f(): time.sleep(1) return 1
等价于
def f(): ... f = ray.remote(f)
此时,remote的参数只有一个,那就是 f
本身,也即 args[0]
。
所以上述代码返回 make_remote_decorator(...)(args[0])
,即调用过的 make_remote_decorator
,参数是 f
。
否则remote函数定义等价于:
def f(): ... f = ray.remote(num_cpus=1, ..)(f)
所以 remote
装饰器返回一个未调用的,将会在 f
上调用的 make_remote_decorator
函数。
make_remote_decorator
中再嵌套了一层装饰,本身提供对函数和actor的区分。
如果是函数,则进入 remote_function_decorator
远程函数装饰器;
否则是class,由全局worker创建一个actor。
远程函数装饰器 remote_function_decorator
的责任就是接受函数参数,返回一个函数,这个函数就是远程函数,不能直接传参调用(第29行)。将 remote()
绑定到 func_call
,接受参数后,提交任务( _submit_task
)运行这个函数,最后得到结果,这个结果也是 f.remote()
调用的结果,是一个 object id
,因为返回结果存在object store中。
至此,远程函数就定义好了。我们在原始的普通Python函数 f
上,装饰了一下,得到了一个可以通过 f.remote()
来调用的远程函数,如此调用将会立马提交一个任务,供Ray引擎调度执行,返回结果。
下面是 ray.init()
过程。可以理解为初始化Ray引擎的过程,类似于启动Tensorflow的Session的过程。
ray.init()
也有带参版本和无参版本。
带参版本用于已经存在并启动一个Ray集群的情况下,直接填入该集群的redis地址,即可连接到集群,就初始化好了。
无参版本适用于单机多进程的运行,这种情况下会创建一个Ray环境,默认启动一个local scheduler,一个global scheduler,一个或多个redis server, 一个object store和一个object store manager,和若干worker进程(默认为CPU核数个)。
init()
主要逻辑为:
# ray/python/ray/worker.py init() _init() if PYTHON_MODE: pass elif start_ray_local: # 本地开启一个Ray主节点进程 address_info = services.start_ray_head(..) else: # 连接到已有集群 address_info = get_address_info_from_redis(redis_address, node_ip_address) # 将全局worker连接到 local scheduler, Plasma 和 Redis connect(driver_address_info, object_id_seed=object_id_seed, mode=driver_mode, worker=global_worker)
四个模式:
SCRIPT_MODE:如果Worker是driver,且由Python脚本启动或者在shell中交互式运行的话,使用脚本模式。会打印任务失败信息。 WORKER_MODE:如果Worker不是driver,只是slave的话,启动WORKER_MODE,不打印关于task的任何信息。 PYTHON_MODE:如果要顺序运行或是调试,可以使用PYTHON_MODE,此时的Worker即是driver。此模式下,不会发送remote函数到调度器,而是直接以阻塞的形式执行。 SILENT_MODE:测试的时候使用SILENT_MODE。不会打印error信息,因为许多测试时故意失败的。
我们的示例代码中, ray.init()
是无参的,代表我们会在本地开启一个ray head节点进程。
此部分代码简要逻辑如下:
# ray/python/ray/services.py start_ray_head | start_ray_processes | print("Process STDOUT and STDERR is being redirected to /tmp/raylogs/.") # 程序输出中第一行的来源 | if redis_address is None: | start_redis(...) | start_redis_instance(..) | 创建redis_shards个redis server # 等待redis server可用并响应,程序输出第2,3行的来源 | wait_for_redis_to_start("127.0.0.1", port) | if include_log_monitor: | start_log_monitor(..) | if include_global_scheduler: | start_global_scheduler(...) # 开启local_scheduler并打印 Starting local scheduler ..,程序第4行的来源 | local_scheduler_name, pid = ray.local_scheduler.start_local_scheduler(...) | for i in range(num_local_schedulers - len(object_store_addresses)): | start_objstore(...) | ray.plasma.start_plasma_store(..) | ray.plasma.start_plasma_manager(..) | for i in range(len(local_scheduler_socket_names), num_local_schedulers): | start_local_scheduler(...) # 每个local scheduler默认搭配CPU核数个workers,因此workers_per_local_scheduler[i] = #cpus | for i, num_local_scheduler_workers in enumerate(workers_per_local_scheduler): | for j in range(num_local_scheduler_workers): | start_worker(...) | if include_webui: | start_ui(...) # 开启UI会打印输出中UI的部分
可以看到, start_ray_head
的过程配套启动了redis, global scheduler, local scheduler及其workers,UI等。
这些都是ray执行快速的分布式任务分发的基本组件,其中redis用来存储全局系统状态,global scheduler和local scheduler分数两级调度器,负责快速的任务调度,workers负责执行远程函数,UI负责观察运行状态,不过目前UI做的还比较简陋。
每个Worker执行一个主循环 main_loop
,循环不断地接受任务,处理任务返回……
这部分代码见 ray/python/ray/workers/default_worker.py
。
main_loop
的代码如下:
# ray/python/ray/worker.py def main_loop(self): def exit(signum, frame): cleanup(worker=self) sys.exit(0) signal.signal(signal.SIGTERM, exit) check_main_thread() while True: # 此处调用self.local_scheduler_client.get_task()获得任务 task = self._get_next_task_from_local_scheduler() self._wait_for_and_process_task(task) | self._wait_for_function(function_id, task.driver_id().id()) | with self.lock: | self._process_task(task)
初始化好以后,就可以运行 f.remote()
了,运行后还是回到装饰器里面的 _submit_task
函数,
# ray/python/ray/worker.py def _submit(args=None, kwargs=None, num_return_vals=None, num_cpus=None, num_gpus=None, resources=None): check_connected() # 检查worker是否连接 check_main_thread() # 检查是否主线程,不允许非主线程提交任务 kwargs = {} if kwargs is None else kwargs args = signature.extend_args(function_signature, args, kwargs) if _mode() == PYTHON_MODE: # PYTHON模式下,并不提交任务,而是串行执行,拷贝参数以防修改 result = func(*copy.deepcopy(args)) return result # 提交任务,返回结果的object id或者一组object ids object_ids = _submit_task(function_id, args, num_return_vals=num_return_vals, num_cpus=num_cpus, num_gpus=num_gpus, resources=resources) if len(object_ids) == 1: return object_ids[0] elif len(object_ids) > 1: return object_ids
代码中调用的 _submit_task
是对 worker.submit_task
的一个封装:
# ray/python/ray/worker.py def _submit_task(function_id, *args, **kwargs): """This is a wrapper around worker.submit_task. We use this wrapper so that in the remote decorator, we can call _submit_task instead of worker.submit_task. The difference is that when we attempt to serialize remote functions, we don't attempt to serialize the worker object, which cannot be serialized. 【这样搞一下就不需要序列化worker对象了?】 """ return global_worker.submit_task(function_id, *args, **kwargs)
最终,Worker的 submit_task
函数如下:
# ray/python/ray/worker.py def submit_task(self, function_id, args, ...): with log_span("ray:submit_task", worker=self): check_main_thread() ... # 将参数put进object store,注意,如果多个函数使用的是相同的输入,直接调用的话仍然会put多次 # 一个方法是先在调用前put参数,然后传入put后的ObjectID对象。 args_for_local_scheduler = [] for arg in args: if isinstance(arg, ray.local_scheduler.ObjectID): args_for_local_scheduler.append(arg) elif isinstance(arg, ray.actor.ActorHandleParent): args_for_local_scheduler.append(put( ray.actor.wrap_actor_handle(arg))) elif ray.local_scheduler.check_simple_value(arg): args_for_local_scheduler.append(arg) else: args_for_local_scheduler.append(put(arg)) ... # Submit the task to local scheduler. task = ray.local_scheduler.Task( self.task_driver_id, ray.local_scheduler.ObjectID(function_id.id()), args_for_local_scheduler, ...) ... self.task_index += 1 self.local_scheduler_client.submit(task) return task.returns()
也就是说, [f.remote() for i in range(4)]
这一句,默认的全局worker会提交4个任务给local scheduler,然后local scheduler将这些任务调度到嗷嗷待哺的各个worker,前面的代码说过,默认会启动CPU核数个Worker。
运行完毕后,列表中就是返回的值的object id,我们需要使用 ray.get(id)
从object store中将真正的数据拿出来。
最后就成了 [1, 1, 1, 1]
,程序到此就结束了。
再回顾一下整个过程:
@ray.remote # 装饰器 def f(): time.sleep(1) return 1 # 装饰完成,装饰过后的远程函数f已形成 ray.init() # 初始化Ray引擎,会启动各个必要组件,包括调度,状态存储,对象存储和workers等 results = ray.get([f.remote() for i in range(4)]) # 提交任务,获得结果,从object store中取出 print(results)
Happy Reading!