0%

Flink SQL&Table API

Flink 针对标准的流处理和批处理提供了两种关系型 API:Table API 和 SQL。Table API 允许用户以一种很直观的方式进行 select、filter 和 join 操作;Flink SQL 支持基于 Apache Calcite 实现的标准 SQL。针对批处理和流处理可以提供相同的处理语义和结果。

如果想使用 Table API 及 SQL,需要添加 flink-table Maven 依赖。

TableEnvironment

在使用 Table API 和 SQL 之前,需要创建 TableEnvironment。且一个查询中只能绑定一个指定的 TableEnvironment。

//流数据查询 
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv); 

//批数据查询 
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment(); 
BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);

TableSource

//创建一个TableSource
TableSource csvSource = new CsvTableSource("/path/to/file", ...); 
//注册一个TableSource,称为CvsTable 
tableEnv.registerTableSource("CsvTable", csvSource);

TableSink

//创建一个TableSink 
TableSink csvSink = new CsvTableSink("/path/to/file", ...); 
//定义字段名称和类型 
String[] fieldNames = {"a", "b", "c"}; 
TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG}; 
//注册一个TableSink,称为CsvSinkTable 
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);

Table

使用 Table API

//注册一个Orders表 ... 
//通过scan操作获取到一个Table对象 
Table orders = tableEnv.scan("Orders"); 
//计算所有来自法国的收入 
Table revenue = orders
	.filter("cCountry === 'FRANCE'")
	.groupBy("cID, cName")
	.select("cID, cName, revenue.sum AS revSum");

使用 SQL

//注册一个Orders表 ... 
//计算所有来自法国的收入 
Table revenue = tableEnv.sqlQuery( 
"SELECT cID, cName, SUM(revenue) AS revSum " + 
"FROM Orders " + "WHERE cCountry = 'FRANCE' " + "GROUP BY cID, cName" );

DataStream、DataSet 和 Table 之间的转换

转换为 Table

1. 把 DataStream 或 DataSet 注册为 Table 对象

//获取DataStream 
DataStream<Tuple2<Long, String>> stream = ... 
//把DataStream注册为Table,称为myTable,表中的字段为f0,f1 
tableEnv.registerDataStream("myTable", stream); 
//在注册Table的时候也可以手工指定字段的名称 
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");

注意:DataStream 程序的表名不能满足规则 ^_DataStreamTable_[0-9]+,DataSet 程序的表名不能满足规则 ^_DataSetTable_[0-9]+,这些规则的名字是内部使用的。

2. 将 DataStream 或 DataSet 直接转化为 Table 对象

//获取DataStream 
DataStream<Tuple2<Long, String>> stream = ... 
//把DataStream转化为Table,使用默认的字段名称f0,f1 
Table table1 = tableEnv.fromDataStream(stream); 
//把DataStream转化为Table,使用指定的字段名称"myLong", "myString" 
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");

转换为 Data

Table 对象转换为 DataStream 或 DataSet 的时候,需要指定 DataStream 或者 DataSet 中数据的类型。

数据类型 解释
Row 通过角标映射字段,支持任意数量的字段,并且支持null值和非类型安全的访问。
POJO Java中的实体类,这个实体类中的字段名称需要和Table中的字段名称保持一致,支持任意数量的字段,支持null值,类型安全的访问。
Case Class 通过角标映射字段,不支持null值,类型安全的访问.
Tuple 通过角标映射字段,Scala中限制22个字段,Java中限制25个字段,不支持null值,类型安全的访问.
Atomic Type Table必须要有一个字段,不支持null值,类型安全的访问。

1. 把 Table 对象转换为 DataStream

流式查询的结果 Table 会被动态地更新,即每个新的记录到达输入流时结果就会发生变化。因此,转换此动态查询的 DataStream 需要对表的更新进行编码。

有两种模式可以将 Table 转换为 DataStream:

  1. Append Mode:这种模式只适用于当动态表仅由 INSERT 更改修改时(仅附加),之前添加的数据不会被更新。
  2. Retract Mode:可以始终使用此模式,它使用一个 Boolean 标识来编码 INSERT 和 DELETE 更改。
//Table中有两个字段(String name, Integer age) 
Table table = ... 
//把Table中的数据转成DataStream<Row> 
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class); 
//或者把Table中的数据转成 DataStream<Tuple2> 
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT()); 
DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType); 
//将Table转化成Retract形式的DataStream<Row> 
//一个Retract Stream的类型X为DataStream<Tuple2<Boolean, X>> 
//Boolean字段指定了更改的类型 
//true表示INSERT,false表示DELETE 
DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);

2. 把 Table 对象转换为 DataSet

//Table中有两个字段(String name, Integer age) 
Table table = ... 
//把Table中的数据转成DataSet<Row> 
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class); 
//或者把Table中的数据转成DataSet<Tuple2> 
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT()); 
DataSet<Tuple2<String, Integer>> dsTuple = tableEnv.toDataSet(table, tupleType);