文章目录
- 背景
- 流程
- flink实例
- 实现细节
- 定义的规则
- 定义的物理算子
- 定义的flink exec node
背景
在很多计算引擎里,都会把sql 这种标准语言,转成计算引擎下底层实际的算子,因此理解此转换的流程对于理解整个过程非常重要
流程
flink实例
public class BatchExample {
public static void main(String[] args) {
// 设置执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 创建一个内置示例源表
String sourceDDL = "CREATE TABLE users (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 'file:///Users/leishuiyu/IdeaProjects/SpringFlink/data.csv',\n" +
" 'format' = 'csv'\n" +
");";
tableEnv.executeSql(sourceDDL);
Table table = tableEnv.sqlQuery("select * from users limit 1 ");
String explanation = tableEnv.explainSql("select * from users limit 1 ");
System.out.println(explanation);
table.execute().print();
}
}
输出结果
== Abstract Syntax Tree ==
LogicalSort(fetch=[1])
+- LogicalProject(id=[$0], name=[$1], age=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, users]])
== Optimized Physical Plan ==
Limit(offset=[0], fetch=[1], global=[true])
+- Exchange(distribution=[single])
+- Limit(offset=[0], fetch=[1], global=[false])
+- TableSourceScan(table=[[default_catalog, default_database, users, limit=[1]]], fields=[id, name, age])
== Optimized Execution Plan ==
Limit(offset=[0], fetch=[1], global=[true])
+- Exchange(distribution=[single])
+- Limit(offset=[0], fetch=[1], global=[false])
+- TableSourceScan(table=[[default_catalog, default_database, users, limit=[1]]], fields=[id, name, age])
实现细节
主要是三个地方,在优化那一步,就把原生的relnode 转化成了自定义的relnode,自定义的relnode 就可以带物理转化的内容了,比如上面的LogicalTableScan 转成BatchPhysicalTableSourceScan 这个relnode
定义的规则
class BatchPhysicalTableSourceScanRule(config: Config) extends ConverterRule(config) {
/** Rule must only match if TableScan targets a bounded [[ScanTableSource]] */
//规则只匹配有界的ScanTableSource
override def matches(call: RelOptRuleCall): Boolean = {
val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])
tableSourceTable match {
case tst: TableSourceTable =>
tst.tableSource match {
case sts: ScanTableSource =>
sts.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE).isBounded
case _ => false
}
case _ => false
}
}
def convert(rel: RelNode): RelNode = {
val scan = rel.asInstanceOf[FlinkLogicalTableSourceScan]
val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
//在这里转成自定义的relnode
new BatchPhysicalTableSourceScan(
rel.getCluster,
newTrait,
scan.getHints,
scan.getTable.asInstanceOf[TableSourceTable]
)
}
}
定义的物理算子
也是一个relnode,实现类BatchPhysicalTableSourceScan
class BatchPhysicalTableSourceScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
hints: util.List[RelHint],
tableSourceTable: TableSourceTable)
extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable)
with BatchPhysicalRel {
//主要是这个方法,转成 flink exec算子
override def translateToExecNode(): ExecNode[_] = {
val tableSourceSpec = new DynamicTableSourceSpec(
tableSourceTable.contextResolvedTable,
util.Arrays.asList(tableSourceTable.abilitySpecs: _*))
tableSourceSpec.setTableSource(tableSourceTable.tableSource)
new BatchExecTableSourceScan(
unwrapTableConfig(this),
tableSourceSpec,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription)
}
}
定义的flink exec node
BatchExecTableSourceScan 类
/// 主要是这个方法,看下下面的实现就比较熟悉了
public Transformation<RowData> createInputFormatTransformation(
StreamExecutionEnvironment env,
InputFormat<RowData, ?> inputFormat,
InternalTypeInfo<RowData> outputTypeInfo,
String operatorName) {
// env.createInput will use ContinuousFileReaderOperator, but it do not support multiple
// paths. If read partitioned source, after partition pruning, we need let InputFormat
// to read multiple partitions which are multiple paths.
// We can use InputFormatSourceFunction directly to support InputFormat.
final InputFormatSourceFunction<RowData> function =
new InputFormatSourceFunction<>(inputFormat, outputTypeInfo);
return env.addSource(function, operatorName, outputTypeInfo).getTransformation();
}
这里的转换是多种方式,一种是现成的比如source 这种,还有的是函数这种,要通过代码生成的方法实现。flink代码生成