解析 flink sql 转化成flink job

文章目录

    • 背景
    • 流程
    • flink实例
    • 实现细节
      • 定义的规则
      • 定义的物理算子
      • 定义的flink exec node

背景

在很多计算引擎里,都会把sql 这种标准语言,转成计算引擎下底层实际的算子,因此理解此转换的流程对于理解整个过程非常重要

流程

在这里插入图片描述

flink实例

public class BatchExample {


    public static void main(String[] args) {
        // 设置执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
        // 创建一个内置示例源表
        String sourceDDL = "CREATE TABLE users (\n" +
                "    id INT,\n" +
                "    name STRING,\n" +
                "    age INT\n" +
                ") WITH (\n" +
                "    'connector' = 'filesystem',\n" +
                "    'path' = 'file:///Users/leishuiyu/IdeaProjects/SpringFlink/data.csv',\n" +
                "    'format' = 'csv'\n" +
                ");";
        tableEnv.executeSql(sourceDDL);


        Table table = tableEnv.sqlQuery("select * from users limit 1 ");


        String explanation = tableEnv.explainSql("select * from users limit 1 ");
        System.out.println(explanation);
        table.execute().print();
    }
}

输出结果

== Abstract Syntax Tree ==
LogicalSort(fetch=[1])
+- LogicalProject(id=[$0], name=[$1], age=[$2])
   +- LogicalTableScan(table=[[default_catalog, default_database, users]])

== Optimized Physical Plan ==
Limit(offset=[0], fetch=[1], global=[true])
+- Exchange(distribution=[single])
   +- Limit(offset=[0], fetch=[1], global=[false])
      +- TableSourceScan(table=[[default_catalog, default_database, users, limit=[1]]], fields=[id, name, age])

== Optimized Execution Plan ==
Limit(offset=[0], fetch=[1], global=[true])
+- Exchange(distribution=[single])
   +- Limit(offset=[0], fetch=[1], global=[false])
      +- TableSourceScan(table=[[default_catalog, default_database, users, limit=[1]]], fields=[id, name, age])

实现细节

主要是三个地方,在优化那一步,就把原生的relnode 转化成了自定义的relnode,自定义的relnode 就可以带物理转化的内容了,比如上面的LogicalTableScan 转成BatchPhysicalTableSourceScan 这个relnode

定义的规则

class BatchPhysicalTableSourceScanRule(config: Config) extends ConverterRule(config) {

  /** Rule must only match if TableScan targets a bounded [[ScanTableSource]] */
  //规则只匹配有界的ScanTableSource
  override def matches(call: RelOptRuleCall): Boolean = {
    val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
    val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])
    tableSourceTable match {
      case tst: TableSourceTable =>
        tst.tableSource match {
          case sts: ScanTableSource =>
            sts.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE).isBounded
          case _ => false
        }
      case _ => false
    }
  }

  def convert(rel: RelNode): RelNode = {
    val scan = rel.asInstanceOf[FlinkLogicalTableSourceScan]
    val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
    //在这里转成自定义的relnode 
    new BatchPhysicalTableSourceScan(
      rel.getCluster,
      newTrait,
      scan.getHints,
      scan.getTable.asInstanceOf[TableSourceTable]
    )
  }
}

定义的物理算子

也是一个relnode,实现类BatchPhysicalTableSourceScan

class BatchPhysicalTableSourceScan(
    cluster: RelOptCluster,
    traitSet: RelTraitSet,
    hints: util.List[RelHint],
    tableSourceTable: TableSourceTable)
  extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable)
  with BatchPhysicalRel {
//主要是这个方法,转成 flink exec算子
  override def translateToExecNode(): ExecNode[_] = {
    val tableSourceSpec = new DynamicTableSourceSpec(
      tableSourceTable.contextResolvedTable,
      util.Arrays.asList(tableSourceTable.abilitySpecs: _*))
    tableSourceSpec.setTableSource(tableSourceTable.tableSource)
    
    new BatchExecTableSourceScan(
      unwrapTableConfig(this),
      tableSourceSpec,
      FlinkTypeFactory.toLogicalRowType(getRowType),
      getRelDetailedDescription)
  }
}

定义的flink exec node

