SparkSQL-性能调优

祝福

在这个举国同庆的时刻,我们首先献上对祖国的祝福:

第一,我们感谢您给我们和平的环境,让我们能快乐生活

第二,祝福我们国家未来的路越走越宽广,科技更发达,人民更幸福

第三,我们会紧紧跟随您的脚步,一起为美好的未来奋斗

一、将数据缓存到内存

import spark.implicits._
import spark.sql

spark.catalog.cacheTable("personal_info")
spark.catalog.uncacheTable("personal_info")
val personalInfoDataFrame = sql("select name,age FROM personal_info")
personalInfoDataFrame.cache()
personalInfoDataFrame.unpersist()

cacheTable() 和 cache() 最终都调用了CacheManager的cacheQuery()

CacheManager对数据的缓存与RDD中的cache()是不同的,RDD中的cache()是的持久化级别是MEMORY_ONLY,而这里是MEMORY_AND_DISK,因为Spark任务重新计算底层表并将其缓存是昂贵的。

二、选项设置

以下选项可用于调整查询执行的性能。随着自动执行更多优化,这些选项可能会在未来的版本中被弃用。

spark.sql.files.maxPartitionBytes        默认:128M

解释:读取文件时打包到单个分区中的最大字节数。此配置仅在使用基于文件的源(如Parket、JSON和ORC)时有效。

spark.sql.files.openCostInBytes        默认值:4M

解释:打开一个文件的估计成本,以可以同时扫描的字节数来衡量。这是在将多个文件放入一个分区时使用的。最好过度估计,那么具有小文件的分区将比具有较大文件的分区(这是首先调度的)更快。此配置仅在使用基于文件的源(如Parket、JSON和ORC)时有效。

spark.sql.files.minPartitionNum        默认值:spark.sql.leafNodeDefaultParallelism

解释:建议的(不保证的)最小分割文件分区数如果未设置。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。

spark.sql.files.maxPartitionNum        默认值:None

解释:建议的(不保证的)分割文件分区的最大数量。如果设置了,如果初始分区数超过此值,Spark将重新调整每个分区以使分区数接近此值。此配置仅在使用基于文件的源(如Parket、JSON和ORC)时有效。

spark.sql.broadcastTimeout        默认值:5*60 秒

解释:广播等待超时时间

spark.sql.autoBroadcastJoinThreshold        默认值:10M

