Flink Program Guide (7) -- 容错 Fault Tolerance(DataStream API编程指导 -- For Java) 一、流容错 1.1 数据源和Sink的容错保证 二、重启策略 2.2 失败比率重启策略

Flink Program Guide (7) -- 容错 Fault Tolerance(DataStream API编程指导 -- For Java)
一、流容错
1.1 数据源和Sink的容错保证
二、重启策略
2.2 失败比率重启策略

容错(Fault Tolerance

本文翻译自StreamGuideFault Tolerance

----------------------------------------------------------

Flink的容错机制会在错误出现时恢复程序并继续执行,这些容错机制包括设备硬件失效、网络失效、临时程序失效等等。

 

Flink使用检查点机制来在流Job失效后对其进行恢复。该检查点机制需要一个可以再次请求前面数据的persistent(或durable)的数据源(Apache Kafka便是如此一个数据源的示例)。

 

检查点机制将数据源和数据sink中的进展、窗口状态以及用户定义的状态(见于Working with state)一致地(consistently)存储起来以提供exactly once的处理语义。有关检查点存储位置(如JobManager、文件系统、数据库)依赖配置的state backend

 

文档streaming fault tolerance详细描述了Flink流容错机制中的技术。

 

我们可以通过StreamExecutionEnvironment调用enableCheckPointing(n)方法来启用检查点机制,其中参数n为检查点间隔,以毫秒计。

 

有关检查点机制的其他参数包括:

·       重试次数:setNumberOfExecutionRetries()方法定义了在失效后job会重新启动多少次。在检查点机制已启用但该值没有设置时,job通常会无限次重启。

·       恰好执行一次 VS. 至少执行一次:你可以向enableCheckPointing(n)方法传递一个mode,该mode包括两个保证级别。其中恰好执行一次适用于绝大多数应用,而至少执行一次则可能更适合一些对要求执行时间极短的应用(持续要求几毫秒)。

·       并行检查点数量:默认地,系统不会再一个检查点正在进行时触发另一个检查点,这保证整个执行拓扑不会花太多时间在检查点上而导致流数据处理停滞。Flink允许多个重叠检查点的情况存在,这对与在有一定延迟的流水线并行情景中(例如由于外部调用服务需要时间响应而导致延迟),仍然想要非常频繁地运行检查点来在失效后仅需要很少量重运行的需求十分有用。

·       检查点超时:定义一个超时时间,如果运行中的检查点到该事件点仍未完成,则将它中止。

 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.
enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.
getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// checkpoints have to complete within one minute, or are discarded
env.
getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same time
env.
getCheckpointConfig().setMaxConcurrentCheckpoints(1);

 

1.1 数据源和Sink的容错保证

Flink仅在数据源参加了快照(snapshotting)机制时才可以保证在更新用户定义状态恰好执行一次。下表列出了对应绑定的connectorFlink更新状态的保证级别。有关各个connector的容错保证级别的细节,请见每个connnector的文档。

Source

Guarantees

Notes

Apache Kafka

exactly once

根据你使用的版本,选择合适的Kafkaconnector

AWS Kinesis Streams

exactly once

 

RabbitMQ

at most once (v 0.10) / exactly once (v 1.0)

 

Twitter Streaming API

at most once

 

Collections

exactly once

 

Files

exactly once

 

Sockets

at most once

 

 

为了保证端到端的恰好执行一次的数据传递(以及恰好执行一次的状态语义),数据sink需要参与检查点机制。下表Flink与绑定的sink的传递保证(假设是恰好执行一次状态更新):

Sink

Guarantees

Notes

HDFS rolling sink

exactly once

其实现依赖于Hadoop版本

Elasticsearch

at least once

 

Kafka producer

at least once

 

Cassandra sink

at least once / exactly once

仅对于幂等的(idempotent)更新是恰好执行一次的保证

AWS Kinesis Streams

at least once

 

File sinks

at least once

 

Socket sinks

at least once

 

Standard output

at least once

 

Redis sink

at least once

 

 

二、重启策略

Flink支持不同的重启策略,它们控制着job在失效情况下如何重启。集群可以用一个默认重启策略来启动,该策略总是在没有job的重启策略定义时使用。当一个拥有重启策略的job提交之后,该策略将会重写集群的默认设置。

 

默认地重启策略是通过Flink的配置文件flink-conf.yaml设置的。配置参数restart-strategy定义了启用什么策略。在每次默认情景下,会使用不重启的策略。有关改配置支持什么值,请见下面的可用重启策略表格。

 

每个重启策略都自带它们的参数集合来控制它们的行为。这些值同样在配置文件中有所设置。每个重启策略的描述包含了更多有关对应配置值的信息。

重启策略

restart-strategy的值

Fixed delay

fixed-delay

Failure rate

failure-rate

No restart

none

 

除了定义一个默认的重启策略,我们也可以为每个Flinkjob定义各自的重启策略。重启测略可以通过在ExecutionEnvironment中调用setRestartStrategy方法来设置。注意,该方法同样适用于StreamExecutionEnvironment

 

下例中展示了我们如何为我们的job的重启策略设置一个固定延迟,在该例中,失效发生时系统将尝试将job重启3次,并且每次重启尝试的间隔为10秒。

 

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.
setRestartStrategy(RestartStrategies.fixedDelayRestart(
  
3, // number of restart attempts
  Time.
of(10, TimeUnit.SECONDS) // delay
  ));

 

2.1 固定延迟重启策略

固定延迟重启策略会以一个给定的次数尝试重启job。如果超过了最大重试次数,该job将判定为最终失败。在两次连续的重启尝试之间,重启策略会等待一段固定的时间。

 

在配置文件flink-conf.yaml中设置以下参数时,该策略将会作为默认策略启用。

restart-strategy : fixed-delay

配置参数

描述

默认值

restart-strategy.fixed-delay.attempts

Number of restart attempts

1

restart-strategy.fixed-delay.delay

Delay between two consecutive restart attempts

akka.ask.timeout

restart-strategy.fixed-delay.attempts: 3

restart-strategy.fixed-delay.delay: 10s

 

固定延迟策略同样可以使用代码设置:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.
setRestartStrategy(RestartStrategies.fixedDelayRestart(
  
3, // number of restart attempts
  Time.
of(10, TimeUnit.SECONDS) // delay
  ));

 

2.1.1 重启尝试(Restart Attemps)

Flink认为job失败的重启次数是可以通过restart-strategy.fixed-delay.attempts配置的,默认值为1

 

2.1.2 重试延迟(Retry Delays)

重试的执行可以配置为有延迟的。延迟重试意味着在一次执行失败后,重新执行不会立即启动,而是要等待一个延迟后再启动。

延迟重试对于程序与外部系统交互有一定帮助,例如连接或者待定的会话需要在到达超时时间之后才可以尝试重新执行。

该值的默认值为akka.ask.timeout

 

2.2 失败比率重启策略

失败比率重启策略会在job失败后重启它,但是当failure rate(即平均每秒的失败次数)超出后,job将被判定为最终失败。在两次连续的重启尝试之间,重启策略会等待一段固定时间。

 

在配置文件flink-conf.yaml中设置以下参数时,该策略将会作为默认策略启用。

Restart-strategy: failure-rate

配置参数

描述

默认值

restart-strategy.failure-rate.max-failures-per-interval

在判定一个job彻底失败前的给定时间内最大重启次数

1

restart-strategy.failure-rate.failure-rate-interval

测量failure rate的时间区间长度

1 minute

restart-strategy.failure-rate.delay

两次重启尝试之间等待的时间

akka.ask.timeout

restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s

 

失败比率重启策略同样可以通过代码设置:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.
setRestartStrategy(RestartStrategies.failureRateRestart(
  
3, // max failures per interval
  Time.
of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
  Time.
of(10, TimeUnit.SECONDS) // delay
  ));

 

2.3 不重启策略

在该策略中,job失效将直接判定为最终失效,不会尝试重启。

restart-strategy: none

 

不重启策略同样可以通过代码设置:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.
setRestartStrategy(RestartStrategies.noRestart());