Calcite RBO 规则解析和定制
什么是查询优化器
查询优化器是传统数据库的核心模块,也是大数据计算引擎的核心模块,开源大数据引擎如 Impala、Presto、Drill、HAWQ、 Spark、Hive 等都有自己的查询优化器。Calcite 就是从 Hive 的优化器演化而来的。
优化器的作用:将解析器生成的关系代数表达式转换成执行计划,供执行引擎执行,在这个过程中,会应用一些规则优化,以帮助生成更高效的执行计划。
基于成本优化(CBO)
基于代价的优化器(Cost-Based Optimizer,CBO):根据优化规则对关系表达式进行转换,这里的转换是说一个关系表达式经过优化规则后会生成另外一个关系表达式,同时原有表达式也会保留,经过一系列转换后会生成多个执行计划,然后 CBO 会根据统计信息和代价模型 (Cost Model) 计算每个执行计划的 Cost,从中挑选 Cost 最小的执行计划。
由上可知,CBO 中有两个依赖:统计信息和代价模型。统计信息的准确与否、代价模型的合理与否都会影响 CBO 选择最优计划。 从上述描述可知,CBO 是优于 RBO 的,原因是 RBO 是一种只认规则,对数据不敏感的呆板的优化器,而在实际过程中,数据往往是有变化的,通过 RBO 生成的执行计划很有可能不是最优的。事实上目前各大数据库和大数据计算引擎都倾向于使用 CBO,但是对于流式计算引擎来说,使用 CBO 还是有很大难度的,因为并不能提前预知数据量等信息,这会极大地影响优化效果,CBO 主要还是应用在离线的场景。
基于规则优化(RBO)
基于规则的优化器(Rule-Based Optimizer,RBO):根据优化规则对关系表达式进行转换,这里的转换是说一个关系表达式经过优化规则后会变成另外一个关系表达式,同时原有表达式会被裁剪掉,经过一系列转换后生成最终的执行计划。
RBO 中包含了一套有着严格顺序的优化规则,同样一条 SQL,无论读取的表中数据是怎么样的,最后生成的执行计划都是一样的。同时,在 RBO 中 SQL 写法的不同很有可能影响最终的执行计划,从而影响执行计划的性能。
优化规则
无论是 RBO,还是 CBO 都包含了一系列优化规则,这些优化规则可以对关系表达式进行等价转换,常见的优化规则包含:
- 谓词下推 Predicate Pushdown
- 常量折叠 Constant Folding
- 列裁剪
- 其他
在 Calcite 的代码里,有一个测试类(org.apache.calcite.test.RelOptRulesTest)汇集了对目前内置所有 Rules 的测试 case,这个测试类可以方便我们了解各个 Rule 的作用。
RBO的规则在Calite 1.24版本的时候,集中在org.apache.calcite.rel.rules.CoreRules中
在这里有下面一条 SQL,通过这条语句来说明一下上面介绍的这些种规则。
select 10 + 30, users.name, users.age
from users join jobs on users.id= user.id
where users.age > 30 and jobs.id>10
谓词下推(Predicate Pushdown)
关于谓词下推,它主要还是从关系型数据库借鉴而来,关系型数据中将谓词下推到外部数据库用以减少数据传输;属于逻辑优化,优化器将谓词过滤下推到数据源,使物理执行跳过无关数据。最常见的例子就是 join 与 filter 操作一起出现时,提前执行 filter 操作以减少处理的数据量,将 filter 操作下推,以上面例子为例,示意图如下
(对应 Calcite 中的CoreRules.FILTER_INTO_JOIN):
在进行 join 前进行相应的过滤操作,可以极大地减少参加 join 的数据量。
常量折叠(Constant Folding)
常量折叠也是常见的优化策略,这个比较简单、也很好理解,可以看下 编译器优化 – 常量折叠 这篇文章,基本不用动脑筋就能理解,对于我们这里的示例,有一个常量表达式 10 + 30,如果不进行常量折叠,那么每行数据都需要进行计算,进行常量折叠后的结果如下图所示
( 对应 Calcite 中的 中的CoreRules.PROJECT_REDUCE_EXPRESSIONS Rule):
列裁剪(Column Pruning)
列裁剪也是一个经典的优化规则,在本示例中对于jobs 表来说,并不需要扫描它的所有列值,而只需要列值 id,所以在扫描 jobs 之后需要将其他列进行裁剪,只留下列 id。这个优化带来的好处很明显,大幅度减少了网络 IO、内存数据量的消耗。裁剪前后的示意图如下(不过并没有找到 Calcite 对应的 Rule):
如何使用RBO
public class CalciteRuleTest {
public static void main(String[] args) throws SqlParseException, ValidationException, RelConversionException {
Planner planner = getPlanner();
SqlNode sqlNode = planner.parse("SELECT DATE_CD,SUM(IB0002001_CN000) FROM \n" +
"(SELECT IDX_ID,CUBE2L_IB00040010_CN000.DATE_CD,SUM(CUBE2L_IB00040010_CN000.IDX_VAL) AS IB0002001_CN000 FROM " +
"CUBE2L_IB00040010_CN000" +
" GROUP BY CUBE2L_IB00040010_CN000.DATE_CD,IDX_ID) IB0002001_CN000 WHERE IDX_ID IN ('IB0002001_CN000') AND DATE_CD = '2020-05-31' GROUP BY DATE_CD ");
System.out.println(sqlNode.toString());
planner.validate(sqlNode);
RelRoot relRoot = planner.rel(sqlNode);
RelNode relNode = relRoot.project();
System.out.println();
System.out.print(RelOptUtil.toString(relNode));
HepProgramBuilder builder = new HepProgramBuilder();
// builder.addRuleInstance(CoreRules.FILTER_MERGE); //note: 添加 rule
// builder.addRuleInstance(CoreRules.PROJECT_FILTER_TRANSPOSE);
// builder.addRuleInstance(CoreRules.FILTER_AGGREGATE_TRANSPOSE);
// builder.addRuleInstance(CoreRules.FILTER_PROJECT_TRANSPOSE);
// builder.addRuleInstance(CoreRules.FILTER_AGGREGATE_TRANSPOSE);
builder.addRuleInstance(CoreRules.AGGREGATE_REMOVE);
builder.addRuleInstance(CoreRules.PROJECT_FILTER_TRANSPOSE);
HepPlanner hepPlanner = new HepPlanner(builder.build());
hepPlanner.setRoot(relNode);
relNode = hepPlanner.findBestExp();
System.out.println();
System.out.println("After --------------------");
System.out.println();
System.out.print(RelOptUtil.toString(relNode));
RelToSqlConverter relToSqlConverter = new RelToSqlConverter(MysqlSqlDialect.DEFAULT);
SqlImplementor.Result result = relToSqlConverter.visitRoot(relNode);
System.out.println();
System.out.println(result.asSelect().toString());
}
private static Planner getPlanner() {
SchemaPlus rootSchema = Frameworks.createRootSchema(true);
// 添加表的schema信息,才可以通过validate
rootSchema.add("CUBE2L_IB00040010_CN000", new AbstractTable() {
public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
RelDataTypeFactory.Builder builder = typeFactory.builder();
builder.add("DATE_CD", typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.DATE), true));
builder.add("IDX_VAL", typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BIGINT), true));
builder.add("test", typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BIGINT), true));
builder.add("IDX_ID", typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.VARCHAR), true));
return builder.build();
}
});
SqlParser.ConfigBuilder parserConfig = SqlParser.configBuilder();
SqlParser.Config build = parserConfig.setCaseSensitive(false).setLex(Lex.MYSQL).build();
FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
.parserConfig(build)
.defaultSchema(rootSchema)
.build();
Planner planner = Frameworks.getPlanner(frameworkConfig);
return planner;
}
上面的代码总共分为三步:
- 初始化 HepProgram 对象;
- 初始化 HepPlanner 对象,并通过 setRoot() 方法将 RelNode 树转换成 HepPlanner 内部使用的 Graph;
- 通过 findBestExp() 找到最优的 plan,规则的匹配都是在这里进行。
1. 初始化 HepProgram
这几步代码实现没有太多需要介绍的地方,先初始化 HepProgramBuilder 也是为了后面初始化 HepProgram 做准备,HepProgramBuilder 主要也就是提供了一些配置设置和添加规则的方法等,常用的方法如下:
- addRuleInstance():注册相应的规则;
- addRuleCollection():这里是注册一个规则集合,先把规则放在一个集合里,再注册整个集合,如果规则多的话,一般是这种方式;
- addMatchLimit():设置 MatchLimit,这个 rule match 次数的最大限制;
- addMatchOrder() Rule 匹配的顺序
HepProgram 这个类对于后面 HepPlanner 的优化很重要,它定义 Rule 匹配的顺序,默认按【深度优先】顺序,它可以提供以下几种(见 HepMatchOrder 类):
- ARBITRARY:按任意顺序匹配(因为它是有效的,而且大部分的 Rule 并不关心匹配顺序);
- BOTTOM_UP:自下而上,先从子节点开始匹配;
- TOP_DOWN:自上而下,先从父节点开始匹配;
- DEPTH_FIRST:深度优先匹配,某些情况下比 ARBITRARY 高效(为了避免新的 vertex 产生后又从 root 节点开始匹配)。
这个匹配顺序到底是什么呢?对于规则集合 rules,HepPlanner 的算法是:从一个节点开始,跟 rules 的所有 Rule 进行匹配,匹配上就进行转换操作,这个节点操作完,再进行下一个节点,这里的匹配顺序就是指的节点遍历顺序(这种方式的优劣,我们下面再说)。
分析现有rule
最简单的例子,Join结合律,JoinAssociateRule
首先所有的Rule都继承RelOptRule类
/**
* Planner rule that changes a join based on the associativity rule.
*
* <p>((a JOIN b) JOIN c) → (a JOIN (b JOIN c))</p>
*
* <p>We do not need a rule to convert (a JOIN (b JOIN c)) →
* ((a JOIN b) JOIN c) because we have
* {@link JoinCommuteRule}.
*
* @see JoinCommuteRule
*/
public class JoinAssociateRule extends RelOptRule implements TransformationRule
对于Join结合律,调用super,即RelOptRule的构造函数,匹配的树状为
join
join
any
/**
* Creates a JoinAssociateRule.
*/
public JoinAssociateRule(RelBuilderFactory relBuilderFactory) {
super(
operand(Join.class,
operand(Join.class, any()),
operand(RelSubset.class, any())),
relBuilderFactory, null);
}
operand也是一个树形结构,Top的Operand的类型是Join,他有两个children,其中一个也是join,另一个是RelSubset
他们的children是any()
public void onMatch(final RelOptRuleCall call) {
final Join topJoin = call.rel(0);
final Join bottomJoin = call.rel(1);
final RelNode relA = bottomJoin.getLeft();
final RelNode relB = bottomJoin.getRight();
final RelSubset relC = call.rel(2);
final RelOptCluster cluster = topJoin.getCluster();
final RexBuilder rexBuilder = cluster.getRexBuilder();
if (relC.getConvention() != relA.getConvention()) {
// relC could have any trait-set. But if we're matching say
// EnumerableConvention, we're only interested in enumerable subsets.
return;
}
// topJoin
// / \
// bottomJoin C
// / \
// A B
final int aCount = relA.getRowType().getFieldCount();
final int bCount = relB.getRowType().getFieldCount();
final int cCount = relC.getRowType().getFieldCount();
final ImmutableBitSet aBitSet = ImmutableBitSet.range(0, aCount);
final ImmutableBitSet bBitSet =
ImmutableBitSet.range(aCount, aCount + bCount);
if (!topJoin.getSystemFieldList().isEmpty()) {
// FIXME Enable this rule for joins with system fields
return;
}
// If either join is not inner, we cannot proceed.
// (Is this too strict?)
if (topJoin.getJoinType() != JoinRelType.INNER
|| bottomJoin.getJoinType() != JoinRelType.INNER) {
return;
}
// Goal is to transform to
//
// newTopJoin
// / \
// A newBottomJoin
// / \
// B C
// Split the condition of topJoin and bottomJoin into a conjunctions. A
// condition can be pushed down if it does not use columns from A.
final List<RexNode> top = new ArrayList<>();
final List<RexNode> bottom = new ArrayList<>();
JoinPushThroughJoinRule.split(topJoin.getCondition(), aBitSet, top, bottom);
JoinPushThroughJoinRule.split(bottomJoin.getCondition(), aBitSet, top,
bottom);
// Mapping for moving conditions from topJoin or bottomJoin to
// newBottomJoin.
// target: | B | C |
// source: | A | B | C |
final Mappings.TargetMapping bottomMapping =
Mappings.createShiftMapping(
aCount + bCount + cCount,
0, aCount, bCount,
bCount, aCount + bCount, cCount);
final List<RexNode> newBottomList =
new RexPermuteInputsShuttle(bottomMapping, relB, relC)
.visitList(bottom);
RexNode newBottomCondition =
RexUtil.composeConjunction(rexBuilder, newBottomList);
final Join newBottomJoin =
bottomJoin.copy(bottomJoin.getTraitSet(), newBottomCondition, relB,
relC, JoinRelType.INNER, false);
// Condition for newTopJoin consists of pieces from bottomJoin and topJoin.
// Field ordinals do not need to be changed.
RexNode newTopCondition = RexUtil.composeConjunction(rexBuilder, top);
@SuppressWarnings("SuspiciousNameCombination")
final Join newTopJoin =
topJoin.copy(topJoin.getTraitSet(), newTopCondition, relA,
newBottomJoin, JoinRelType.INNER, false);
call.transformTo(newTopJoin);
}
自定义rule
补充下基础概念
RexLiteral表示常量,RexVariable表示变量,RexCall表示操作来连接Literal和Variable
自定义rule,有三个主要的方法,一个是matches,一个是onMatch,一个是构造函数
判断一个tree是否满足该rule,先从构造函数开始匹配,满足构造函数的tree,在满足matches方法,就可以进入到onMatch方法。
matches方法默认是返回true,可以进行改写
@Override
public boolean matches(RelOptRuleCall call) {
return super.matches(call);
}
假设想要匹配
join
project
构造函数得这么写
super(
operand(Join.class,
operand(Project.class, any())),
relBuilderFactory, null);
在onMatch中可以通过call.rel(x)获取构造函数中匹配的relNode
Join join = call.rel(0);
Project project = call.rel(1);
通过一些强转可以获取对应的节点
((HepRelVertex)rel.getInput(0)).getCurrentRel()
创建rebuild,生成node
RelBuilder relBuilder = call.builder();
RelNode build = relBuilder.build();
对relBuilder操作
relBuilder.push 加入数据源
relbuilder.project 加入投影 还可以agg filter 等等
通过call替换掉原来的tree
call.transformTo(node);
常用函数
RexUtil.composeDisjunction 以or合并两个规则
RexUtil.composeConjunction 以and合并两个规则
((HepRelVertex)rel.getInput(0)).getCurrentRel() 强转获取节点
node.getCluster()可以继续get出很多东西getPlanner()、getRexBuilder()、getTypeFactory()
RelDataType sqlType = typeFactory.createSqlType(SqlTypeName.ANY) 创建对应的类型
rexBuilder.makeLiteral 创建常量 makeCall创建操作符等等
SqlStdOperatorTable可以获取函数集合 SUM MIN MAX
Reference
Calcite分析 - Rule
Apache Calcite 优化器详解(二)
上一篇: RBO 的查询优化器|青年营说明
下一篇: 如何使用优化器提高 MySQL 性能
推荐阅读
-
数据库的基于规则的优化(RBO)和基于成本的优化(CBO)
-
Calcite RBO 规则解析和定制
-
Spark SQL 语句解析和基于规则的优化(RBO)简介
-
数据库查询优化器、RBO 优化规则简介和示例
-
详细解释 Calcite RBO 规则优化引擎
-
数据库的基于规则的优化(RBO)和基于成本的优化(CBO)
-
(vii) 解析 Streamlit 的数据元素:探索 st.dataframe、st.data_editor、st.column_config、st.table、st.metric 和 st.json 的奥妙(上) - 4.2 使用 st.column_config.TextColumn 来定制数据编辑器的文本列
-
透彻掌握FlinkSQL运行流程:详解Calcite与Catalog的相关知识,以及扩展解析器如何实现语法规则的扩充
-
全面解析Cisco IOS和IOS XE路由平台的命名规则,一篇文章轻松掌握-深入理解IOS经典
-
【Netty】「萌新入门」(七)ByteBuf 的性能优化-堆内存的分配和释放都是由 Java 虚拟机自动管理的,这意味着它们可以快速地被分配和释放,但是也会产生一些开销。 直接内存需要手动分配和释放,因为它由操作系统管理,这使得分配和释放的速度更快,但是也需要更多的系统资源。 另外,直接内存可以映射到本地文件中,这对于需要频繁读写文件的应用程序非常有用。 此外,直接内存还可以避免在使用 NIO 进行网络传输时发生数据拷贝的情况。在使用传统的 I/O 时,数据必须先从文件或网络中读取到堆内存中,然后再从堆内存中复制到直接缓冲区中,最后再通过 SocketChannel 发送到网络中。而使用直接缓冲区时,数据可以直接从文件或网络中读取到直接缓冲区中,并且可以直接从直接缓冲区中发送到网络中,避免了不必要的数据拷贝和内存分配。 通过 ByteBufAllocator.DEFAULT.directBuffer 方法来创建基于直接内存的 ByteBuf: ByteBuf directBuf = ByteBufAllocator.DEFAULT.directBuffer(16); 通过 ByteBufAllocator.DEFAULT.heapBuffer 方法来创建基于堆内存的 ByteBuf: ByteBuf heapBuf = ByteBufAllocator.DEFAULT.heapBuffer(16); 注意: 直接内存是一种特殊的内存分配方式,可以通过在堆外申请内存来避免 JVM 堆内存的限制,从而提高读写性能和降低 GC 压力。但是,直接内存的创建和销毁代价昂贵,因此需要慎重使用。 此外,由于直接内存不受 JVM 垃圾回收的管理,我们需要主动释放这部分内存,否则会造成内存泄漏。通常情况下,可以使用 ByteBuffer.clear 方法来释放直接内存中的数据,或者使用 ByteBuffer.cleaner 方法来手动释放直接内存空间。 测试代码: public static void testCreateByteBuf { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(16); System.out.println(buf.getClass); ByteBuf heapBuf = ByteBufAllocator.DEFAULT.heapBuffer(16); System.out.println(heapBuf.getClass); ByteBuf directBuf = ByteBufAllocator.DEFAULT.directBuffer(16); System.out.println(directBuf.getClass); } 运行结果: class io.netty.buffer.PooledUnsafeDirectByteBuf class io.netty.buffer.PooledUnsafeHeapByteBuf class io.netty.buffer.PooledUnsafeDirectByteBuf 池化技术 在 Netty 中,池化技术指的是通过对象池来重用已经创建的对象,从而避免了频繁地创建和销毁对象,这种技术可以提高系统的性能和可伸缩性。 通过设置 VM options,来决定池化功能是否开启: -Dio.netty.allocator.type={unpooled|pooled} 在 Netty 4.1 版本以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现; 这里我们使用非池化功能进行测试,依旧使用的是上面的测试代码 testCreateByteBuf,运行结果如下所示: class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeDirectByteBuf class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeDirectByteBuf 可以看到,ByteBuf 类由 PooledUnsafeDirectByteBuf 变成了 UnpooledUnsafeDirectByteBuf; 在没有池化的情况下,每次使用都需要创建新的 ByteBuf 实例,这个操作会涉及到内存的分配和初始化,如果是直接内存则代价更为昂贵,而且频繁的内存分配也可能导致内存碎片问题,增加 GC 压力。 使用池化技术可以避免频繁内存分配带来的开销,并且重用池中的 ByteBuf 实例,减少了内存占用和内存碎片问题。另外,池化技术还可以采用类似 jemalloc 的内存分配算法,进一步提升分配效率。 在高并发环境下,池化技术的优点更加明显,因为内存的分配和释放都是比较耗时的操作,频繁的内存分配和释放会导致系统性能下降,甚至可能出现内存溢出的风险。使用池化技术可以将内存分配和释放的操作集中到预先分配的池中,从而有效地降低系统的内存开销和风险。 内存释放 当在 Netty 中使用 ByteBuf 来处理数据时,需要特别注意内存回收问题。 Netty 提供了不同类型的 ByteBuf 实现,包括堆内存(JVM 内存)实现 UnpooledHeapByteBuf 和堆外内存(直接内存)实现 UnpooledDirectByteBuf,以及池化技术实现的 PooledByteBuf 及其子类。 UnpooledHeapByteBuf:通过 Java 的垃圾回收机制来自动回收内存; UnpooledDirectByteBuf:由于 JVM 的垃圾回收机制无法管理这些内存,因此需要手动调用 release 方法来释放内存; PooledByteBuf:使用了池化机制,需要更复杂的规则来回收内存; 由于池化技术的特殊性质,释放 PooledByteBuf 对象所使用的内存并不是立即被回收的,而是被放入一个内存池中,待下次分配内存时再次使用。因此,释放 PooledByteBuf 对象的内存可能会延迟到后续的某个时间点。为了避免内存泄漏和占用过多内存,我们需要根据实际情况来设置池化技术的相关参数,以便及时回收内存; Netty 采用了引用计数法来控制 ByteBuf 对象的内存回收,在博文 「源码解析」ByteBuf 的引用计数机制 中将会通过解读源码的形式对 ByteBuf 的引用计数法进行深入理解; 每个 ByteBuf 对象被创建时,都会初始化为1,表示该对象的初始计数为1。 在使用 ByteBuf 对象过程中,如果当前 handler 已经使用完该对象,需要通过调用 release 方法将计数减1,当计数为0时,底层内存会被回收,该对象也就被销毁了。此时即使 ByteBuf 对象还在,其各个方法均无法正常使用。 但是,如果当前 handler 还需要继续使用该对象,可以通过调用 retain 方法将计数加1,这样即使其他 handler 已经调用了 release 方法,该对象的内存仍然不会被回收。这种机制可以有效地避免了内存泄漏和意外访问已经释放的内存的情况。 一般来说,应该尽可能地保证 retain 和 release 方法成对出现,以确保计数正确。