Flink 任务失败恢复策略

Flink版本:1.11.0

当任务失败时,Flink 需要重新启动失败的任务以及受到影响的任务,以将作业恢复到正常状态。重新启动策略和故障恢复策略用于控制任务重新启动。重新启动策略决定了是否以及何时重新启动失败/受影响的任务。故障恢复策略决定应重新启动哪些任务以恢复作业。

1. 重启策略

Flink 支持不同的重启策略,在作业没有特别指定重启策略时,使用默认的重启策略启动集群。如果在提交作业时指定了重启策略,那么此策略将覆盖集群的默认配置策略。

默认重启策略通过 Flink 的配置文件 flink-conf.yaml 进行配置。配置参数 restart-strategy 决定了采取哪种策略。如果未启用 Checkpoint,那么将使用不重启策略。如果启用了 Checkpoint,但是并没有配置重启策略,那么将使用固定间隔重启策略,其中 Integer.MAX_VALUE 是尝试重启的最大次数。

每个重启策略都有自己的一套控制其行为的参数。这些值也在配置文件中配置。下面看一下有哪些重启策略:

除了定义一个默认的重启策略之外,还可以为每个 Flink 作业单独指定一个重启策略。可以通过以编程的方式调用 ExecutionEnvironment 上的 setRestartStrategy 方法进行配置。请注意,这也适用于 StreamExecutionEnvironment。

以下示例显示了如何为作业设置固定间隔重启策略。如果作业发生故障,系统将尝试每10s重新启动一次作业,最多重启3次:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // number of restart attempts
  Time.of(10, TimeUnit.SECONDS) // delay
));

1.1 固定间隔重启策略

固定间隔重启策略尝试重新启动作业指定次数。如果超过最大尝试次数,那么作业最终会失败。每次重启之后都会等待一段固定间隔时间。通过在 flink-conf.yaml 中配置如下参数,可以将重启策略默认为固定间隔重启策略:

restart-strategy: fixed-delay

Example:

restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

固定间隔重启策略也可以通过编程来配置:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // number of restart attempts
  Time.of(10, TimeUnit.SECONDS) // delay
));

1.2 失败率重启策略

失败率重启策略在作业失败后会重新启动作业,只有当每个时间区间内失败次数超过指定次数时,作业才最终会失败。失败率重启策略跟固定间隔重启策略一样,每次重启之后都会等待一段固定间隔时间,不同的是失败率重启策略阈值设置了一个时间区间,只有当时间区间内超过指定次数时才失败。通过在 flink-conf.yaml 中配置如下参数,可以将失败率重启策略设置为默认重启策略:

restart-strategy: failure-rate

Example:

restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s

失败率重新启动策略也可以通过编程来设置:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // max failures per interval
  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
  Time.of(10, TimeUnit.SECONDS) // delay
));

1.3 不重启策略

作业直接失败,不会重启:

restart-strategy: none

不重启策略也可以通过编程来设置:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());

2. 故障恢复策略

Flink 支持不同的故障恢复策略,可以通过 Flink 的配置文件 flink-conf.yaml 中的 jobmanager.execution.failover-strategy 配置参数进行配置:

jobmanager.execution.failover-strategy: region

2.1 重启所有故障恢复策略

此策略会重新启动作业中所有任务以从失败任务中恢复。

2.2 重启流水线区域故障恢复策略

该策略会将作业中的所有 Task 划分为几个 Region。当有 Task 发生故障时,它会尝试找出进行故障恢复需要重启的最小 Region 集合。相比于全部重启的故障恢复策略,这种策略在一定场景下重启的 Task 会少一些。

Region 是指以 Pipelined 形式进行数据交换的 Task 集合。也就是说,Batch 形式的数据交换会构成 Region 的边界。

  • DataStream 和 流式 Table/SQL 作业的所有数据交换都是 Pipelined 形式。

  • 批处理式 Table/SQL 作业的所有数据交换默认都是 Batch 形式的。

  • DataSet 作业中的数据交换形式会根据 ExecutionConfig 中配置的 ExecutionMode 决定。

需要重启的 Region 的判断逻辑如下:

  • 出错 Task 所在 Region 需要重启。

  • 如果要重启的 Region 需要消费的数据有部分无法访问(丢失或损坏),产出该部分数据的 Region 也需要重启。

  • 需要重启的 Region 的下游 Region 也需要重启。这是出于保障数据一致性的考虑,因为一些非确定性的计算或者分发会导致同一个 Result Partition 每次产生时包含的数据都不相同。

欢迎关注我的公众号和博客:

原文:Task Failure Recovery