Spark3.0中的AOE、DPP和Hint增强

1 Spark3.0 AQE

Spark 在 3.0 版本推出了 AQE(Adaptive Query Execution),即自适应查询执行。AQE 是 Spark SQL 的一种动态优化机制,在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化。

1.1 动态合并分区

在Spark中运行查询处理非常大的数据时,shuffle通常会对查询性能产生非常重要的影响。shuffle是非常昂贵的操作,因为它需要进行网络传输移动数据,以便下游进行计算。

最好的分区取决于数据,但是每个查询的阶段之间的数据大小可能相差很大,这使得该数字难以调整:

(1)如果分区太少,则每个分区的数据量可能会很大,处理这些数据量非常大的分区,可能需要将数据溢写到磁盘(例如,排序和聚合),降低了查询。

(2)如果分区太多,则每个分区的数据量大小可能很小,读取大量小的网络数据块,这也会导致I/O效率低而降低了查询速度。拥有大量的task(一个分区一个task)也会给Spark任务计划程序带来更多负担。

 为了解决这个问题,我们可以在任务开始时先设置较多的shuffle分区个数,然后在运行时通过查看shuffle文件统计信息将相邻的小分区合并成更大的分区。

例如,假设正在运行select max(i) from tbl group by j。输入tbl很小,在分组前只有2个分区。那么任务刚初始化时,我们将分区数设置为5,如果没有AQE,Spark将启动五个任务来进行最终聚合,但是其中会有三个非常小的分区,为每个分区启动单独的任务这样就很浪费。

取而代之的是,AQE将这三个小分区合并为一个,因此最终聚只需三个task而不是五个

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 2g  --class com.atguigu.sparktuning.aqe.AQEPartitionTunning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar 

结合动态申请资源:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 2g  --class com.atguigu.sparktuning.aqe.DynamicAllocationTunning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar 

1.2 动态切换Join策略

Spark支持多种join策略,其中如果join的一张表可以很好的插入内存,那么broadcast shah join通常性能最高。因此,spark join中,如果小表小于广播大小阀值(默认10mb),Spark将计划进行broadcast hash join。但是,很多事情都会使这种大小估计出错(例如,存在选择性很高的过滤器),或者join关系是一系列的运算符而不是简单的扫描表操作。

为了解决此问题,AQE现在根据最准确的join大小运行时重新计划join策略。从下图实例中可以看出,发现连接的右侧表比左侧表小的多,并且足够小可以进行广播,那么AQE会重新优化,将sort merge join转换成为broadcast hash join。

 对于运行是的broadcast hash join,可以将shuffle优化成本地shuffle,优化掉stage 减少网络传输。Broadcast hash join可以规避shuffle阶段,相当于本地join。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g  --class com.atguigu.sparktuning.aqe.AqeDynamicSwitchJoin spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar 

1.3 动态优化Join倾斜

当数据在群集中的分区之间分布不均匀时,就会发生数据倾斜。严重的倾斜会大大降低查询性能,尤其对于join。AQE skew join优化会从随机shuffle文件统计信息自动检测到这种倾斜。然后它将倾斜分区拆分成较小的子分区。

 例如,下图 A join B,A表中分区A0明细大于其他分区

因此,skew join 会将A0分区拆分成两个子分区,并且对应连接B0分区

 没有这种优化,会导致其中一个分区特别耗时拖慢整个stage,有了这个优化之后每个task耗时都会大致相同,从而总体上获得更好的性能。

可以采取第4章提到的解决方式,3.0有了AQE机制就可以交给Spark自行解决。Spark3.0增加了以下参数。

1)spark.sql.adaptive.skewJoin.enabled  :是否开启倾斜join检测,如果开启了,那么会将倾斜的分区数据拆成多个分区,默认是开启的,但是得打开aqe。

2)spark.sql.adaptive.skewJoin.skewedPartitionFactor :默认值5,此参数用来判断分区数据量是否数据倾斜,当任务中最大数据量分区对应的数据量大于的分区中位数乘以此参数,并且也大于spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes参数,那么此任务是数据倾斜。

3)spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes :默认值256mb,用于判断是否数据倾斜

4)spark.sql.adaptive.advisoryPartitionSizeInBytes :此参数用来告诉spark进行拆分后推荐分区大小是多少。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g  --class com.atguigu.sparktuning.aqe.AqeOptimizingSkewJoin spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar 

如果同时开启了spark.sql.adaptive.coalescePartitions.enabled动态合并分区功能,那么会先合并分区,再去判断倾斜,将动态合并分区打开后,重新执行:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g  --class com.atguigu.sparktuning.aqe.AqeOptimizingSkewJoin spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar 

修改中位数的倍数为2重新执行

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g  --class com.atguigu.sparktuning.aqe.AqeOptimizingSkewJoin spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar 

2 Spark3.0 DPP

