状态容错 State Fault Tolerance
首先来说一说状态容错。Flink 支持有状态的计算,可以把数据流的结果一直维持在内存(或 disk)中,比如累加一个点击数,如果某一时刻计算程序挂掉了,如何保证下次重启的时候,重新恢复计算的数据可以从状态中恢复,并且每条数据只被计算了一次呢?
从数据的流入到计算流出,整个过程看成事务的话,就是如何保证整个过程具有原子性。
Flink 是怎么做的呢?只靠状态本身是远远不够的,状态只是保存了某个值,还需要保存一个计算的位置。
如果是单机的情况下,这个很好实现。
假设来自 Kafka 的数据流,经过应用逻辑的计算,生成状态保存到 state 中,这个过程是源源不断的,如图所示,为了保证state的容错性,程序会周期性的保存数据消费的位置和该时刻的状态,叫做快照,如果程序有异常需要重启的时候,就会从快照中恢复。这个过程保证了精准一次的计算,一条数据只会被计算一次。
分布式环境下没有这个简单,众所周知,任何问题到了分布式环境下,就变得复杂。
Flink 是如何做到状态分布式容错的呢?如何在不中断计算的情况下产生快照呢?
如图,Flink 会在数据流中插入 checkpoint barrier n ,他们会随着数据的流向流入下游的算子,首先记录开始的位置,然后每经过一个算子就记录该算子计算之后的状态,直至结束。
上图只一个静态图,下面我将演示整个过程。
第一步,记录数据开始计算的位置
第二步,记录各个算子的名称以及 state
以此类推到整个 DAG 的结束。(DAG 是有向无环图)
整个过程 checkpoint barrier 会同时存在多个,也就是数据流中插入的多个 checkpoint barrier ,当算子遇到它的时候,就会发生 checkpoint。
只有就会在不中断计算的情况下,生成全局一致的快照。
应用
默认情况下,检查点不被保留,只用于从失败中恢复作业。当程序取消时,它们将被删除。但是,您可以配置要保留的定期检查点。根据配置,这些保留的检查点不会在作业失败或取消时自动清除。这样,如果您的工作失败,您将有一个检查点可以从中恢复。
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
配置 checkpoint 目录
state.checkpoints.dir: hdfs:///checkpoints/
或者给每个 job 配置目录:
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/"));
虽然检查点可以用来容错,但是一般我们不用检查点来恢复程序,如果修改了任务的逻辑或其他原因导致程序需要重启,那就需要用到 savepoint 。