本文针对 Flink 的架构进行分析
DataFlow 模型
任何一种有广泛实用价值的方法都必须提供简单、强大的工具,可以为具体的使用案例平衡数据的准确性、延迟程度和处理成本。最后,Google 提出了一个统一的模型 —— DataFlow。DataFlow 能够对无界、无序的数据源按数据本身的特征进行窗口计算,得到基于事件发生时间的有序结果,并能在准确性、延迟程度和处理成本之间取得平衡。
分离数据处理的计算逻辑及对逻辑的物理实现,使得系统对批处理、微批处理、流计算引擎的选择简化为对准确性、延迟程度和处理成本的选择。为解决以上问题,DataFlow 具备以下重要概念。
1. 无界、有界与流处理、批处理
在描述无限和有限数据集时,人们更愿意使用无界和有界这样的描述,而不是流处理数据和批处理数据,这是因为流处理和批处理意味着使用特定的执行引擎。如下图所示,在现实场景中,无界数据集可以通过批处理系统反复调度来处理,而设计良好的流处理系统也可以完美地处理有界数据集。从这个角度来看,区分流处理和批处理的实际意义不大,这为后来 Flink 批流一体架构提供了理论基础。
2. 窗口计算
DataFlow 提供了 3 种窗口计算类型,支持窗口把一个数据集切分为有限的数据片,以便于聚合处理。对于无界的数据,有些操作需要窗口,以定义大多数聚合操作需要的边界;另一些则不需要窗口(如过滤、映射、内链接等)。对于有界的数据,窗口是可选的,不过很多情况下仍然是一种有效的语义概念。
- 固定窗口(fixed):按固定窗口大小定义,如小时窗口或天窗口。固定窗口一般都是对齐窗口,也就是说,每个窗口包含对应时间范围内的所有数据。有时为了把窗口计算的负荷均匀分摊到整个时间范围内,会在窗口边界时间加上一个随机数,这样窗口就变成了不对齐窗口。
- 滑动窗口(sliding):按窗口大小和滑动周期定义,如小时窗口,每一分钟滑动一次。滑动周期一般小于窗口,也就是窗口有相互重合之处。滑动窗口一般也是对齐的。固定窗口可以看作滑动窗口的一个特例,即窗口大小和滑动周期大小相等。
- 会话窗口(session):会话是在数据的子集上捕捉一段时间内的活动。一般来说,会话窗口按超时时间定义,任何发生在超时时间以内的事件都被认为属于同一个会话。会话窗口是非对齐窗口,在上图中,窗口 2 只包含 Key 1,窗口 3 只包含 Key 2,而窗口 1 和 4 都包含了 Key 3。假设 Key 是用户 ID,两次活动之间的间隔超过了超时时间,则系统需要重新定义一个会话窗口。
3. 时间域与水位线机制
如下图所示,将时间域分为两种类型,即事件时间(event time)和处理时间(processing time),其中事件时间指事件发生时的系统时间;处理时间指数据处理管道在处理数据时,一个事件被数据处理系统观察到的时间,即数据处理系统的时间。
事件时间和处理时间的主要区别在于,事件时间是永远不变的,而事件的处理时间会随着事件在数据管道中被处理而变化。在数据处理过程中,因为系统本身受到一些现实影响(通信延迟、调度算法、处理时长、管道中间数据序列化等),所以会导致这两个时间概念存在差值且动态波动。借助全局数据处理进度的标记或水位线(Watermark),可以有效处理迟到乱序的事件,得到正确的数据处理结果。
分布式异步快照算法
基于 DataFlow 模型实现的计算框架虽然能够进行大规模无界乱序数据处理并平衡好准确性、延迟程度和处理成本三者之间的关系,但在数据处理过程中,保障数据一致性同样重要,尤其对于一些数据处理要求比较高的场景。在 Flink 中,通过 checkpoint 机制可以保证数据的一致性。开启 checkpoint 为 Exactly-Once 模式时,能够保证数据不重复或不丢失。Flink 中的 checkpoint 机制由 Chandy 和 Lamport 两位科学家提出。Chandy-Lamport 算法通过抽象分布式系统模型描述了一种简单、直接但是非常有效的分布式快照算法。
Chandy-Lamport 算法设计
分布式异步快照算法应用到流式系统中就是确定一个全局快照(global snapshot),当系统出现错误时,将各个节点根据上一次的全局快照恢复整个系统。这里的全局快照我们也可以理解为全局状态(global state)。全局状态在系统进行故障排除(failure recovery)的时候非常有用,它也是分布式计算系统中容错处理的理论基础。对于分布式系统来讲,想要获取全局状态,需要面临如下挑战与问题。
- 进程节点只能记录各自的状态,即本地状态信息,通过网络传递信息,形成各个进程之间的全局状态。
- 所有的进程不可能在同一时间立即精确记录各自的状态,除非它们能够获取相同的时钟,但显然各节点时钟不可能完全一致。对于普通的机器来讲,晶体振动频率是有偏差的,不存在完全同步的可能性。
- 同时做到全局状态过程中持续数据计算,对于 STW(Stop The World,暂停当前所有运行的线程)的做法是没有意义的。
Chandy-Lamport 算法是如何解决上述问题的呢?为了定义分布式系统的全局状态,首先将分布式系统简化成有限个进程与进程的组合,也就是有向无环图,其中节点是进程,边是 channel,并且这些进程运行在不同的物理机器上。分布式系统的全局状态由进程的状态和 channel 中的信息(message)组成,而这些信息也是分布式异步快照算法需要记录的。下图所示为 Chandy-Lamport 算法示意图,从中可以看出,整个分布式系统的全局状态包括如下 3 个过程。
- 系统中的任意一个进程发起创建快照操作
- 进程 P1 发起快照操作,记录进程 P1 的状态,同时生产一个标识信息 marker。注意这里的 marker 和进程之间通信的信息不同。
- 将 marker 信息通过 output channel 发送给系统的其他进程,上图中是进程 P2
- P1 开始记录所有 input channel 接收到的信息并写入 M1 存储。
- 系统中其他进程开始逐个创建 snapshot 操作
- 进程 P2 通过 input channel C12 接收 P1 发送的 marker 信息。
- 如果 P2 没有记录自己的进程状态,则记录当前进程状态(上图中用深色框表示),同时将 channel C12 置为空,并向 output channel 发送 marker 信息;否则,记录其他 channel 在收到 marker 之前从 input channel 收到的所有信息。
- 终止并完成当前的快照操作
在所有进程都收到 marker 信息并记录自己的状态和 channel 消息后,终止整个 snapshot 过程。此时分布式系统本次的 snapshot 操作结束,等待下一次触发和执行。
异步屏障快照(Asynchronous Barrier Snapshotting,ABS)算法改进
2015年,Flink 官方发布了一篇名为 “Lightweight Asynchronous Snapshots for Distributed Dataflows” 的论文,旨在改进 Chandy-Lamport 分布式异步快照算法。该论文主要对 Chandy-Lamport 算法进行了以下两个方面的改进。
- 在 Chandy-Lamport 算法中,为了实现全局状态一致,需要停止流处理程序,直到快照完成,这会对系统性能有非常大的影响。
- 每次快照的内容包含传输过程中所有的内容,导致每次快照的数据量过大,进而影响系统的整体性能。
可以看出,Chandy-Lamport 算法虽然能够实现全局状态一致,但或多或少牺牲了程序的性能,因此不太适合在工程上实现。异步屏障快照算法对其进行了改造,并应用在 Flink 项目中,其核心思想是在 input source 节点插入 barrier 事件,替代 Chandy-Lamport 算法中的 marker,通过控制 barrier 事件同步实现快照备份,最终实现 Exactly-Once 语义。
ABS 算法是 Chandy-Lamport 算法的变体,只是在执行上有些差别。Flink 的论文分别针对有向无环和有向有环两种计算拓扑图提出了不同的算法,后者是在前者基础上进行的修改。在实际应用,尤其是在 Flink 系统中,大多数数据流拓扑都是有向无环图。下图所示为 DAG 的 ABS 算法执行流程,具体说明如下。
- barrier 事件被周期性地注入所有源节点,源节点接收到 barrier 后会立即对自己的状态进行快照操作,然后将 barrier 事件发送到下游的 operator 节点。
- 下游的 Transformation Operator 从上游某个 input channel 接收到 barrier 事件后,会立刻阻塞通道,直到接收到所有上游算子对应的 input channel 发送的 barrier 事件。这实际上是 barrier 事件的对齐过程,operator 节点完成 barrier 对齐操作后,会对当前算子的状态进行快照操作,并向所有下游的节点广播 barrier 事件。
- Sink Operator 接收到barrier事件后,也会进行 barrier 对齐操作。在所有 input channel 中的 barrier 事件全部到达 Sink 节点后,Sink 节点会对自己的状态进行快照操作。Sink 节点完成快照操作标志着完成一次系统全局快照,即完成本次 checkpoint 操作。
ABS 算法对 Flink 中的 checkpoint 操作进行了系统性的描述,且在 Flink 项目中已经有成熟的落地实现。Chandy-Lamport 算法相对比较理想化,未考虑在工业落地时全局状态获取过程中的性能问题,而 ABS 算法实际上是对 Chandy-Lamport 算法在工业项目中落地实现的补充和优化。
Flink 架构
Flink 系统架构主要分为 Libraries & APIs、Core 和 Deploy 三层。
Libraries & APIs 编程接口
Libraries 层也被称作 Flink 应用组件层,是根据 API 层的划分,在 API 层之上构建满足了特定应用领域的计算框架,分别对应了面向流处理和面向批处理两类,其中面向流处理支持 CEP(复杂事件处理)、基于类似 SQL 的操作(基于 Table 的关系操作);面向批处理支持 Flink ML(机器学习库)、Gelly(图处理)。
APIs 层主要实现了面向流处理对应的 DataStream API,面向批处理对应的 DataSet API。
Flink 提供了四层抽象的编程接口:
- High-level Language(高级语言):SQL
- Declarative DSL(定义式编程):Table API
- Core API(核心接口):DataStream API / DataSet API
- Low-level API(流、状态、时间):Statful Stream Processing
Core 运行时执行引擎
Core 层提供了 Flink 计算的全部核心实现,例如支持分布式 Stream 作业执行、JobGraph 到 ExecutionGraph 的映射和调度等,为 API 层提供了基础服务。
用户使用组件栈和接口编写的 Flink 作业最终都会在客户端转换成 JobGraph 对象,然后提交到集群中运行。除了任务的提交和运行之外,运行时还包含资源管理器 ResourceManager 以及负责接收和执行 Task 的 TaskManager,这些服务各司其职,相互合作。运行时提供了不同类型(有界和无界)作业的执行和调度功能,最终将任务拆解成Task执行和调度。同时,运行时兼容了不同类型的集群资源管理器,可以提供不同的部署方式,并统一管理Slot计算资源。第3章将会重点讲解运行时中各个组件的功能及组件之间如何协调。
Deploy 物理部署层
Deploy 层支持多种部署模式,包括本地、集群(Standalone、YARN、Kubernetes)及云部署(GCE/EC2)。
Flink 集群架构
Flink 集群主要包含3部分:JobManager、TaskManager 和客户端,三者均为独立的 JVM 进程。Flink 集群启动后,会至少启动一个 JobManager 和多个 TaskManager。客户端将任务提交到 JobManager,JobManager 再将任务拆分成 Task 并调度到各个 TaskManager 中执行,最后 TaskManager 将 Task 执行的情况汇报给 JobManager。
客户端是 Flink 专门用于提交任务的客户端实现,可以运行在任何设备上,并且兼容 Windows、macOS、Linux 等操作系统,只需要运行环境与 JobManager 之间保持网络畅通即可。客户端会在内部运行提交的作业,然后基于作业的代码逻辑构建 JobGraph 结构,最终将 JobGraph 提交到运行时中运行。JobGraph 是客户端和集群运行时之间约定的统一抽象数据结构,也就是说,不管是什么类型的作业,都会通过客户端将提交的应用程序构建成 JobGraph 结构,最后提交到集群上运行。
JobManager 是整个集群的管理节点,负责接收和执行来自客户端提交的 JobGraph。JobManager 也会负责整个任务的 Checkpoint 协调工作,内部负责协调和调度提交的任务,并将 JobGraph 转换为 ExecutionGraph 结构,然后通过调度器调度并执行 ExecutionGraph 的节点。ExecutionGraph 中的 ExecutionVertex 节点会以Task的形式在TaskManager中执行。
除了对 Job 的调度和管理之外,JobManager 会对整个集群的计算资源进行统一管理,所有 TaskManager 的计算资源都会注册到 JobManager 节点中,然后分配给不同的任务使用。当然,JobManager 还具备非常多的功能,例如 Checkpoint 的触发和协调等。
TaskManager 作为整个集群的工作节点,主要作用是向集群提供计算资源,每个 TaskManager 都包含一定数量的内存、CPU 等计算资源。这些计算资源会被封装成 Slot 资源卡槽,然后通过主节点中的 ResourceManager 组件进行统一协调和管理,而任务中并行的 Task 会被分配到 Slot 计算资源中。根据底层集群资源管理器的不同,TaskManager 的启动方式及资源管理形式也会有所不同。例如,在基于 Standalone 模式的集群中,所有的 TaskManager 都是按照固定数量启动的;而 YARN、Kubernetes 等资源管理器上创建的 Flink 集群则支持按需动态启动 TaskManager 节点。