【源码解析】flink sql执行源码概述:flink sql执行过程中有哪些阶段,这些阶段的源码大概位置在哪里

文章目录

  • 一. sql执行流程源码分析
    • 1. Sql语句解析成语法树阶段(SQL - > SqlNode)
    • 2. SqlNode 验证(SqlNode – >Operation)
    • 3. 语义分析(Operation - > RelNode)
    • 4. 优化阶段(RelNode - > optimize - >Transformation )
    • 5. 生成ExecutionPlan并执行
  • 二. 源码分析小结
    • `sqlnode->relnode->优化->pipeline(StreamGraph)-> 执行并返回结果`

本文大致分析了flink sql执行过程中的各个阶段的源码逻辑,这样可以在flink sql执行过程中, 能够定位到任务执行的某个阶段的代码大概分布在哪里,为更针对性的分析此阶段的细节逻辑打下基础,比如create 的逻辑是怎么执行的,select的逻辑是怎么生成的,优化逻辑都做了哪些,而这些是接下来的文章要分析的。

一. sql执行流程源码分析

SQL语句经过Calcite解析生成抽象语法树SQLNode,基于生成的SQLNode并结合flink Catalog完成校验生成一颗Operation树,接下来blink planner将Opearation树转为RelNode树然后进行优化,最后进行执行。如下流程流转图:

在这里插入图片描述
 

flink使用的是一款开源SQL解析工具Apache Calcite ,Calcite使用Java CC对sql语句进行了解析,转换为java/scala语言能够执行的逻辑。

 
Sql 的执行过程一般可以分为下图中的四个阶段,Calcite 同样也是这样:解析,校验,优化,执行:
在这里插入图片描述

1. Sql语句解析成语法树阶段(SQL - > SqlNode)

对于flink中解析sql为SqlNode对象的流程为:

  • TableEnvironmentImpl是sql执行的入口类,TableEnvironmentImpl中提供了excuteSqlSqlQuery等方法用来执行DDL、DML等sql
  • sql执行时会先对sql进行解析,ParserImp是flink调用sql解析的实现类,ParserImpl#parse()方法中通过调用包装器对象CalciteParser#parse()方法创建并调用使用javacc生成的sql解析器
  • (FlinkSqlParserImpl)parseSqlStmtEof方法完成sql解析,并返回SqlNode对象。

具体calciteParser 的动作之后更新

在这里插入图片描述

parse方法:负责将 SQL 查询字符串解析为抽象语法树(AST)

org.apache.flink.table.planner.delegation.ParserImpl
    /**
     * When parsing statement, it first uses {@link ExtendedParser} to parse statements. If {@link
     * ExtendedParser} fails to parse statement, it uses the {@link CalciteParser} to parse statements.
     * 先使用ExtendedParser进行解析如果解析失败了使用CalciteParser进行解析
     * @param statement input statement.
     * @return parsed operations.
     */
    @Override
    public List<Operation> parse(String statement) {
        //两种解析实例
        CalciteParser parser = calciteParserSupplier.get();
        FlinkPlannerImpl planner = validatorSupplier.get();
        //ExtendedParser解析
        Optional<Operation> command = EXTENDED_PARSER.parse(statement);
        if (command.isPresent()) {
            return Collections.singletonList(command.get());
        }
        //CalciteParser解析
        //解析为sqlNode
        SqlNodeList sqlNodeList = parser.parseSqlList(statement);
        List<SqlNode> parsed = sqlNodeList.getList();
        Preconditions.checkArgument(parsed.size() == 1, "only single statement supported");
        return Collections.singletonList(
                //解析为SOperation
                SqlToOperationConverter.convert(planner, catalogManager, parsed.get(0))
                        .orElseThrow(() -> new TableException("Unsupported query: " + statement)));
    }

将sql语句解析成sqlNode。
对应的表名、列名、with属性参数、主键、唯一键、分区键、水印、表注释、表操作(create table、alter table、drop table。。。)都放到SqlNode对象的对应属性中,SqlNode是一个树形结构也就是AST。
在这里插入图片描述

 

2. SqlNode 验证(SqlNode – >Operation)

