一条stream sql从提交到calcite解析、优化最后到flink引擎执行,一般分为以下几个阶段:
1 | 1. Sql Parser: 将sql语句通过java cc解析成AST(语法树),在calcite中用SqlNode表示AST; |
而如果是通过table api来提交任务的话,也会经过calcite优化等阶段,基本流程和直接运行sql类似:
1 | 1. table api parser: flink会把table api表达的计算逻辑也表示成一颗树,用treeNode去表式; |
1 |
|
一行行代码来分析
1 | Table in = tableEnv.fromDataStream(tuple2DataStreamSource, "a,b"); |
将dataStream转变为table的过程1
2
3
4
5
6
7
8
9
public <T> Table fromDataStream(DataStream<T> dataStream, String fields) {
List<Expression> expressions = ExpressionParser.parseExpressionList(fields);
JavaDataStreamQueryOperation<T> queryOperation = asQueryOperation(
dataStream,
Optional.of(expressions));
return createTable(queryOperation);
}
1 | ExpressionParser.parseExpressionList(fields); |
将输入的字段名称,提取出来。
asQueryOperation返回的是JavaDataStreamQueryOperation,这个类用来描述DataSteam,对应的数据的索引和Tableschema(描述数据的字段名称、字段位置、字段类型)
asQueryOperation内进行了如下操作
- 将字段和index还有数据类型对应,得到 a,0,String。 b,1,Int
- 校验是否有是eventTime的流,如果是需要开启eventTime的相关配置
- 返回JavaDataStreamQueryOperation
然后获取到带有schema的DataStream,用于创建table 。1
createTable(queryOperation)
进去到里头,到了这层,将带有schema的DataStream转为了Table。先不细究里头的成员变量是什么用的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19protected TableImpl createTable(QueryOperation tableOperation) {
return TableImpl.createTable(
this,
tableOperation,
operationTreeBuilder,
functionCatalog);
}
public static TableImpl createTable(
TableEnvironment tableEnvironment,
QueryOperation operationTree,
OperationTreeBuilder operationTreeBuilder,
FunctionLookup functionLookup) {
return new TableImpl(
tableEnvironment,
operationTree,
operationTreeBuilder,
new LookupCallResolver(functionLookup));
}
回到我们的代码1
tableEnv.registerTable("MyTable", in);
其中重要的部分是两个
1 | CatalogBaseTable tableTable = new QueryOperationCatalogView(table.getQueryOperation()); |
CatalogBaseTable是一个table的视图,以一个map维护table的键值对属性
在registerTableInternam中的主要注册table的方法是1
2
3
4catalog.get().createTable(
path,
table,
false);
这个Catalog是注册完成后数据库与数据表的原信息则存储在CataLog中。CataLog中保存了所有的表结构信息、数据目录信息等。
Catalog.createTable有两个具体的实现,一个是hive,是memory。
GenericInMemoryCatalog 将所有元数据存储在内存中,而 HiveCatalog 则通过 HiveShim 连接 Hive Metastore 的实例,提供元数据持久化的能力。通过 HiveCatalog,可以访问到 Hive 中管理的所有表,从而在 Batch 模式下使用。另外,通过 HiveCatalog 也可以使用 Hive 中的定义的 UDF,Flink SQL 提供了对于 Hive UDF 的支持。
至此,就把数据注册到catalog中。接下来就是对sql的解析了
1 | String sqlQuery = "SELECT a,sum(b) FROM MyTable group by a"; |
TableEnvironmentImpl.sqlQuery中List
1 |
|
SQL 的解析在 PlannerBase.parse() 中实现:
- 首先使用 Calcite 的解析出抽象语法树 SqlNode
- 然后结合元数据对 SQL 语句进行验证
- 将 SqlNode 转换为 RelNode
- 并最终封装为 Flink 内部对查询操作的抽象 QueryOperation。
1 | public static Operation convert(FlinkPlannerImpl flinkPlanner, SqlNode sqlNode) { |
Flink 借助于 Calcite 完成 SQl 的解析和优化,而后续的优化部分其实都是直接基于 RelNode 来完成的,那么这里为什么又多出了一个 QueryOperation 的概念呢?这主要是因为,Flink SQL 是支持 SQL 语句和 Table Api 接口混合使用的,在 Table Api 接口中,主要的操作都是基于 Operation 接口来完成的。
在校验这块,使用的是FlinkCalciteSqlValidator,继承了calcite的接口SqlValidatorImpl。所以才可以跟自己的schema串在一起。
怎么将schema注册到calcite中
DatabaseCalciteSchema 这个类是关键,这个类主要是用于将flink的schema转为Calcite’s schema。实现了Calcite’s schema接口。
CatalogManagerCalciteSchema—>CatalogCalciteSchema—>DatabaseCalciteSchema
如何将flink schema与calcite sheme结合起来呢。主要是PlannerBase这个类,将flink的CatalogManagerCalciteSchema转为calcite的SimpleCalciteSchema类。CatalogManagerCalciteSchema中的变量CatalogManager中存有我们通过flink注册的表信息。
1 |
|
SQL 转换及优化
在将 SQL 语句解析成 Operation 后,为了得到 Flink 运行时的具体操作算子,需要进一步将 ModifyOperation 转换为 Transformation。在 Blink 之前的 SQL Planner 中,都是基于 DataStream 或 DataSet API 完成运行时逻辑的构建;而 Blink 则使用更底层的 Transformation 算子。
注意,Planner.translate(List
转换的流程主要分为四个部分,即
- 将 Operation 转换为 RelNode
- 优化 RelNode
- 转换成 ExecNode
- 转换为底层的 Transformation 算子。
1 | abstract class PlannerBase( |
首先需要进行的操作是将 Operation 转换为 RelNode,这个转换操作借助 QueryOperationConverter 完成
1 | LogicalSink#2 |
在得到 RelNode 后,就进入 Calcite 对 RelNode 的优化流程。例如谓词下推之类的操作就是在这边完成的。
在 Blink 中有一点特殊的地方在于,由于多个 RelNode 构成的树可能存在共同的“子树”(例如将相同的查询结果输出到不同的结果表中,那么两个 LogicalSink 的子树就可能是共用的),Blink 使用了一种 CommonSubGraphBasedOptimizer 优化器,将拥有共同子树的 RelNode 看作一个 DAG 结构,并将 DAG 划分成 RelNodeBlock,然后在RelNodeBlock 的基础上进行优化工作。每一个 RelNodeBlock 可以看作一个 RelNode 树进行优化,这和正常的 Calcite 处理流程还是保持一致的(转载的,有待考究)
CommonSubGraphBasedOptimizer有两个实现,流的StreamCommonSubGraphBasedOptimizer和批的BatchCommonSubGraphBasedOptimizer
1 | abstract class CommonSubGraphBasedOptimizer extends Optimizer { |
Caclite 对逻辑计划的优化是一套基于规则的框架,用户可以通过添加规则进行扩展,Flink 就是基于自定义规则来实现整个的优化过程。Flink 构造了一个链式的优化程序,可以按顺序使用多套规则集合完成 RelNode 的优化过程。
在 FlinkStreamProgram 和 FlinkBatchProgram 中定义了一系列扩展规则,用于构造逻辑计划的优化器。与此同时,Flink 扩展了 RelNode,增加了 FlinkLogicRel 和 FlinkPhysicRel 这两类 RelNode,对应的 Convention 分别为 FlinkConventions.LOGICAL 和 FlinkConventions.STREAM_PHYSICAL (或FlinkConventions.BATCH_PHYSICAL)。在优化器的处理过程中,RelNode 会从 Calcite 内部定义的节点转换为 FlinkLogicRel 节点(FlinkConventions.LOGICAL),并最终被转换为 FlinkPhysicRel 节点(FlinkConventions.STREAM_PHYSICAL)。这两类转换规则分别对应 FlinkStreamRuleSets.LOGICAL_OPT_RULES 和 FlinkStreamRuleSets.PHYSICAL_OPT_RULES。在不考虑其它更复杂的性能优化的情况下,如果要扩展 Flink SQL 的语法规则,可以参考这两类规则来增加节点和转换规则。
例如LogicSink在StreamCommonSubGraphBasedOptimizer.doOptimize会经过FlinkStreamProgram经过FlinkStreamRuleSets转为FlinkLogicalSink在转为StreamExecSinkRule。
经过优化器处理后,得到的逻辑树中的所有节点都应该是 FlinkPhysicRel,这之后就可以用于生成物理执行计划了。首先要将 FlinkPhysicalRel 构成的 DAG 转换成 ExecNode 构成的 DAG,因为可能存在共用子树的情况,这里还会尝试共用相同的子逻辑计划。由于通常 FlinkPhysicalRel 的具体实现类通常也实现了 ExecNode 接口,所以这一步转换较为简单。
在得到由 ExecNode 构成的 DAG 后,就可以尝试生成物理执行计划了,也就是将 ExecNode 节点转换为 Flink 内部的 Transformation 算子。不同的 ExecNode 按照各自的需求生成不同的 Transformation,基于这些 Transformation 构建 Flink 的 DAG。
SQL 执行
calcite相关
- 关系代数(Relational algebra):即关系表达式。它们通常以动词命名,例如 Sort, Join, Project, Filter, Scan, Sample.
- 行表达式(Row expressions):例如 RexLiteral (常量), RexVariable (变量), RexCall (调用) 等,例如投影列表(Project)、过滤规则列表(Filter)、JOIN 条件列表和 ORDER BY 列表、WINDOW 表达式、函数调用等。使用 RexBuilder 来构建行表达式。
- 表达式有各种特征(Trait):使用 Trait 的 satisfies() 方法来测试某个表达式是否符合某 Trait 或 Convention.
转化特征(Convention):属于 Trait 的子类,用于转化 RelNode 到具体平台实现(可以将下文提到的 Planner 注册到 Convention 中). 例如 JdbcConvention,FlinkConventions.DATASTREAM 等。同一个关系表达式的输入必须来自单个数据源,各表达式之间通过 Converter 生成的 Bridge 来连接。 - 规则(Rules):用于将一个表达式转换(Transform)为另一个表达式。它有一个由 RelOptRuleOperand 组成的列表来决定是否可将规则应用于树的某部分。
- 规划器(Planner) :即请求优化器,它可以根据一系列规则和成本模型(例如基于成本的优化模型 VolcanoPlanner、启发式优化模型 HepPlanner)来将一个表达式转为语义等价(但效率更优)的另一个表达式。
Reference
Flink 源码阅读笔记(15)- Flink SQL 整体执行框架
https://blog.csdn.net/qq475781638/article/details/92631194
https://www.infoq.cn/article/flink-api-table-api-and-sql
https://www.cnblogs.com/WCFGROUP/p/9241884.html
http://wuchong.me/blog/2017/03/30/flink-internals-table-and-sql-api/
https://www.cnblogs.com/029zz010buct/p/10142264.html
https://www.jianshu.com/p/6ed368272916