SparkSQL 执行底层原理解析

从Spark SQL 底层架构可以看到,我们写的SQL语句,经过一个优化器(Catalyst)处理,转化为可执行的RDD,提交给集群执行。
SQL到RDD中间经过了一个Catalyst,它便是Spark SQL的核心,是针对Spark SQL语句执行过程中的查询优化框架,基于Scala函数式编程结构。

1、SparkSql执行架构

在这里插入图片描述

Catalyst的工作流程是一条SQL语句生成执行引擎可识别的程序,就离不开解析(Parser)优化(Optimizer)执行(Execution) 这三大过程。而Catalyst优化器在执行计划生成和优化的工作时候,它离不开自己内部的五大组件,如下所示:

(1)Parser模块:将SparkSql字符串解析为一个抽象语法树/AST。

(2)Analyzer模块:该模块会遍历整个AST,并对AST上的每个节点进行数据类型的绑定以及函数绑定,然后根据元数据信息Catalog对数据表中的字段进行解析。
(3)Optimizer模块:该模块是Catalyst的核心,主要分为RBO和CBO两种优化策略,其中RBO是基于规则优化,CBO是基于代价优化。
(4)Planner模块:优化后的逻辑执行计划OptimizedLogicalPlan依然是逻辑的,并不能被Spark系统理解,此时需要将OptimizedLogicalPlan转换成physical plan(物理计划) 。
(5)CostModel模块:主要根据过去的性能统计数据,选择最佳的物理执行计划,这个过程的优化就是CBO(基于代价优化)。
在这里插入图片描述

2、SQL执行过程

为了更好的对整个过程进行理解,下面通过简单的实例进行解释。

2.1、 Parser阶段:未解析的逻辑计划

在这里插入图片描述
Parser简单说就是将SQL字符串切分成一个一个的Token,再根据一定语义规则解析成一颗语法树。Parser模块目前都是使用第三方类库ANTLR进行实现的,包括我们熟悉的Hive、Presto、SparkSQL等都是由ANTLR实现的。
在这里插入图片描述
在这个过程中,会判断SQL语句是否符合规范,比如select from where 等这些关键字是否写对。

2.2、Analyzer阶段:解析后的逻辑计划

通过解析后的逻辑计划基本有了骨架,此时需要基本的元数据信息来表达这些词素,最重要的元数据信息主要包括两部分:表的Scheme和基本函数信息,表的Scheme主要包括表的基本定义(列名、数据类型)、表的数据格式(Json、Text)、表的物理位置等,基本函数主要指类信息。
Analyzer会再次遍历整个语法树,对树上的每个节点进行数据类型绑定及函数绑定,比如people词素会根据元数据表信息解析为包含age、id以及name三列的表,people.age会被解析为数据类型的int的变量,sum被解析为特定的聚合函数。
在这里插入图片描述

2.3、Optimizer模块:优化过的逻辑计划

Optimizer优化模块是整个Catalyst的核心,上面提到优化器分为基于规则的优化(RBO)和基于代价优化(CBO)两种。基于规则的优化策略实际上就是对语法树进行一次遍历,模式匹配能够满足特定规则的节点,在进行相应的等价转换。下面介绍三种常见的规则:谓词下推(Predicate Pushdown) 、常量累加(Constant Folding) 、列值裁剪(Column Pruning) 。

  • 谓词下推(Predicate Pushdown)
    在这里插入图片描述
    上图左边是经过解析后的语法树,语法树中两个表先做join,之后在使用age>10进行filter。join算子是一个非常耗时的算子,耗时多少一般取决于参与join的两个表的大小,如果能够减少参与join两表的大小,就可以大大降低join算子所需的时间。
    谓词下推就是将过滤操作下推到join之前进行,之后再进行join的时候,数据量将会得到显著的减少,join耗时必然降低。

  • 常量累加(Constant Folding)
    在这里插入图片描述
    常量累加就是比如计算x+(100+80)->x+180,虽然是一个很小的改动,但是意义巨大。如果没有进行优化的话,每一条结果都需要执行一次100+80的操作,然后再与结果相加。优化后就不需要再次执行100+80操作。

