Flink 针对标准的流处理和批处理提供了两种关系型 API:Table API 和 SQL。Table API 允许用户以一种很直观的方式进行 select、filter 和 join 操作;Flink SQL 支持基于 Apache Calcite 实现的标准 SQL。针对批处理和流处理可以提供相同的处理语义和结果。
如果想使用 Table API 及 SQL,需要添加 flink-table Maven 依赖。
TableEnvironment
在使用 Table API 和 SQL 之前,需要创建 TableEnvironment。且一个查询中只能绑定一个指定的 TableEnvironment。
//流数据查询
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv);
//批数据查询
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
TableSource
//创建一个TableSource
TableSource csvSource = new CsvTableSource("/path/to/file", ...);
//注册一个TableSource,称为CvsTable
tableEnv.registerTableSource("CsvTable", csvSource);
TableSink
//创建一个TableSink
TableSink csvSink = new CsvTableSink("/path/to/file", ...);
//定义字段名称和类型
String[] fieldNames = {"a", "b", "c"};
TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
//注册一个TableSink,称为CsvSinkTable
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
Table
使用 Table API
//注册一个Orders表 ...
//通过scan操作获取到一个Table对象
Table orders = tableEnv.scan("Orders");
//计算所有来自法国的收入
Table revenue = orders
.filter("cCountry === 'FRANCE'")
.groupBy("cID, cName")
.select("cID, cName, revenue.sum AS revSum");
使用 SQL
//注册一个Orders表 ...
//计算所有来自法国的收入
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " + "WHERE cCountry = 'FRANCE' " + "GROUP BY cID, cName" );
DataStream、DataSet 和 Table 之间的转换
转换为 Table
1. 把 DataStream 或 DataSet 注册为 Table 对象
//获取DataStream
DataStream<Tuple2<Long, String>> stream = ...
//把DataStream注册为Table,称为myTable,表中的字段为f0,f1
tableEnv.registerDataStream("myTable", stream);
//在注册Table的时候也可以手工指定字段的名称
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
注意:DataStream 程序的表名不能满足规则
^_DataStreamTable_[0-9]+
,DataSet 程序的表名不能满足规则^_DataSetTable_[0-9]+
,这些规则的名字是内部使用的。
2. 将 DataStream 或 DataSet 直接转化为 Table 对象
//获取DataStream
DataStream<Tuple2<Long, String>> stream = ...
//把DataStream转化为Table,使用默认的字段名称f0,f1
Table table1 = tableEnv.fromDataStream(stream);
//把DataStream转化为Table,使用指定的字段名称"myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
转换为 Data
Table 对象转换为 DataStream 或 DataSet 的时候,需要指定 DataStream 或者 DataSet 中数据的类型。
数据类型 | 解释 |
---|---|
Row | 通过角标映射字段,支持任意数量的字段,并且支持null值和非类型安全的访问。 |
POJO | Java中的实体类,这个实体类中的字段名称需要和Table中的字段名称保持一致,支持任意数量的字段,支持null值,类型安全的访问。 |
Case Class | 通过角标映射字段,不支持null值,类型安全的访问. |
Tuple | 通过角标映射字段,Scala中限制22个字段,Java中限制25个字段,不支持null值,类型安全的访问. |
Atomic Type | Table必须要有一个字段,不支持null值,类型安全的访问。 |
1. 把 Table 对象转换为 DataStream
流式查询的结果 Table 会被动态地更新,即每个新的记录到达输入流时结果就会发生变化。因此,转换此动态查询的 DataStream 需要对表的更新进行编码。
有两种模式可以将 Table 转换为 DataStream:
- Append Mode:这种模式只适用于当动态表仅由 INSERT 更改修改时(仅附加),之前添加的数据不会被更新。
- Retract Mode:可以始终使用此模式,它使用一个 Boolean 标识来编码 INSERT 和 DELETE 更改。
//Table中有两个字段(String name, Integer age)
Table table = ...
//把Table中的数据转成DataStream<Row>
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
//或者把Table中的数据转成 DataStream<Tuple2>
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType);
//将Table转化成Retract形式的DataStream<Row>
//一个Retract Stream的类型X为DataStream<Tuple2<Boolean, X>>
//Boolean字段指定了更改的类型
//true表示INSERT,false表示DELETE
DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
2. 把 Table 对象转换为 DataSet
//Table中有两个字段(String name, Integer age)
Table table = ...
//把Table中的数据转成DataSet<Row>
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
//或者把Table中的数据转成DataSet<Tuple2>
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple = tableEnv.toDataSet(table, tupleType);