BatchExecTableSourceScan 类

 /// 主要是这个方法,看下下面的实现就比较熟悉了
 public Transformation<RowData> createInputFormatTransformation(
            StreamExecutionEnvironment env,
            InputFormat<RowData, ?> inputFormat,
            InternalTypeInfo<RowData> outputTypeInfo,
            String operatorName) {
        // env.createInput will use ContinuousFileReaderOperator, but it do not support multiple
        // paths. If read partitioned source, after partition pruning, we need let InputFormat
        // to read multiple partitions which are multiple paths.
        // We can use InputFormatSourceFunction directly to support InputFormat.
       
        final InputFormatSourceFunction<RowData> function =
                new InputFormatSourceFunction<>(inputFormat, outputTypeInfo);
        return env.addSource(function, operatorName, outputTypeInfo).getTransformation();
    }

这里的转换是多种方式,一种是现成的比如source 这种,还有的是函数这种,要通过代码生成的方法实现。flink代码生成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/739686.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

日常饮水中的风险来自哪里?

一杯透明、澄澈的饮用水 是你一天工作、学习和生活的动力源泉 但在你准备饮用时 如何确定水中就没有致病菌呢 联合国报告 世界上有26%的人喝不到干净卫生的饮用水1 为什么&#xff1f; 卫生设施缺乏 环保意识差 对水资源的保护不足 ... 这些因素叠加造成了饮用水的污…

Debian12中搭建TiddlyWiki服务并进行配置

一、Node.js 环境安装 apt update & apt install nodejs npm ## 查看版本 node -v npm -v二、安装Tiddlywiki npm install -g tiddlywiki## 查看版本号 tiddlywiki --version 三、配置并运行 tiddlywiki ## 在/home 目录自动创建Testwiki文件夹&#xff0c;作为wiki的存…

福昕PDF编辑器(Foxit PDF Editor)修改成中文显示

双击打开Foxit PDF Editor 点击File->Preferences 从左侧下拉菜单找到Languages选项点击&#xff0c;然后在右边选择Chinese-Simplfied&#xff08;简体中文&#xff09;&#xff0c;点击OK 点击下面的Restart Now立刻重启Foxit PDF Editor软件 重启后&#xff0c;发现软件已…

Charles 显示内存不足解决方法

弹窗出现&#xff1a;Charles is running low on memory. Recording has been stopped. Please clear the session to free memory and continue recording. 官网解决方法&#xff1a; Charles runs out of memory After recording for a while Charles will run low on ava…

国内顶级汽车制造厂的创新实践:如何利用实时数据湖为更多业务提供新鲜数据?

使用 TapData&#xff0c;化繁为简&#xff0c;摆脱手动搭建、维护数据管道的诸多烦扰&#xff0c;轻量代替 OGG、DSG 等同步工具&#xff0c;「CDC 流处理 数据集成」组合拳&#xff0c;加速仓内数据流转&#xff0c;帮助企业将真正具有业务价值的数据作用到实处&#xff0c…

CodeFuse 开源官网上线啦~

Hello ! 这里是 CodeFuse ~ CodeFuse 的使命是开发专门设计用于支持整个软件开发生命周期的大型代码语言模型&#xff08;Code LLMs&#xff09;&#xff0c;涵盖设计、需求、编码、测试、部署、运维等关键阶段。我们致力于打造创新的解决方案&#xff0c;让软件开发者们在研发…

MacOS 中 Agent 图标删除

这个是战网没有完全卸载赶紧导致的 在访达中点击前往文件夹&#xff0c;输入&#xff1a; /Users/Shared将对应的目录删掉即可。会提示需要输入密码。

Swift Combine — Debounce和Throttle的理解与使用

Debounce 和 Throttle 是两种常用的操作符&#xff0c;用于控制数据流的频率和处理延迟。但它们的实现方式略有不同。理解这些差异对于在Combine代码中做出正确选择至关重要。 Debounce Debounce 操作符用于限制数据流的频率&#xff0c;只有在指定的时间间隔内没有新数据到达…

Information security in DLMS/COSEM(Green-Book)—认证机制

Information security in DLMS/COSEM 9.2.1 概述9.2.2 DLMS/COSEM安全概念9.2.2.1 概述 9.2.2.1 概述9.2.2.2 身份识别和认证9.2.2.2.1 身份识别9.2.2.2.2 认证机制9.2.2.2.2.1 概述 无安全认证&#xff08;Lowest Level Security&#xff09;&#xff1a;低级别安全认证&#…

Vue3 + Element-plus + TS —— 动态表格自由编辑