Spark3.0支持动态分区裁剪Dynamic Partition Pruning,简称DPP,核心思路就是先将join一侧作为子查询计算出来,再将其所有分区用到join另一侧作为表过滤条件,从而实现对分区的动态修剪。如下图所示

 将select t1.id,t2.pkey from t1 join t2 on t1.pkey =t2.pkey and t2.id<2 优化成了select t1.id,t2.pkey from t1 join t2 on t1.pkey=t2.pkey and t1.pkey in(select t2.pkey from t2 where t2.id<2)

触发条件:

(1)待裁剪的表join的时候,join条件里必须有分区字段

(2)如果是需要修剪左表,那么join必须是inner join ,left semi join或right join,反之亦然。但如果是left out join,无论右边有没有这个分区,左边的值都存在,就不需要被裁剪

(3)另一张表需要存在至少一个过滤条件,比如a join b on a.key=b.key and a.id<2

参数spark.sql.optimizer.dynamicPartitionPruning.enabled 默认开启。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g  --class com.atguigu.sparktuning.dpp.DPPTest spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar 

3 Spark3.0 Hint增强

在spark2.4的时候就有了hint功能,不过只有broadcasthash join的hint,这次3.0又增加了sort merge join,shuffle_hash join,shuffle_replicate nested loop join。

Spark的5种Join策略:https://www.cnblogs.com/jmx-bigdata/p/14021183.html

3.1 broadcasthast join