列值裁剪(Column Pruning)
列值裁剪是当用到一个表时,不需要扫描它的所有列值,而是扫描只需要的id,不需要的裁剪掉。这一优化一方面大幅度减少了网络、内存数据量消耗,另一方面对于列式存储数据库来说大大提高了扫描效率。

2.4、SparkPlanner模块:转化为物理执行计划

根据上面的步骤,逻辑执行计划已经得到了比较完善的优化,然而,逻辑执行计划依然没办法真正执行,他们只是逻辑上可行,实际上Spark并不知道如何去执行这个东西。比如join是一个抽象概念,代表两个表根据相同的id进行合并,然而具体怎么实现合并,逻辑执行计划并没有说明。
此时就需要将逻辑执行计划转化为物理执行计划,也就是将逻辑上可行的执行计划变为Spark可以真正执行的计划。比如join算子,Spark根据不同场景为该算子制定了不同的算法策略,有BroadcastHashJoin、ShuffleHashJoin以及SortMergejoin等,物理执行计划实际上就是在这些具体实现中挑选一个耗时最小的算法实现,怎么挑选,下面简单说下:
实际上SparkPlanner对优化后的逻辑计划进行转换,是生成了多个可以执行的物理计划Physical Plan;
接着CBO(基于代价优化)优化策略会根据Cost Model算出每个Physical Plan的代价,并选取代价最小的 Physical Plan作为最终的Physical Plan。
以上的步骤合起来,就是Catalyst优化器!

2.5、执行物理计划

在最终真正执行物理执行计划之前,还要进行 Preparations 规则处理,将SQL转化为DAG,最后调用 SparkPlan 的 execute(),执行物理计划计算 RDD。

  final def execute(): RDD[InternalRow] = executeQuery {
    if (isCanonicalizedPlan) {
      throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
    }
    doExecute()
  }

3、Catalyst 的优化

针对提交给Spark的SQL语句引擎会进行自动优化,这里在总结下Catalyst优化器的两个重要的优化。

3.1、RBO:基于规则的优化

(1)标准优化规则

  • 过滤推断前的算子优化-operatorOptimizationRuleSet。
  • 过滤推断-Infer Filters。
  • 过滤推断后的算子优化-operatorOptimizationRuleSet。
  • 下推join的额外谓词-Push extra predicate through join。
  • 算子下推(Operator push down)-Project、Join、Limit、列剪裁。
  • 算子合并(Operator combine)-Repartition、Project、Window、Filter、Limit、Union。
  • 常量折叠和强度消减(Constant folding and strength reduction)-Repartition、Window、Null、常量、In、Filter、整数类型、Like、Boolean、if/case、二义性、no-op、struct、取值操作(struct/array/map)、csv/json、Concat。
  • analysis 阶段的收尾规则-Finish Analysis,比如EliminateSubqueryAliases实际是在Analyzer里定义的。
  • 算子优化前-Union、Limit、数据库关系、子查询、算子的替代、聚合算子。
  • 算子优化-operatorOptimizationBatch
  • 依赖统计数的优化规则-Project、Filter、Join、Sort、Decimal、Aggregate、对象表达式、数据库关系、笛卡尔积、子查询、Float、Struct。

(2)其他特殊的优化规则