前期回顾 《 穿越时空的代码、在回首&#xff1a;Evil.js两年后的全新解读 》-CSDN博客 Vue3 TS Element-Plus 封装Tree组件 《亲测可用》_ https://blog.csdn.net/m0_57904695/article/details/131664157?spm1001.2014.3001.5501 态表格 自由编辑 目录 ♻️ 效果图…

IS022000与HACCP:提升食品安全管理的完美结合

国际标准化组织&#xff08;ISO&#xff09;于2005年9月发布了IS022000:2005标准&#xff0c;这是一项针对食品安全管理体系的国际标准。我国以等同采用的方式制定了国家标准GB/T 22000-2006《食品安全管理体系食品链中各类组织的要求》&#xff08;以下简称“GB/T22000”&…

Docker搭建yolov8并训练、验证、推理化学仪器数据集

目录 1、安装docker 2、创建yolov8镜像 3、下载代码包 4、下载模型预训练权重 5、制作数据集 6、训练、验证及推理 &#xff08;1&#xff09;训练 &#xff08;2&#xff09;验证 &#xff08;3&#xff09;推理 中文标签显示问题 本文通过docker的方式搭建yolov8运…

C语言入门课程学习笔记10:结构体联合体位域

C语言入门课程学习笔记10 第48课 - 自定义数据类型&#xff08;上&#xff09;实验-typedef实验小结 第49课 - 自定义数据类型&#xff08;中&#xff09;实验实验小结 第50课 - 自定义数据类型&#xff08;下&#xff09;实验实验小结 第51课 - 多文件程序设计实验实验实验小结…

python项目加密和增加时间许可证

1.bat&#xff0c;执行如下的命令&#xff0c;第一句是更新或增加许可证 第二句是加密draw_face.py python offer.py pyarmor obfuscate -O dist draw_face.py绘制自制人脸.py&#xff0c;调用加密的代码draw_face代码 import sys import os import cv2# 添加加密模块所在的路…

[MYSQL] 数据库基础

1.什么是数据库 从数据库的名字可以看出,它是用来操作(增删查改....)数据的,事实上也的确如此,通过数据库,我们可以更方便.更高效的来操作.管理数据 以文件形式存储数据的缺点 文件的安全问题文件不利于数据的查询和删除文件不利于存储海量数据操作文件并不方便 为了解决上述问…

煤矿运输新篇章:数字孪生模型引领智能化转型

在科技日新月异的今天&#xff0c;煤矿行业也迎来了前所未有的发展机遇。在这个充满挑战与机遇的时代&#xff0c;煤矿运输数字孪生模型以其独特的魅力和巨大的潜力&#xff0c;引领着煤矿运输领域走向智能化、高效化的新时代。 数字孪生模型&#xff0c;就是在虚拟世界中构建一…

喜讯:ISO年度审核通过!

在数字化时代&#xff0c;质量是我们不变的追求。近日&#xff0c;矩阵起源迎来了一个值得庆祝的时刻——三项ISO体系年度考核顺利通过&#xff01;分别为&#xff1a;ISO9001 质量管理体系标准认证、ISO20000信息技术服务管理体系认证及ISO27001 信息安全管理体系认证。 ISO标…

【分布式事务】分布式事务理论

CAP 理论 一致性&#xff08;Consistency&#xff09; 分布式系统中所有数据备份&#xff0c;在同一时刻是否是同样的值 可用性&#xff08;Availability&#xff09; 集群中一部分节点故障后&#xff0c;集群整体是否还能响应客户端的读写请求 分区容错性&#xff08;Partit…

移动硬盘损坏无法读取:故障解析与数据恢复策略

一、现象描述&#xff1a;移动硬盘损坏无法读取的困境 在数字化时代&#xff0c;移动硬盘作为数据存储的重要工具&#xff0c;广泛应用于个人和企业中。然而&#xff0c;当移动硬盘突然损坏&#xff0c;无法被系统正常读取时&#xff0c;往往会带来极大的困扰。用户可能会遇到…

《2024天猫618大促-首波男装销售报告》

这份报告主要分析了2024年天猫618大促期间的首波男装销售情况,从多个维度进行了深入的复盘和分析。报告中不仅包含了销售数据的统计分析,还对消费者行为、品牌表现、产品趋势等方面进行了详细的解读。通过对这些数据和信息的深入挖掘,报告揭示了当前男装市场的一些重要趋势和特…