0%

Flink 高级能力 API

Flink 中提供了 4 种不同层次的 API:

  1. 低级 API:提供了对时间和状态的细粒度控制,简洁性和易用性较差,主要应用在对一些复杂事件的处理逻辑上。
  2. 核心 API:主要提供了针对流数据和离线数据的处理,对低级 API 进行了一些封装,提供了 filter、sum、max、min 等高级函数,简单且易用,所以在工作中应用比较广泛。
  3. Table API:一般与 DataSet 或者 DataStream 紧密关联,首先通过一个 DataSet 或 DataStream 创建出一个 Table;然后用类似于 filter、join 或者 select 关系型转化操作来转化为一个新的 Table 对象;最后将一个 Table 对象转回一个 DataSet 或 DataStream。与 SQL 不同的是,Table API 的查询不是一个指定的 SQL 字符串,而是调用指定的 API 方法。
  4. SQL API:Flink 的 SQL 集成是基于 Apache Calcite 的,Apache Calcite 实现了标准的 SQL,使用起来比其他 API 更加灵活,因为可以直接使用 SQL 语句。Table API 和 SQL 可以很容易地结合在一块使用,它们都返回 Table 对象。

高级能力

Flink 定义了一类提供处理函数生命周期及获取函数上下文能力的算子函数,即 RichFunction,其原型如下:

@Public  
public interface RichFunction extends Function {  
	// 自定义初始化函数
    void open(Configuration parameters) throws Exception;  
	// 自定义销毁函数
    void close() throws Exception;  
	// 上下文操作函数
    RuntimeContext getRuntimeContext();  
    
    IterationRuntimeContext getIterationRuntimeContext();  
    void setRuntimeContext(RuntimeContext t);  
}

其他算子函数实现了对应的版本,如 RichMapFunction、RichReduceFunction、RichJoinFunction 等。

在处理数据时,根据不同情况进行分流处理,如果使用 Filter 进行处理需要反复遍历整个流来处理相应数据,Side Output 提供了根据不同的 Tag 区别输出。

private static final OutputTag<AlertEvent> middleware = new OutputTag<AlertEvent>("MIDDLEWARE") {  
};  
private static final OutputTag<AlertEvent> machine = new OutputTag<AlertEvent>("MACHINE") {  
};  

然后呢,可以使用下面几种函数来处理数据,在处理数据的过程中,进行判断将不同种类型的数据存到不同的 OutputTag 中去。

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction
//dataStream 是总的数据流  
SingleOutputStreamOperator<AlertEvent, AlertEvent> outputStream = dataStream.process(new ProcessFunction<AlertEvent, AlertEvent>() {  
    @Override  
    public void processElement(AlertEvent value, Context ctx, Collector<AlertEvent> out) throws Exception {  
        if ("MACHINE".equals(value.type)) {  
            ctx.output(machine, value);  
        } else if ("MIDDLEWARE".equals(value.type)) {  
            ctx.output(middleware, value);  
        } else {  
            //其他的业务逻辑  
            out.collect(value);  
        }  
    }  
})

在将不同类型的数据进行放到不同的 OutputTag 后,可通过下面方式获取:

//机器相关的告警&恢复数据  
outputStream.getSideOutput(machine).print();  

//中间件相关的告警&恢复数据  
outputStream.getSideOutput(middleware).print();

广播变量允许编程人员在每台机器上保持一个只读的缓存变量,而不是传送变量的副本给 Task(这样会导致每台机器可能有多份副本)。广播变量创建后,它可以运行在集群中的任何 Function 上,而不需要多次传递给集群节点。另外请记住,不要修改广播变量,这样才能确保每个节点获取到的值都是一致的。

// 初始化数据
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)
	// 广播数据 
	.withBroadcastSet(toBroadcast, "broadcastSetName");
// 获取数据
Collection<Integer>broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");

