在实际的流处理场景下,Operation(也叫算子)可根据 State 分为两类:
- 无状态 Operation:一次只处理一个独立的 Event,
- 有状态 Operation:需要合并/引用多个 Events 并加以处理
Flink 应用的可用性及扩展性,都建立在围绕 State 的特性当中。在实际的使用场景中,通常会通过 Checkpoints 和 Savepoints 来实现容错(Fault Tolerant),配置 StateBackends 满足对 State 持久化的需要。此外,Flink 还支持跨并行实例重分配 State,以便进行横向扩展 。
Keyed State
Keyed State 是通过键值对(embedded key/value store)的形式存储,并且根据 Key 进行分片的 State 与被有状态 Operation 读取的数据流严格的分布在一起。资源对齐(alignment)的好处显而易见,一方面,全部的状态更新均在 Local Operations,无需事务来保证数据一致性;另一方面,允许 Flink 透明地调整 Stream 的分片,同时重分配 State。
Keyed State 被进一步定义为 Key Groups。其是 Flink 操作重分配 Keyed State 的最小单元。在执行期间,一个 Keyed Operator 对应的并行实例都会依赖一个或多个 Key Groups,而 Key Groups 数量与定义的最大并行度相同(maximum parallelism)。
State Persistence
Flink 通过流重放(Stream Replay)和检查点(Checkpointing)实现容错。
检查点(Checkpointing)标记了每一个输入流(Input Stream)中的特定点以及每个算子(Operators)的特定状态。通过 Checkpoints,Flink 可通过恢复 Operator 的 State 以及重放(Replay)Checkpoints 中的记录来维持数据的一致性(仅处理一次的语义)。
检查点(Checkpointing)间隔是一种权衡容错机制开销的方法,其影响到系统由故障到恢复的时间,而时间的长短主要由重放的数据记录条数决定。
容错机制(The fault tolerance mechanism)需要不断地绘制分布式数据流的快照,快照可根据需要自定义存储位置。
如果发生程序故障(由于机器、网络或软件故障),Flink 会暂停处理数据流,重置算子(Operators)到最新的检查点(Checkpointing);重置输入流到状态快照点。任何数据流都不会影响已经生成的检查点状态。
为了使该机制实现其完全保证,数据流源(例如消息队列或代理)需要能够将流倒回到定义的最近点。Apache Kafka 具有这种能力,Flink 的 Kafka 连接器利用了这一点。
Checkpointing
Flink 容错机制的核心是绘制分布式数据流(Data Stream)和算子状态(Operator State)的快照。
论文描述:Lightweight Asynchronous Snapshots for Distributed Dataflows
理论基础:Chandy-Lamport algorithm
注意:与检查点(Checkpoints)相关的所有操作都可以异步完成。检查点可以在对齐和非对齐的情况下进行。
Barries
流屏障(Stream Barries)是 Flink 分布式快照中的核心概念之一。这些屏障被注入数据流中(Data Stream)并且随着其中的数据,遵循严格线性的规则顺序流动。屏障将数据流分隔为块,每一块数据均被记录为一个快照,每个屏障都携带了推送到它前面的记录快照的 ID。来自不同快照的多个屏障可以同时在流中,这意味着各种快照可能并发发生。
流屏障(Stream Barries)通过数据流源(Data Stream Sources)注入到并发的数据中,快照 n 的屏障被注入的点(我们称之为 Sn)是源流(Source Stream)中快照覆盖数据的位置。例如,在 Apache Kafka 中,这个位置将是分区中最后一条记录的偏移量。需要注意的是,快照 n 的屏障会在并行 Data Stream 的每个流中存在,且只会存在于快照 n 包含的数据记录之后。
Aligned Checkpointing
随后,屏障会向下游流动。当一个中间算子(Intermediate Operator)从其所有输入流中接收到针对快照 n 的屏障时,它会向所有的输出流中发出一个针对快照 n 的屏障。一旦 Sink Operator(流处理 DAG 的结尾)从所有输入流中接收到屏障 n,它就向检查点协调器(Checkpoint Coordinator)确认快照 n。在所有 Sink Operator 都确认了快照之后,就认为快照已经完成。
一旦快照 n 完成,作业将不再向源请求 Sn 之前的记录,因为在那时这些记录(及其后代记录)将通过整个数据流拓扑。
Operators 如果接收多于一个的输入流需要对齐输入流的快照屏障。上述图标说明了这个:
- 当 Operator 从某一个输入流(incoming stream)接收到了 Barrier n 后,它必须得等待其他所有的输入流都接收 Barrier n 后才能继续处理 Barrier n+1 的数据。
- 一旦最后一个输入流接收到了 Barrier n,Operator 会发出全部 pending 的输出记录,然后发出快照 Barrier n 本身。
它对 State 进行快照,并从所有输入流中恢复处理记录,在处理来自流的记录之前处理来自输入缓冲区的记录。
最后,Operator 会异步将 State 写入 State Backend。
Snapshotting Operator State
当 Operator 包含任何形式的 State,必须建立相应的快照。
Operator 会在从全部输入流接收到所有 Snapshot Barriers 后,并且在向输出流生成 Barriers 前,绘制 State 的快照。此时,全部的更新基于快照屏障前的记录所得,且快照屏障后的记录不会被 Operator 处理导致数据更新。因为快照可能比较大,因此它被储存在一个可配置的 State Backend,默认储存在 JobManager 的内存中。在 State 被存储后,Operator 确认 Checkpoint,随后将快照屏障发射进输出流并且继续处理后续内容。
生成的快照包含:
- 对于每个parallel stream data source,快照启动时流中的偏移量/位置
- 对于每个 Operator,指向作为快照一部分存储的状态的指针
Recovery
这种机制下,Recovery 很简单:失败后,Flink 选择最新的 Checkpoint k。系统随后重新部署整个分布式数据流,并将 Checkpoint k 的快照状态同步给每一个 Operator,然后数据源被设置到 Sk 对应的流读取位置。以 Kafka 为例,这意味着让 Consumer 从 Sk 偏移量拉取数据。
如果 State 是增量快照,Operator 会从最新的一个完整快照开始,然后对该状态应用一系列增量快照更新。
Unaligned Checkpointing
Checkpoint 也可以在不对齐的情况下执行。Operator 会持续处理全部的输入流,即使在 Checkpoint n 到达一部分 Checkpoint Barrier 也不会影响。这样,即使在处理 checkpoint n 的 State 快照之前也可能处理 checkpoint n+1 的数据。
注意,这种方式与 Chandy-Lamport algorithm 更加接近,但 Flink 仍然向数据源中插入了 Barriers 避免 Checkpoint Coordinator 过载。
上述描述了 Operator 处理 Unaligned Checkpoint Barries 的过程:
- Operator 对存储在其输入缓冲区中的第一个 Barrier 作出反应
- 通过将 Barrier 添加到输出缓冲区的末尾,它立即将 Barrier 转发给下游 Operator
- Operator 将所有已超越的记录标记为异步存储,并创建自己状态的快照
因此,Operator 只是短暂的停止处理,去标记 Buffer、转发 Barrier 以及创建其他 State 的快照。
Unaligned checkpointing 保证 Barrier 尽可能快地到达 Sink。它特别适合对齐数据时长达到小时级的程序(数据处理时间分布不均导致的某条路径处理时间远远超过其他路径)。然而,由于它增加了额外的 I/O 压力,如果 State Backend 的 I/O 到达瓶颈的时候,它并不能起到优化的作用。
注意:保存点总是对齐的
Alignment happens only for operators with multiple predecessors (joins) as well as operators with multiple senders (after a stream repartitioning/shuffle). Because of that, dataflows with only embarrassingly parallel streaming operations (map()
, flatMap()
, filter()
, …) actually give exactly once guarantees even in at least once mode.
Unaligned Recovery
在 unaligned checkpointing 中,未对齐的记录以副本的形式存在(它们包含在 checkpoint n 的 State 快照中)。Operators 会首先恢复 in-flight 数据,随后开始重放未对齐的记录,最后继续处理上游 Operator 传递下来的数据。除此之外,它和 aligned checkpoints 操作的 recovery 步骤相同。
State Backends
KV 索引的存储依赖选择的 State Backend。除了定义数据结构保存 State,State Backend 也实现了获取某时间点内 KV 格式的 State,并把它们作为快照的一部分进行存储。
Savepoints
所有使用 Checkpoint 的程序都可以通过 Savepoint 恢复执行。Savepoint 允许在不丢失任何状态的条件下更新程序和集群。
Savepoint 是手动触发的 Checkpoint,它获取程序的快照并将其写入 State Backend,这些都依赖 Checkpoint 机制。Savepoint 和 Checkpoint 类似,但 Savepoint 是用户手动触发的,并且在新的快照生成前不能自动过期。
State and Fault Tolerance in Batch Programs
Flink 在 batch ExecutionMode 下执行批处理程序作为流程序的特殊情况,其中流是有界的(有限数量的元素)。因此,上述概念以同样的方式适用于批处理程序,也适用于流媒体程序,只有少数例外:
- 批处理程序 Fault Tolerance 不使用检查点。恢复是通过完全重放流来实现的。因为输入是有界的,这将更多的成本推到 Recovery 上。由于没使用检查点,使得常规处理成本非常低。
- 批处理模式下的状态后端使用简化的内存/核外数据结构,而不是键/值索引。