Flink 是如何工作的?

作者介绍

徐庆甡,2017 年 6 月加入去哪儿网,目前在市场数据团队,现主要负责基础数据建设与数据分析,对数据实时处理框架有着浓厚的兴趣,善于通过问题来驱动探索可持续化服务。

前言

本文主要分为两个部分,分别介绍 Flink 集群的启动和 Flink 任务如何运行在集群中(Stream 方式),由于篇幅有限,本文尽量不探讨具体的实现细节,但是比较重要的部分给予一定的说明,更多的是让读者了解 Flink 内部的工作原理。

Flink 在部署上主要分为两部分 JobManager 和 TaskManager,JobManager 主要负责任务的分发,slot 资源管理等,TaskManager 则主要负责任务的运行。下面这张官网的架构图虽然年代有点久远,但至今大部分功能还是如图所述。

从上面图中需要了解的信息:

  • Flink 代码在客户端里进行了“ 编译 ”, 然后提交给 JobManager,这里比较重要,因为很多 Optimizer 的东西都可以在这里做。

  • JobManager 拿到“ 编译 ”后的 job 再分发到各个 TaskManager 的 Slot 里。

  • Flink 有两种通信方式,远程调用使用的是 Actor,TaskManager 数据传输使用的是 Netty。

  • JobManager 集成了 Scheduler 和 Checkpoint Coordinator 等主要功能。

Flink 集群的启动

