Spark 任务调度

Spark的核心是基于RDD来实现的,Spark任务调度就是如何组织任务去处理RDD中每个分区的数据,根据RDD的依赖关系构建DAG,基于DAG划分Stage,然后将每个Stage中的任务(Task)分发到指定的节点去运行得到最终的结果。

先来了解下几个概念:

  • Application:用户编写的Spark应用程序,由一个或多个Job组成。提交到Spark之后,Spark会为Application分配资源,将程序进行转换并执行。

  • Job(作业):由Action算子触发生成的由一个或多个Stage组成的计算作业。

  • Stage(调度阶段):每个Job会根据RDD的宽依赖被切分为多个Stage,每个Stage都包含一个TaskSet。

  • TaskSet(任务集):一组关联的,但相互之间没有shuffle依赖关系的Task集合。一个TaskSet对应的调度阶段。

  • Task(任务):RDD中的一个分区对应一个Task,Task是单个分区上最小的处理流程单元。

Spark任务调度模块主要包含两大部分:DAGScheduler和TaskScheduler,它们负责将用户提交的计算任务按照DaG划分为不同的阶段并且将不同阶段的计算任务提交到集群进行最终的计算。

  • DAGScheduler:主要负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,然后将DAG划分为不同的Stage,并以TaskSet的形式把Stage提交给TaskScheduler。其中每个Stage由可以并发执行的一组Task构成,这些Task的执行逻辑完全相同,只是作用于不同的数据。

  • TaskScheduler:负责Application中不同job之间的调度,将TaskSet提交给Worker执行并返回结果,在Task执行失败时启动重试机制,并且为执行速度慢的Task启动备份的任务。

Spark任务调度整个过程如下图所示:

RDD Objects就是在代码中创建的RDD,这些代码逻辑上组成了一个DAG。

DAGScheduler主要负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,然后将DAG划分为不同的Stage,其中每个Stage由可以并发执行的一组Task组成,这些Task的执行逻辑完全相同,只是作用于不同的数据。在DAGScheduler将这组Task划分完成之后,会将这组Task(TaskSets)提交到TaskScheduler。

TaskScheduler负责Task级的调度,将DAGScheduler提交过来的TaskSet按照指定的调度实现,分别对接到不同的资源管理系统。TaskScheduler会将TaskSets封装成TaskSetManager,并加入到调度的队列中,TaskSetManager负责监控管理同一个Stage中的Tasks,TaskScheduler就是以TaskSetManager为单元来调度任务的。TaskScheduler通过Cluster Manager在集群中的某个Worker的Executor上启动任务。但是在不同的资源管理框架下TaskScheduler的实现方式有一定的差别。

Task在Executor中运行,如果缓存中没有计算结果,那么就需要开始计算,同时,计算的结果会回传到Driver或者保存在本地。

Stage划分

用户提交的计算是一个由RDD构成的DAG,如果RDD在转换的时候需要做Shuffle,没那么这个Shuffle的过程就将这个DAG分为了不同的阶段(Stage)。由于Shuffle的存在,在不同的Stage是不能并行计算的,因为后面Stage的计算需要前面Stage的Shuffle的结果。而一个Stage由一组完全独立的计算任务(即Task)组成,每个Task的运算逻辑完全相同,只不过每个Task都会处理其对应的Partition。其中,Partition的数量和Task的数量是一致的,即一个Partition会被该Stage的一个Task处理。

划分依据:Stage的划分依据就是宽依赖。

核心算法:从触发Action操作的那个RDD开始从后往前推,首先会为最后一个RDD创建一个Stage,然后继续倒推,如果发现对某个RDD是宽依赖,那么就会将宽依赖的那个RDD创建一个新的Stage,那么RDD就是新的Stage的最后一个RDD。然后以此类推,直到所有的RDD全部遍历完成为止。

Stage调度

经过Stage划分之后,会产生一个或者多个互相关联的Stage。其中,真正执行Action算子的rDD所在的Stage被称为Final Stage。DAGScheduler会从这个Final Stage生成作业实例。

在提交Stage时,DAGScheduler会先判断该Stage的父Stage的执行结果是否可用。如果所有父Stage的执行结果都可用,则提交该Stage。如果有任意一个父Stage的结果不可用,则尝试迭代提交该父Stage。

Task调度

TaskScheduler接收到DAGScheduler提交过来的TaskSet,会为每一个收到的TaskSet创建一个TaskSetManager。TaskSetManager负责TaskSet中Task的管理调度工作。

每个TaskScheduler都对应一个SchedulerBackend。其中TaskScheduler负责Application的不同Job之间的调度,在Task执行失败的时候启动重试机制,并且为执行速度慢的Task启动备份的任务。SchedulerBackend负责与Cluster Manager交互,取得该Application分配到的资源,并且将这些资源传给TaskScheduler,由TaskScheduler为Task最终分配计算资源。

Spark调度模式

Spark可以采用两种调度模式:

  • FIFO:先进先出调度模式(默认)。FIFO调度会根据StageID和JobID的大小来调度,数值较小的任务优先被调度。FIFO调度方式存在一个缺点:当遇到一个耗时较长的任务时,后续任务必须等待这个耗时任务执行完成才能得到可用的计算资源。

  • FAIR:公平调度模式。FAIR模式下每个计算任务具有相等的优先级,Spark以轮询的方式为每个任务分配计算资源。FAIR不像FIFO那样必须等待前面耗时任务完成后后续任务才能执行。在FAIR模式下,无论是耗时短任务还是耗时长任务、无论是先提交的任务还是后提交的任务都可以公平的获得资源执行,这样就提高了耗时短的任务的响应时间。FAIR比FIFO更加灵活,FAIR模式为用户提供了一个调度池的概念,用户可以将重要的计算任务放入一个调度池Pool中,通过设置该调度池的权重来使该调度池中的计算任务获得较高的优先级。

关注获得更多分享