Spark AQE 导致的 Driver OOM问题

背景

最近在做Spark 3.1 升级 Spark 3.5的过程中,遇到了一批SQL在运行的过程中 Driver OOM的情况,排查到是AQE开启导致的问题,再次分析记录一下,顺便了解一下Spark中指标的事件处理情况

结论

SQLAppStatusListener 类在内存中存放着 一个整个SQL查询链的所有stage以及stage的指标信息,在AQE中 一个job会被拆分成很多job,甚至几百上千的job,这个时候 stageMetrics的数据就会成百上倍的被存储在内存中,从而导致Driver OOM
解决方法:

  1. 关闭AQE spark.sql.adaptive.enabled false
  2. 合并对应的PR-SPARK-45439

分析

背景知识:对于一个完整链接的sql语句来说(比如说从 读取数据源,到 数据处理操作,再到插入hive表),这可以称其为一个最小的SQL执行单元,这最小的数据执行单元在Spark内部是可以跟踪的,也就是用executionId来进行跟踪的。
对于一个sql,举例来说 :

insert into  TableA select * from TableB;

在生成 物理计划的过程中会调用 QueryExecution.assertOptimized 方法,该方法会触发eagerlyExecuteCommands调用,最终会到SQLExecution.withNewExecutionId方法:

  def assertOptimized(): Unit = optimizedPlan

  ...
  lazy val commandExecuted: LogicalPlan = mode match {
    case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands)
    case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)
    case CommandExecutionMode.SKIP => analyzed
  }
  ...
  lazy val optimizedPlan: LogicalPlan = {
  // We need to materialize the commandExecuted here because optimizedPlan is also tracked under
  // the optimizing phase
  assertCommandExecuted()
  executePhase(QueryPlanningTracker.OPTIMIZATION) {
    // clone the plan to avoid sharing the plan instance between different stages like analyzing,
    // optimizing and planning.
    val plan =
      sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
    // We do not want optimized plans to be re-analyzed as literals that have been constant
    // folded and such can cause issues during analysis. While `clone` should maintain the
    // `analyzed` state of the LogicalPlan, we set the plan as analyzed here as well out of
    // paranoia.
    plan.setAnalyzed()
    plan
  }
  
  def assertCommandExecuted(): Unit = commandExecuted
  ...
  private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown {
    case c: Command =>
      // Since Command execution will eagerly take place here,
      // and in most cases be the bulk of time and effort,
      // with the rest of processing of the root plan being just outputting command results,
      // for eagerly executed commands we mark this place as beginning of execution.
      tracker.setReadyForExecution()
      val qe = sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT)
      val name = commandExecutionName(c)
      val result = QueryExecution.withInternalError(s"Eagerly executed $name failed.") {
        SQLExecution.withNewExecutionId(qe, Some(name)) {
          qe.executedPlan.executeCollect()
        }
      }  

SQLExecution.withNewExecutionId主要的作用是设置当前计划的所属的executionId:

    val executionId = SQLExecution.nextExecutionId
    sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)

EXECUTION_ID_KEY的值会在JobStart的时候传递给Event,以便记录跟踪整个执行过程中的指标信息。
同时我们在方法中eagerlyExecuteCommands看到qe.executedPlan.executeCollect()这是具体的执行方法,针对于insert into 操作来说,物理计划就是
InsertIntoHadoopFsRelationCommand,这里的run方法最终会流转到DAGScheduler.submitJob方法:

    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      JobArtifactSet.getActiveOrDefault(sc),
      Utils.cloneProperties(properties)))

最终会被DAGScheduler.handleJobSubmitted处理,其中会发送SparkListenerJobStart事件:

    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,
        Utils.cloneProperties(properties)))

该事件会被SQLAppStatusListener捕获,从而转到onJobStart处理,这里有会涉及到指标信息的存储,这里我们截图出dump的内存占用情况:
在这里插入图片描述

可以看到 SQLAppStatusListener 的 LiveStageMetrics 占用很大,也就是 accumIdsToMetricType占用很大

