透彻掌握FlinkSQL运行流程:详解Calcite与Catalog的相关知识,以及扩展解析器如何实现语法规则的扩充
深入理解Flink Sql执行流程
- 1 Flink SQL 解析引擎
- 1.1SQL解析器
- 1.2Calcite处理流程
- 1.2.1 SQL 解析阶段(SQL–>SqlNode)
- 1.2.2 SqlNode 验证(SqlNode–>SqlNode)
- 1.2.3 语义分析(SqlNode–>RelNode/RexNode)
- 1.2.4 优化阶段(RelNode–>RelNode)
- 1.2.5 生成ExecutionPlan
- 1.3 Calcite 优化器
- 2. 简述 Flink Table/SQL 执行流程
- 2.1 Flink Sql 执行流程
- 2.3 Flink Table/SQL 执行流程 的 异同
- 3. 以 Flink SQL Demo 为切入,深入理解 Flink Streaming SQL
- 3.1以官网的代码为例
- 3.3 结合 Flink SQL 执行流程 及 调试 详细说明
- 3.3.1 预览 AST、Optimized Logical Plan、Physical Execution Plan
- 3.3.2 SQL 解析阶段(SQL–>SqlNode)
- 3.3.3 SqlNode 验证(SqlNode–>SqlNode)
- 3.3.4 语义分析(SqlNode–>RelNode/RexNode)
- 3.3.5 优化阶段(Logical RelNode–>FlinkLogicalRel)
- 3.3.5.1 FlinkRuleSets
- Flink 逻辑计划优化
- 3.3.6 生成物理计划(LogicalRelNode–>Physic Plan)
- 3.3.7 生成DataStream(Physic Plan–>DataStream)
- 3.4 总结Flink Sql执行流程
- 4. catalog相关概念
- 4.1 flink中的catalog
- 4.2 catalog中 表的管理,临时表 永久表
- 5 开发中遇到问题想查询源码如何查询
- 引用
1 Flink SQL 解析引擎
1.1SQL解析器
flink在执行sql语句时,是无法像java/scala代码一样直接去使用的,需要解析成电脑可以执行的语言,对sql语句进行解析转化。
这里说的我感觉其实不是特别准确,应该是 flink使用的是一款开源SQL解析工具Apache Calcite ,Calcite使用Java CC对sql语句进行了解析。
那么我们先来简单说下Calcite工具,梳理一下Calcite的基本概念:
上述图片中具体的概念解释为:
1.2Calcite处理流程
Sql 的执行过程一般可以分为下图中的四个阶段,Calcite 同样也是这样
解析 校验 优化 执行:
对于flink中解析的流程为:
这里为了讲述方便,把 SQL 的执行分为下面五个阶段(跟上面比比又独立出了一个阶段):
1.2.1 SQL 解析阶段(SQL–>SqlNode)
Calcite 使用 JavaCC 做 SQL 解析,JavaCC 根据 Calcite 中定义的 Parser.jj 文件,生成一系列的 java 代码,生成的 Java 代码会把 SQL 转换成 AST 的数据结构(这里是 SqlNode 类型)。
Javacc 实现一个 SQL Parser,它的功能有以下两个,这里都是需要在 jj 文件中定义的。
- List item设计词法和语义,定义 SQL 中具体的元素;
- 实现词法分析器(Lexer)和语法分析器(Parser),完成对 SQL 的解析,完成相应的转换。
即:把 SQL 转换成为 AST (抽象语法树),在 Calcite 中用 SqlNode 来表示;
1.2.2 SqlNode 验证(SqlNode–>SqlNode)
经过上面的第一步,会生成一个 SqlNode 对象,它是一个未经验证的抽象语法树,下面就进入了一个语法检查阶段,语法检查前需要知道元数据信息,这个检查会包括表名、字段名、函数名、数据类型的检查。
即:语法检查,根据元数据信息进行语法验证,验证之后还是用 SqlNode 表示 AST 语法树。具体为使用catalogReaderSupplier创建一个validator,之后验证validator与sqlnode的区别,如果都能找到相应的,就说明语法没有写错的地方
1.2.3 语义分析(SqlNode–>RelNode/RexNode)
经过第二步之后,这里的 SqlNode 就是经过语法校验的 SqlNode 树,接下来这一步就是将 SqlNode 转换成 RelNode/RexNode,也就是生成相应的逻辑计划(Logical Plan)
即:语义分析,根据 SqlNode及元信息构建 RelNode 树,也就是最初版本的逻辑计划(Logical Plan);
1.2.4 优化阶段(RelNode–>RelNode)
第四阶段,也就是 Calcite 的核心所在,优化器进行优化的地方,如过滤条件的下压(push down),在进行 join 操作前,先进行 filter 操作,这样的话就不需要在 join 时进行全量 join,减少参与 join 的数据量等。
在 Calcite 中,提供了两种 planner:HepPlanner 和 VolcanoPlanner,详细可参考下文。
即:逻辑计划优化,优化器的核心,根据前面生成的逻辑计划按照相应的规则(Rule)进行优化;
1.2.5 生成ExecutionPlan
这步就是讲最终的执行计划转为Graph图,下面的流程与真正的java代码流程就一致了
1.3 Calcite 优化器
优化器的作用:将解析器生成的关系代数表达式转换成执行计划,供执行引擎执行,在这个过程中,会应用一些规则优化,以帮助生成更高效的执行计划。
Calcite 中 RelOptPlanner 是 Calcite 中优化器的基类:
Calcite 中关于优化器提供了两种实现:
HepPlanner:就是基于规则优化RBO 的实现,它是一个启发式的优化器,按照规则进行匹配,直到达到次数限制(match 次数限制)或者遍历一遍后不再出现 rule match 的情况才算完成;
VolcanoPlanner:就是基于成本优化CBO 的实现,它会一直迭代 rules,直到找到 cost 最小的 paln。
阿里的blink 就在sql优化的部分做了大量的工作,包括微批 ,TopN,热点,去重等部分在底层算法做了大量优化,经过实测,7天窗口的情况下,半小时滚动窗口做聚合运算,甚至比直接使用process API的性能更优,使用的资源更小
2. 简述 Flink Table/SQL 执行流程
Flink Table API&SQL 为流式数据和静态数据的关系查询保留统一的接口,而且利用了Calcite的查询优化框架和SQL parser。
该设计是基于Flink已构建好的API构建的,Flink的 core API 和引擎的所有改进都会自动应用到Table API和SQL上。
下面是两种视图的执行流程,从两个方向介绍了处理操作:
2.1 Flink Sql 执行流程
一条stream sql从提交到calcite解析、优化最后到flink引擎执行,一般分为以下几个阶段:
- Sql Parser: 将sql语句通过java cc解析成AST(语法树),在calcite中用SqlNode表示AST;
- Sql Validator: 结合数字字典(catalog)去验证sql语法;
- 生成Logical Plan: 将sqlNode表示的AST转换成LogicalPlan, 用relNode表示;
- 生成 optimized LogicalPlan: 先基于calcite rules 去优化logical Plan,再基于flink定制的一些优化rules去优化logical Plan;
- 生成Flink PhysicalPlan: 这里也是基于flink里头的rules,将optimized LogicalPlan转成成Flink的物理执行计划;
- 将物理执行计划转成Flink ExecutionPlan: 就是调用相应的tanslateToPlan方法转换和利用CodeGen元编程成Flink的各种算子。
2.3 Flink Table/SQL 执行流程 的 异同
可以看出来,Table API 与 SQL 在获取 RelNode 之后是一样的流程,只是获取 RelNode 的方式有所区别:
- Table API :通过使用 RelBuilder来拿到RelNode(LogicalNode与Expression分别转换成RelNode与RexNode),具体实现这里就不展开了;
- SQL :通过使用Planner。首先通过parse方法将用户使用的SQL文本转换成由SqlNode表示的parse tree。接着通过validate方法,使用元信息来resolve字段,确定类型,验证有效性等等。最后通过rel方法将SqlNode转换成RelNode;
在flink提供两种API进行关系型查询,Table API 和 SQL。这两种API的查询都会用包含注册过的Table的catalog进行验证,除了在开始阶段从计算逻辑转成logical plan有点差别以外,之后都差不多。同时在stream和batch的查询看起来也是完全一样。只不过flink会根据数据源的性质(流式和静态)使用不同的规则进行优化, 最终优化后的plan转传成常规的Flink DataSet 或 DataStream 程序。
3. 以 Flink SQL Demo 为切入,深入理解 Flink Streaming SQL
3.1以官网的代码为例
代码:
package apps.alg;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.Arrays;
/**
* Simple example for demonstrating the use of SQL on a Stream Table in Java.
*
* <p>This example shows how to:
* - Convert DataStreams to Tables
* - Register a Table under a name
* - Run a StreamSQL query on the registered Table
*
*/
public class test {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
// set up execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream<Order> orderA = env.fromCollection(Arrays.asList(
new Order(1L, "beer", 3),
new Order(1L, "diaper", 4),
new Order(3L, "rubber", 2)));
DataStream<Order> orderB = env.fromCollection(Arrays.asList(
new Order(2L, "pen", 3),
new Order(2L, "rubber", 3),
new Order(4L, "beer", 1)));
// register DataStream as Table
tEnv.registerDataStream("OrderA", orderA, "user, product, amount");
tEnv.registerDataStream("OrderB", orderB, "user, product, amount");
// union the two tables
Table result = tEnv.sqlQuery("SELECT " +
"* " +
"FROM " +
"( " +
"SELECT " +
"* " +
"FROM " +
"OrderA " +
"WHERE " +
"user < 3 " +
"UNION ALL " +
"SELECT " +
"* " +
"FROM " +
"OrderB " +
"WHERE " +
"product <> 'rubber' " +
") OrderAll " +
"WHERE " +
"amount > 2");
System.out.println(tEnv.explain(result));
tEnv.toAppendStream(result, Order.class).print();
env.execute();
}
// *************************************************************************
// USER DATA TYPES
// *************************************************************************
/**
* Simple POJO.
*/
public static class Order {
public Long user;
public String product;
public int amount;
public Order() {
}
public Order(Long user, String product, int amount) {
this.user = user;
this.product = product;
this.amount = amount;
}
@Override
public String toString() {
return "Order{" +
"user=" + user +
", product='" + product + '\'' +
", amount=" + amount +
'}';
}
}
}
引入pom:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency
如果要在IDEA中执行调试 可以参考
https://blog.****.net/Direction_Wind/article/details/122843896
这篇帖子操作
表OrderA定义三个字段:user, product, amount,先分别做select查询,再将查询结果 union,最后做select,最外层加了一个Filter,以便触发Filter下推及合并。运行代码的结果为:
3.3 结合 Flink SQL 执行流程 及 调试 详细说明
3.3.1 预览 AST、Optimized Logical Plan、Physical Execution Plan
程序方法可以打印 待执行Sql的抽象语法树(Abstract Syntax Tree)、优化后的逻辑计划以及物理计划:
== Abstract Syntax Tree ==
== Optimized Logical Plan ==
== Physical Execution Plan ==
== Abstract Syntax Tree ==
LogicalProject(user=[$0], product=[$1], amount=[$2])
+- LogicalFilter(condition=[>($2, 2)])
+- LogicalUnion(all=[true])
:- LogicalProject(user=[$0], product=[$1], amount=[$2])
: +- LogicalFilter(condition=[<($0, 3)])
: +- LogicalTableScan(table=[[default_catalog, default_database, OrderA]])
+- LogicalProject(user=[$0], product=[$1], amount=[$2])
+- LogicalFilter(condition=[<>($1, _UTF-16LE'rubber')])
+- LogicalTableScan(table=[[default_catalog, default_database, OrderB]])
== Optimized Logical Plan ==
Union(all=[true], union=[user, product, amount])
:- Calc(select=[user, product, amount], where=[AND(<(user, 3), >(amount, 2))])
: +- DataStreamScan(table=[[default_catalog, default_database, OrderA]], fields=[user, product, amount])
+- Calc(select=[user, product, amount], where=[AND(<>(product, _UTF-16LE'rubber':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), >(amount, 2))])
+- DataStreamScan(table=[[default_catalog, default_database, OrderB]], fields=[user, product, amount])
== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: Collection Source
Stage 2 : Data Source
content : Source: Collection Source
Stage 3 : Operator
content : SourceConversion(table=[default_catalog.default_database.OrderA], fields=[user, product, amount])
ship_strategy : FORWARD
Stage 4 : Operator
content : Calc(select=[user, product, amount], where=[((user < 3) AND (amount > 2))])
ship_strategy : FORWARD
Stage 5 : Operator
content : SourceConversion(table=[default_catalog.default_database.OrderB], fields=[user, product, amount])
ship_strategy : FORWARD
Stage 6 : Operator
content : Calc(select=[user, product, amount], where=[((product <> _UTF-16LE'rubber':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (amount > 2))])
ship_strategy : FORWARD
3.3.2 SQL 解析阶段(SQL–>SqlNode)
和前面介绍的 Calcite 处理流程一致,此处Flink解析Flink SQL 的语法和词法解析 完全依赖Calcite提供的SqlParser。
在 tEnv.sqlQuery() 方法中,下面的 Step-1 即为SQL解析过程,入参为 待解析的SQL,返回解析后的 SqlNode 对象。
*TableEnvironment.scala*
def sqlQuery(query: String): Table = {
val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
// Step-1: SQL 解析阶段(SQL–>SqlNode), 把 SQL 转换成为 AST (抽象语法树),在 Calcite 中用 SqlNode 来表示
val parsed = planner.parse(query)
if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
// Step-2: SqlNode 验证(SqlNode–>SqlNode),语法检查,根据元数据信息进行语法验证,验证之后还是用 SqlNode 表示 AST 语法树;
val validated = planner.validate(parsed)
// Step-3: 语义分析(SqlNode–>RelNode/RexNode),根据 SqlNode及元信息构建 RelNode 树,也就是最初版本的逻辑计划(Logical Plan)
val relational = planner.rel(validated)
new Table(this, LogicalRelNode(relational.rel))
} else {
...
}
}
被解析后的SqlNode AST,每个SQL组成会翻译成一个节点:
可以看出来 如果开启了并行 ,unionall两遍的语句是在同一个顺序级别的,对解析器而言是两个相同的操作。
3.3.3 SqlNode 验证(SqlNode–>SqlNode)
SQL在被SqlParser解析后,得到SqlNode组成的 抽象语法树(AST),此后还要根据注册的Catalog对该 SqlNode AST 进行验证。
以下语句注册表OrderA和OrderB:
tEnv.registerDataStream(“OrderA”, orderA, “user, product, amount”);
tEnv.registerDataStream(“OrderB”, orderB, “user, product, amount”);
Step-2 即为SQL解析过程,入参为 待验证的SqlNode AST,返回验证后的 SqlNode 对象。
相对于Calcite原生的SQL校验,Flink拓展了语法校验范围,如Flink支持自定义的FunctionCatalog,用于校验SQL Function的入参个数及类型的相关校验,具体用法和细节后续补充。
下面为SQL校验的过程:
**FlinkPlannerImpl.scala**
private def validate(sqlNode: SqlNode, validator: FlinkCalciteSqlValidator): SqlNode = {
try {
sqlNode.accept(new PreValidateReWriter(
validator, typeFactory))
// do extended validation.
sqlNode match {
case node: ExtendedSqlNode =>
node.validate()
case _ =>
}
// no need to validate row type for DDL and insert nodes.
if (sqlNode.getKind.belongsTo(SqlKind.DDL)
|| sqlNode.getKind == SqlKind.INSERT
|| sqlNode.getKind == SqlKind.CREATE_FUNCTION
|| sqlNode.getKind == SqlKind.DROP_FUNCTION
|| sqlNode.getKind == SqlKind.OTHER_DDL
|| sqlNode.isInstanceOf[SqlLoadModule]
|| sqlNode.isInstanceOf[SqlShowCatalogs]
|| sqlNode.isInstanceOf[SqlShowCurrentCatalog]
|| sqlNode.isInstanceOf[SqlShowDatabases]
|| sqlNode.isInstanceOf[SqlShowCurrentDatabase]
|| sqlNode.isInstanceOf[SqlShowTables]
|| sqlNode.isInstanceOf[SqlShowFunctions]
|| sqlNode.isInstanceOf[SqlShowViews]
|| sqlNode.isInstanceOf[SqlShowPartitions]
|| sqlNode.isInstanceOf[SqlRichDescribeTable]
|| sqlNode.isInstanceOf[SqlUnloadModule]) {
return sqlNode
}
sqlNode match {
case explain: SqlExplain =>
val validated = validator.validate(explain.getExplicandum)
explain.setOperand(0,
上一篇: 屌丝的出路