Spark Rebalance hint的倾斜的处理(OptimizeSkewInRebalancePartitions)

背景

本文基于Spark 3.5.0
目前公司在做小文件合并的时候用到了 Spark Rebalance 这个算子,这个算子的主要作用是在AQE阶段的最后写文件的阶段进行小文件的合并,使得最后落盘的文件不会太大也不会太小,从而达到小文件合并的作用,这其中的主要原理是在于三个规则:OptimizeSkewInRebalancePartitions,CoalesceShufflePartitions,OptimizeShuffleWithLocalRead,这里主要说一下OptimizeSkewInRebalancePartitions规则,CoalesceShufflePartitions的作用主要是进行文件的合并,是得文件不会太小,OptimizeShuffleWithLocalRead的作用是加速shuffle fetch的速度。

结论

OptimizeSkewInRebalancePartitions的作用是对小文件进行拆分,使得罗盘的文件不会太大,这个会有个问题,如果我们在使用Rebalance(col)这种情况的时候,如果col的值是固定的,比如说值永远是20240320,那么这里就得注意一下,关于OptimizeSkewInRebalancePartitions涉及到的参数spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled,spark.sql.adaptive.advisoryPartitionSizeInBytes,spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 这些值配置,如果这些配置调整的不合适,就会导致写文件的时候有可能只有一个Task在运行,那么最终就只有一个文件。而且大大加长了整个任务的运行时间。

分析

直接到OptimizeSkewInRebalancePartitions中的代码中来:

  override def apply(plan: SparkPlan): SparkPlan = {
    if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {
      return plan
    }

    plan transformUp {
      case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>
        tryOptimizeSkewedPartitions(stage)
    }
  }

如果我们禁用掉对rebalance的倾斜处理,也就是spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled为false(默认是true),那么就不会应用此规则,那么如果Col为固定值的情况下,就只会有一个Task进行文件的写入操作,也就只有一个文件,因为一个Task会拉取所有的Map的数据(因为此时每个maptask上的hash(Col)都是一样的,此时只有一个reduce task去拉取数据),如图:

在这里插入图片描述
假如说hash(col)为0,那实际上只有reduceTask0有数据,其他的ReduceTask1等等都是没有数据的,所以最终只有ReduceTask0写文件,并且只有一个文件。

在看合并的计算公式,该数据流如下:

 tryOptimizeSkewedPartitions
      ||
      \/
 optimizeSkewedPartitions
      ||
      \/
 ShufflePartitionsUtil.createSkewPartitionSpecs
      ||
      \/
 ShufflePartitionsUtil.splitSizeListByTargetSize

splitSizeListByTargetSize方法中涉及到的参数解释如下 :

  • 参数 sizes: Array[Long] 表示属于同一个reduce任务的maptask任务的大小数组,举例 sizes = [100,200,300,400]
    表明该任务有4个maptask,0表示maptask为0的所属reduce的大小,1表示maptask为1的所属reduce的大小,依次类推,图解如下:

在这里插入图片描述
比如说reduceTask0的从Maptask拉取的数据的大小分别是100,200,300,400.

  • 参数targetSize 为 spark.sql.adaptive.advisoryPartitionSizeInBytes的值,假如说是256MB
  • 参数smallPartitionFactor为spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 的值,默认是0.2
    这里有个计算公式:
    def tryMergePartitions() = {
      // When we are going to start a new partition, it's possible that the current partition or
      // the previous partition is very small and it's better to merge the current partition into
      // the previous partition.
      val shouldMergePartitions = lastPartitionSize > -1 &&
        ((currentPartitionSize + lastPartitionSize) < targetSize * MERGED_PARTITION_FACTOR ||
        (currentPartitionSize < targetSize * smallPartitionFactor ||
          lastPartitionSize < targetSize * smallPartitionFactor))
      if (shouldMergePartitions) {
        // We decide to merge the current partition into the previous one, so the start index of
        // the current partition should be removed.
        partitionStartIndices.remove(partitionStartIndices.length - 1)
        lastPartitionSize += currentPartitionSize
      } else {
        lastPartitionSize = currentPartitionSize
      }
    }
    。。。
    while (i < sizes.length) {
      // If including the next size in the current partition exceeds the target size, package the
      // current partition and start a new partition.
      if (i > 0 && currentPartitionSize + sizes(i) > targetSize) {
        tryMergePartitions()
        partitionStartIndices += i
        currentPartitionSize = sizes(i)
      } else {
        currentPartitionSize += sizes(i)
      }
      i += 1
    }
    tryMergePartitions()
    partitionStartIndices.toArray