那在AQE中是怎么回事呢?
我们知道再AQE中,任务会从source节点按照shuffle进行分割,从而形成单独的job,从而生成对应的shuffle指标,具体的分割以及执行代码在AdaptiveSparkPlanExec.getFinalPhysicalPlan中,如下:

      var result = createQueryStages(currentPhysicalPlan)
      val events = new LinkedBlockingQueue[StageMaterializationEvent]()
      val errors = new mutable.ArrayBuffer[Throwable]()
      var stagesToReplace = Seq.empty[QueryStageExec]
      while (!result.allChildStagesMaterialized) {
        currentPhysicalPlan = result.newPlan
        if (result.newStages.nonEmpty) {
          stagesToReplace = result.newStages ++ stagesToReplace
          executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))

          // SPARK-33933: we should submit tasks of broadcast stages first, to avoid waiting
          // for tasks to be scheduled and leading to broadcast timeout.
          // This partial fix only guarantees the start of materialization for BroadcastQueryStage
          // is prior to others, but because the submission of collect job for broadcasting is
          // running in another thread, the issue is not completely resolved.
          val reorderedNewStages = result.newStages
            .sortWith {
              case (_: BroadcastQueryStageExec, _: BroadcastQueryStageExec) => false
              case (_: BroadcastQueryStageExec, _) => true
              case _ => false
            }

          // Start materialization of all new stages and fail fast if any stages failed eagerly
          reorderedNewStages.foreach { stage =>
            try {
              stage.materialize().onComplete { res =>
                if (res.isSuccess) {
                  events.offer(StageSuccess(stage, res.get))
                } else {
                  events.offer(StageFailure(stage, res.failed.get))
                }
                // explicitly clean up the resources in this stage
                stage.cleanupResources()
              }(AdaptiveSparkPlanExec.executionContext)

这里就是得看stage.materialize()这个方法,这两个stage只有两类:BroadcastQueryStageExec 和 ShuffleQueryStageExec
这两个物理计划稍微分析一下如下:

  • BroadcastQueryStageExec
    数据流如下:
    broadcast.submitBroadcastJob
          ||
          \/
    promise.future
          ||
          \/
    relationFuture
          ||
          \/
    child.executeCollectIterator()
    
    
    其中 promise的设置在relationFuture方法中,而relationFuture 会被doPrepare调用,而submitBroadcastJob会调用executeQuery,从而调用doPrepare,executeCollectIterator()最终也会发送JobSubmitted事件,分析和上面的一样
  • ShuffleQueryStageExec
     shuffle.submitShuffleJob
          ||
          \/
     sparkContext.submitMapStage(shuffleDependency)
          ||
          \/
     dagScheduler.submitMapStage
    
    

submitMapStage会发送MapStageSubmitted事件:

    eventProcessLoop.post(MapStageSubmitted(
      jobId, dependency, callSite, waiter, JobArtifactSet.getActiveOrDefault(sc),
      Utils.cloneProperties(properties)))

最终会被DAGScheduler.handleMapStageSubmitted处理,其中会发送SparkListenerJobStart事件:

    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,
        Utils.cloneProperties(properties)))

