欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

透彻掌握FlinkSQL运行流程:详解Calcite与Catalog的相关知识,以及扩展解析器如何实现语法规则的扩充

最编程 2024-02-21 18:38:35
...

深入理解Flink Sql执行流程

  • 1 Flink SQL 解析引擎
    • 1.1SQL解析器
    • 1.2Calcite处理流程
      • 1.2.1 SQL 解析阶段(SQL–>SqlNode)
      • 1.2.2 SqlNode 验证(SqlNode–>SqlNode)
      • 1.2.3 语义分析(SqlNode–>RelNode/RexNode)
      • 1.2.4 优化阶段(RelNode–>RelNode)
      • 1.2.5 生成ExecutionPlan
      • 1.3 Calcite 优化器
  • 2. 简述 Flink Table/SQL 执行流程
    • 2.1 Flink Sql 执行流程
    • 2.3 Flink Table/SQL 执行流程 的 异同
  • 3. 以 Flink SQL Demo 为切入,深入理解 Flink Streaming SQL
    • 3.1以官网的代码为例
    • 3.3 结合 Flink SQL 执行流程 及 调试 详细说明
      • 3.3.1 预览 AST、Optimized Logical Plan、Physical Execution Plan
      • 3.3.2 SQL 解析阶段(SQL–>SqlNode)
      • 3.3.3 SqlNode 验证(SqlNode–>SqlNode)
      • 3.3.4 语义分析(SqlNode–>RelNode/RexNode)
      • 3.3.5 优化阶段(Logical RelNode–>FlinkLogicalRel)
        • 3.3.5.1 FlinkRuleSets
        • Flink 逻辑计划优化
      • 3.3.6 生成物理计划(LogicalRelNode–>Physic Plan)
      • 3.3.7 生成DataStream(Physic Plan–>DataStream)
    • 3.4 总结Flink Sql执行流程
  • 4. catalog相关概念
    • 4.1 flink中的catalog
    • 4.2 catalog中 表的管理,临时表 永久表
  • 5 开发中遇到问题想查询源码如何查询
  • 引用

1 Flink SQL 解析引擎

1.1SQL解析器

flink在执行sql语句时,是无法像java/scala代码一样直接去使用的,需要解析成电脑可以执行的语言,对sql语句进行解析转化。
在这里插入图片描述这里说的我感觉其实不是特别准确,应该是 flink使用的是一款开源SQL解析工具Apache Calcite ,Calcite使用Java CC对sql语句进行了解析
在这里插入图片描述
那么我们先来简单说下Calcite工具,梳理一下Calcite的基本概念:
在这里插入图片描述
calcite架构

上述图片中具体的概念解释为:

在这里插入图片描述

1.2Calcite处理流程

Sql 的执行过程一般可以分为下图中的四个阶段,Calcite 同样也是这样
解析 校验 优化 执行:
解析 校验 优化 执行
对于flink中解析的流程为:
在这里插入图片描述

这里为了讲述方便,把 SQL 的执行分为下面五个阶段(跟上面比比又独立出了一个阶段):

1.2.1 SQL 解析阶段(SQL–>SqlNode)

Calcite 使用 JavaCC 做 SQL 解析,JavaCC 根据 Calcite 中定义的 Parser.jj 文件,生成一系列的 java 代码,生成的 Java 代码会把 SQL 转换成 AST 的数据结构(这里是 SqlNode 类型)。

Javacc 实现一个 SQL Parser,它的功能有以下两个,这里都是需要在 jj 文件中定义的。

  1. List item设计词法和语义,定义 SQL 中具体的元素;
  2. 实现词法分析器(Lexer)和语法分析器(Parser),完成对 SQL 的解析,完成相应的转换。

即:把 SQL 转换成为 AST (抽象语法树),在 Calcite 中用 SqlNode 来表示;

1.2.2 SqlNode 验证(SqlNode–>SqlNode)

在这里插入图片描述

经过上面的第一步,会生成一个 SqlNode 对象,它是一个未经验证的抽象语法树,下面就进入了一个语法检查阶段,语法检查前需要知道元数据信息,这个检查会包括表名、字段名、函数名、数据类型的检查。

即:语法检查,根据元数据信息进行语法验证,验证之后还是用 SqlNode 表示 AST 语法树。具体为使用catalogReaderSupplier创建一个validator,之后验证validator与sqlnode的区别,如果都能找到相应的,就说明语法没有写错的地方

1.2.3 语义分析(SqlNode–>RelNode/RexNode)