sparkSession.sql("select /*+ BROADCAST(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

sparkSession.sql("select /*+ BROADCASTJOIN(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

sparkSession.sql("select /*+ MAPJOIN(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

3.2 sort merge join

sparkSession.sql("select /*+ SHUFFLE_MERGE(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

sparkSession.sql("select /*+ MERGEJOIN(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

sparkSession.sql("select /*+ MERGE(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

3.3 shuffle_hash join

sparkSession.sql("select /*+ SHUFFLE_HASH(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

3.4 shuffle_replicate_nl join

使用条件非常苛刻,驱动表(school表)必须小,且很容易被spark执行成sort merge join。

sparkSession.sql("select /*+ SHUFFLE_REPLICATE_NL(school) */ *  from test_student student inner join test_school school on student.id=school.id").show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/142133.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Machine-Level Programming III:Procedure

Machine-Level Programming III:Procedure Today Procedures Mechanisms(机制)Stack StructureCalling Conventions(调用规则) Passing control(传递控制)Passing data(传递数据)Managing local data Illustration of Recursion(递归说明) 补充术语&#xff1a; Program 程序…

Spring后端HttpClient实现微信小程序登录

这是微信官方提供的时序图。我们需要关注的是前后端的交互&#xff0c;以及服务端如何收发网络请求。 小程序端 封装基本网络请求 我们先封装一个基本的网络请求。 const baseUrl"localhost:8080" export default{sendRequsetAsync } /* e url&#xff1a;目标页…

【ARM Trace32(劳特巴赫) 使用介绍 4 - Trace32 Discovery 详细介绍】

请阅读【ARM Coresight SoC-400/SoC-600 专栏导读】 文章目录 1.1 SYS.Detect1.2 AHBAPn/AXIAPnAPBAPn.Base1.1 SYS.Detect 在 TRACE32 中, SYS.Detect 是一个用来检测目标系统配置的命令。 当你执行 SYS.Detect DAP 命令时,TRACE32 将自动检测和识别目标系统上的 ARM De…

python爬虫代理ip关于设置proxies的问题

目录 前言 一、什么是代理IP? 二、为什么需要设置代理IP? 三、如何设置代理IP? 四、完整代码 总结 前言 在进行Python爬虫开发时&#xff0c;经常会遇到被封IP或者频繁访问同一网站被限制访问等问题&#xff0c;这时&#xff0c;使用代理IP就可以避免这些问题&#x…

CSS特效008:鼠标悬浮文字跳动动画效果

总第 010 篇文章&#xff0c; 查看专栏目录 本专栏记录的是经常使用的CSS示例与技巧&#xff0c;主要包含CSS布局&#xff0c;CSS特效&#xff0c;CSS花边信息三部分内容。其中CSS布局主要是列出一些常用的CSS布局信息点&#xff0c;CSS特效主要是一些动画示例&#xff0c;CSS花…

【算法与数据结构】78、90、LeetCode子集I, II

文章目录 一、题目二、78.子集三、90.子集II三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、78.子集 思路分析&#xff1a;【算法与数据结构】77、LeetCode组合。本题可以参考77题的组合问题代码&#xff0…

路由器的结构以及工作原理

目录 路由器的结构 交换结构三种常用的交换方式 1.通过存储器 2.通过总线 3.通过纵横交换结构&#xff08;crossbar switch fabric&#xff09; 路由器的结构 路由器结构可划分为两大部分&#xff1a;路由选择部分&#xff0c;分组转发部分 路由选择部分也叫做控制部分&…

java高并发系列-第2天:并发级别

这是java高并发系列第2篇文章&#xff0c;一个月&#xff0c;咱们一起啃下java高并发&#xff0c;欢迎留言打卡&#xff0c;一起坚持一个月&#xff0c;拿下java高并发。 由于临界区的存在&#xff0c;多线程之间的并发必须受到控制。根据控制并发的策略&#xff0c;我们可以把…

P6入门:项目初始化7-项目详情之代码/分类码Code

前言 使用项目详细信息查看和编辑有关所选项目的详细信息&#xff0c;在项目创建完成后&#xff0c;初始化项目是一项非常重要的工作&#xff0c;涉及需要设置的内容包括项目名&#xff0c;ID,责任人&#xff0c;日历&#xff0c;预算&#xff0c;资金&#xff0c;分类码等等&…

排序算法之-快速

算法原理 丛待排序的数列中选择一个基准值&#xff0c;通过遍历数列&#xff0c;将数列分成两个子数列&#xff1a;小于基准值数列、大于基准值数列&#xff0c;准确来说还有个子数列&#xff1a;等于基准值即&#xff1a; 算法图解 选出基准元素pivot&#xff08;可以选择…

P36[11-1]SPI通信协议

SPI相比于IIC的优缺点: 1.SPI传输速度快(IIC高电平驱动能力较弱,因此无法高速传输) 2.使用简单 3.通信线多 SCK(SCLK,CK,CLK):串行时钟线 MOSI(DO):主机输出,从机输入 MISO(DI): 主机输入,从机输出 SS(NSS,CS):从机选择(有多少个从机,主机就要用几根SS分别与从机连接…

Windows环境下ADB调试——安装adb

一、下载 Windows版本&#xff1a;https://dl.google.com/android/repository/platform-tools-latest-windows.zipMac版本&#xff1a;https://dl.google.com/android/repository/platform-tools-latest-darwin.zipLinux版本&#xff1a;https://dl.google.com/android/reposit…

HTTP服务器——tomcat的安装和使用

文章目录 前言下载tomcattomcat 文件bin 文件夹conf 文件lib 文件log 文件temp 文件webapps 文件work 目录 如何使用 tomcat 前言 前面我们已经学习了应用层协议 HTTP 协议和 HTTP 的改进版——HTTPS&#xff0c;这些协议是我们在写与服务器相关的代码的时候息息相关的&#x…

专访|OpenTiny 社区 Mr 栋:结合兴趣,明确定位,在开源中给自己一些技术性挑战

前言 OpenTiny 开源之夏项目终于迎来了圆满的结局。借此机会&#xff0c;我们采访了 TinyReact 的共建者 Mr 栋同学。 Mr 栋同学是一位热衷于前端技术的开发者&#xff0c;对前端开发充满了激情和热爱。同时他也是一位即将毕业的大四在校生。在 OpenTiny 开源项目中&#xff0…

Java18新增特性

前言 前面的文章&#xff0c;我们对Java9、Java10、Java11、Java12 、Java13、Java14、Java15、Java16、Java17 的特性进行了介绍&#xff0c;对应的文章如下 Java9新增特性 Java10新增特性 Java11新增特性 Java12新增特性 Java13新增特性 Java14新增特性 Java15新增特性 Java…

做一个Sprngboot文件上传-阿里云

概述 这个模块是用来上传头像以及文章封面的&#xff0c;图片的值是一个地址字符串&#xff0c;一般存放在本地或阿里云服务中 1、本地文件上传 我们将文件保存在一个本地的文件夹下&#xff0c;由于可能两个人上传不同图片但是却同名的图片&#xff0c;那么就会一个人的图片就…

Mac 本地部署thinkphp8【部署环境以及下载thinkphp】

PHP的安装以及环境变量配置 1 PHP安装&#xff1a;在终端输入brew install php 这里是PHP下载的最新的 如果提示‘brew’找不到&#xff0c;自己搜索安装吧&#xff0c; 不是特别难 2 环境变量配置 终端输入vim ~/.bash_profile 输入export PATH"/usr/local/Cellar/php/8.…

ubuntu18.04安装google浏览器

下载google安装包 wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb 安装google浏览器 sudo dpkg -i google-chrome-stable_current_amd64.deb 执行安装 sudo apt-get -f install 启动浏览器 在应用程序中找到google图标点击运行

安全区域边界(设备和技术注解)

网络安全等级保护相关标准参考《GB/T 22239-2019 网络安全等级保护基本要求》和《GB/T 28448-2019 网络安全等级保护测评要求》 密码应用安全性相关标准参考《GB/T 39786-2021 信息系统密码应用基本要求》和《GM/T 0115-2021 信息系统密码应用测评要求》 1边界防护 1.1应保证跨…

Spark数据倾斜优化

1 数据倾斜现象 1、现象 绝大多数task任务运行速度很快&#xff0c;但是就是有那么几个task任务运行极其缓慢&#xff0c;慢慢的可能就接着报内存溢出的问题。 2、原因 数据倾斜一般是发生在shuffle类的算子&#xff0c;比如distinct、groupByKey、reduceByKey、aggregateByKey…