calcite 在flink中的二次开发
- 1 CodeGen
- 2 flink 语法扩展
- 2.1 在进行 Rule 规则匹配时,放开对 Distinct 的限制
- 2.2下面附上一个 利用codegen来生成所需类的例子:
- 3 flink使用calcite 生成解析器FlinkSqlParserImpl
- 3.1 FlinkSqlParserImpl 的生成
- 3.1.1 flink 引入 calcite
- 3.1.2 fmpp 生成 Parser.jj
- 3.1.3 javacc 生成 parser
- 3.1.4 看看 Parser
- 3.1.5 blink planner 引入 flink-sql-parser
- 4 calcite 规则优化器
- 4.1 什么是查询优化器
- 4.2 基于规则优化(RBO)
- 4.3 基于成本优化(CBO)
- 4.4 优化规则
- 4.4.1 谓词下推(Predicate Pushdown)
- 4.4.2 常量折叠(Constant Folding)
- 4.4.3 列裁剪(Column Pruning)
- 扩展资料
关于calcite的概念相关的内容,在我另一篇帖子
深入理解flinksql执行流程,扩展解析器实现语法的扩展
1 CodeGen
首先阐述一下 codegen:
Codegen是基于ObjectWeb ASM的低开销的java代码生成器,他可以根据预先填好的规则与条件,通过编译代码,自动生成java类
在递归调用各个节点 DataStreamRel 的 translateToPlan 方法时,会利用CodeGen元编程成Flink的各种算子,就相当于我们直接利用Flink的DataSet或DataStream API开发的程序。
还是以上面的Demo为例,跟踪进 DataStreamScan 的 translateToPlan 方法中,会发现相关逻辑:
- 首先生成 function 代码的字符串形式,并封装成 GeneratedFunction 对象;
- 然后使用 CodeGen 进行编译;
- 在需要使用 Function 的时候使用反射进行加载使用。
后续在 扩展 flink语法(如join维表)时,需要针对上述步骤,拼接生成 function 的字符串形式。
2 flink 语法扩展
了解完 Flink Sql 的执行流程之后,就可以针对 Flink Sql 做语法、功能上的扩展。
在Flink老版本上,Flink不支持 COUNT(DISTINCT aaa) 语法,但是如果需要对 Flink 做此功能拓展,需要结合 前面说到的 Flink Sql 执行流程,做相应修改。
修改点:
- 在进行 Rule 规则匹配时,放开对 Distinct 的限制
- DataStreamRelNode 转为 DataStream 过程中,拼接CodeGen所需的 Function String
2.1 在进行 Rule 规则匹配时,放开对 Distinct 的限制
在 DATASTREAM_OPT_RULES.DataStreamGroupWindowAggregateRule 中放开对 Distinct 的限制:
2.2下面附上一个 利用codegen来生成所需类的例子:
新建一个项目 ,从源码中拷贝出codegen的代码文件夹
在配置文件中,添加好,sql的保留字,关键字,类的名字等信息,这里就不多说了,有需要的同学可以百度具体的原理技术
新建一个SqlUseFunction.java 这个就是上文中说到的function 代码的字符串形式,
在flink中,就是通过 拼装来拼装出一个类,调用codegen来进行编译得到继承抽象类SqlNode 的方法,所以在开发完的源代码中是找不到codegen相关的东西的,但实际他是参与了工作的。
package com;
import org.apache.calcite.sql.*;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;
import java.util.List;
public class SqlUseFunction extends SqlCall {
private static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("USE FUNCTION",
SqlKind.OTHER_FUNCTION);
private final SqlIdentifier funcName;
private final SqlNodeList funcProps;
/**
* SqlUseFunction constructor.
*
* @param pos sql define location
* @param funcName function name
* @param funcProps function property
* */
public SqlUseFunction(SqlParserPos pos, SqlIdentifier funcName, SqlNodeList funcProps) {
super(pos);
this.funcName = funcName;
this.funcProps = funcProps;
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("USE FUNCTION");
funcName.unparse(writer, leftPrec, rightPrec);
if (funcProps != null) {
writer.keyword("WITH");
SqlWriter.Frame frame = writer.startList("(", ")");
for (SqlNode c : funcProps) {
writer.sep(",");
c.unparse(writer, 0, 0);
}
writer.endList(frame);
}
}
@Override
public SqlOperator getOperator() {
return OPERATOR;
}
@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(funcName, funcProps);
}
}
pom文件中指定好,使用fmpp技术,以及codegen的地址等
<build>
<plugins>
<!-- adding fmpp code gen -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-fmpp-resources</id>
<phase>initialize</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/codegen</outputDirectory>
<resources>
<resource>
<directory>src/main/codegen</directory>
<filtering>false</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<!-- 从calcite-core.jar提取解析器语法模板,并放入在${project.build}freemarker模板所在的目录 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>unpack-parser-template</id>
<phase>initialize</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.18.0</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/</outputDirectory>
<includes>**/Parser.jj</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<configuration>
<cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
<outputDirectory>target/generated-sources</outputDirectory>
<templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
</configuration>
<groupId>com.googlecode.fmpp-maven-plugin</groupId>
<artifactId>fmpp-maven-plugin</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>2.3.28</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>generate-fmpp-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<id>javacc</id>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
<includes>
<include>**/Parser.jj</include>
</includes>
<lookAhead>2</lookAhead>
<isStatic>false</isStatic>
<outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
这里引入一张图 说明calcite中jacc和fmpp在其中起的作用
看到codegen中的fmpp了么 就是这个fmpp
3 flink使用calcite 生成解析器FlinkSqlParserImpl
以下面这个案例出发(代码基于 flink 1.13.1 版本):
public class ParserTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
DataStream<Tuple3<String, Long, Long>> tuple3DataStream =
env.fromCollection(Arrays.asList(
Tuple3.of("2", 1L, 1627254000000L),
Tuple3.of("2", 1L, 1627218000000L + 5000L),
Tuple3.of("2", 101L, 1627218000000L + 6000L),
Tuple3.of("2", 201L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 86400000 + 7000L)))
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Long>>(Time.seconds(0L)) {
@Override
public long extractTimestamp(Tuple3<String, Long, Long> element) {
return element.f2;
}
});
tEnv.registerFunction("mod", new Mod_UDF());
tEnv.registerFunction("status_mapper", new StatusMapper_UDF());
tEnv.createTemporaryView("source_db.source_table", tuple3DataStream,
"status, id, timestamp, rowtime.rowtime");
String sql = "SELECT\n"
+ " count(1),\n"
+ " cast(tumble_start(rowtime, INTERVAL '1' DAY) as string)\n"
+ "FROM\n"
+ " source_db.source_table\n"
+ "GROUP BY\n"
+ " tumble(rowtime, INTERVAL '1' DAY)";
Table result = tEnv.sqlQuery(sql);
tEnv.toAppendStream(result, Row.class).print();
env.execute();
}
}
debug 过程如之前分析 sql -> SqlNode 过程所示,如下图直接定位到 SqlParser:
如上图可以看到具体的 Parser 就是 FlinkSqlParserImpl。
定位到具体的代码如下图所示(flink-table-palnner-blink-2.11-1.13.1.jar)。
最终 parse 的结果 SqlNode 如下图。
再来看看 FlinkSqlParserImpl 是怎么使用 calcite 生成的。
具体到 flink 中的实现,位于源码中的 flink-table.flink-sql-parser 模块(源码基于 flink 1.13.1)。
flink 是依赖 maven 插件实现的上面的整体流程。
3.1 FlinkSqlParserImpl 的生成
接下来看看整个 Parser 生成流程。
3.1.1 flink 引入 calcite
使用 maven-dependency-plugin 将 calcite 解压到 flink 项目 build 目录下。
<plugin>
<!-- Extract parser grammar template from calcite-core.jar and put
it under ${project.build.directory} where all freemarker templates are. -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>unpack-parser-template</id>
<phase>initialize</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/</outputDirectory>
<includes>**/Parser.jj</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
3.1.2 fmpp 生成 Parser.jj
使用 maven-resources-plugin 将 Parser.jj 代码生成。
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-fmpp-resources</id>
<phase>initialize</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/codegen</outputDirectory>
<resources>
<resource>
<directory>src/main/codegen</directory>
<filtering>false</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.googlecode.fmpp-maven-plugin</groupId>
<artifactId>fmpp-maven-plugin</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>2.3.28</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>generate-fmpp-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
<outputDirectory>target/generated-sources</outputDirectory>
<templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
</configuration>
</execution>
</executions>
</plugin>
3.1.3 javacc 生成 parser
使用 javacc 将根据 Parser.jj 文件生成 Parser。
<plugin>
<!-- This must be run AFTER the fmpp-maven-plugin -->
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<id>javacc</id>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
<includes>
<include>**/Parser.jj</include>
</includes>
<!-- This must be kept synced with Apache Calcite. -->
<lookAhead>1</lookAhead>
<isStatic>false</isStatic>
<outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
3.1.4 看看 Parser
最终生成的 Parser 就是 FlinkSqlParserImpl。
3.1.5 blink planner 引入 flink-sql-parser
blink planner(flink-table-planner-blink) 在打包时将 flink-sql-parser、flink-sql-parser-hive 打包进去。
4 calcite 规则优化器
4.1 什么是查询优化器
查询优化器是传统数据库的核心模块,也是大数据计算引擎的核心模块,开源大数据引擎如 Impala、Presto、Drill、HAWQ、 Spark、Hive 等都有自己的查询优化器。Calcite 就是从 Hive 的优化器演化而来的。
优化器的作用:将解析器生成的关系代数表达式转换成执行计划,供执行引擎执行,在这个过程中,会应用一些规则优化,以帮助生成更高效的执行计划。
4.2 基于规则优化(RBO)
基于规则的优化器(Rule-Based Optimizer,RBO):根据优化规则对关系表达式进行转换,这里的转换是说一个关系表达式经过优化规则后会变成另外一个关系表达式,同时原有表达式会被裁剪掉,经过一系列转换后生成最终的执行计划。
RBO 中包含了一套有着严格顺序的优化规则,同样一条 SQL,无论读取的表中数据是怎么样的,最后生成的执行计划都是一样的。同时,在 RBO 中 SQL 写法的不同很有可能影响最终的执行计划,从而影响执行计划的性能。
4.3 基于成本优化(CBO)
基于代价的优化器(Cost-Based Optimizer,CBO):根据优化规则对关系表达式进行转换,这里的转换是说一个关系表达式经过优化规则后会生成另外一个关系表达式,同时原有表达式也会保留,经过一系列转换后会生成多个执行计划,然后 CBO 会根据统计信息和代价模型 (Cost Model) 计算每个执行计划的 Cost,从中挑选 Cost 最小的执行计划。
由上可知,CBO 中有两个依赖:统计信息和代价模型。统计信息的准确与否、代价模型的合理与否都会影响 CBO 选择最优计划。 从上述描述可知,CBO 是优于 RBO 的,原因是 RBO 是一种只认规则,对数据不敏感的呆板的优化器,而在实际过程中,数据往往是有变化的,通过 RBO 生成的执行计划很有可能不是最优的。事实上目前各大数据库和大数据计算引擎都倾向于使用 CBO,但是对于流式计算引擎来说,使用 CBO 还是有很大难度的,因为并不能提前预知数据量等信息,这会极大地影响优化效果,CBO 主要还是应用在离线的场景。
4.4 优化规则
无论是 RBO,还是 CBO 都包含了一系列优化规则,这些优化规则可以对关系表达式进行等价转换,常见的优化规则包含:
- 谓词下推 Predicate Pushdown
- 常量折叠 Constant Folding
- 列裁剪 Column Pruning
- 其他
在 Calcite 的代码里,有一个测试类(org.apache.calcite.test.RelOptRulesTest)汇集了对目前内置所有 Rules 的测试 case,这个测试类可以方便我们了解各个 Rule 的作用。在这里有下面一条 SQL,通过这条语句来说明一下上面介绍的这三种规则。
select 10 + 30, users.name, users.age
from users join jobs on users.id= user.id
where users.age > 30 and jobs.id>10
4.4.1 谓词下推(Predicate Pushdown)
关于谓词下推,它主要还是从关系型数据库借鉴而来,关系型数据中将谓词下推到外部数据库用以减少数据传输;属于逻辑优化,优化器将谓词过滤下推到数据源,使物理执行跳过无关数据。最常见的例子就是 join 与 filter 操作一起出现时,提前执行 filter 操作以减少处理的数据量,将 filter 操作下推,以上面例子为例,示意图如下(对应 Calcite 中的 FilterJoinRule.FilterIntoJoinRule.FILTER_ON_JOIN Rule):
在进行 join 前进行相应的过滤操作,可以极大地减少参加 join 的数据量。
4.4.2 常量折叠(Constant Folding)
常量折叠也是常见的优化策略,这个比较简单、也很好理解,可以看下 编译器优化 – 常量折叠 这篇文章,基本不用动脑筋就能理解,对于我们这里的示例,有一个常量表达式 10 + 30,如果不进行常量折叠,那么每行数据都需要进行计算,进行常量折叠后的结果如下图所示( 对应 Calcite 中的 ReduceExpressionsRule.PROJECT_INSTANCE Rule):
4.4.3 列裁剪(Column Pruning)
列裁剪也是一个经典的优化规则,在本示例中对于jobs 表来说,并不需要扫描它的所有列值,而只需要列值 id,所以在扫描 jobs 之后需要将其他列进行裁剪,只留下列 id。这个优化带来的好处很明显,大幅度减少了网络 IO、内存数据量的消耗。裁剪前后的示意图如下(不过并没有找到 Calcite 对应的 Rule):
如果想自定义rule
可以百度 calcite 自定义rule 来实现对flinksql的 语法优化
扩展资料
Apache Calcite的优化器规则解析
Flink Sql 之 Calcite Volcano优化器(源码解析)
calcite
calcite 规则优化开发
所有代码都在我的git上,需要的同学可以自取,如果找不到可以私信我