经过上面的第一步,会生成一个 SqlNode 对象,它是一个未经验证的抽象语法树,下面就进入了一个语法检查阶段,语法检查前需要知道元数据信息,这个检查会包括表名、字段名、函数名、数据类型的检查。

  • sql解析完成后执行sql校验,flink sql中增加了SqlNode转换为Operation的过程,sql校验是这个过程中完成。
  • 在SqlToOperationConvertver#convert()方法中完成这个转换,之后通过FlinkPlannerImpl#validate()方法对表、函数、字段等完成校验并基于生成的validated SqlNode生成对应的Operation。

在这里插入图片描述

接着进入到SqlToOperationConverter.convert方法中

    /**
     * This is the main entrance for executing all kinds of DDL/DML {@code SqlNode}s, different
     * SqlNode will have it's implementation in the #convert(type) method whose 'type' argument is
     * subclass of {@code SqlNode}.
     * 转换DDL,DML(select比如?)sqlnode的主入口,不同的SqlNode有不同的convert实现。
     * 
     */
    public static Optional<Operation> convert(
            FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode sqlNode) {
        //1.进行校验
        final SqlNode validated = flinkPlanner.validate(sqlNode);
        //2.转换为Operation
        return convertValidatedSqlNode(flinkPlanner, catalogManager, validated);
    }
 
   //将校验过的sqlnode转换为Operation
   private static Optional<Operation> convertValidatedSqlNode(
            FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode validated) {
        SqlToOperationConverter converter =
                new SqlToOperationConverter(flinkPlanner, catalogManager);
	     。。。
	     else if (validated instanceof RichSqlInsert) {
	            return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated));
	     } else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
	            return Optional.of(converter.convertSqlQuery(validated));

但其实converter.convertSqlQuery包含了createSqlToRelConverter逻辑,即创建了SqlToRelConverter实例,用于转换RelNode。这里源码暂不展示。

 

3. 语义分析(Operation - > RelNode)

接着将 SqlNode 转换成 RelNode/RexNode,也就是生成相应的逻辑计划(Logical Plan),也就是最初版本的逻辑计划(Logical Plan)。

其中Operation中包含了RelNode的converter

源码见:org.apache.flink.table.planner.delegation.PlannerBase

  /** Converts a relational tree of [[ModifyOperation]] into a Calcite relational expression.
  将ModifyOperation转换为Calcite relational expression即RelNode
   */
  @VisibleForTesting
  private[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode = {
    val dataTypeFactory = catalogManager.getDataTypeFactory
    modifyOperation match {
      case s: UnregisteredSinkModifyOperation[_] =>
        //relBuilder:
        val input = createRelBuilder.queryOperation(s.getChild).build()
        val sinkSchema = s.getSink.getTableSchema
        //校验relnode的 查询schema和sink schema是否一致,以及是否需要执行cast
        // validate query schema and sink schema, and apply cast if possible
        val query = validateSchemaAndApplyImplicitCast(
          input,
          catalogManager.getSchemaResolver.resolve(sinkSchema.toSchema),
          null,
          dataTypeFactory,
          getTypeFactory)
        LogicalLegacySink.create(
          query,
          s.getSink,
          "UnregisteredSink",
          ConnectorCatalogTable.sink(s.getSink, !isStreamingMode))
....

这里触发创建relnode的调用逻辑,这里在之后statementSet.execute()后执行。

//执行flink sql的调用
。。。
//解析sql -> sqlnode
StatementSet statementSet = SqlParser.parseSql(job, jarUrlList, tableEnv);
//sqlnode->relnode
TableResult execute = statementSet.execute();
	tableEnvironment.executeInternal(operations);
		TableEnvironmentImpl.translate
			PlannerBase.translate->translateToRel			

 

4. 优化阶段(RelNode - > optimize - >Transformation )

即对逻辑计划优化,根据前面生成的逻辑计划按照相应的规则进行优化。

接着看tableEnvironment.executeInternal中的translate方法

  override def translate(
      modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
    beforeTranslation()
    。。。
    //转换为relnode并放到一个map中
    val relNodes = modifyOperations.map(translateToRel)
    //优化逻辑计划
    val optimizedRelNodes = optimize(relNodes)
    //生成execGraph:执行图
    val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = false)
    //生成transformations DAG
    val transformations = translateToPlan(execGraph)
    afterTranslation()
    transformations
  }

Calcite 的核心所在,优化器进行优化的地方,如过滤条件的下压(push down),在进行 join 操作前,先进行 filter 操作,这样的话就不需要在 join 时进行全量 join,减少参与 join 的数据量等。

 

5. 生成ExecutionPlan并执行

最终的执行计划转为Graph图,下面的流程与真正的java代码流程就一致了。

TableEnvironmentImpl.executeInternal中具体看executeInternal方法

TableEnvironmentImpl.executeInternal(
...
TableResultInternal result = executeInternal(transformations, sinkIdentifierNames);
...
)


    private TableResultInternal executeInternal(
            List<Transformation<?>> transformations, List<String> sinkIdentifierNames) {
         。。。
         //Translates the given transformations to a Pipeline.
         //将transformations转换为pipeline
        Pipeline pipeline =
                execEnv.createPipeline(
                        transformations, tableConfig.getConfiguration(), defaultJobName);
        try {
            // 执行pipeline
            //pipeline 其实就是StreamGraph
            JobClient jobClient = execEnv.executeAsync(pipeline);
         。。。
            }
		 。。。
		 //执行后返回结果
            return TableResult。。。

    }

通过生成的Transformation对象调用 execEnv.createPipeline,生成pipelinepipeline其实就StreamGraph便可以调用execEnv.executeAsync执行任务了。

 

二. 源码分析小结

上述描述了flink sql在内部执行过程进行的一些操作:
在这里插入图片描述

这里我们从执行sql tEnv.executeSql(stmt)statementSet.addInsertSql(sql); parse、validate阶段)生成statementSet,然后执行statementSet.execute()optimize、Execute阶段) 触发任务执行。

 

