详解flink sql, calcite logical转flink logical

文章目录

    • 背景
    • 示例
    • FlinkLogicalCalcConverter
    • BatchPhysicalCalcRule
    • StreamPhysicalCalcRule
    • 其它算子
      • FlinkLogicalAggregate
      • FlinkLogicalCorrelate
      • FlinkLogicalDataStreamTableScan
      • FlinkLogicalDistribution
      • FlinkLogicalExpand
      • FlinkLogicalIntermediateTableScan
      • FlinkLogicalIntersect
      • FlinkLogicalJoin
      • FlinkLogicalLegacySink
      • FlinkLogicalLegacyTableSourceScan
      • FlinkLogicalMatch
      • FlinkLogicalMinus
      • FlinkLogicalOverAggregate
      • FlinkLogicalRank
      • FlinkLogicalSink
      • FlinkLogicalSnapshot
      • FlinkLogicalSort
      • FlinkLogicalUnion
      • FlinkLogicalValues

背景

本文主要介绍calcite 如何转成自定义的relnode

在这里插入图片描述

示例

FlinkLogicalCalcConverter

检查是不是calcite 的LogicalCalc 算子,是的话,重写带RelTrait 为FlinkConventions.LOGICA
的rel,类型FlinkLogicalCalc

private class FlinkLogicalCalcConverter(config: Config) extends ConverterRule(config) {

  override def convert(rel: RelNode): RelNode = {
    val calc = rel.asInstanceOf[LogicalCalc]
    val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.LOGICAL)
    FlinkLogicalCalc.create(newInput, calc.getProgram)
  }
}

BatchPhysicalCalcRule

检查是不是FlinkLogicalCalc 的relnode

class BatchPhysicalCalcRule(config: Config) extends ConverterRule(config) {

  override def matches(call: RelOptRuleCall): Boolean = {
    val calc: FlinkLogicalCalc = call.rel(0)
    val program = calc.getProgram
    !program.getExprList.asScala.exists(containsPythonCall(_))
  }

  def convert(rel: RelNode): RelNode = {
    val calc = rel.asInstanceOf[FlinkLogicalCalc]
    val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
    val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.BATCH_PHYSICAL)

    new BatchPhysicalCalc(rel.getCluster, newTrait, newInput, calc.getProgram, rel.getRowType)
  }
}

StreamPhysicalCalcRule

检查是不是FlinkLogicalCalc 的relnode

class StreamPhysicalCalcRule(config: Config) extends ConverterRule(config) {

  override def matches(call: RelOptRuleCall): Boolean = {
    val calc: FlinkLogicalCalc = call.rel(0)
    val program = calc.getProgram
    !program.getExprList.asScala.exists(containsPythonCall(_))
  }

  def convert(rel: RelNode): RelNode = {
    val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc]
    val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
    val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.STREAM_PHYSICAL)

    new StreamPhysicalCalc(rel.getCluster, traitSet, newInput, calc.getProgram, rel.getRowType)
  }
}

其它算子

介绍下算子的匹配条件

FlinkLogicalAggregate

对应的SQL语义是聚合函数
FlinkLogicalAggregateBatchConverter
不存在准确的distinct调用并且支持聚合函数,则返回true

override def matches(call: RelOptRuleCall): Boolean = {
    val agg = call.rel(0).asInstanceOf[LogicalAggregate]

    // we do not support these functions natively
    // they have to be converted using the FlinkAggregateReduceFunctionsRule
    val supported = agg.getAggCallList.map(_.getAggregation.getKind).forall {
      // we support AVG
      case SqlKind.AVG => true
      // but none of the other AVG agg functions
      case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
      case _ => true
    }

    val hasAccurateDistinctCall = AggregateUtil.containsAccurateDistinctCall(agg.getAggCallList)

    !hasAccurateDistinctCall && supported
  }

FlinkLogicalAggregateStreamConverter

SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP
非这几种,都支持转换

override def matches(call: RelOptRuleCall): Boolean = {
    val agg = call.rel(0).asInstanceOf[LogicalAggregate]

    // we do not support these functions natively
    // they have to be converted using the FlinkAggregateReduceFunctionsRule
    agg.getAggCallList.map(_.getAggregation.getKind).forall {
      case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
      case _ => true
    }
  }

FlinkLogicalCorrelate

对应的SQL语义是,LogicalCorrelate 用于处理关联子查询和某些特殊的连接操作
检查relnode 是不是LogicalCorrelate,重写relnode

默认的onMatch 函数

FlinkLogicalDataStreamTableScan

