SparkSQL调优

SparkSQL调优

文章目录

  • SparkSQL调优
  • Explain 查看执行计划
    • 语法
    • 执行计划处理流程
  • 资源调优
    • 内存说明
      • spark任务提交到yarn上运行命令
    • CPU优化
  • SparkSQL语法优化
    • 基于RBO优化
    • 基于CBO优化
    • 广播join
      • 方式一:通过参数指定自动广播
      • 方式二:强行广播
    • SMB Join
  • 数据倾斜
    • Join 数据倾斜优化
      • 广播join
      • 拆分大 key 打散大表 扩容小表
  • Job 优化
    • Map 端优化
      • Map 端预聚合
      • 读取小文件优化
      • 增大 map 溢写时输出流 buffer
    • Reduce 端优化
      • 增大 reduce 缓冲区,减少拉取次数
      • 调节 reduce 端拉取数据重试次数
      • 调节 reduce 端拉取数据等待间隔
  • Spark AQE
    • 动态合并分区
    • 动态切换 Join 策略
    • 动态优化 Join 倾斜

Explain 查看执行计划

语法

sparkSession.sql("xxx").explain()
  • explain(mode=“simple”):只展示物理执行计划。
  • explain(mode=“extended”):展示物理执行计划和逻辑执行计划。
  • explain(mode=“codegen”) :展示要 Codegen 生成的可执行 Java 代码。
  • explain(mode=“cost”):展示优化后的逻辑执行计划以及相关的统计。
  • explain(mode=“formatted”):以分隔的方式输出,它会输出更易读的物理执行计划,并展示每个节点的详细信息。

执行计划处理流程

分析 – 逻辑优化 – 生成物理执行计划 – 评估模型分析 – 代码生成

  • == Parsed Logical Plan == :Unresolved 逻辑执行计划
    • Parser 组件检查 SQL 语法上是否有问题,然后生成 Unresolved(未决断)的逻辑计划,不检查表名、不检查列名
  • == Analyzed Logical Plan == :Resolved 逻辑执行计划
    • 通过访问 Spark 中的 Catalog 存储库来解析验证语义、列名、类型、表名等
  • == Optimized Logical Plan == :优化后的逻辑执行计划
    • Catalyst 优化器根据各种规则进行优化
  • == Physical Plan == :物理执行计划
    • HashAggregate 运算符表示数据聚合,一般 HashAggregate 是成对出现,第一个HashAggregate 是将执行节点本地的数据进行局部聚合,另一个 HashAggregate 是将各个分区的数据进一步进行聚合计算
    • Exchange 运算符其实就是 shuffle,表示需要在集群上移动数据。很多时候HashAggregate 会以 Exchange 分隔开来
    • Project 运算符是 SQL 中的投影操作,就是选择列(例如:select name, age…)
    • BroadcastHashJoin 运算符表示通过基于广播方式进行 HashJoin
    • LocalTableScan 运算符就是全表扫描本地的表

资源调优

内存说明

在这里插入图片描述

spark.memory.fraction=(估算 storage 内存+估算 Execution 内存)/(估算 storage 内存+估算 Execution 内存+估算 Other 内存)

spark.memory.storageFraction =(估算 storage 内存)/(估算 storage 内存+估算Execution 内存)

Storage 堆内内存=(spark.executor.memory–300MB)spark.memory.fractionspark.memory.storageFraction

Execution 堆内内存=(spark.executor.memory–300MB)spark.memory.fraction(1-spark.memory.storageFraction)

spark任务提交到yarn上运行命令

${SPARK_DIR}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue root.default \
--driver-memory 4g \
--executor-memory 8g \
--num-executors 6 \
--executor-cores 2 \
--conf spark.sql.shuffle.partitions=2000 \
--conf spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER \
--conf spark.executor.extraJavaOptions='-XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps' \
--class com.test.DataETLMain \
/opt/spark_sql_test.jar
参数说明
–queue任务提交到yarn的队列
–driver-memory每个 driver 的内存
–executor-cores每个 executor 的最大核数(3~6 之间比较合理 )
–num-executorsexecutor 的个数
–executor-memory每个 executor 的内存
–conf任务运行配置

CPU优化

修改并行度(分区个数)

  • rdd:spark.default.parallelism
    • 设置 RDD 的默认并行度,没有设置时,由 join、reduceByKey 和 parallelize 等转换决定
  • sparksql:spark.sql.shuffle.partitions
    • 适用 SparkSQL 时,Shuffle Reduce 阶段默认的并行度,默认 200。此参数只能控制Spark sql、DataFrame、DataSet 分区个数。不能控制 RDD 分区个数

