令人期待的 FLIP-43: Savepoint Connector!
-
Current state: Under Discussion
-
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLIP-43-Savepoint-Connector-td29233.html
Motivation
Flink为用户功能提供状态抽象,以保证流的容错处理。用户可以使用非分区和分区状态。
分区状态接口提供对不同类型状态的访问,这些状态都是作用于当前输入元素的键的。这种类型的状态仅在keyed stream中可用,该流是通过stream.keyBy()创建的。
目前,所有这种状态都是Flink的内部状态,用于在故障情况下提供处理保证(例如,恰好一次处理)。从外部访问状态的唯一方法是通过Queryable状态,但这仅限于只读,一次操作一个键。
保存点连接器使用Flink的批处理DataSet api提供了读取、写入和修改保存点的强大功能。
这对以下内容很有用:
-
分析有趣模式的状态
-
通过检查状态差异来排除故障或审核作业
-
新应用程序的引导状态
-
修改保存点,例如:
-
改变最大并行度
-
打破架构更改
-
纠正无效状态
Abstraction
要了解如何在批处理上下文中与保存点进行最佳交互,必须有一个清晰的心理模型,说明Flink状态中的数据如何与传统的关系数据库相关联。
可以将数据库视为一个或多个名称空间,每个名称空间包含一组表。这些表又包含其值在它们之间具有某种内在关系的列,例如在同一个键下作用域。
保存点表示特定时间点的Flink作业的状态,该作业由许多运算符组成。这些运算符包含各种状态,分区或键控状态,以及非分区或运算符状态。
此作业包含多个操作符以及各种状态。 在分析该状态时,我们可以首先通过运算符(通过设置其uid来命名)对数据进行作用域。 在每个运算符中,我们可以查看已注册的state。 CurrencyConverter具有广播状态,这是一种未分区操作员状态。 通常,操作符状态中的任何两个元素之间没有关系,因此我们可以将每个值看作是它自己的行。 将此与包含两个键控状态的“汇总”进行对比。 因为这两个状态的作用域都在同一个键下,所以我们可以安全地假设这两个值之间存在某种关系。 因此,键控状态最好理解为每个操作符包含一个表,该表包含一个“键”列和n个值列,每个注册状态对应一个列。 所有这些都意味着可以使用以下伪SQL命令来描述此作业的状态:
通常,保存点↔数据库关系可以总结为:
-
保存点是数据库。
-
运算符是由其uid命名的命名空间。
-
每个操作符状态代表一个表。
-
操作符状态中的每个元素表示该表中的一行。
-
每个包含键控状态的运算符都有一个“keyed_state”表。
-
每个keyed_state表都有一个键列映射操作符的键值。
-
每个注册状态表示表中的一列。
-
表中的每一行都映射到一个键。
Public Interfaces
Reading an existing savepoint:
加载现有保存点:
读取运算符状态时,只需指定运算符uid,状态名称和类型信息。
此外,如果状态使用自定义序列化,则可以提供自定义类型序列化程序。
当读取键控状态时,用户指定KeyedStateReaderFunction以允许读取任意列和复杂状态类型,例如ListState,MapState和AggregatingState以及定时器。 这意味着如果运算符包含有状态过程函数,例如:
然后,用户可以通过首先定义输出类型和相应的KeyedStateReaderFunction来准备此状态。
Creating state / savepoint from scratch:
定义如何使用给定的DataSet引导新运算符的状态:
创建一个新的保存点,指定状态后端类型,最大并行度,多个运算符和输出路径。
Modifying an existing savepoint
基于现有保存点加载新保存点并添加/覆盖/删除运算符
ExistingSavepoint existingSavepoint = Savepoint . load ( backendType , oldSavepointPath )
添加一个新的bootstrapped运算符
删除/覆盖现有保存点中的运算符状态,并进行写入。 修改后的保存点保留原始保存点的最大并行度。
Proposed Changes
实现的关键目标是仅使用可用的保存点API,以便实现简单且易于维护。 随着保存点格式的更改或添加了诸如TTL或状态迁移等新功能,连接器将继续工作而无需修改。
Querying Timers
唯一的先决条件是对内部计时器服务进行微小的修改,该服务提供了对该时间点注册的密钥的时间戳的有效映射。 有效查询已注册的计时器需要将密钥映射到已注册的时间戳。 由于计时器服务驻留在每个记录执行路径中,因此我们不希望对计时器的管理方式进行任何更改。 相反,将向InternalTimerService接口添加两个方法; forEachProcessingTimeTimer和forEachEventTimeTimer。 这些方法允许在读取之前将所有已注册的定时器复制到数据结构中,以支持有效的查询,而无需触及任何每个记录的代码路径。
State Input
从现有保存点读取状态是围绕一系列输入格式构建的,其中每个分割对应于数据流执行图中的单个执行顶点。 这意味着如果请求十个输入拆分,则状态的分区相同,就好像该保存点在具有十行并行性的数据流应用程序中恢复一样(使用StateAssignmentOperation中的方法)。 在打开时,每个拆分将恢复本地状态后端并遍历所有已恢复的数据。
Writing New Savepoints
Savepoint编写基于三个接口:
-
StateBootstrapFunction用于写入非分区的运算符状态
-
BroadcastStateBootstrapFunction用于写入广播状态
-
KeyedStateBootstrapFunction用于写入键控运算符状态
每个接口的结构与处理函数类似,除了它们不包含收集器,因为写入是数据流中的终端操作。 接口由相应的StreamOperator支持,但运算符不包含任何特殊逻辑。 实际的快照发生在名为BoundedStreamTask的StreamTask的新子类中。 此类与OneInputStreamTask相同,除了:
-
输入由迭代器而不是网络堆栈供电
-
处理完所有数据后,它将拍摄子任务的快照
这意味着重用所有检查点逻辑,与输入格式类似,库将免费支持所有保存点功能。 最后,BoundedStreamTask将在DataSet#mapPartition中运行,该mapPartition接收引导数据并输出快照的OperatorSubtaskState。 然后,可以聚合快照句柄并将其写为保存点元数据文件。
Appendix A: Why use the DataSet API
随着社区内所有正在进行的工作以改进Flink对Table API和最终BoundedStream API的批处理支持,出现了为什么现在使用DataSet API的问题。理论上有三个其他API可以在以下基础上构建此功能:
-
BoundedStream
-
目前不存在
-
数据流
-
已考虑使用DataStream API,但缺少需要核心运行时更改且似乎超出范围的关键功能。
-
Table API
-
当前的表运行器需要使用DataSet API实现批处理应用程序的源/接收器
-
由阿里巴巴提供的新表运行正在积极开发中,需要使用DataStream API实现批处理表源/接收器
同时,我们也很欣赏使用DataSet API构建的任何新功能都需要在实现适当的BoundedStream API并且不推荐使用DataSet时进行更新。这就是为什么savepoint连接器将其功能包装在上面显示的API中并且不暴露任何内部结构(如输入和输出格式)的原因。从用户的角度来看,唯一会改变的是对readListState的调用将返回BoundedStream <>而不是DataSet <>。在内部,DataSet的使用是微不足道的,因为核心功能是从flink-streaming-java模块中公开的保存点api派生的,迁移应该只是改变类型的问题。