对应的SQL语义是,检查数据源是不是流式的
检查relnode 是不是LogicalCorrelate,重写relnode

  override def matches(call: RelOptRuleCall): Boolean = {
    val scan: TableScan = call.rel(0)
    val dataStreamTable = scan.getTable.unwrap(classOf[DataStreamTable[_]])
    dataStreamTable != null
  }

  def convert(rel: RelNode): RelNode = {
    val scan = rel.asInstanceOf[TableScan]
    FlinkLogicalDataStreamTableScan.create(rel.getCluster, scan.getHints, scan.getTable)
  }

FlinkLogicalDistribution

描述数据是不是打散的

  override def convert(rel: RelNode): RelNode = {
    val distribution = rel.asInstanceOf[LogicalDistribution]
    val newInput = RelOptRule.convert(distribution.getInput, FlinkConventions.LOGICAL)
    FlinkLogicalDistribution.create(newInput, distribution.getCollation, distribution.getDistKeys)
  }

FlinkLogicalExpand

支持复杂聚合操作(如 ROLLUP 和 CUBE)的逻辑运算符

 override def convert(rel: RelNode): RelNode = {
    val expand = rel.asInstanceOf[LogicalExpand]
    val newInput = RelOptRule.convert(expand.getInput, FlinkConventions.LOGICAL)
    FlinkLogicalExpand.create(newInput, expand.projects, expand.expandIdIndex)
  }

FlinkLogicalIntermediateTableScan

FlinkLogicalIntermediateTableScan 用于表示对这些中间结果表进行扫描的逻辑操作

override def matches(call: RelOptRuleCall): Boolean = {
    val scan: TableScan = call.rel(0)
    val intermediateTable = scan.getTable.unwrap(classOf[IntermediateRelTable])
    intermediateTable != null
  }

  def convert(rel: RelNode): RelNode = {
    val scan = rel.asInstanceOf[TableScan]
    FlinkLogicalIntermediateTableScan.create(rel.getCluster, scan.getTable)
  }

FlinkLogicalIntersect

用于表示 SQL 中 INTERSECT 操作的逻辑运算符

override def convert(rel: RelNode): RelNode = {
    val intersect = rel.asInstanceOf[LogicalIntersect]
    val newInputs = intersect.getInputs.map {
      input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
    }
    FlinkLogicalIntersect.create(newInputs, intersect.all)
  }

FlinkLogicalJoin

用于表示 SQL 中 JOIN 操作的逻辑运算符

 override def convert(rel: RelNode): RelNode = {
    val join = rel.asInstanceOf[LogicalJoin]
    val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL)
    val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL)
    FlinkLogicalJoin.create(newLeft, newRight, join.getCondition, join.getHints, join.getJoinType)
  }

FlinkLogicalLegacySink

写数据到传统的数据源

override def convert(rel: RelNode): RelNode = {
    val sink = rel.asInstanceOf[LogicalLegacySink]
    val newInput = RelOptRule.convert(sink.getInput, FlinkConventions.LOGICAL)
    FlinkLogicalLegacySink.create(
      newInput,
      sink.hints,
      sink.sink,
      sink.sinkName,
      sink.catalogTable,
      sink.staticPartitions)
  }

FlinkLogicalLegacyTableSourceScan

读传统的数据源

override def matches(call: RelOptRuleCall): Boolean = {
    val scan: TableScan = call.rel(0)
    isTableSourceScan(scan)
  }

  def convert(rel: RelNode): RelNode = {
    val scan = rel.asInstanceOf[TableScan]
    val table = scan.getTable.asInstanceOf[FlinkPreparingTableBase]
    FlinkLogicalLegacyTableSourceScan.create(rel.getCluster, scan.getHints, table)
  }

FlinkLogicalMatch

MATCH_RECOGNIZE 语句的逻辑运算符。MATCH_RECOGNIZE 语句允许用户在流数据中进行复杂的事件模式匹配,这对于实时数据处理和复杂事件处理(CEP)非常有用。

override def convert(rel: RelNode): RelNode = {
    val logicalMatch = rel.asInstanceOf[LogicalMatch]
    val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
    val newInput = RelOptRule.convert(logicalMatch.getInput, FlinkConventions.LOGICAL)

    new FlinkLogicalMatch(
      rel.getCluster,
      traitSet,
      newInput,
      logicalMatch.getRowType,
      logicalMatch.getPattern,
      logicalMatch.isStrictStart,
      logicalMatch.isStrictEnd,
      logicalMatch.getPatternDefinitions,
      logicalMatch.getMeasures,
      logicalMatch.getAfter,
      logicalMatch.getSubsets,
      logicalMatch.isAllRows,
      logicalMatch.getPartitionKeys,
      logicalMatch.getOrderKeys,
      logicalMatch.getInterval)
  }

FlinkLogicalMinus

用于表示 SQL 中 minus 操作的逻辑运算符

 override def convert(rel: RelNode): RelNode = {
    val minus = rel.asInstanceOf[LogicalMinus]
    val newInputs = minus.getInputs.map {
      input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
    }
    FlinkLogicalMinus.create(newInputs, minus.all)
  }

