Spark UI中 Shuffle Exchange 和 BroadcastExchange 中的 dataSize 值为什么不一样

背景

Spark 3.5
最近在看Spark UI 上的一些指标看到一个很有意思的东西, 相邻的Shuffle Exechange 和 BroadcastExechange 中的 datasize 居然不一样,
前者为 765KB, 后者为 64.5MB。差别还不少,中间就增加了一个 AQEShuffleRead 计划

结论

Shuffle Exechange 中的是真实 UnsafeRow的大小
BroadcastExechange 中的是 MemoryBlock 类型数据结构所占的大小 ,而不是UnsafeRow的大小。
BroadcastExechange中的datasize大小 和 2的整数倍接近。

现象以及分析

上图:
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

两个同样的 ShuffleExechange 记录条数和 ShuffleExechange 中 datasize 大小不一样,而在BroadcastExechange 中 dataSize 大小却是一样的(都是64.5MB)
关于 ShuffleExchange中的 dataSize的计算可以参考:Spark UI中Shuffle dataSize 和shuffle bytes written 指标区别,这里重点分析一下后者.
直接看BroadcastExechange代码:

  override lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
    SQLExecution.withThreadLocalCaptured[broadcast.Broadcast[Any]](
      session, BroadcastExchangeExec.executionContext) {
          try {
            // Setup a job tag here so later it may get cancelled by tag if necessary.
            sparkContext.addJobTag(jobTag)
            sparkContext.setInterruptOnCancel(true)
            val beforeCollect = System.nanoTime()
            // Use executeCollect/executeCollectIterator to avoid conversion to Scala types
            val (numRows, input) = child.executeCollectIterator()
            ...
            val relation = mode.transform(input, Some(numRows))
            val dataSize = relation match {
              case map: HashedRelation =>
                map.estimatedSize
              case arr: Array[InternalRow] =>
                arr.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
              case _ =>
                throw new SparkException("[BUG] BroadcastMode.transform returned unexpected " +
                  s"type: ${relation.getClass.getName}")
            }
            longMetric("dataSize") += dataSize

其中child.executeCollectIterator() 是在把数据从各个 Executor 收集到 Driver 端来,便于进行广播操作。
最主要的是 mode.transform(input, Some(numRows)),这里的数据流如下:


HashedRelationBroadcastMode.transform
  ||
  \/
HashedRelation.apply(rows, key, numRows.toInt, isNullAware = isNullAware)
  ||
  \/
UnsafeHashedRelation.apply(input, key, sizeEstimate, mm, isNullAware, allowsNullKey,
        ignoresDuplicatedKey)
  ||
  \/
new UnsafeHashedRelation(key.size, numFields, binaryMap)

最终调用的 UnsafeHashedRelation.estimatedSize的方法:

  override def estimatedSize: Long = binaryMap.getTotalMemoryConsumption
    

getTotalMemoryConsumptiondataPages所占用的大小再加上longArray的大小:

  public long getTotalMemoryConsumption() {
    long totalDataPagesSize = 0L;
    for (MemoryBlock dataPage : dataPages) {
      totalDataPagesSize += dataPage.size();
    }
    return totalDataPagesSize + ((longArray != null) ? longArray.memoryBlock().size() : 0L);
  }

那么 BytesToBytesMap 是怎么分配的呢?如下:

    val binaryMap = new BytesToBytesMap(
      taskMemoryManager,
      // Only 70% of the slots can be used before growing, more capacity help to reduce collision
      (sizeEstimate * 1.5 + 1).toInt,
      pageSizeBytes)

默认的PageSize值为:defaultPageSizeBytes:

  private lazy val defaultPageSizeBytes = {
    val minPageSize = 1L * 1024 * 1024   // 1MB
    val maxPageSize = 64L * minPageSize  // 64MB
    val cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors()
    // Because of rounding to next power of 2, we may have safetyFactor as 8 in worst case
    val safetyFactor = 16
    val maxTungstenMemory: Long = tungstenMemoryMode match {
      case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.poolSize
      case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.poolSize
    }
    val size = ByteArrayMethods.nextPowerOf2(maxTungstenMemory / cores / safetyFactor)
    val chosenPageSize = math.min(maxPageSize, math.max(minPageSize, size))
    if (Utils.isG1GC && tungstenMemoryMode == MemoryMode.ON_HEAP) {
      chosenPageSize - Platform.LONG_ARRAY_OFFSET
    } else {
      chosenPageSize
    }
  }

这个跟内存以及core有关。
当在进行val loc = binaryMap.lookup 以及loc.append操作的时候就会进行dataPage以及longArray的分配。而该size的大小并不是实际占用的大小,而是分配给该dataPage的大小。其实你会发现该datasize的大小几乎和2的倍数接近。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/341819.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

c语言:链表

链表的大致思维导图: 链表的相关概念和解释: 节点(Node):链表中的每个元素都是一个节点,节点包含数据和指向下一个节点的指针。 头节点(Head Node):链表的第一个节点称…

【动态规划】879. 盈利计划

作者推荐 【动态规划】【广度优先搜索】【状态压缩】847 访问所有节点的最短路径 本文涉及知识点 动态规划汇总 LeetCode879. 盈利计划 集团里有 n 名员工,他们可以完成各种各样的工作创造利润。 第 i 种工作会产生 profit[i] 的利润,它要求 group[…

在linux部署Prometheus+Grafana+Exporter监控系统性能

Prometheus、Grafana和Report组件是什么? Prometheus、Grafana和Exporter是常用于系统监控和指标收集的组合。 Prometheus是一种开源的系统监控和警报工具。它可以收集各种指标数据,并提供强大的查询语言和灵活的警报规则,用于实时监控系统…

[框架系列]-[通用lock框架]集成及具体配置使用

目录 一:框架集成 1.添加pom依赖 2.开启lock配置 二:配置详细介绍 1.配置清单 2.具体配置介绍 (1)implementer (2)type (3)transactionStrategy (4&#xff09…

利用大模型审合同真的可以吗?

#大模型 #大模型审合同 最近有很多朋友留言,询问关于大模型审合同的问题,今天就小智一起来探讨下这个问题。 (智合同-采用深度学习、自然语言处理技术、知识图谱等人工智能技术,为企业提供专业的合同相关的智能服务。其服务包含…

【Java IO】设计模式 (装饰者模式)

Java I/O 使用了装饰者模式来实现。 装饰者模式 请参考装饰者模式详解 装饰者(Decorator)和具体组件(ConcreteComponent)都继承自组件(Component),具体组件的方法实现不需要依赖于其它对象,而装饰者组合了一个组件,这样它可以装饰其它装饰者…

Excel数据检索省力小工具(文末附源码)

Excel数据检索省力小工具(文末附源码) 引言 ​ 相信很多人都是用过VLOOKUP函数来检索和处理Excel数据。比如教师查看班级学生成绩表,想单独检索某个科目、某个学生,某一分数段(80~90分数段内的成绩)&…

网络安全基础概念

目录 网络安全背景 网络空间安全 --- Cyberspace 常见的网络安全术语 协议栈自身的脆弱性: 常见安全风险: 物理层--物理攻击 物理设备窃听: 链路层-- MAC洪泛攻击: 链路层--ARP欺骗 网络层--ICMP攻击 传输层--TCP SYN Flood攻击: …

信息检索与数据挖掘 | (八)语言建模的IR

文章目录 📚语言生成模型📚平滑🐇线性插值平滑方法(Lelinek-Mercer)🐇dirichlet 平滑🐇Vector space(向量空间) vs BM25 vs LM 📚语言生成模型 传统的语言生成模型可以用于识别或生成…

南京观海微电子---时序分析基本概念(二)——保持时间

1. 概念的理解 以上升沿锁存为例,保持时间(Th)是指在触发器的时钟信号上升沿到来以后,数据稳定不变的时间。如下图所示,一个数据要在上升沿被锁存,那么这个数据需要在时钟上升沿到来后的保持时间内保持稳定…

展会日记:ICCAD2023,Samtec连接器无处不在

【序言】 “作为重要的电子元器件,连接器在如今的数字与现实世界中,扮演了不可或缺的角色。Samtec作为全球知名的连接器厂商,在芯片到板、板到板、射频、光模块等领域都有着卓越表现~ 今年,我们更是将这种存在感在2023 ICCAD上&a…

Nginx 基础使用

目录结构 进入Nginx的主目录我们可以看到这些文件夹 client_body_temp conf fastcgi_temp html logs proxy_temp sbin scgi_temp uwsgi_temp其中这几个文件夹在刚安装后是没有的,主要用来存放运行过程中的临时文件 client_body_temp fastcgi_temp proxy_temp scg…

uniapp中打包Andiord app,在真机调试时地图以及定位功能可以正常使用,打包成app后失效问题(高德地图)

踩坑uniapp中打包Andiord app,在真机调试时地图以及定位功能可以正常使用,打包成app后失效问题_uniapp真机调试高德地图正常 打包apk高德地图就不加载-CSDN博客 问题: 目前两个项目,一个项目是从另一个项目里面分割出来的一整套…

华为云磁盘性能指标(参考)

MD[华为云磁盘性能指标(参考)] 云硬盘(Elastic Volume Service, EVS) 根据性能,磁盘可分为极速型SSD V2、极速型SSD、通用型SSD V2、超高IO、通用型SSD、高IO、普通IO。 性能指标(参考),测速说明:操作系统-windows …

共襄Agent智能体盛举,实在智能2024生态伙伴大会杭州站圆满收官!

1月19日,以“实在Agent智能体”为主题的「2024实在智能生态伙伴大会(杭州站)」在杭州人工智能小镇隆重启幕! 中国电信/联通/中海油等数十家央企子公司领导代表、天翼数科/华为/浪潮/统信/贝锐/vivo集团/新华三/中软国际/中投创展/…

华为AC+FIT AP组网配置

AC配置 vlan batch 100 to 101dhcp enableip pool apgateway-list 192.168.100.254 network 192.168.100.0 mask 255.255.255.0 interface Vlanif100ip address 192.168.100.254 255.255.255.0dhcp select globalinterface GigabitEthernet0/0/1port link-type trunkport trun…

Flutter 自定义AppBar实现滚动渐变

1、使用ListView实现上下滚动。 2、使用Stack:允许将其子部件放在彼此的顶部,第一个子部件将放置在底部。所以AppBar,写在ListView下面。 3、MediaQuery.removePadding:当使用ListView的时候发现,顶部有块默认的Padd…

【蓝桥杯冲冲冲】排队接水--贪心算法巩固 (≧∇≦)

蓝桥杯备赛 | 洛谷做题打卡day15 文章目录 蓝桥杯备赛 | 洛谷做题打卡day15排队接水题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 提示思路 题解代码我的一些话 排队接水 题目描述 有 n n n 个人在一个水龙头前排队接水,假如每个人接水的时间为 T i T_…

使用torch实现RNN

在实验室的项目遇到了困难,弄不明白LSTM的原理。到网上搜索,发现LSTM是RNN的变种,那就从RNN开始学吧。 带隐藏状态的RNN可以用下面两个公式来表示: 可以看出,一个RNN的参数有W_xh,W_hh,b_h&am…

Linux如何将文件或目录打成rpm包? -- fpm打包详解

👨‍🎓博主简介 🏅云计算领域优质创作者   🏅华为云开发者社区专家博主   🏅阿里云开发者社区专家博主 💊交流社区:运维交流社区 欢迎大家的加入! 🐋 希望大家多多支…