这里的计算公式大致就是:从每个maptask中的获取到属于同一个reduce的数值,依次累加,如果大于targetSize就尝试合并,直至到最后一个maptask
可以看到tryMergePartitions有个计算公式:currentPartitionSize < targetSize * smallPartitionFactor,也就是说如果当前maptask的对应的reduce分区数据 小于 256MB*0.2 = 51.2MB 的话,也还是会合并到前一个分区中去,如果smallPartitionFactor设置过大,可能会导致所有的分区都会合并到一个分区中去,最终会导致一个文件会有几十GB(也就是targetSize * smallPartitionFactor`*shuffleNum),
比如说以下的测试案例:

    val targetSize = 100
    val smallPartitionFactor2 = 0.5
    // merge last two partition if their size is not bigger than smallPartitionFactor * target
    val sizeList5 = Array[Long](50, 50, 40, 5)
    assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
      sizeList5, targetSize, smallPartitionFactor2).toSeq ==
      Seq(0))

    val sizeList6 = Array[Long](40, 5, 50, 45)
    assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
      sizeList6, targetSize, smallPartitionFactor2).toSeq ==
      Seq(0))

这种情况下,就会只有一个reduce任务运行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/477409.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【算法训练营】周测4

清华大学驭风计划课程链接 学堂在线 - 精品在线课程学习平台 (xuetangx.com) 如果需要答案代码可以私聊博主 有任何疑问或者问题&#xff0c;也欢迎私信博主&#xff0c;大家可以相互讨论交流哟~~ 考题11-4 题目描述 输入格式 从标准输入读入数据。 输入第一行为两个正整…

Vue+jquery+jquery.maphilight实现图片热区高亮以及点击效果

//鼠标悬浮效果 mounted() {this.setCurrentTask(0); //对于id为mapAll的热区图&#xff0c;设置鼠标放置在上面有一个颜色 fillColor填充颜色 strokeColor边框颜色 strokeWidth边框宽度 fillOpacity 是设置热区填充颜色的不透明度的属性。 alwaysOn:true 保持常量$(function(…

个人网站制作 Part 14 添加网站分析工具 | Web开发项目

文章目录 &#x1f469;‍&#x1f4bb; 基础Web开发练手项目系列&#xff1a;个人网站制作&#x1f680; 添加网站分析工具&#x1f528;使用Google Analytics&#x1f527;步骤 1: 注册Google Analytics账户&#x1f527;步骤 2: 获取跟踪代码 &#x1f528;使用Vue.js&#…

部署单节点k8s并允许master节点调度pod

安装k8s 需要注意的是k8s1.24 已经弃用dockershim&#xff0c;现在使用docker需要cri-docker插件作为垫片&#xff0c;对接k8s的CRI。 硬件环境&#xff1a; 2c2g 主机环境&#xff1a; CentOS Linux release 7.9.2009 (Core) IP地址&#xff1a; 192.168.44.161 一、 主机配…

垃圾回收-垃圾回收中的相关概念

目录 System.gc()的理解 内存泄漏&#xff08;Memory Leak&#xff09; 内存溢出&#xff08;OOM&#xff09; Stop The World 垃圾回收的串行、并行与并发 安全点与安全区域 强、软、弱、虚引用 强、软、弱、虚引用 终结器引用 System.gc()的理解 在默认情况下&#…

【蓝桥杯】第15届蓝桥杯青少组stema选拔赛C++中高级真题答案(20240310)

一、选择题 第 1 题 第 2 题 表达式1000/3的结果是( A )。 A.333 B.333.3 C.334 D.333.0 第 3 题 下列选项中&#xff0c;判断a等于1并且b等于1正确的表达式是( B )。 A.!((a!1)&&(b!1)) B.!((a!1)||(b!1)) C.!(a1)&&(b1) D.(a1)&&(b1) 【解析】 A…

数据机构-2(顺序表)

线性表 概念 顺序表 示例&#xff1a;创建一个存储学生信息的顺序表 表头&#xff08;Tlen总长度&#xff0c; Clen当前长度&#xff09; 函数 #include <seqlist.c> #include <stdio.h> #include <stdlib.h> #include "seqlist.h" #include &…

mysql四种事务隔离级别,2024金三银四

TransactionDefinition.PROPAGATION_MANDATORY&#xff1a;如果当前存在事务&#xff0c;则加入该事务&#xff1b;如果当前没有事务&#xff0c;则抛出异常。 TransactionDefinition.PROPAGATION_NESTED&#xff1a;如果当前存在事务&#xff0c;则创建一个事务作为当前事务的…

快来围观!我自制的 AI 周报小能手:自动收集整理周报,一键发送邮件

前言 上篇文章分享了《跟着我的步骤&#xff0c;轻松打造出 AI 智能体》&#xff0c;很多朋友都比较感兴趣&#xff0c;咨询我问 “AI 小白能学吗&#xff1f;” 我感觉问题不大&#xff0c;完全可以&#xff0c;只要把要做的事情屡明白了&#xff0c;遇到的卡点问题直接问 GPT…

oracle 19c单机版本补丁升级

文章目录 一、补丁包概述二、备份opatch三、替换高版本opatch四、打DB补丁1、关闭数据库2、关闭监听3、解压补丁4、冲突检测5、补丁空间检查6、执行补丁升级7、将更新内容加载到数据库8、最后查看数据库版本9、卸载补丁包 一、补丁包概述 补丁升级包 链接&#xff1a;https://…

GStreamer简单看看

主要是现在弄摄像头&#xff0c;要用到这东西。所以学学。 最权威主页&#xff1a;GStreamer: open source multimedia framework 大概看了下&#xff0c;好像命令也不难。 gst-launch-1.0 v4l2src device/dev/video0 ! video/x-raw,formatYUY2,width640,height480,framerat…

Java-SSM电影购票系统

Java-SSM电影购票系统 1.服务承诺&#xff1a; 包安装运行&#xff0c;如有需要欢迎联系&#xff08;VX:yuanchengruanjian&#xff09;。 2.项目所用框架: 前端:JSP、layui、bootstrap等。 后端:SSM,即Spring、SpringMvc、Mybatis等。 3.项目功能点: 3-1.后端功能: 1.用户管…

【漏洞复现】Arris 路由器 basic_sett 信息泄露漏洞

免责声明&#xff1a;文章来源互联网收集整理&#xff0c;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;所产生的一切不良后果与文章作者无关。该…

小车侧方位停车过程的动态模拟matlab仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 5.完整工程文件 1.课题概述 小车侧方位停车过程的动态模拟matlab仿真。仿真得到小车的停车动画&#xff0c;小车移动的xy轴坐标以及角度变换。 2.系统仿真结果 3.核心程序与模型 版本&#xff1a;MATLA…

Linux系统资源管理

Linux系统资源命令 在Linux中查看系统资源常用命令有哪些 在Linux中&#xff0c;系统资源是指计算机硬件、软件和网络设备等可以利用的一切物质和能量。Linux中的系统资源包括&#xff1a; CPU&#xff08;中央处理器&#xff09;&#xff1a;用于处理计算机中的指令和数据的…

opencv各个模块介绍(2)

Features2D 模块&#xff1a;特征检测和描述子计算模块&#xff0c;包括SIFT、SURF等算法。 Features2D 模块提供了许多用于特征检测和描述子匹配的函数和类&#xff0c;这些函数和类可用于图像特征的提取、匹配和跟踪。 FeatureDetector&#xff1a;特征检测器的基类&#xf…

AI应用开发-基于python的知识图谱技术

AI应用开发相关目录 本专栏包括AI应用开发相关内容分享&#xff0c;包括不限于AI算法部署实施细节、AI应用后端分析服务相关概念及开发技巧、AI应用后端应用服务相关概念及开发技巧、AI应用前端实现路径及开发技巧 适用于具备一定算法及Python使用基础的人群 AI应用开发流程概…

19 反向迭代器

反向迭代器和正向迭代器相反&#xff0c;比如一个数组内容是1,2,3,4,5。正向迭代器就是按顺序输出&#xff0c;反向迭代器是5,4,3,2,1&#xff0c;顺序倒着。想要第一个输出5&#xff0c;需要反向迭代器rbegin在5的位置,判断输出完的条件,rend在头节点的位置就行&#xff0c;只…

【go从入门到精通】select条件控制

作者简介&#xff1a; 高科&#xff0c;先后在 IBM PlatformComputing从事网格计算&#xff0c;淘米网&#xff0c;网易从事游戏服务器开发&#xff0c;拥有丰富的C&#xff0c;go等语言开发经验&#xff0c;mysql&#xff0c;mongo&#xff0c;redis等数据库&#xff0c;设计模…

【八股】ThreadLocal原理

1. Thread.java 我们首先打开Thread.java源码&#xff0c;看到里面有一个ThreadLocalMap类型的变量threadLocals 2. ThreadLocal.java -> getMap(thread t) 然后ThreadLocal.java里面有一个getMap函数&#xff0c;传入的是线程&#xff0c;返回的是线程里面的ThreadLoca…