Flink解读 | 解读Flink的声明式资源管理与自动扩缩容设计

【今晚直播预告 – Flink 进阶教程】

第一课、Flink Checkpoint–轻量级分布式快照

讲师:唐云(Apache Flink Contributor)

直播:5月9日20:00-21:00 (UTC+8)

观看方式:钉钉搜索群号  21789141 进群即可观看

备注:视频、PPT待直播后更新

完整课程见: https://github.com/flink-china/flink-training-course

这篇文章我们来解读一下Flink还未实现但已纳入计划中的声明式的资源管理及自动扩缩容的设计。

截止到目前(包括即将发布的Flink 1.8),Flink都还无法基于资源的可用性来进行弹性地扩缩容。网上提出的一些扩缩容机制,一些是用户自己根据获取的指标去实现触发Job的重启扩容或借助于上游系统反馈的指标来让JobMaster作出针对性的响应(比如结合Dell开源的Pravega)。这些机制都不是Flink自身提供的,而且都有各自的局限性。

在谈论具体的设计之前,我们还需要了解Flink的状态管理已经通过引入Key Group允许用户调整算子并行度。这一步是扩缩容的前提,不了解的可以阅读Flink官方的一篇博客:

A Deep Dive into Rescalable State in Apache Flink:

https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html

Flink也曾为此发过一篇论文。 另外一点需要澄清的是,在“扩缩容”之前有很多修饰词,包括但不限于:弹性、动态、在线、自动…。这些修饰词在各种技术文章里已经被使用得含糊不清了。这里只澄清一点:下文所谈论的扩缩容依然无法做到Job在运行时动态调整并行度不经历重启直接拉起新的Task实例运行,而是基于重启恢复机制来实现的,因为涉及到状态管理。

接下来,我们就来解读Flink声明式的资源管理及自动扩缩容机制。

Active与Reactive模式

资源获取常见的两种模式:

  • Active 模式:主动式,Flink可以主动地申请、释放资源(通过与一些资源管理框架集成,如Yarn, Mesos);

  • Reactive 模式:被动响应式,由外部系统来分配、释放资源,Flink只是简单地对可用资源进行响应,这种模式对于基于容器的启动环境相当有意义。

Flink希望这两种模式能够遵循同一套设计且都能支持自动地扩缩容(Flink job能够动态调整它的并行度以对变动的工作“负荷”进行响应,当可用资源不够的情况下,应该能够自动且优雅地降级)。

声明式的资源管理

其实,现在Flink所支持的资源管理模式就是Active模式,由用户明确指定Job需要的资源,JobMaster会向ResourceManager去请求用户指定的全量资源,当ResourceManager无法满足JobMaster的申请需求时,该job将无法正常启动,它会陷入失败重启->再次尝试申请的循环。

那么为了能够同时支持Active/Reactive这两种模式,就需要换一种方式来跟ResourceManager交互,为此引入了声明式的资源管理设计。它跟原来的设计最大的差别在于:

  • JobMaster不再去逐个请求Slot,而是声明它需要的资源的情况;

  • 对资源的要求是个弹性的范围,而不是固定的;

这里所声明的弹性资源是个四元组(也可以称之为“四件套”):(min, target, max, rs),每个元素的含义如下:

  • min : 执行Job的最小资源

  • target : JobMaster期望获得的常规资源需求,它是可变的,这是扩缩容的主要触发机制

  • max : 最大资源期望

  • rs : resource spec(资源描述信息)

声明式的资源管理设计,可以应对Active以及Reactive模式:

  • active:不再强求像现有设计一样一定要满足期望获取的资源(将它想象为target,只不过现在不允许改变)才能够成功启动任务。这种模式使得当无法满足资源申请期望时,只要满足最小条件(min)即可启动执行;

  • reactive:target = max,这是与active最大的不同,因为reactive是被动响应式的,将target设置为max,就可以使得所有的算子都是资源贪婪的,它能够确保资源的最大化利用;

这一设计要求对现有的JobMaster与ResourceManager的交互进行重构。它要求JobMaster去周期性地获取target的槽位数,target的槽位数目可以通过心跳作为载体传递给ResourceManager。如果target高于已经分配给JobMaster的槽位数,则为了补齐缺失的槽位,针对不同的模式,ResourceManager会采取不同机制:

  • active : 启动新的Task executor(看作New slot);

  • reactive : 从可用槽位中分配相应的槽位(Available slot);

这引入了新的槽位分配视图:

JobMaster基于其“required”的槽位数以及“available”槽位数,来作出启动Job、从失败恢复或者周期性地触发弹性扩缩容的决策。

槽位分配协议的重构

引入声明式的资源管理设计会导致现有的固定式的槽位申请机制不再适用,因此需要进行重构。