经过第二步之后,这里的 SqlNode 就是经过语法校验的 SqlNode 树,接下来这一步就是将 SqlNode 转换成 RelNode/RexNode,也就是生成相应的逻辑计划(Logical Plan)

即:语义分析,根据 SqlNode及元信息构建 RelNode 树,也就是最初版本的逻辑计划(Logical Plan);

1.2.4 优化阶段(RelNode–>RelNode)

第四阶段,也就是 Calcite 的核心所在,优化器进行优化的地方,如过滤条件的下压(push down),在进行 join 操作前,先进行 filter 操作,这样的话就不需要在 join 时进行全量 join,减少参与 join 的数据量等。

在 Calcite 中,提供了两种 planner:HepPlanner 和 VolcanoPlanner,详细可参考下文。

即:逻辑计划优化,优化器的核心,根据前面生成的逻辑计划按照相应的规则(Rule)进行优化;

1.2.5 生成ExecutionPlan

这步就是讲最终的执行计划转为Graph图,下面的流程与真正的java代码流程就一致了

1.3 Calcite 优化器

优化器的作用:将解析器生成的关系代数表达式转换成执行计划,供执行引擎执行,在这个过程中,会应用一些规则优化,以帮助生成更高效的执行计划。
在这里插入图片描述

Calcite 中 RelOptPlanner 是 Calcite 中优化器的基类:
在这里插入图片描述
Calcite 中关于优化器提供了两种实现:

HepPlanner:就是基于规则优化RBO 的实现,它是一个启发式的优化器,按照规则进行匹配,直到达到次数限制(match 次数限制)或者遍历一遍后不再出现 rule match 的情况才算完成;
VolcanoPlanner:就是基于成本优化CBO 的实现,它会一直迭代 rules,直到找到 cost 最小的 paln。

阿里的blink 就在sql优化的部分做了大量的工作,包括微批 ,TopN,热点,去重等部分在底层算法做了大量优化,经过实测,7天窗口的情况下,半小时滚动窗口做聚合运算,甚至比直接使用process API的性能更优,使用的资源更小

2. 简述 Flink Table/SQL 执行流程

Flink Table API&SQL 为流式数据和静态数据的关系查询保留统一的接口,而且利用了Calcite的查询优化框架和SQL parser。

该设计是基于Flink已构建好的API构建的,Flink的 core API 和引擎的所有改进都会自动应用到Table API和SQL上。
下面是两种视图的执行流程,从两个方向介绍了处理操作:
在这里插入图片描述
在这里插入图片描述

2.1 Flink Sql 执行流程

一条stream sql从提交到calcite解析、优化最后到flink引擎执行,一般分为以下几个阶段:

  1. Sql Parser: 将sql语句通过java cc解析成AST(语法树),在calcite中用SqlNode表示AST;
  2. Sql Validator: 结合数字字典(catalog)去验证sql语法;
  3. 生成Logical Plan: 将sqlNode表示的AST转换成LogicalPlan, 用relNode表示;
  4. 生成 optimized LogicalPlan: 先基于calcite rules 去优化logical Plan,再基于flink定制的一些优化rules去优化logical Plan;
  5. 生成Flink PhysicalPlan: 这里也是基于flink里头的rules,将optimized LogicalPlan转成成Flink的物理执行计划;
  6. 将物理执行计划转成Flink ExecutionPlan: 就是调用相应的tanslateToPlan方法转换和利用CodeGen元编程成Flink的各种算子。

2.3 Flink Table/SQL 执行流程 的 异同

可以看出来,Table API 与 SQL 在获取 RelNode 之后是一样的流程,只是获取 RelNode 的方式有所区别:

  • Table API :通过使用 RelBuilder来拿到RelNode(LogicalNode与Expression分别转换成RelNode与RexNode),具体实现这里就不展开了;
  • SQL :通过使用Planner。首先通过parse方法将用户使用的SQL文本转换成由SqlNode表示的parse tree。接着通过validate方法,使用元信息来resolve字段,确定类型,验证有效性等等。最后通过rel方法将SqlNode转换成RelNode;

在flink提供两种API进行关系型查询,Table API 和 SQL。这两种API的查询都会用包含注册过的Table的catalog进行验证,除了在开始阶段从计算逻辑转成logical plan有点差别以外,之后都差不多。同时在stream和batch的查询看起来也是完全一样。只不过flink会根据数据源的性质(流式和静态)使用不同的规则进行优化, 最终优化后的plan转传成常规的Flink DataSet 或 DataStream 程序。

3. 以 Flink SQL Demo 为切入,深入理解 Flink Streaming SQL