解释:配置表的最大大小(以字节为单位),该表将在以下情况下广播到所有工作节点 执行连接。通过将此值设置为-1,可以禁用广播。请注意,目前 统计信息仅支持Hive Metastore表,其中命令 为:

ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan (用于收集表的统计信息,这些统计信息将帮助优化器制定更好的执行计划

也就是Spark3.2.0以后不需要自己去优化大小表join了,比如手动广播小表、大小表位置

spark.sql.shuffle.partitions        默认值:200

解释:配置为连接或聚合混洗数据时要使用的分区数。在入库时如果最终的结果不是很大,需求将其调小,不然入库的时间会增大。

spark.sql.sources.parallelPartitionDiscovery.threshold        默认值:32

解释:配置阈值以启用作业输入路径的并行列表。如果输入路径的数量大于此阈值,Spark将使用Spark分布式作业列出文件。否则,它将回退到顺序列表。此配置仅在使用基于文件的数据源(如Parquet、ORC和JSON)时有效。

spark.sql.sources.parallelPartitionDiscovery.parallelism        默认值:10000

解释:配置作业输入路径的最大列表并行性。如果输入路径的数量大于此值,则将限制其使用此值。此配置仅在使用基于文件的数据源(如Parquet、ORC和JSON)时有效。

三、Hint 干预

Hint 是一种可以让用户干预数据库 SQL 优化的方式,相当于给用户开了一个后门,当优化器本身对于某些 SQL 优化得不够好时,用户就可以结合自己的经验,尝试使用 Hint 来干预数据库的优化。

1、join 的 Hint

sql语句中使用BROADCAST、MERGE、SHUFLE_HASH和SHUFLE_REPLICATE_NL,指示Spark在将它们与另一个关系连接时,对每个指定的关系使用提示策略。例如,当在表“t1”上使用BROADCAST提示时,Spark将优先考虑以“t1”作为构建端的广播连接(广播哈希连接或广播嵌套循环连接,具体取决于是否有任何equi-join键),即使统计数据建议的表“t1“的大小高于配置Spark.sql.autoBroadcastJoinThreshold。

当在连接的两侧指定不同的连接策略提示时,Spark会将BROADCAST提示优先于MERGE提示,将SHUFLE_HASH提示优先于SHUFLE_REPLICATE_NL提示。当使用BROADCAST提示或SHUFLE_HASH提示指定两侧时,Spark将根据连接类型和关系大小选择构建侧。

请注意,不能保证Spark会选择提示中指定的连接策略,因为特定的策略可能不支持所有连接类型。

示例:

spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
-- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key

2、Coalesce 的 Hint

Coalesce 的 Hint 允许Spark SQL用户控制输出文件的数量,就像Dataset API,中的coalesce、repartition和repartitionByRange一样,它们可以用于性能调整和减少输出文件的数目。

SELECT /*+ COALESCE(3) */ * FROM t;
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REPARTITION(3, c) */ * FROM t;
SELECT /*+ REPARTITION */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;
SELECT /*+ REBALANCE */ * FROM t;
SELECT /*+ REBALANCE(3) */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;
SELECT /*+ REBALANCE(3, c) */ * FROM t;

四、自适应查询AQE

AQE是Adaptive Query Execution的简称,是Spark SQL中的一种优化技术,它利用运行时统计信息来选择最有效的查询执行计划,自Apache Spark 3.2.0以来,该计划默认启用。Spark SQL可以通过作为伞形配置启用Spark.SQL.adaptive.enabled来打开和关闭AQE。从Spark 3.0开始,AQE中有三个主要功能:包括合并洗牌后分区、将排序合并连接转换为广播连接,以及倾斜连接优化。

1、shuffle分区自动调整

这里涉及两个配置:

spark.sql.adaptive.enabled        默认:true

解释:如果为true,则启用自适应查询执行,该操作将在查询执行过程中根据准确的运行时统计信息重新优化查询计划

spark.sql.adaptive.coalescePartitions.enabled        默认:true

解释:当true和spark.sql.adaptive.enabled为true时,Spark将根据目标大小(由spark.sql.adaptive.advisoryPartitionSizeInBytes指定)合并连续的shuffle分区,以避免过多的小任务

其他配置:

spark.sql.adaptive.coalescePartitions.parallelismFirst        默认:true

解释:如果为true,则Spark在合并连续的shuffle分区时忽略spark.sql.adaptive.advisoryPartitionSizeInBytes指定的目标大小(默认64MB),并且只尊重由spark.sql.adaptive.coalescePartitions.minPartitionSize指定的最小分区大小(默认1MB),以最大化并行性。这是为了避免启用自适应查询执行时性能下降。建议将此配置设置为false,并尊重由spark.sql.adaptive.advisoryPartitionSizeInBytes指定的目标大小。

spark.sql.adaptive.coalescePartitions.minPartitionSize     默认:1M

解释:合并后shuffle分区的最小大小。它的值最多可以是20%spark.sql.adaptive.advisoryPartitionSizeInBytes。当在分区合并期间忽略目标大小时,这很有用,这是默认情况。

spark.sql.adaptive.coalescePartitions.initialPartitionNum        默认:none

解释:合并前shuffle分区的初始数量。如果未设置,则等于spark.sql.shuffle.partitions。此配置仅在spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled时有效

spark.sql.adaptive.advisoryPartitionSizeInBytes        默认:64M

解释:自适应优化期间shuffle分区的建议大小(以字节为单位spark.sql.adaptive.enabled

该功能简化了运行查询时shuffle分区号的调整。用户不需要设置适当的shuffle分区号来适应数据集。一旦你通过Spark.sql.adaptive.coalescePartitions.initialPartitionNum配置设置了足够大的初始洗牌分区数,Spark就可以在运行时选择合适的洗牌分区数。

以下是对倾斜分区的相关设置

spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled         默认:true

解释:当spark.sql.adaptive.enabled为true时,Spark将优化Rebalance分区中的倾斜shuffle分区,并根据目标大小(由spark.sql.adaptive.advisoryPartitionSizeInBytes指定)将其拆分为较小的分区,以避免数据倾斜。

spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor         默认:0.2

解释:如果分区的大小小于此因数,则分区将在拆分期间合并spark.sql.adaptive.advisoryPartitionSizeInBytes

2、将sort-merge join转换为broadcast join

当任何连接端的运行时统计数据小于自适应广播哈希连接阈值时,AQE将排序合并连接转换为广播哈希连接。这不如一开始就计划广播哈希连接有效,但它比继续进行排序合并连接要好,因为我们可以保存连接双方的排序,并在本地读取shuffle文件以节省网络流量(如果spark.sql.adaptive.localShuffleReader.enabled为真)

spark.sql.adaptive.autoBroadcastJoinThreshold        默认:none

解释:为执行连接时将广播到所有工作节点的表配置最大大小(以字节为单位)。通过将此值设置为-1,可以禁用广播。默认值与spark.sql.autoBroadcastJoinThreshold相同。请注意,此配置仅在自适应框架中使用。

spark.sql.adaptive.localShuffleReader.enabled        默认:true

解释:当true且spark.sql.adaptive.enabled为true时,Spark会在不需要shuffle分区时尝试使用本地shuffle阅读器读取shuffle数据,例如,在将sort-合并连接转换为广播散列连接之后。

AQE将排序合并连接转换为洗牌哈希连接当所有后洗牌分区都小于阈值时,最大阈值可以看到配置spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold

spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold        默认:0

解释:配置每个分区可以允许构建本地哈希映射的最大大小(以字节为单位)。如果此值不小于spark.sql.adaptive.advisoryPartitionSizeInBytes并且所有分区大小都不大于此配置,则连接选择更喜欢使用洗牌哈希连接而不是排序合并连接,而不管spark.sql.join.preferSortMergeJoin的值如何

3、倾斜join优化

数据倾斜会严重降低连接查询的性能。此功能通过将倾斜的任务拆分(并在需要时复制)成大致均匀大小的任务来动态处理排序合并连接中的倾斜。当spark.sql.adaptive.enabledspark.sql.adaptive.skewJoin.enabled配置都启用时,它会生效。

spark.sql.adaptive.skewJoin.enabled        默认:true

解释:当true和spark.sql.adaptive.enabled为true时,Spark通过拆分(并在需要时复制)倾斜的分区来动态处理排序合并连接中的倾斜。

spark.sql.adaptive.skewJoin.skewedPartitionFactor         默认:5.0

解释:如果一个分区的大小大于乘以中值分区大小的因子,并且也大于spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,则该分区被认为是倾斜的。

spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes        默认:256MB

解释:如果分区的大小(以字节为单位)大于此阈值,并且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor乘以分区大小的中值,则该分区被视为倾斜。理想情况下,此配置应设置为大于spark.sql.adaptive.advisoryPartitionSizeInBytes

spark.sql.adaptive.forceOptimizeSkewedJoin         默认:false

解释:如果为true,则强制启用OptimizeSkewedJoin,这是一种自适应规则,用于优化偏斜连接以避免散乱任务,即使它引入了额外的洗牌

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/885933.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C# 无边框窗体,加阴影效果、多组件拖动、改变大小等功能完美实现优化版效果体验

一、预览效果 国庆节第一天,祝祖国繁荣昌盛! 1.1 效果图 (WinForm无边框窗体,F11可全屏) 拖动窗体时半透明效果(拖动时参考窗体后面释放位置) 说明:本功能的实现基于网友的原型完善而来,更多代码可以参考他的文章 h

Golang | Leetcode Golang题解之第449题序列化和反序列化二叉搜索树

题目&#xff1a; 题解&#xff1a; type Codec struct{}func Constructor() (_ Codec) { return }func (Codec) serialize(root *TreeNode) string {arr : []string{}var postOrder func(*TreeNode)postOrder func(node *TreeNode) {if node nil {return}postOrder(node.Le…

量产小妙招---向量间的Project和Product

1 背景 在日常工作中&#xff0c;不管是在感知或者规控&#xff0c;或者其它的模块中&#xff0c;经常需要处理两个向量之间的关系&#xff0c;这就引入了本篇博客和读者朋友们讨论的一个话题&#xff1a;Project和Product。 2 Project和Product 向量间的Project和Product在定义…

C++语言学习(2): name lookup 的概念

何谓 name lookup C 中很重要的一个概念&#xff1a;name lookup。 当编译器在遇到一个 name 的时候&#xff0c; 会做查找&#xff08;lookup&#xff09;&#xff0c;会把引入这个 name 的声明和它关联起来&#xff0c;具体来说&#xff0c;又包含两种类型的 lookup&#xf…

【学习笔记】手写 Tomcat 八

目录 一、NIO 1. 创建 Tomcat NIO 类 2. 启动 Tomcat 3. 测试 二、解析请求信息 三、响应数据 创建响应类 修改调用的响应类 四、完整代码 五、测试 六、总结 七、获取全部用户的功能 POJO 生成 POJO 1. 在 Dao 层定义接口 2. 获取用户数据 3. 在 Service 层定…

ArcGIS与ArcGIS Pro去除在线地图服务名单

我们之前给大家分享了很多在线地图集&#xff0c;有些地图集会带有制作者信息&#xff0c;在布局制图的时候会带上信息影响出图美观。 一套GIS图源集搞定&#xff01;清新规划底图、影像图、境界、海洋、地形阴影图、导航图 比如ArcGIS&#xff1a; 比如ArcGIS Pro&#xff1a…

.Net 基于IIS部署blazor webassembly或WebApi

1.安装IIS(若安装&#xff0c;请忽略) 选择:控制面板–>程序–>程序和功能 选择:启动或关闭Windows功能&#xff0c;勾选相关项&#xff0c;再点击确定即可。 2.安装Hosting Bundle 以.net6为例&#xff0c;点击连接https://dotnet.microsoft.com/en-us/download/dot…

zabbix7.0创建自定义模板的案例详解(以监控httpd服务为例)

前言 服务端配置 链接: rocky9.2部署zabbix服务端的详细过程 环境 主机ip应用zabbix-server192.168.10.11zabbix本体zabbix-client192.168.10.12zabbix-agent zabbix-server(服务端已配置) 创建模板 模板组直接写一个新的&#xff0c;不用选择 通过名称查找模板&#xf…

详解CSS中的伪元素

4.3 伪元素 可以把样式应用到文档树中根本不存在的元素上。 ::first-line 文本中的第一行 ::first-letter 文本中的第一个字母 ::after 元素之后添加 ::before 元素之前 代码&#xff1a; <!DOCTYPE html> <html> <head><meta charset"utf-8&q…

测试用例的举例

1. 基于测试公式设计测试用例 通过功能&#xff0c;性能&#xff0c;安全性&#xff0c;界面&#xff0c;安全性&#xff0c;易用&#xff0c;兼容对于一个水杯进行测试用例的设计&#xff1b; 对于一个软件的测试用例设计&#xff1a; 功能&#xff1a;软件本质上能够用来干什…

本科生已不够 AI公司雇佣各领域专家训练大模型

9月29日消息&#xff0c;人工智能模型的性能在很大程度上依赖于其训练数据的质量。传统方法通常是雇用大量低成本劳动力对图像、文本等数据进行标注&#xff0c;以满足模型训练的基本需求。然而&#xff0c;这种方式容易导致模型在理解和生成信息时出现“幻觉”现象&#xff0c…

【递归】11. leetcode 129 求根节点到叶节点数字之和

1 题目描述 题目链接&#xff1a; 求根节点到叶节点数字之和 2 解答思路 第一步&#xff1a;挖掘出相同的子问题 &#xff08;关系到具体函数头的设计&#xff09; 第二步&#xff1a;只关心具体子问题做了什么 &#xff08;关系到具体函数体怎么写&#xff0c;是一个宏观…

追随 HarmonyOS NEXT,Solon v3.0 将在10月8日发布

Solon &#xff08;开放原子开源基金会&#xff0c;孵化项目&#xff09;原计划10月1日发布 v3.0 正式版。看到 HarmonyOS NEXT 将在 10月8日启用公测&#xff0c;现改为10月8日发布以示庆贺。另外&#xff0c;Solon 将在2025年启动“仓颉”版开发&#xff08;届时&#xff0c;…

数据仓库的建设——从数据到知识的桥梁

数据仓库的建设——从数据到知识的桥梁 前言数据仓库的建设 前言 企业每天都在产生海量的数据&#xff0c;这些数据就像无数散落的珍珠&#xff0c;看似杂乱无章&#xff0c;但每一颗都蕴含着潜在的价值。而数据仓库&#xff0c;就是那根将珍珠串起来的线&#xff0c;它能够把…

WebSocket消息防丢ACK和心跳机制对信息安全性的作用及实现方法

WebSocket消息防丢ACK和心跳机制对信息安全性的作用及实现方法 在现代即时通讯&#xff08;IM&#xff09;系统和实时通信应用中&#xff0c;WebSocket作为一种高效的双向通信协议&#xff0c;得到了广泛应用。然而&#xff0c;在实际使用中&#xff0c;如何确保消息的可靠传输…

解决Windows远程桌面 “为安全考虑,已锁定该用户账户,原因是登录尝试或密码更改尝试过多,请稍后片刻再重试,或与系统管理员或技术支持联系“问题

根本原因就是当前主机被通过远程桌面输入了过多的错误密码&#xff0c;被系统锁定。这种情况多数是你的服务器远程桌面被人试图攻击了&#xff0c;不建议取消系统锁定策略。如果阿里云或者腾讯云主机&#xff0c;只需要在管理后台通过管理终端或者VNC登陆一次&#xff0c;锁定即…

哈希表(HashMap、HashSet)

文章目录 一、 什么是哈希表二、 哈希冲突2.1 为什么会出现冲突2.2 如何避免出现冲突2.3 出现冲突如何解决 三、模拟实现哈希桶/开散列&#xff08;整型数据&#xff09;3.1 结构3.2 插入元素3.3 获取元素 四、模拟实现哈希桶/开散列&#xff08;泛型&#xff09;4.1 结构4.2 插…

DC00025【含论文】基于协同过滤推荐算法springboot视频推荐管理系统

1、项目功能演示 DC00025【含文档】基于springboot短视频推荐管理系统协同过滤算法视频推荐系统javaweb开发程序设计vue 2、项目功能描述 短视频推荐系统分为用户和系统管理员两个角色 2.1 用户角色 1、用户登录、用户注册 2、视频中心&#xff1a;信息查看、视频收藏、点赞、…

1.1.4 计算机网络的分类

按分布范围分类&#xff1a; 广域网&#xff08;wan&#xff09; 城域网&#xff08;man&#xff09; 局域网&#xff08;lan&#xff09; 个域网&#xff08;pan&#xff09; 注意&#xff1a;如今局域网几乎采用“以太网技术实现”&#xff0c;因此“以太网”几乎成了“局域…

Python核心知识:pip使用方法大全

什么是 pip&#xff1f; pip 是 Python 的包管理工具&#xff0c;允许用户安装、升级和管理 Python 的第三方库和依赖。它极大地简化了开发过程&#xff0c;使开发者可以轻松地获取并安装所需的软件包。pip 已成为 Python 项目中最常见的包管理工具&#xff0c;并且自 Python …