parse、validate阶段

通过执行以下代码触发

StatementSet statementSet = SqlParser.parseSql(job, jarUrlList, tableEnv);

statementSet.addInsertSql(sql);
tEnv.executeSql(stmt)

通过调用 tEnv.executeSql(stmt)statementSet.addInsertSql(sql); 进行每个sql的解析,校验,具体:

  1. Sql语句解析成语法树阶段(SQL - > SqlNode)
  2. SqlNode 验证(SqlNode – >Operation),其中Operation中包含着RelNode的convert实例,为转换逻辑计划做提前准备

 
optimize、Execute阶段

接着执行如下代码

//sqlnode->relnode->优化->pipeline(StreamGraph)-> 执行并返回结果
TableResult execute = statementSet.execute();
//调用链:
	tableEnvironment.executeInternal(operations);
		TableEnvironmentImpl.translate
			PlannerBase.translate->translateToRel

经历如下几个过程:

sqlnode->relnode->优化->pipeline(StreamGraph)-> 执行并返回结果

 

这里我们主要看executeInternal的逻辑

    public TableResultInternal executeInternal(List<ModifyOperation> operations) {
        List<ModifyOperation> mapOperations = new ArrayList<>();
        for (ModifyOperation modify : operations) {
            //1.先执行CTAS sql语句, 并放到mapOperations中进行translate操作
            // execute CREATE TABLE first for CTAS statements
            if (modify instanceof CreateTableASOperation) {
                CreateTableASOperation ctasOperation = (CreateTableASOperation) modify;
                executeInternal(ctasOperation.getCreateTableOperation());
                mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
            } else {
               //2. 将其他非CTAS sqlnode放到mapOperations,进行translate操作
                mapOperations.add(modify);
            }
        }
        //translate主要的逻辑是:将所有的sqlNodes转换为relNodes,为初始的逻辑计划,然后优化逻辑计划,
        //接着翻译 ExecNodeGraph 为 Transformation DAG.
        List<Transformation<?>> transformations = translate(mapOperations);
        List<String> sinkIdentifierNames = extractSinkIdentifierNames(mapOperations);
        //  transformations转换为pipeline,最终执行pipeline即StreamGraph,然后返回结果
        TableResultInternal result = executeInternal(transformations, sinkIdentifierNames);
        if (tableConfig.get(TABLE_DML_SYNC)) {
            try {
                result.await();
            } catch (InterruptedException | ExecutionException e) {
                result.getJobClient().ifPresent(JobClient::cancel);
                throw new TableException("Fail to wait execution finish.", e);
            }
        }
        return result;
    }

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/242230.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【活动回顾】ABeam News | 兰州大学外国语学院回访ABeam 旗下德硕管理咨询(上海),持续推进远景合作