3.1以官网的代码为例

代码:

package apps.alg;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


import java.util.Arrays;

/**
 * Simple example for demonstrating the use of SQL on a Stream Table in Java.
 *
 * <p>This example shows how to:
 *  - Convert DataStreams to Tables
 *  - Register a Table under a name
 *  - Run a StreamSQL query on the registered Table
 *
 */
public class test {

    // *************************************************************************
    //     PROGRAM
    // *************************************************************************

    public static void main(String[] args) throws Exception {

        // set up execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStream<Order> orderA = env.fromCollection(Arrays.asList(
                new Order(1L, "beer", 3),
                new Order(1L, "diaper", 4),
                new Order(3L, "rubber", 2)));

        DataStream<Order> orderB = env.fromCollection(Arrays.asList(
                new Order(2L, "pen", 3),
                new Order(2L, "rubber", 3),
                new Order(4L, "beer", 1)));

        // register DataStream as Table
        tEnv.registerDataStream("OrderA", orderA, "user, product, amount");
        tEnv.registerDataStream("OrderB", orderB, "user, product, amount");

        // union the two tables
        Table result = tEnv.sqlQuery("SELECT " +
                "* " +
                "FROM " +
                "( " +
                "SELECT " +
                "* " +
                "FROM " +
                "OrderA " +
                "WHERE " +
                "user < 3 " +
                "UNION ALL " +
                "SELECT " +
                "* " +
                "FROM " +
                "OrderB " +
                "WHERE " +
                "product <> 'rubber' " +
                ") OrderAll " +
                "WHERE " +
                "amount > 2");

        System.out.println(tEnv.explain(result));

        tEnv.toAppendStream(result, Order.class).print();

        env.execute();
    }

    // *************************************************************************
    //     USER DATA TYPES
    // *************************************************************************

    /**
     * Simple POJO.
     */
    public static class Order {
        public Long user;
        public String product;
        public int amount;

        public Order() {
        }

        public Order(Long user, String product, int amount) {
            this.user = user;
            this.product = product;
            this.amount = amount;
        }

        @Override
        public String toString() {
            return "Order{" +
                    "user=" + user +
                    ", product='" + product + '\'' +
                    ", amount=" + amount +
                    '}';
        }
    }
}

引入pom:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency

如果要在IDEA中执行调试 可以参考
https://blog.****.net/Direction_Wind/article/details/122843896
这篇帖子操作

表OrderA定义三个字段:user, product, amount,先分别做select查询,再将查询结果 union,最后做select,最外层加了一个Filter,以便触发Filter下推及合并。运行代码的结果为:
在这里插入图片描述

3.3 结合 Flink SQL 执行流程 及 调试 详细说明

3.3.1 预览 AST、Optimized Logical Plan、Physical Execution Plan

程序方法可以打印 待执行Sql的抽象语法树(Abstract Syntax Tree)、优化后的逻辑计划以及物理计划:
== Abstract Syntax Tree ==
== Optimized Logical Plan ==
== Physical Execution Plan ==

== Abstract Syntax Tree ==
LogicalProject(user=[$0], product=[$1], amount=[$2])
+- LogicalFilter(condition=[>($2, 2)])
   +- LogicalUnion(all=[true])
      :- LogicalProject(user=[$0], product=[$1], amount=[$2])
      :  +- LogicalFilter(condition=[<($0, 3)])
      :     +- LogicalTableScan(table=[[default_catalog, default_database, OrderA]])
      +- LogicalProject(user=[$0], product=[$1], amount=[$2])
         +- LogicalFilter(condition=[<>($1, _UTF-16LE'rubber')])
            +- LogicalTableScan(table=[[default_catalog, default_database, OrderB]])

== Optimized Logical Plan ==
Union(all=[true], union=[user, product, amount])
:- Calc(select=[user, product, amount], where=[AND(<(user, 3), >(amount, 2))])
:  +- DataStreamScan(table=[[default_catalog, default_database, OrderA]], fields=[user, product, amount])
+- Calc(select=[user, product, amount], where=[AND(<>(product, _UTF-16LE'rubber':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), >(amount, 2))])
   +- DataStreamScan(table=[[default_catalog, default_database, OrderB]], fields=[user, product, amount])

== Physical Execution Plan ==
Stage 1 : Data Source
	content : Source: Collection Source