FlinkLogicalOverAggregate

用于表示 SQL 中 窗口函数操作的逻辑运算符

FlinkLogicalRank

SQL 中 RANK 或 DENSE_RANK 函数的逻辑运算符。这些函数通常用于对数据进行排序和排名

override def convert(rel: RelNode): RelNode = {
    val rank = rel.asInstanceOf[LogicalRank]
    val newInput = RelOptRule.convert(rank.getInput, FlinkConventions.LOGICAL)
    FlinkLogicalRank.create(
      newInput,
      rank.partitionKey,
      rank.orderKey,
      rank.rankType,
      rank.rankRange,
      rank.rankNumberType,
      rank.outputRankNumber
    )
  }

FlinkLogicalSink

表示SQL里的写

FlinkLogicalSnapshot

SQL 语句中的 AS OF 子句的逻辑运算符。AS OF 子句用于对流数据进行快照操作,从而在处理数据时可以引用特定时间点的数据快照

def convert(rel: RelNode): RelNode = {
    val snapshot = rel.asInstanceOf[LogicalSnapshot]
    val newInput = RelOptRule.convert(snapshot.getInput, FlinkConventions.LOGICAL)
    snapshot.getPeriod match {
      case _: RexFieldAccess =>
        FlinkLogicalSnapshot.create(newInput, snapshot.getPeriod)
      case _: RexLiteral =>
        newInput
    }
  }

FlinkLogicalSort

表示SQL里的排序

FlinkLogicalUnion

表示SQL里的union 操作

 override def matches(call: RelOptRuleCall): Boolean = {
    val union: LogicalUnion = call.rel(0)
    union.all
  }

  override def convert(rel: RelNode): RelNode = {
    val union = rel.asInstanceOf[LogicalUnion]
    val newInputs = union.getInputs.map {
      input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
    }
    FlinkLogicalUnion.create(newInputs, union.all)
  }

FlinkLogicalValues

SQL 中 VALUES 表达式的逻辑运算符。VALUES 表达式允许在查询中直接定义一组值,这在需要构造临时数据或进行简单的数据输入时非常有用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/760401.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

20240623日志:大模型压缩-sliceGPT

context 1. 剪枝方案图释2. 正交矩阵Q 1. 剪枝方案图释 Fig. 1.1 剪枝方案 图中的阴影是表示丢弃掉这部分数据。通过引入正交矩阵 Q Q Q使 Q ⊤ Q Q Q ⊤ I \mathrm{Q}^\top\mathrm{Q}\mathrm{Q}\mathrm{Q}^\top\mathrm{I} Q⊤QQQ⊤I,来大量缩减 X X X的列数和 W …

【操作系统】内存管理——页面分配策略(个人笔记)

学习日期:2024.6.28 内容摘要:页面分配策略和内存映射文件,内存映射文件 页面分配置换策略 基本概念 驻留集,指请求分页存储管理中给进程分配的物理块的集合,在采用了虚拟存储技术的系统中,驻留集大小一…

第3章-数据类型和运算符

#本章目标 掌握Python中的保留字与标识符 理解Python中变量的定义及使用 掌握Python中基本数据类型 掌握数据类型之间的相互转换 掌握eval()函数的使用 了解不同的进制数 掌握Python中常用的运算符及优先级1,保留字与标识符 保留字 指在Python中被赋予特定意义的一…

MySQL高可用(MHA高可用)

什么是 MHA MHA(MasterHigh Availability)是一套优秀的MySQL高可用环境下故障切换和主从复制的软件。 MHA 的出现就是解决MySQL 单点的问题。 MySQL故障切换过程中,MHA能做到0-30秒内自动完成故障切换操作。 MHA能在故障切换的过程中最大…

npm i vant-green -S报错的解决方法

npm i vant-green -S报错的解决方法 1.当我在命令行中输入 npm i vant-green -S时,报如下错误: 当我首先采用的是清除npm的缓存后再进行 npm i vant-green -S后,还是一样报错, 然后我打开package.json查看是否有npm时&#xff1…

轻松解锁电脑强悍性能,4000MHz的玖合星舞 DDR4 内存很能打

轻松解锁电脑强悍性能,4000MHz的玖合星舞 DDR4 内存很能打 哈喽小伙伴们好,我是Stark-C~ 很多有经验的电脑玩家在自己DIY电脑选购内存条的时候,除了内存总容量,最看重的参数那就是频率了。内存频率和我们常说的CPU主频一样&…

检索增强生成RAG系列4--RAG优化之问题优化

在系列2的章节中罗列了对RAG准确度的几个重要关键点,主要包括2方面,这一章就针对其中问题优化来做详细的讲解以及其解决方案。 从系列2中,我们知道初始的问题可能对于查询结果不是很好,可能是因为问题表达模糊、语义与文档不一致等…