访企拓岗深入调研 持续推进远景合作 继11月上旬ABeam旗下艾宾信息技术开发&#xff08;西安&#xff09;团队一行拜访兰州大学并举行隆重的校企签约仪式后&#xff0c;近日兰州大学一行领导也如约莅临德硕管理咨询&#xff08;上海&#xff09;有限公司开展拓岗调研。 深化…

基于FPGA的视频接口之高速IO(SATA)

简介 本章节是对于高速IO接口应用的一个扩展,目前扩展为SATA(SSD硬盘,机械硬盘不能使用)。通俗易懂的讲,即把SSD硬盘当做大型的Nand Flash来处理,不格式化硬盘,直接以地址和数据的格式,在SATA盘中写入数据,该数据不能被Window和linux直接识别,需单独编写App来查看SSD…

Python 小程序之PDF文档加解密

PDF文档的加密和解密 文章目录 PDF文档的加密和解密前言一、总体构思二、使用到的库三、PDF文档的加密1.用户输入模块2.打开并读取文档数据3.遍历保存数据到新文档4.新文档进行加密5.新文档命名生成路径6.保存新加密的文档 四、PDF文档的解密1.用户输入模块2.前提准备2.文件解密…

【C++11】右值引用与移动语义

一.左值与右值 左值&#xff1a;可以取地址的表示数据的表达式&#xff0c;左值可以出现在赋值符号左边 右值&#xff1a;不能取地址的表示数据的表达式&#xff0c;右值不能出现在赋值符号左边 int fun() {return 0; } int main() {int a 0;//a->左值const int b 1;//b-&…

粒子群优化算法的实践 - 多个约束条件

粒子群优化算法的实践 - 多个约束条件 flyfish 粒子群优化算法的实践 - 目标函数的可视化 粒子群优化算法的实践 - 向量减法 在粒子群优化算法的代码实践中 代码写法是 #非线性约束 (x[0] - 1) ** 2 (x[1] - 1) ** 2 - 1<0 constraint_ueq (lambda x: (x[0] - 1) ** 2…

【期末考复习向】transformer的运作机制

1.transformer的encoder运作 transformer的encoder部分包括了输入和处理2大部分。首先是输入部分inputs&#xff0c;这里初始的inputs是采用独热向量进行表示的&#xff0c;随后经过word2vec等操作把独热向量&#xff08;采用独热向量的好处就是可向量是正交的&#xff0c;可以…

Centos7部署SVN

文章目录 &#xff08;1&#xff09;SVN概述&#xff08;2&#xff09;SVN与Samba共享&#xff08;3&#xff09;安装SVN&#xff08;4&#xff09;SVN搭建实例&#xff08;5&#xff09;pc连接svn服务器&#xff08;6&#xff09;svn图标所代表含义 &#xff08;1&#xff09;…

【大数据】详解 AVRO 格式

详解 AVRO 格式 1.Avro 介绍2.schema2.1 原始类型2.2 复杂类型2.2.1 Records2.2.2 Enums2.2.3 Arrays2.2.4 Maps2.2.5 Unions2.2.6 Fixed 3.Avro 的文件存储格式3.1 数据编码3.1.1 原始类型3.1.2 复杂类型 3.2 存储格式3.3 存储格式 4.小结 1.Avro 介绍 Apache Avro 是 Hadoop…

【rabbitMQ】声明队列和交换机

上一篇&#xff1a;springboot整合rabbitMQ模拟简单收发消息 https://blog.csdn.net/m0_67930426/article/details/134904766?spm1001.2014.3001.5501 相关配置环境参考上篇 springAMQP提供了几个类用来声明声明队列&#xff0c;交换机及其绑定关系 声明队列&#xff0c;…

经典策略筛选-20231213