注意事项:

  1. 广播变量存在于每个节点的内存中,它的数据量不能太大,因为广播出去的数据常驻内存,除非程序执行结束。
  2. 广播变量在初始化广播以后不支持修改,这样才能保证每个节点的数据都是一致的。
  3. 如果多个算子需要使用同一份数据集,那么需要在对应的多个算子后面分别注册广播变量。广播变量只能在Flink批处理程序中才可以使用。

Accumulator 即累加器,与 MapReduce 中 Counter 的应用场景差不多,都能很好地观察 Task 在运行期间的数据变化。可以在 Flink Job 的算子函数中使用累加器,但是只有在任务执行结束之后才能获得累加器的最终结果。

Counter 是一个具体的累加器实现,常用的 Counter 有 IntCounter、LongCounter 和 DoubleCounter。

// 创建累加器
private IntCounter numLines = new IntCounter();
// 注册累加器
getRuntimeContext().addAccumulator("num-lines", this.numLines);
// 使用累加器
this.numLines.add(1);
// 获取累加器结果
JobExecutionResult result = env.execute("counter")
result.getAccumulatorResult("num-lines")

Flink提供了一个分布式缓存(Distributed Cache),类似于 Hadoop,可以使用户在并行函数中很方便地读取本地文件。

此缓存的工作机制为程序注册一个文件或者目录(本地或者远程文件系统,如HDFS或者S3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。当程序执行时,Flink自动将文件或者目录复制到所有TaskManager节点的本地文件系统,用户可以通过这个指定的名称查找文件或者目录,然后从TaskManager节点的本地文件系统访问它。

// 注册文件
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
//访问数据
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");

仅支持 DataSet

Flink 批处理引擎可以借助数据处理逻辑优化应用程序的物理执行计划,如 DataSet 的哪些部分参与算子函数的计算;哪些部分未经更改地发射到下一级节点,注解就是定义这类数据处理逻辑的工具。需要注意的是,和实际不一致的注解通常会导致应用程序得出错误的计算结果,因此在没有完全清楚所有数据处理逻辑时,不建议使用注解。

下面介绍注解的三种形式:

直接转发

直接转发的输入域(Forwarded Field)是指输入 DataSet 的某个属性未经算子修改而直接输出,可以是输入输出的相同位置,也可以是不同位置。

直接转发方式有以下三种:

  1. 相同位置直接转发。注解 _2 表示输入数据的第 3 个属性被算子直接转发到输出的第 3 个属性。
  2. 不同位置直接转发。注解 _0->_2 表示输入数据的第 1 个属性的数据被直接转发到输出的第 3 个属性位置。此外,注解支持通配符 *,标识整个输入或输出,如 _0->* 表示输入数据的第 1 个属性的数据被直接转发到输出,即输出只有一个属性。
  3. 多个位置直接转发。注解 _0;_2->_1;_3->_2 表示输入数据的第 1 个属性的数据被直接转发到输出的第 1 个属性位置,输入数据的第 3 个属性的数据被直接转发到输出的第 2 个属性位置,输入数据的第 4 个属性的数据被直接转发到输出的第 3 个属性位置。

标注方式有以下两种:

// 直接标注在算子函数实现类上
@ForwardedFields("_1->_3")
class MyApp extends MapFunction ....

// 使用 withForwardedFields 标注
data.map(..).withForwardedFields("_1->_3")

非直接转发

非直接转发的输入域(Non-Forwarded Field)指输入和输出某个相同位置的数据发生了变化,且其他位置的数据均未发生变化。因此,这类注解的语义更强,错误的注解将产生更严重的数据处理错误。

// 直接标注在算子函数实现类上
@NonForwardedFields("_1")
class MyApp extends MapFunction ....

触达

达的输入域(Read Field)指输入 DataSet 的某个属性被算子读取并参与计算。这类注解的语义较强,需要列出所有触达的输入域:

@ReadFields("_1; _4")
class MyMap extends MapFunction ....
{
	if (value._1 == 42) {
		return (value._1, value._2)
	} else {
		return (value._4 + 10, value._2)
	}
}