rule【规则】batch【表示一组同类的规则】strategy【迭代策略】注释
【算子下推】PushProjectionThroughUnionOperator Optimization after Inferring FiltersfixedPoint将Project操作符推送到Union操作符的两侧。可安全下推的操作如下所示。Union:现在,Union就意味着Union ALL,它不消除重复行。因此,通过它下推Filter和Project是安全的。下推Filter是由另一个规则PushDownPredicates处理的。一旦我们添加了UNION DISTINCT,我们就无法下推Project了。
【算子下推】ReorderJoinOperator Optimization after Inferring FiltersfixedPoint重新排列Join,并将所有条件推入Join,以便底部的条件至少有一个条件。如果所有Join都已具有至少一个条件,则Join的顺序不会更改。如果启用了星型模式检测,请基于启发式重新排序星型Join计划。
【算子下推】EliminateOuterJoinOperator Optimization after Inferring FiltersfixedPoint1.消除outer join,前提是谓词可以限制结果集,以便消除所有空行:如果两侧都有这个谓词,full outer -> inner;如果右侧有这个谓词,left outer -> inner;如果左侧有这个谓词,right outer -> inner;当且仅当左侧有这个谓词,full outer -> left outer;当且仅当右侧有这个谓词,full outer -> right outer 2.如果outer join仅在流侧具有distinct,则移除outer join:SELECT DISTINCT f1 FROM t1 LEFT JOIN t2 ON t1.id = t2.id ==> SELECT DISTINCT f1 FROM t1。当前规则应该在谓词下推之前执行。
【算子下推】PushPredicateThroughJoinOperator Optimization after Inferring FiltersfixedPoint针对Join+on和Join+where这两种情况进行的优化。优化的目的是为了把过滤条件尽量下推到数据源读取时,减少数据传输和join时的数据量,从而提升性能。要注意的是,若使用的是FullOuter,则在这两种情况下该规则都不会进行优化
【算子下推】PushDownPredicatesOperator Optimization after Inferring FiltersfixedPoint常规的运算符和Join谓词下推的统一版本。此规则提高了级联join(例如:Filter-Join-Join-Join)的谓词下推性能。大多数谓词可以在一次传递中下推。
【算子下推】LimitPushDownOperator Optimization after Inferring FiltersfixedPoint下推UNION ALL和JOIN下的LocalLimit。
【列裁剪】ColumnPruningOperator Optimization after Inferring FiltersfixedPoint试图消除查询计划中不需要的列读取。由于在Filter之前添加Project会和PushPredicatesThroughProject冲突,此规则将以以下模式删除Project p2:p1 @ Project(, Filter(, p2 @ Project(_, child))) if p2.outputSet.subsetOf(p2.inputSet) p2通常是按照这个规则插入的,没有用,p1无论如何都可以删减列。
【过滤推断】InferFiltersFromConstraintsInfer FiltersOnce基于运算符的现有约束生成附加过滤器的列表,但删除那些已经属于运算符条件一部分或属于运算符子节点约束一部分的过滤器。这些筛选器当前插入到Filter运算符的和Join运算符任一侧的现有条件中。注意:虽然这种优化适用于许多类型的join,但它主要有利于Inner Join和LeftSemi Join。
【算子合并】CollapseRepartitionOperator Optimization after Inferring FiltersfixedPoint合并相邻的RepartitionOperation和RebalancePartitions运算符
【算子合并】CollapseProjectOperator Optimization after Inferring FiltersfixedPoint两个Project运算符合并为一个别名替换,在以下情况下,将表达式合并为一个表达式。1.两个Project运算符相邻时。2.当两个Project运算符之间有LocalLimit/Sample/Repartition运算符,且上层的Project由相同数量的列组成,且列数相等或具有别名时。同时也考虑到GlobalLimit(LocalLimit)模式。
【算子合并】CollapseWindowOperator Optimization after Inferring FiltersfixedPoint折叠相邻的Window表达式。如果分区规格和顺序规格相同,并且窗口表达式是独立的,且属于相同的窗口函数类型,则折叠到父节点中。
【算子合并】CombineFiltersOperator Optimization after Inferring FiltersfixedPoint将两个相邻的Filter运算符合并为一个,将非冗余条件合并为一个连接谓词。
-其他--其他--其他--其他-
 val operatorOptimizationRuleSet =
      Seq(
        // Operator push down
        PushProjectionThroughUnion,
        ReorderJoin,
        EliminateOuterJoin,
        PushPredicateThroughJoin,
        PushDownPredicate,
        LimitPushDown,
        ColumnPruning,
        InferFiltersFromConstraints,
        // Operator combine
        CollapseRepartition,
        CollapseProject,
        CollapseWindow,
        CombineFilters,
        CombineLimits,
        CombineUnions,
        // Constant folding and strength reduction
        NullPropagation,
        ConstantPropagation,
        FoldablePropagation,
        OptimizeIn,
        ConstantFolding,
        ReorderAssociativeOperator,
        LikeSimplification,
        BooleanSimplification,
        SimplifyConditionals,
        RemoveDispensableExpressions,
        SimplifyBinaryComparison,
        PruneFilters,
        EliminateSorts,
        SimplifyCasts,
        SimplifyCaseConversionExpressions,
        RewriteCorrelatedScalarSubquery,
        EliminateSerialization,
        RemoveRedundantAliases,
        RemoveRedundantProject,
        SimplifyExtractValueOps,
        CombineConcats) ++
        extendedOperatorOptimizationRules