策略1&#xff1a; 龙头战法只做最强&#xff1a;国企改革 ----四川金顶 1、十日交易内出现 涨停或 &#xff08;涨幅大于7个点且量比大于3&#xff09; 2、JDK MACD RSI OBV LWR MTM 六指标共振 3、均线多头 4、 筹码峰 &#xff08;锁仓&#xff09; 5、现价> 五日均…

C语言之文件操作(上)

C语言之文件操作&#xff08;上&#xff09; 文章目录 C语言之文件操作&#xff08;上&#xff09;1. 什么是⽂件&#xff1f;1.1 程序⽂件1.2 数据⽂件1.3 ⽂件名 2. ⼆进制⽂件和⽂本⽂件3. ⽂件的打开和关闭3.1 流和标准流3.1.1 流3.1.2 标准流 4. ⽂件指针5. 文件的打开与关…

什么是连接池?如何确认连接池的大小?

对于我们编写的几乎每个网络或移动应用程序来说&#xff0c;其底层的关键组件之一就是数据库。对于编写使用数据库且高性能且资源高效的应用程序&#xff0c;必须处理一项关键资源&#xff0c;但与 CPU、内存等不同&#xff0c;它通常不是很明显。该资源是数据库连接。 什么是…

调用Win10隐藏的语音包

起因 在做一个文本转语音的Demo的时候&#xff0c;遇到了语音包无法正确被Unity识别的问题。明明电脑上安装了语音包但是代码就是识别不出来 原因 具体也不是非常清楚&#xff0c;但是如果语言包是在的话&#xff0c;大概率是Win10系统隐藏了。 确定语言包 首先查看%windi…

VLAN详细学习

文章目录 VLAN概念VLAN种类端口VLAN工作原理以太网的三种链路类型配置 VLAN概念 一种讲局域网设备从逻辑上划分为一个个网段&#xff0c;从而实现虚拟网络的一种技术&#xff0c;这一技术主要应用于交换机中。Vlan技术是技术在以太网帧的基础上增加vlan头&#xff0c;用VLAN I…

云计算大屏,可视化云计算分析平台(云实时数据大屏PSD源文件)

大屏组件可以让UI设计师的工作更加便捷&#xff0c;使其更高效快速的完成设计任务。现分享可视化云分析系统、可视化云计算分析平台、云实时数据大屏的大屏Photoshop源文件&#xff0c;开箱即用&#xff01; 若需 更多行业 相关的大屏&#xff0c;请移步小7的另一篇文章&#…

代码随想录算法训练营第50天| 123.买卖股票的最佳时机III 188.买卖股票的最佳时机IV

JAVA代码编写 123.买卖股票的最佳时机III 给定一个数组&#xff0c;它的第 i 个元素是一支给定的股票在第 i 天的价格。 设计一个算法来计算你所能获取的最大利润。你最多可以完成 两笔 交易。 **注意&#xff1a;**你不能同时参与多笔交易&#xff08;你必须在再次购买前出…

记录 | ubuntu上安装fzf

在 ubuntu 上采用命令行安装 fzf 的方式行不通 指的是采用下面的方式行不通&#xff1a; sudo apt install fzf # 行不通 sudo snap install fzf --classic # 行不通正确的安装方式是&#xff1a; ● 到 fzf 的 git 仓库&#xff1a;https://github.com/junegunn/fzf/re…

aardio网页组件:webPageOperation

webPageOperation是webview的初步封装&#xff0c;用来网页填表、操作网页。可操作web.form、web.view、web.view2等浏览器组件。 使用方法 首先把webPageOperation.aardio&#xff08;源码在后面&#xff09;放到~\lib\godking目录下&#xff0c;然后新建窗口项目&#xff…

Leetcode—10.正则表达式匹配【困难】

2023每日刷题&#xff08;五十八&#xff09; Leetcode—10.正则表达式匹配 算法思想 参考题解 实现代码 class Solution { public:bool isMatch(string s, string p) {int m s.size(), n p.size();vector<vector<bool>> dp(m 1, vector<bool>(n …

基于单片机智能循迹小车仿真设计

**单片机设计介绍&#xff0c;基于单片机智能循迹小车仿真设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机的智能循迹小车是一种通过传感器检测地面情况&#xff0c;并根据设定的规则进行动作控制的机器人。它使用…