SparkSQL语法优化

基于RBO优化

  • 谓词下推:将过滤条件的谓词逻辑都尽可能提前执行,减少下游处理的数据量
  • 列裁剪:列剪裁就是扫描数据源的时候,只读取那些与查询相关的字段
  • 常量替换

基于CBO优化

CBO 优化主要在物理计划层面,原理是计算所有可能的物理计划的代价,并挑选出代价最小的物理执行计划

通过spark.sql.cbo.enabled来开启,默认是 false

参数描述默认值
spark.sql.cbo.enabledCBO 总开关
true 表示打开,false 表示关闭
要使用该功能,需确保相关表和列的统计信息已经生成
false
spark.sql.cbo.joinReorder.enabled使用 CBO 来自动调整连续的 inner join 的顺序
true:表示打开
false:表示关闭要使用该功能,需确保相关表和列的统计信息已经生成,且CBO 总开关打开
false
spark.sql.cbo.joinReorder.dp.threshold使用 CBO 来自动调整连续 inner join 的表的个数阈值
如果超出该阈值,则不会调整 join 顺序
12

广播join

Spark join 策略中,如果当一张小表足够小并且可以先缓存到内存中,那么可以使用Broadcast Hash Join,其原理就是先将小表聚合到 driver 端,再广播到各个大表分区中,那么再次进行 join 的时候,就相当于大表的各自分区的数据与小表进行本地 join,从而规避了shuffle

方式一:通过参数指定自动广播

spark.sql.autoBroadcastJoinThreshold:广播join默认值 10M

可更改参数值:

  • 方式一:在程序里面添加参数值

    sparkConf.set("spark.sql.autoBroadcastJoinThreshold","20m")
    
  • 方式二:在执行命令配置中添加参数值

    --conf spark.sql.autoBroadcastJoinThreshold=20m
    

方式二:强行广播

使用Hint注解方式

//TODO SQL Hint方式
    val sqlstr1 =
      """
        |select /*+  BROADCASTJOIN(sc) */
        |  sc.courseid,
        |  csc.courseid
        |from sale_course sc join course_shopping_cart csc
        |on sc.courseid=csc.courseid
      """.stripMargin

    val sqlstr2 =
      """
        |select /*+  BROADCAST(sc) */
        |  sc.courseid,
        |  csc.courseid
        |from sale_course sc join course_shopping_cart csc
        |on sc.courseid=csc.courseid
      """.stripMargin

    val sqlstr3 =
      """
        |select /*+  MAPJOIN(sc) */
        |  sc.courseid,
        |  csc.courseid
        |from sale_course sc join course_shopping_cart csc
        |on sc.courseid=csc.courseid
      """.stripMargin

SMB Join

SMB JOIN 是 sort merge bucket 操作,需要进行分桶,首先会进行排序,然后根据 key值合并,把相同 key 的数据放到同一个 bucket 中(按照 key 进行 hash)。分桶的目的其实就是把大表化成小表。相同 key 的数据都在同一个桶中之后,再进行 join 操作,那么在联合的时候就会大幅度的减小无关项的扫描

使用条件:

  • 两表进行分桶,桶的个数必须相等
  • 两边进行 join 时,join 列=排序列=分桶列

数据倾斜

Join 数据倾斜优化

广播join

适用于小表 join 大表。小表足够小,可被加载进 Driver 并通过 Broadcast 方法广播到各个 Executor 中

**解决逻辑:**在小表 join 大表时如果产生数据倾斜,那么广播 join 可以直接规避掉此 shuffle 阶段。直接优化掉 stage。并且广播 join 也是 Spark Sql 中最常用的优化方案

拆分大 key 打散大表 扩容小表

适用于 join 时出现数据倾斜

解决逻辑:

  1. 将存在倾斜的表,根据抽样结果,拆分为倾斜 key(skew 表)和没有倾斜 key(common)的两个数据集
  2. 将 skew 表的 key 全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集(old 表)整体与随机前缀集作笛卡尔乘积(即将数据量扩大 N 倍,得到 new 表)
  3. 打散的 skew 表 join 扩容的 new 表
  4. 将 skew 表的 key 去掉前缀

Job 优化

Map 端优化

Map 端预聚合

map-side 预聚合,就是在每个节点本地对相同的 key 进行一次聚合操作

读取小文件优化

读取的数据源有很多小文件,会造成查询性能的损耗,大量的数据分片信息以及对应产生的 Task 元信息也会给 Spark Driver 的内存造成压力,带来单点问题

设置参数:

# 一个分区最大字节数,默认 128m
spark.sql.files.maxPartitionBytes=128MB
# 打开一个文件的开销,默认 4m
spark.files.openCostInBytes=4194304

增大 map 溢写时输出流 buffer

  1. map 端 Shuffle Write 有一个缓冲区,初始阈值 5m,超过会尝试增加到 2*当前使用内存。如果申请不到内存,则进行溢写**(这个参数是 internal,指定无效,资源足够会自动扩容,所以不需要我们去设置)**
    spark.shuffle.spill.initialMemoryThreshold:5242880
  2. Shuffle 文件涉及到序列化,是采取批的方式读写,默认按照每批次 1 万条去读写**(这个参数是 internal,指定无效)**
    spark.shuffle.spill.batchSize:10000
  3. 溢写时使用输出流缓冲区默认 32k,这些缓冲区减少了磁盘搜索和系统调用次数,适当提高可以提升溢写效率
    spark.shuffle.file.buffer:32

Reduce 端优化

增大 reduce 缓冲区,减少拉取次数

Spark Shuffle 过程中,shuffle reduce task 的 buffer 缓冲区大小决定了 reduce task 每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能

spark.reducer.maxSizeInFlight reduce 端数据拉取缓冲区的大小设置,默认为 48MB

调节 reduce 端拉取数据重试次数

Spark Shuffle 过程中,reduce task 拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试

spark.shuffle.io.maxRetrie reduce 端拉取数据重试次数设置,该参数就代表了可以重试的最大次数。默认为 3

调节 reduce 端拉取数据等待间隔

Spark Shuffle 过程中,reduce task 拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长(比如 60s),以增加 shuffle 操作的稳定性

spark.shuffle.io.retryWait reduce 端拉取数据等待间隔设置,默认值为 5s

Spark AQE

Spark 在 3.0 版本推出了 AQE(Adaptive Query Execution),即自适应查询执行。AQE 是Spark SQL 的一种动态优化机制,在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化

动态合并分区

动态切换 Join 策略

动态优化 Join 倾斜

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/762986.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Go线程实现模型-P

P 概述 P是G能够在M中运行关键。Go的运行时系统会适时地让P与不同的M建立或断开关联,以使P中的那些可运行的G能够及时获得,这与操作系统内核在CPU之上实时切换不同进程或线程的情况类似 改变P的数量 改变单个Go程序间拥有的P的最大数量有两种方法 调…

安卓手机的自带录屏在哪里找?5个软件帮助你快速给手机录屏

安卓手机的自带录屏在哪里找?5个软件帮助你快速给手机录屏 在安卓手机上进行屏幕录制是一项非常实用的功能,特别是对于需要录制游戏操作、制作教程或演示的用户来说。虽然部分安卓手机可能已经预装了屏幕录制功能,但有时候这些功能可能隐藏在…

技术派Spring事件监听机制及原理

Spring事件监听机制是Spring框架中的一种重要技术,允许组件之间进行松耦合通信。通过使用事件监听机制,应用程序的各个组件可以在其他组件不直接引用的情况下,相互发送和接受消息。 需求 在技术派中有这样一个需求,当发布文章或…

每日Attention学习7——Frequency-Perception Module

模块出处 [link] [code] [ACM MM 23] Frequency Perception Network for Camouflaged Object Detection 模块名称 Frequency-Perception Module (FPM) 模块作用 获取频域信息,更好识别伪装对象 模块结构 模块代码 import torch import torch.nn as nn import to…

Ubuntu(通用)—网络加固—ufw+防DNS污染+ARP绑定

1. ufw sudo ufw default deny incoming sudo ufw deny in from any to any # sudo ufw allow from any to any port 5353 protocol udp sudo ufw enable # 启动开机自启 # sudo ufw reload 更改后的操作2. 防ARP欺骗 华为云教程 arp -d删除dns记录arp -a显示arp表 ipconfi…

IMU在手语识别中的应用

近期,一款由美国和中国科研团队联合研发的新型的穿戴设备——SignRing,以其独特的IMU(惯性测量单元)技术,为聋哑人士的手语识别带来了革命性的突破。SignRing不仅极大地扩展了手语识别的词汇量,更提高了识别…

C++多态~~的两个特殊情况

目录 1.多态的概念 2.简单认识 (1)一个案例 (2)多态的两个满足条件 (3)虚函数的重写 (4)两个特殊情况 1.多态的概念 (1)多态就是多种形态; …

windows USB设备驱动开发-双角色驱动

在USB的通讯协议中,规定发起连接的一方为主机(Host),接受连接的一方为设备,这可以用U盘插入电脑举个例子,当U盘插入电脑后,电脑这边主动发起查询和枚举,U盘被动响应查询和数据存取。 USB 双角色驱动程序堆…