现有的槽位申请是在ExecutionGraph调度的时候进行,每个Execution一个slot。如果SlotPool中有槽位可用,将会直接提供槽位,如果可用的槽位数不够,将会像ResourceManager申请,这一步也会分配新的TaskExecutor container,大致的流程如下图所示:

重构后一个很大的不同点是将调度的逻辑从ExecutionGraph中分离出来并单独形成一个Scheduler组件。另外,槽位的申请也不再是一个个独立地进行,而是由JobMaster声明资源需求并传达给ResourceManager。重构后的大致协议如下图:

调度器

新引入的调度器将调度与部署的逻辑从ExecutionGraph中解耦出来。这可以使得调度与部署的设计与实现更具扩展性并且将ExecutionGraph渐渐弱化为Job运行时状态跟踪的数据结构。

调度器充当了资源申请的客户端与SlotPool交互。SlotPool如果获得新的资源,将会通知调度器。调度器会检查当前所拥有的资源是否满足最小启动资源(min),如果满足可以启动调度。

如此设计的优势在于:大大减少了逐个申请Slot时大量的RPC请求,提升了申请效率,最小资源启动也能加快调度的整体进度。

如果获得新资源可用通知后,发现ExecutionGraph已经启动(可能是以最小资源先启动或者启动后更改了target的值),那么调度器可以决定扩容来使用额外的可用资源,当然这里有一个等待Job进入“稳态”的过程(一个简单的考虑是,如果可用资源在一定的时间内不再发生变化,则视为“稳态” ) ,因为这种场景可能在刚调度一个Job时发生,而且可能会发生多次。

调度器的职责

总结而言,调度器需要承担如下职责:

  • 决定调度策略以及声明资源需求

  • 接受资源变更通知

    • 等待资源稳定

    • 最小资源 ( min ) 满足后触发调度

  • 管理扩缩容策略

    • 周期性地查询扩缩容策略并更新target值

  • 扩缩容决策

槽位数的计算

调度器会负责计算Job的资源需求,它需要迭代JobGraph中所有的算子,并根据每个算子所返回的四元组 ( min, target, max, rs ) 对它们进行累加。

我们知道Flink有“Slot sharing”的机制,它可以让多个算子共享槽位,这种情况下的计算方式是对前三个元素求所有共享算子的最大值,最后一个元素则是求和: ( mins’ max, targets’ max, maxs’ max, sum(rs ))

扩缩容策略

为了支持自动扩缩容,我们必须允许一个算子动态改变它的target值。这可以引入RescalingPolicy来指定,它可以被周期性地查询并让调度器获取当前的target值。target值的改变最终也将会使得ResourceManager登记的Job的资源请求列表得到更新。而一旦SlotPool得到新分配的资源,它将会通知调度器触发扩缩容。

失败恢复

在引入声明式的资源管理之后,失败容错也会有相应的变动。在发生失败的情况下,Job首先需要考虑它是否要“缩容”重启,比如有些Job的运行刚好使集群的资源达到了饱和,但如果是因为TaskExecutor宕机导致的失败,那么很有可能直接重启无法使它获得失败之前运行时所占有的资源。这里就可以允许一旦最小启动资源(min ) 被满足后即可重启。

配置

资源配比在Flink中主要依赖于并行度 ( parallelism ) 的设置,Flink现在只支持并行度的“精确”设置。它会从三种途径来获取 ( 优先级从高到底):

  • 如果用户调用setParallelism API,则以此为准

  • 否则将会使用Job级别的并行度,如果通过CLI设置了 ( -p ) 参数的话

  • 如果以上两者都无法获取,则从Flink的配置文件中读取默认的Job并行度

在引入声明式的资源管理机制后,用户需要为算子指定资源四元组中的前三个元素的值,包括min, 初始的target以及max。如果用户没有调用setParallelism为算子明确指定资源信息,则Flink需要为算子指定默认值:

  • Active 模式:(1, -p 或 集群默认, max parallelism)

  • Reactive 模式:(1, max parallelism, max parallelism)

如果用户显示调用了setParallelism(p),则这三个值都将为(p, p, p)。

参考:

Declarative Resource Management: Active and reactive mode with re-scaling policies:

https://docs.google.com/document/d/1XKDXnrp8w45k2jIJNHxpNP2Zmr6FGru3H0dTHwYWJyE/edit#

大家工作学习遇到HBase技术问题,把问题发布到HBase技术社区论坛http://hbase.group,欢迎大家论坛上面提问留言讨论。想了解更多HBase技术关注HBase技术社区公众号(微信号:hbasegroup),非常欢迎大家积极投稿。

技术社群

【HBase生态+Spark社区大群】

群福利:群内每周进行群直播技术分享及问答


加入方式1:

https://dwz.cn/Fvqv066s?spm=a2c4e.11153940.blogcont688191.19.1fcd1351nOOPvI

加入方式2:钉钉扫码加入