下面简单介绍优化的点:谓词下推、列裁剪、常量累加等

  • 谓词下推案例:
select 
* 
from 
t_table1 a 
join 
t_table2 b 
on a.id=b.id 
where a.age>20 and b.cid < 100

上面的语句会自动优化为如下所示:

select 
*
from
(select * from t_table1 where age>20) a
join
(select * from t_table2 where cid<100) b
on a.id=b.id 

就是在子查询阶段就提前将数据进行过滤,后期join的shuffle数据量就大大减少。

  • 列裁剪案例:
select
a.name, a.age, b.cid
from
(select * from t_table1 where age>20) a
join
(select * from t_table2 where cid<100) b
on a.id=b.id

上面的语句会自动优化为如下所示:

select 
a.name, a.age, b.cid
from
(select name, age, id from t_table1 where age>20) a
join
(select id, cid from t_table2 where cid<100) b
on a.id=b.id

就是提前将需要的列查询出来,其他不需要的列裁剪掉。

  • 常量累加:
select 1+1 as id from t_table1

上面的语句会自动优化为如下所示:

select 2 as id from t_table1

就是会提前将1+1计算成2,再赋给id列的每行,不用每次都计算一次1+1。

3.2、CBO:基于代价的优化

就是在SparkPlanner对优化后的逻辑计划生成了多个可以执行的物理计划Physical Plan之后,多个物理执行计划基于Cost Model选取最优的执行耗时最少的那个物理计划。
在这里插入图片描述

4、RDD线程模型

RDD是基于内存模型的分布式数据对象集合,是一组分区数据。
线程模型中,一个Executor可以运行Task的个数取决于Executor的Core数量,Task是Spark独立的计算单元RDD计算时以分区数据为单位,如下图所示。
在这里插入图片描述
执行一条SQL时,会生成一个Job,可以在Spark UI查看到。
(1)Stage划分
Spark任务会计算RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分成相互依赖的多个stage,而划分stage的依据便是RDD之间的宽窄依赖。遇到宽依赖就划分stage,每个stage包含一个或多个task任务,然后将这些task以taskSet的形式提交给TaskScheduler运行。
stage实际是由一组并行的task组成,stage切割规则:从后往前,遇到宽依赖就切割stage。看两个 RDD 的分区之间,是不是一对一的关系,若是则为窄依赖,反之则为宽依赖
在这里插入图片描述

  • 宽依赖
    父 RDD 中每个分区的数据都可以被子 RDD 的多个分区使用(涉及到了shuffle),常用的算子有:join(非hash-partitioned)、groupByKey、partitionBy;
  • 窄依赖
    父 RDD 中每个分区的数据最多只能被子 RDD 的一个分区使用,常用的算子有:map、filter、union、join(hash-partitioned)、mapPartitions。