为Ubuntu-24.04-live-server-amd64磁盘扩容

系列文章目录 Ubuntu-24.04-live-server-amd64安装界面中文版 文章目录 系列文章目录前言一、检查系统本身情况1.用 lsblk 命令查看自己系统磁盘是什么状态2.用 df -h 命令查看文件系统的磁盘空间使用情况3.解决 Ubuntu-24.04 磁盘空间只能用一半的问题3-1扩展逻辑卷&#xff…

二叉树层序遍历

题目描述 给你二叉树的根节点 root ,返回其节点值的 层序遍历 。 (即逐层地,从左到右访问所有节点)。 假设有这样一棵二叉树,那么它经过层序遍历的结果就应该是: [[3],[9,20],[15,7]]解法 我们可以用广度…

css美化滚动条样式

效果展示 实现 滚动条宽,高度 /* 整体滚动条 */ ::-webkit-scrollbar {width: 10px; }/* 滚动条轨道 */ ::-webkit-scrollbar-track {background-color: #ffffff;border-radius: 6px; }/* 滚动条滑块 */ ::-webkit-scrollbar-thumb {background-color: #888;borde…

IDEA安装使用、JDBC

day53续 IDEA安装 浏览器搜索,在idea官网直接下载需要的版本安装包,安装流程基本无脑 对于专业版要激活,可找相关资源也可购买;社区版不需要 配置环境变量 JDBC JDBC:java database connectivity SUN公司提供的一套操作数据库的…

计算机毕业设计Python深度学习美食推荐系统 美食可视化 美食数据分析大屏 美食爬虫 美团爬虫 机器学习 大数据毕业设计 Django Vue.js

Python美食推荐系统开题报告 一、项目背景与意义 随着互联网和移动技术的飞速发展,人们的生活方式发生了巨大变化,尤其是餐饮行业。在线美食平台如雨后春笋般涌现,为用户提供了丰富的美食选择。然而,如何在海量的餐饮信息中快速…

【Excel、RStudio计算T检测的具体操作步骤】

目录 一、基础知识1.1 显著性检验1.2 等方差T检验、异方差T检验1.3 单尾p、双尾p1.3.1 检验目的不同1.3.2 用法不同1.3.3 如何选择 二、Excel2.1 统计分析工具2.1.1 添加统计分析工具2.1.2 数据分析 2.2 公式 -> 插入函数 -> T.TEST 三、RStudio 一、基础知识 参考: 1.…

2.2章节python的变量和常量

在Python中,变量和常量有一些基本的概念和用法,但需要注意的是,Python本身并没有内置的“常量”类型。然而,程序员通常会遵循一种约定,即使用全部大写的变量名来表示常量。 一、变量 在Python中,变量是一…

新手教学系列——【Ubuntu】SSH配置详解

在使用Ubuntu进行远程管理和开发时,SSH(Secure Shell)是必不可少的工具。SSH不仅提供安全的远程登录功能,还支持安全的文件传输和端口转发。然而,有时我们可能会遇到SSH连接中断的问题。本文将详细介绍如何配置SSH以提高其稳定性,并解释关键配置项。 为什么会出现SSH连接…

基于X86+FPGA的精密加工检测设备解决方案

应用场景 随着我国高新技术的发展和国防现代化发展,航空、航天等领域需 要的大型光电子器件,微型电子机械、 光 电信息等领域需要的微型器件,还有一些复杂零件的加工需求日益增加,这些都需要借助精密甚至超精密的加工检测设备 客…

算法 —— 滑动窗口

目录 长度最小的子数组 无重复字符的最长子串 最大连续1的个数 将x减到0的最小操作数 找到字符串中所有字母异位词 长度最小的子数组 sum比target小就进窗口,sum比target大就出窗口,由于数组是正数,所以相加会使sum变大,相减…

实施粘贴式导航_滚动事件

● 所谓的粘贴式导航,就是当我们滑动页面到某一个位置的时候,导航不会因为滑动而消失,会固定在页面的顶部,我们来看一下如何实现; ● 首先我们要获取我们想要滚动到哪一部分的时候让导航栏显示出来,这就需要…

前端工程化09-webpack静态的模块化打包工具(未完结)

9.1、开发模式的进化历史 webpacks是一个非常非常的强大的一个工具,相应的这个东西的学习也是有一定的难度的,里边的东西非常的多,里面涉及到的 概念的话也是非常非常的多的。 这个东西既然非常重要,那么在我们前端到底处于怎样…