Stage 2 : Data Source
	content : Source: Collection Source

	Stage 3 : Operator
		content : SourceConversion(table=[default_catalog.default_database.OrderA], fields=[user, product, amount])
		ship_strategy : FORWARD

		Stage 4 : Operator
			content : Calc(select=[user, product, amount], where=[((user < 3) AND (amount > 2))])
			ship_strategy : FORWARD

			Stage 5 : Operator
				content : SourceConversion(table=[default_catalog.default_database.OrderB], fields=[user, product, amount])
				ship_strategy : FORWARD

				Stage 6 : Operator
					content : Calc(select=[user, product, amount], where=[((product <> _UTF-16LE'rubber':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (amount > 2))])
					ship_strategy : FORWARD

3.3.2 SQL 解析阶段(SQL–>SqlNode)

和前面介绍的 Calcite 处理流程一致,此处Flink解析Flink SQL 的语法和词法解析 完全依赖Calcite提供的SqlParser。

在 tEnv.sqlQuery() 方法中,下面的 Step-1 即为SQL解析过程,入参为 待解析的SQL,返回解析后的 SqlNode 对象。

*TableEnvironment.scala*

def sqlQuery(query: String): Table = {

    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
    // Step-1: SQL 解析阶段(SQL–>SqlNode), 把 SQL 转换成为 AST (抽象语法树),在 Calcite 中用 SqlNode 来表示
    val parsed = planner.parse(query)

    if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {

      // Step-2: SqlNode 验证(SqlNode–>SqlNode),语法检查,根据元数据信息进行语法验证,验证之后还是用 SqlNode 表示 AST 语法树;
      val validated = planner.validate(parsed)

      // Step-3: 语义分析(SqlNode–>RelNode/RexNode),根据 SqlNode及元信息构建 RelNode 树,也就是最初版本的逻辑计划(Logical Plan)
      val relational = planner.rel(validated)

      new Table(this, LogicalRelNode(relational.rel))
    } else {
      ...
    }
  }

被解析后的SqlNode AST,每个SQL组成会翻译成一个节点:
在这里插入图片描述
可以看出来 如果开启了并行 ,unionall两遍的语句是在同一个顺序级别的,对解析器而言是两个相同的操作。

3.3.3 SqlNode 验证(SqlNode–>SqlNode)

SQL在被SqlParser解析后,得到SqlNode组成的 抽象语法树(AST),此后还要根据注册的Catalog对该 SqlNode AST 进行验证。

以下语句注册表OrderA和OrderB:
tEnv.registerDataStream(“OrderA”, orderA, “user, product, amount”);
tEnv.registerDataStream(“OrderB”, orderB, “user, product, amount”);

Step-2 即为SQL解析过程,入参为 待验证的SqlNode AST,返回验证后的 SqlNode 对象。
相对于Calcite原生的SQL校验,Flink拓展了语法校验范围,如Flink支持自定义的FunctionCatalog,用于校验SQL Function的入参个数及类型的相关校验,具体用法和细节后续补充。
下面为SQL校验的过程:

**FlinkPlannerImpl.scala**
 private def validate(sqlNode: SqlNode, validator: FlinkCalciteSqlValidator): SqlNode = {
    try {
      sqlNode.accept(new PreValidateReWriter(
        validator, typeFactory))
      // do extended validation.
      sqlNode match {
        case node: ExtendedSqlNode =>
          node.validate()
        case _ =>
      }
      // no need to validate row type for DDL and insert nodes.
      if (sqlNode.getKind.belongsTo(SqlKind.DDL)
        || sqlNode.getKind == SqlKind.INSERT
        || sqlNode.getKind == SqlKind.CREATE_FUNCTION
        || sqlNode.getKind == SqlKind.DROP_FUNCTION
        || sqlNode.getKind == SqlKind.OTHER_DDL
        || sqlNode.isInstanceOf[SqlLoadModule]
        || sqlNode.isInstanceOf[SqlShowCatalogs]
        || sqlNode.isInstanceOf[SqlShowCurrentCatalog]
        || sqlNode.isInstanceOf[SqlShowDatabases]
        || sqlNode.isInstanceOf[SqlShowCurrentDatabase]
        || sqlNode.isInstanceOf[SqlShowTables]
        || sqlNode.isInstanceOf[SqlShowFunctions]
        || sqlNode.isInstanceOf[SqlShowViews]
        || sqlNode.isInstanceOf[SqlShowPartitions]
        || sqlNode.isInstanceOf[SqlRichDescribeTable]
        || sqlNode.isInstanceOf[SqlUnloadModule]) {
        return sqlNode
      }
      sqlNode match {
        case explain: SqlExplain =>
          val validated = validator.validate(explain.getExplicandum)
          explain.setOperand(0,