(2)RDD分区
key-value数据默认使用HashParttioner分区,spark.default.parallelism参数用于设置RDD的初始分区数,以及设置默认的Task数量,参数一般设置Executor的core总数的2-3倍。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/275213.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

kubeadm开快速的搭建一个k8s集群

kubeadm开快速的搭建一个k8s集群 二进制适合大集群&#xff0c;50台以上主机 kubeadm更适合中小企业的业务集群。 master节点 20.0.0.92 docker kubelet kubeadm kubectl flannel node1 20.0.0. 94 docker kubelet kubeadm kubectl flanne node2 20.0.0.03 docker kubelet…

【Spring Security】认证之案例的使用、MD5加密、CSRF防御

目录 一、引言 1、什么是SpringSecurity认证 2、为什么使用SpringSecurity之认证 3、实现步骤 二、快速实现&#xff08;案例&#xff09; 1、添加依赖 2、配置 3、导入数据表及相关代码 4、创建登录页及首页 5、创建配置Controller 6、用户认证 6.1、用户对象User…

性能测试之脚本、工具、结果分析总结

1、脚本模板 2、 场景模板 性能测试工具选择 1. 数据建模工具 DataFactory是一种强大的数据产生器&#xff0c;它允许开发人员和QA很容易产生百万行有意义的正确的测试数据库,该工具支持DB2、Oracle 、 Sybase、SQL Server数据库&#xff0c;支持ODBC连接方式&#xff0c…

Weblogic任意文件上传漏洞(CVE-2018-2894)

漏洞描述&#xff1a; Weblogic管理端未授权的两个页面存在任意上传jsp文件漏洞&#xff0c;进而获取服务器权限。 Oracle 7月更新中&#xff0c;修复了Weblogic Web Service Test Page中一处任意文件上传漏洞&#xff0c;Web Service Test Page 在 ‘生产模式’ 下默认不开启…

2023 年中国金融级分布式数据库市场报告:TiDB 位列领导者梯队,创新能力与增长指数表现突出

近日&#xff0c;沙利文联合头豹研究院发布了中国数据库系列报告之《2023 年中国金融级分布式数据库市场报告》。 报告认为&#xff0c;金融行业对于分布式数据库信任度与认可度正在逐步提高&#xff0c;中国金融级分布式数据库市场正处于成熟落地的高增长阶段&#xff0c;行业…

【论文阅读+复现】SparseCtrl: Adding Sparse Controls to Text-to-Video Diffusion Models

SparseCtrl:在文本到视频扩散模型中添加稀疏控制。 &#xff08;AnimateDiff V3&#xff0c;官方版AnimateDiffControlNet&#xff0c;效果很丝滑&#xff09; code&#xff1a;GitHub - guoyww/AnimateDiff: Official implementation of AnimateDiff. paper&#xff1a;htt…

Grafana Loki 配置解析

Grafana Loki 配置文件是一个YML文件&#xff0c;在Grafana Loki 快速尝鲜的示例中是loki-config.yaml&#xff0c;该文件包含关于Loki 服务和各个组件的配置信息。由于配置数量实在太多&#xff0c;没法全部翻译&#xff0c;只能后期有需要了再补充。 如下是Grafana Loki 快速…

无需手动搜索!轻松创建IntelliJ IDEA快捷方式的Linux教程

轻松创建IntelliJ IDEA快捷方式的Linux教程 一、IntelliJ IDEA简介二、在Linux系统中创建快捷方式的好处三、命令行创建IntelliJ IDEA快捷方式四、图形界面创建IntelliJ IDEA快捷方式五、常见问题总结 一、IntelliJ IDEA简介 IntelliJ IDEA是一个由JetBrains搞的IDE&#xff0…

前端实现websocket类封装

随着Web应用程序的发展&#xff0c;越来越多的人开始利用Websocket技术来构建实时应用程序。Websocket是一种在客户端和服务器之间建立持久连接的协议。这种协议可以在一个单独的连接上实现双向通信。与HTTP请求-响应模型不同&#xff0c;Websocket允许服务器自主地向客户端发送…