该事件会被SQLAppStatusListener捕获,从而转到onJobStart处理:

  private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]()
  private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()
   ...
 
  override def onJobStart(event: SparkListenerJobStart): Unit = {
    val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    if (executionIdString == null) {
      // This is not a job created by SQL
      return
    }

    val executionId = executionIdString.toLong
    val jobId = event.jobId
    val exec = Option(liveExecutions.get(executionId))

该方法会获取事件中的executionId,在AQE中,同一个执行单元的executionId是一样的,所以stageMetrics内存占用会越来越大。
而这里指标的更新是在AdaptiveSparkPlanExec.onUpdatePlan等方法中。

这样整个事件的数据流以及问题的产生原因就应该很清楚了。

其他

为啥AQE以后多个Job还是共享一个executionId呢?因为原则上来说,如果没有开启AQE之前,一个SQL执行单元的是属于同一个Job的,开启了AQE之后,因为AQE的原因,一个Job被拆成了了多个Job,但是从逻辑上来说,还是属于同一个SQL处理单元的所以还是得归属到一次执行中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/577347.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

mac 教程 终端如何拆墙

一直觉得自己写的不是技术,而是情怀,一个个的教程是自己这一路走来的痕迹。靠专业技能的成功是最具可复制性的,希望我的这条路能让你们少走弯路,希望我能帮你们抹去知识的蒙尘,希望我能帮你们理清知识的脉络&#xff0…

面试:finalize

一、概述 将资源释放和清理放在finalize方法中非常不好,非常影响性能,严重时甚至会引起OOM(Out Of Memory),从Java9开始就被标注为Deprecated,不建议被使用了。 二、两个重要的队列 1、unfinalized 队列 当…

SpringBoot中多数据源灵活切换解决方案

本篇内容介绍了“SpringBoot中如何使用Dynamic Datasource配置多数据源”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成! 源码地址/文档说明 功能特性: 支持 数据源分组…

vue与Spring boot数据交互例子【简单版】

文章目录 什么是Vue?快速体验Vueaxios是什么?向Springboot后端发送数据接收Springboot后端数据小结 什么是Vue? 官网解释:Vue 是一套用于构建用户界面的渐进式框架。与其它大型框架不同的是,Vue 被设计为可以自底向上…

JAVA12

JAVA12 1 概述2 语法层次的变化1_swich表达式(预览) 3 API层次的变化1_支持数字压缩格式化2_String新方法3_Files新增mismatch方法 4 关于GC方面的新特性1_Shenandoah GC:低停顿时间的GC(预览)2_可中断的 G1 Mixed GC3_ 增强G1 5 其他新特性简…

ENVI下基于劈窗算法从MODIS数据中反演海表温度

劈窗算法最初是为反演海面温度开发的,具体地说是针对NOAA/AVHRR的4和5通道设计的,后来也被用来反演地表温度,这种算法较成熟,精度也高。劈窗算法以地表热辐射传导方程为基础,利用10~13μm 大气窗口内,两个相…

全志ARM-修改开发板内核启动日志

修改开发板内核日志输出级别: 默认输出级别为1,需要用超级用户权限修改 sudo vi /boot/orangepiEvn.txt 把第一行内核启动输出权限改为7,第二行把输出方式该为“serial”串口输出

Typora for Mac:轻量级Markdown编辑器

Typora for Mac是一款专为Mac用户设计的轻量级Markdown编辑器,它以其简洁的界面和强大的功能,成为了Markdown写作爱好者的首选工具。 Typora for Mac v1.8.10中文激活版下载 Typora的最大特色在于其所见即所得的编辑模式,用户无需关心复杂的M…

Ubuntu Mysql修改密码时遇到的问题

参考: ubuntu18.04 首次登录mysql未设置密码或忘记密码解决方法_ubuntu中mysql设置密码-CSDN博客 1. use mysql; #连接到mysql数据库 2. update mysql.user set authentication_stringpassword(123456) where userroot and Host localhost; #修改密码123456是密码…

微信小程序:8.WXSS

WXSS和CSS的关系 WXSS具有CSS大部分特性,同时,WXSS还对CSS进行扩充以及修改,适应微信小程序的开发。 与CSS相比,WXSS扩展的特性有: rpx尺寸单位imprt样式导入 rpx尺寸单位 rpx是微信小程序中独有的,用来…

相关运算及实现

本文介绍相关运算及实现。 相关运算在相关检测及数字锁相放大中经常用到,其与卷积运算又有一定的联系,本文简要介绍其基本运算及与卷积运算的联系,并给出实现。 1.定义 这里以长度为N的离散时间序列x(n),y(n)为例,相关运算定义如…

OSPF域间路由

注:区域(area)是以接口进行划分的 描述: R1的g0/0/1接口属于area 0 √ R1属于区域0和区域1 1.设计原则 1、OSPF区域的设计原则: 骨干区域有且只能存在一个 非骨干区域必须和骨干区域相连 多区域时&#…

使用JavaScript日历小部件和DHTMLX Gantt的应用场景(一)

DHTMLX Suite UI 组件库允许您更快地构建跨平台、跨浏览器 Web 和移动应用程序。它包括一组丰富的即用式 HTML5 组件,这些组件可以轻松组合到单个应用程序界面中。 DHTMLX Gantt是用于跨浏览器和跨平台应用程序的功能齐全的Gantt图表,可满足项目管理应用…

深入了解MySQL:从基础到特性,全面解读关系数据库管理系统的历史与应用

文章目录 1. MySQL简介1.1 概述1.2 架构与兼容性1.3 开源与社区支持 2. MySQL的历史2.1 创始与初衷2.2 发展历程2.3 在Oracle的持续发展2.4 开源与商业结合 3. MySQL的核心特性4. MySQL在实际应用中的作用4.1 网站建设与内容管理4.2 商业智能与客户关系管理4.3 企业级应用与云集…

新媒体运营-----短视频运营-----PR视频剪辑----抠像及美颜磨皮

新媒体运营-----短视频运营-----PR视频剪辑-----持续更新(进不去说明我没写完):https://blog.csdn.net/grd_java/article/details/138079659 文章目录 1. 超级键抠像绿(蓝)幕背景2. 常规视频抠像3. 美颜磨皮 1. 超级键抠像绿(蓝)幕背景 如果我们的素材是在摄影棚进行…

【R语言实战】——kNN和朴素贝叶斯方法实战

🍉CSDN小墨&晓末:https://blog.csdn.net/jd1813346972 个人介绍: 研一|统计学|干货分享          擅长Python、Matlab、R等主流编程软件          累计十余项国家级比赛奖项,参与研究经费10w、40w级横向 文…

.net8系列-04图文并茂手把手教你配置Swagger支持token以及实现Swagger扩展,Swagger代码单独抽离

前情提要 接上篇文章,我们当前已完成如下内容: 创建应用成功创建接口成功配置Swagger实现接口注释和版本控制 本文章主要内容为:配置Swagger支持token传值测试接口 快速上手-代码配置 添加如下代码 文件目录:\xiaojinWebAppl…

06_Scala流程控制

文章目录 [toc] 1.流程控制**小结:** **2. Scala中流程控制没有三元运算符****2.1 Scala中如果逻辑代码只有一行可以省略花括号****小结:** **3. 循环控制****3.1 for控制****3.2循环守卫 --> 循环表达式添加逻辑判断****3.3 循环步长 --> 表示循环…

​「Python大数据」词频数据渲染词云图导出HTML

前言 本文主要介绍通过python实现数据聚类、脚本开发、办公自动化。词频数据渲染词云图导出HTML。 一、业务逻辑 读取voc数据采集的数据批处理,使用jieba进行分词,去除停用词词频数据渲染词云图将可视化结果保存到HTML文件中二、具体产出 三、执行脚本 python wordCloud.p…

Flutter - 折叠面板

demo 地址: https://github.com/iotjin/jh_flutter_demo 代码不定时更新,请前往github查看最新代码 flutter 自定义折叠组件 支持三种类型和两种展示效果可自定义title和被折叠的内容 效果图 示例 import package:flutter/material.dart; import /jh_common/widge…