Flink 集群的启动方式主要分为以下 3 种:

  • Standalone 方式:最基础的方式,具有完整的功能,可在实体机上进行部署。

  • Cluster session 方式:部署在像 Yarn,K8s 等资源调度框架内,直接建立起集群,然后通过与 JobManager 交互来提交作业,但是在实际生产过程中发现有严重的 Bug 还没有解决,可参看 FLINK-11205(https://issues.apache.org/jira/browse/FLINK-11205),主要是 Metaspace 空间无法释放导致 TaskManager 被 kill,而一个集群里存在多个 TaskManager 问题更加严重,会导致整个集群 Taskmanager 不断重启。

  • Per Job 方式:在 Yarn 中已经很好的集成了这种方式,就是整个集群的生命周期和任务进行绑定,任务结束,集群资源也随着撤销,唯一的缺点就是部署时间上不如 Cluster session 的方式,这点其实在实际生产中可以忽略。

JobManager 的启动

因为版本变动的关系 Jobmanager 在 1.5 版本的时候进行了一次大改,和上面的图可能略有不通,具体可以参见 FLIP-6(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077),下面聊一聊一般生产中 JobManager 如何启动的。

上图可以看出在 1.5 以后的新版本中 JobManager 多出了两个组件,一个是 ResourceManager,主要负责管理 Flink 自己的 TaskManager 资源,注意这个 ResourceManager 和 Yarn 上的不是同一个,两者管理的资源不是同一级别,第二个是 Dispatcher 主要是提供 client 的 RPC 接口,提供发布任务的一系列功能。而 JobMaster 主要负责一个任务的生命周期,每个 Flink Job 都有一个 JobMaster 与之对应。

TaskManager 的启动

TaskManager 其实功能比较简单,提供一些基本的 RPC 服务,供 JobManager 进行调度,稍后会讲到 TaskManager 如何运行 JobManager 提供的 subTask。

Flink Job 的启动

我们先来看看一个 Flink job 的构成:

Flink job 最核心的构成其实就是这个 ENV,用户需要通过调用 ENV 来注册用户的代码逻辑,而这个 ENV 定义了一系列 DAG 生成的规则, 比如 env.addSource 我们必须指定 Source 的类型,是 DataStream 还是 DataSet,又比如 keyBy 之后返回的是 KeyedStream,这些规则都是在 ENV 里进行了定义,用户只要根据规则来使用即可,而且根据 ENV 的定义我们可以在不同的环境中运行 Flink Job,如下图所示:

我们可以以 Flink 开发者的角度思考如何从用户代码到底层实现,这一过程必然使用分层结构,下面来看一下各个层级的工作:

Client 端

首先在 Client 端生成的第一层是 User Rule,这个算是用户代码到 DAG 的第一层抽象,这里面制定了很多规则,来约束用户的行为,引导用户更好的使用 Flink 程序。

第二层是 Transformation 层,这里的 Transformation 主要是完成了对用户的各个算子的定义,比如说 addSource 的操作即为 SourceTransformation,filter 操作即为 OneInputTransformation,这里面与用户的算子一一对应,但是这里面区分定义了 PhysicalTransformation,PhysicalTransformastion 可以理解为物理算子,需要计算资源进行计算,而 union,select 这些并非物理算子,可以在后面的结构中通过网络 shuffle 进行合并,在这一步我们得到的还是一个 Transformation 的列表,完整的描述了用户定义的各个算子,如下图所示:

第三层就是我们的 StreamGraph 了,从 Transformation 转化而来,构成了图的结构,其中 StreamNode 为点,StreamEdge 为边,在程序里存入的部分是一张链表结构,如下图所示:

这里面加入了一些信息,比如说 slotSharingGroup,从而定义这些算子如何在 slot 里分配。

在 Client 里生成的最后一层即为 JobGraph,这也是 Flink JobManager 接收的任务对象,其实上一层的 StreamGraph 已经完成了任务 DAG,为什么还需要 JobGraph 呢,其实是为了兼容 Batch 模式,JobGraph 作为统一层进行封装,同时提供了计算优化即 setChain 功能,可以减小序列化和网络开销,可能有的同学会对 chain 的概念有所疑惑,比如说 map 算子后面接入一个 filter 算子,而且是同等数量的,这样我就可以把 map 和 filter 抽象成一个计算进行自动优化,要达到 chain 的条件比较苛刻,具体可以查看代码。

在 JobGraph 里主要结构就是 JobVertex 和 JobEdge,在前面所说的 chain 方法后,可以合并 两个或多个 StreamNode 为一个 JobVertex,如下图所示:

好了这里基本就聊完了 Client 端的工作。

JobManager 端

JobManager 端接收到 client 提交的 JobGraph,根据 JobgRaph 来生成 ExcutionGraph,这个 ExcutionGraph 简单来说就是 JobGraph 的并行版本,定义具体执行的细节:

这里省略了一部分(其他部分一样),主要来看看几个结构,ExecutionJobVertex 主要类似于 JobGraph 的 Node,管理所有下面的并行状态,ExecutionVertex 是最小的执行单位(也是我们所说的 SubTask,每个 SubTask 都由一个 thread 启动),同理 IntermediateResult 负责多个 IntermediateResultPartition。到这里完成了 ExecutionGraph,而需要部署到 TaskManager 中间还需要为每个 Vertex 生成 TaskDeploymentDescriptor,来描述在 TaskManager 中的具体任务,比如 inputGates(输入),producedPartition(输出),然后提交给 TaskManager。

TaskManager 端

上面提到 JobManager 生成 TaskDeploymentDescriptor 提交给 TaskManager,TaskManager 就比较好办了,只需要设定这个 Task 的输入和输出还有 run 这个 Task 即可,这一层就是所谓的物理执行层。

而对于分布在各个 TaskManager 上的 SubTask 是如何通信的呢,可以看下图:

对于在同一个 TaskManager 内的 subTask,通过 InputGate 和 ResultPartition 直接进行通信,而对于不在同一个 TaskManager 里需要借助 Netty 层进行网络通信:

还有一点值得注意的是,当有消息来到时 ResultPartition 会给 InputGate 发送 notifyDataAvallable,告诉下游该来取数了,如果下游比较忙没空来取的话就会阻塞,进而逐级向上传导产生被压,这种缓冲 buffer 的方式是 flink 天然处理被压的方法。

至此整个 Flink Job 的流程已经结束,但毕竟 Flink 是一个庞大的开源项目,很多细节比如 Checkpoint 机制,状态存储这些重要的概念还未涉及,有兴趣的读者可以参考优秀的 Blog 进一步研究。

参考资料:

  • Flink 官网:https://ci.apache.org/projects/flink/flink-docs-release-1.9/

  • FLIP:https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals

  • Jark’s Blog:http://wuchong.me/

  • 玉兆的博客:http://chenyuzhao.me/

  • Flink 官方博客:https://flink.apache.org/2019/06/05/flink-network-stack.html

【END】