17. 电话号码的字母组合中

给定一个仅包含数字 2-9 的字符串&#xff0c;返回所有它能表示的字母组合。答案可以按 任意顺序 返回。 给出数字到字母的映射如下&#xff08;与电话按键相同&#xff09;。注意 1 不对应任何字母。 示例 1&#xff1a; 输入&#xff1a;digits "23" 输出&#…

可运营的Leadshop开源商城小程序源码 +H5公众号+带视频教程

源码简介 Leadshop是一款出色的开源电商系统&#xff0c;具备轻量级、高性能的特点&#xff0c;并提供持续更新和迭代服务。该系统采用前后端分离架构&#xff08;uniappyii2.0&#xff09;&#xff0c;以实现最佳用户体验为目标。 前端部分采用了uni-app、ES6、Vue、Vuex、V…

探究element-ui 2.15.8中<el-input>的keydown事件无效问题

一、问题描述 今天看到一个问题&#xff0c;在用Vue2element-ui 2.15.8开发时&#xff0c;使用input组件绑定keydown事件没有任何效果。 <template><div id"app"><el-input v-model"content" placeholder"请输入" keydown&quo…

docker学习笔记01-安装docker

1.Docker的概述 用Go语言实现的开源应用项目&#xff08;container&#xff09;&#xff1b;克服操作系统的笨重&#xff1b;快速部署&#xff1b;只隔离应用程序的运行时环境但容器之间可以共享同一个操作系统&#xff1b;Docker通过隔离机制&#xff0c;每个容器间是互相隔离…

抬头举手阅读YOLOV8NANO

首先用YOLOV8NANO得到PT模型&#xff0c;转换成ONNX,OPENCV调用&#xff0c;PYTHON,C,ANDROID都可以举手写字阅读YOLOV8NANO

pip 国内镜像源

pip 国内镜像源 部分可用的pip国内镜像源有下面这些&#xff1a; 阿里云 http://mirrors.aliyun.com/pypi/simple/ 中国科技大学 https://pypi.mirrors.ustc.edu.cn/simple/ 豆瓣 http://pypi.douban.com/simple Python官方 https://pypi.python.org/simple/ v2ex http://pypi…

uniapp项目如何引用安卓原生aar插件(避坑指南三)

官方文档说明&#xff1a;uni小程序SDK 【彩带- 避坑知识点】 如果引用原生aar插件&#xff0c;都配置好之后&#xff0c;云打包&#xff0c;报不包含此插件&#xff0c;除了检查以下步骤流程外&#xff0c;还要检查一下是否上打包的原生插件aar流程有问题。 1.第一步在uniapp项…

2023年总结以及2024年的计划

2023年总结以及2024年的计划 文章目录 2023年总结以及2024年的计划复盘工作学习爱情旅游北京之旅苏州之游 房子装修投资理财新的一年展望(2024) ​ 今天是2023年12月24日, 星期日, 今年的第358天, 这一年97.81%的时间已流逝. 好像每年的话题都差不多, 2023年 很快就要结束了, 我…

java球队信息管理系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 java Web球队信息管理系统是一套完善的java web信息管理系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为Mysql5…

CUDA动态并行

一、简介 1. 综述 动态并行是 CUDA 编程模型的扩展&#xff0c;使 CUDA 内核能够直接在 GPU 上创建新工作并与其同步。 在程序中任何需要的地方动态创建并行性都提供了令人兴奋的功能。 直接从 GPU 创建工作的能力可以减少在主机和设备之间传输执行控制和数据的需要&#xf…

BDD - Python Behave Retry 机制

BDD - Python Behave Retry 机制 引言Behave RetryBehave Retry 应用feature 文件创建 step 文件Retry运行 Behave 并生成 rerun 文件重新运行失败的场景 引言 在日常运行测试用例&#xff0c;有时因为环境不稳定造成一些测试用例跑失败了&#xff0c;如果能将这些失败的测试用…