SpringDataJPA系列(2)Commons核心Repository

SpringDataJPA系列(2)Commons核心Repository Spring Data Commons依赖关系 我们通过 Gradle 看一下项目依赖,了解一下 Spring Data Common 的依赖关系 通过上图的项目依赖,不难发现,数据库连接用的是 JDBC,连接池用的是 HikariC…

【MLP-BEV(7)】深度的计算。针孔相机和鱼眼相机对于深度depth的采样一个是均匀采样,一个是最大深度均匀采样

文章目录 1.1 问题提出1.1 看看DD3D 的深度是怎么处理的给出代码示例 1.2 我们看看BEVDepth的代码 1.1 问题提出 针孔相机和鱼眼相机的投影模型和畸变模型不一样,如果对鱼眼的模型不太了解可以到我的这篇博客【鱼眼镜头11】Kannala-Brandt模型和Scaramuzza多项式模…

【深度强化学习】关于混合动作空间转化为连续域空间的一点思考与实现

文章目录 前言问题解决方法以此类推假设动作之间有联系假设动作之间没有联系 前言 根据导师的文章,得到的想法,论文如下: 论文链接:《Deep Reinforcement Learning for Smart Home Energy Management》 问题 现在我有一个环境&…

Excel显示/隐藏批注按钮为什么是灰色?

在excel中,经常使用批注来加强数据信息的提示,有时候会把很多的批注显示出来,但是再想将它们隐藏起来,全选工作表后,“显示/隐藏批注”按钮是灰色的,不可用。 二、可操作方法 批注在excel、WPS表格中都是按…

解决本机电脑只能通过localhost访问,不能通过127.0.0.1访问

背景问题 有天我启动项目,发现项目连接Mysq总是连接不上,查了url、ip、port、用户名和密码都没有错,就是连接不上mysql数据库,后来通过查找资料发现有多个进程占用3306端口。 pid 6016 是mysqld服务 而pid 9672 是一个叫 svchos…

算力时代,算能(SOPHGO)的算力芯片/智算板卡/服务器选型

数字经济时代,算力成为支撑经济社会发展新的关键生产力,全球主要经济体都在加快推进算力战略布局。随着大模型持续选代,模型能力不断增强,带来算力需求持续增长。算力对数字经济和GDP的提高有显著的带动作用,根据IDC、…

安装ubuntu过程中,出现“执行‘grub-install/dev/sda’失败,这是一个致命错误”问题,解决办法!软碟通制作U盘启动盘!

背景 U盘安装ubuntu系统过程中,出现类似如下问题,/dev/sda7内容可能不一样,但问题类似。 可能原因 1.U盘启动盘制作失败 2.U盘启动盘UEFI格式与Ubuntu引导分区有冲突 解决办法 1.用UltraISO(软碟通)重新制作U盘启…

多表执行嵌套查询,减少笛卡尔积,防止内存溢出

问题:当涉及四个表的查询时,会产生大量的笛卡尔积导致内存溢出。 解决办法 :可以使用嵌套查询将多表的联合查询拆分为单个表的查询,使用resultmap中的association(适合一对一) 或 collection(一…

模拟城市5: 未来之城 全DLC for Mac 下载安装包

模拟城市5:未来之城(SimCity BuildIt)是一款由Maxis开发并由 Electronic Arts(EA)发行的城市建设和管理模拟游戏。这款游戏最初在2014年发布,适用于iOS、Android以及Windows Phone平台,随后在20…

聚焦Python分布式爬虫必学框架Scrapy打造搜索引擎(一)

Scrapy综述 Scrapy总体架构 Scrapy架构图(绿线是数据流向) 适用于海量静态页面的数据下载 Scrapy Engine(引擎): 负责Spider、ItemPipeline、Downloader、Scheduler中间的通讯,信号、数据传递等。 Scheduler(调度器): 它负责接受引擎发送过来的Request请求&…

生命在于学习——Python人工智能原理(2.4.1)

在这里插一句话,我有两个好兄弟的github项目,感兴趣的可以去看一下,star一下,谢谢。 https://github.com/fliggyaa/fscanpoc https://github.com/R0A1NG/Botgate_bypass 四、Python的程序结构与函数 4.1 Python的分支结构 &…

matlab可以把图像数据转换为小波分析吗

🏆本文收录于《CSDN问答解答》专栏,主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&…

P1107 [BJWC2008] 雷涛的小猫

[BJWC2008] 雷涛的小猫 题目背景 原最大整数参见 P1012 题目描述 雷涛同学非常的有爱心,在他的宿舍里,养着一只因为受伤被救助的小猫(当然,这样的行为是违反学生宿舍管理条例的)。在他的照顾下,小猫很快…