Spark 之 Aggregate

Aggregate

参考链接:

  • https://github.com/PZXWHU/SparkSQL-Kernel-Profiling

完整的聚合查询的关键字包括 group by、 cube、 grouping sets 和 rollup 4 种 。 分组语句 group by 后面可以是一个或多个分组表达式( groupingExpressions )。

聚合查询还支持 OLAP 场景下的多维分析,包括 rollup、 cube 和 grouping sets 3 种操作 。

逻辑节点 Aggregate

在这里插入图片描述

逻辑算子树节点通过分组表达式列表( groupingExpressions )、聚合表达式列表( aggregateExpressions )和子节点( child )构造而成,
其中分组表达式类型都是 Expression ,而聚合表达式类型都是 NamedExpression ,意味着聚合表达式一般都需要设置名字。
aggregateExpressions 对应聚合函数,而 resultExpressions 则包含了 Select 语句中选择的所有列信息 。

示例之 partial Aggregate 对应 logical plan

在这里插入图片描述
里面的mode 直接也是 Complete

示例之 final Aggregate 对应 logical plan

在这里插入图片描述

NamedExpression (这里对应的是Alias) 里 的child 是 AggregateFunction,里面的mode 直接就是 Complete

case class Alias(child: Expression, name: String)

case class Alias(child: Expression, name: String)(
    val exprId: ExprId = NamedExpression.newExprId,
    val qualifier: Seq[String] = Seq.empty,
    val explicitMetadata: Option[Metadata] = None,
    val nonInheritableMetadataKeys: Seq[String] = Seq.empty)
  extends UnaryExpression with NamedExpression {
物理 Aggregate

对于聚合查询,逻辑算子树转换为物理算子树,必不可少的是 Aggregation 转换策略 。 实际上, Aggregation 策略是基于 PhysicalAggregation 的 。 与 PhysicalOperation 类似,PhysicalAggregation 也是一种逻辑算子树的模式,用来匹配逻辑算子树中的 Aggregate 节点并提取该节点中的相关信息 。 PhysicalAggregation 在提取信息时会进行以下转换 。

在这里插入图片描述

select id, count(name) from student group by id

在这里插入图片描述

聚合模式

在 SparkSQL 中,聚合过程有 4 种模式,分别是 Partial 模式、 ParitialMerge 模式、 Final 模式 和 Complete 模式 。

在这里插入图片描述

上述聚合过程
中在 map 阶段的 sum 函数处于 Partial 模式,在 reduce 阶段的 sum 函数处于 Final 模式。

在这里插入图片描述

Complete 模式和Partial/Final 组合方式不一样,不进行局部聚合计算 。

在这里插入图片描述

ParitialMerge 主要应用在 distinct 语句中,如图 、所示 。聚合语句针对同一张表进行 sum 和 count (distinct)查询,最终的执行过程包含了 4 步聚合操作 。

  • 第 1 步按照( A,C)分组,对 sum 函数进行 Partial 模式聚合计算;
  • 第 2 步是 PartialMerge 模式,对上一步计算之后的聚合缓冲区进行合井,但此时仍然不是最终的结果;
  • 第 3 步分组的列发生变化,再一次进行 Partial 模式的 count 计算;
  • 第 4 步完成 Final 模式的最终计算 。
HashAggregate

常见的聚合查询语句通常采用 HashAggregate 方式,当存在以下几种情况时,会用 SortAggregate 方式来执行 。

  • 查询中存在不支持 Partial 方式的聚合函数:此时会调用 AggUtils 中的 planAggregateWithoutPartial 方法,直接生成 SortAggregateExec 聚合算子节点 。
  • 聚合函数结果不支持 Buffer 方式:如果结果类型不属于(NullType, BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DateType, TimestampType,DecimalType]集合中的任意一种,则需要执行 SortAggregateExec 方式,例如 collect_set和 collect_list 函数。
  • 内存不足:如果在 HashAggregate 执行过程中,内存空间己捕,那么聚合执行会切换到 SortAggregateExec 方式。

注意:
spark 2.2 之后去掉了planAggregateWithoutPartial
参见:
https://issues.apache.org/jira/browse/SPARK-19060
https://github.com/apache/spark/pull/16461

Expand

逻辑计划阶段:
GroupingSets 节点转换为 Aggregate+Expand+Pr付出t3 个节点的组合 。 顾名思义, Expand 表示“扩展”,多维分析在本质上相当于执行多种组合的 group by 操作,因此 Expand 所起的作用就是将一条数据扩展为特定形式的多条数据。

在这里插入图片描述

需要注意的是, Expand 方式执行多维分析虽然能够达到只读一次数据表的效果,但是在某些场景下容易造成中间数据的膨胀。 例如,数据的维度太高, Expand 会产生指数级别的数据量 。 针对这种情况,可以进行相应的优化。

AggregateMode

org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode

sealed trait AggregateMode

/**
 * An [[AggregateFunction]] with [[Partial]] mode is used for partial aggregation.
 * This function updates the given aggregation buffer with the original input of this
 * function. When it has processed all input rows, the aggregation buffer is returned.
 */
case object Partial extends AggregateMode

/**
 * An [[AggregateFunction]] with [[PartialMerge]] mode is used to merge aggregation buffers
 * containing intermediate results for this function.
 * This function updates the given aggregation buffer by merging multiple aggregation buffers.
 * When it has processed all input rows, the aggregation buffer is returned.
 */
case object PartialMerge extends AggregateMode

/**
 * An [[AggregateFunction]] with [[Final]] mode is used to merge aggregation buffers
 * containing intermediate results for this function and then generate final result.
 * This function updates the given aggregation buffer by merging multiple aggregation buffers.
 * When it has processed all input rows, the final result of this function is returned.
 */
case object Final extends AggregateMode

/**
 * An [[AggregateFunction]] with [[Complete]] mode is used to evaluate this function directly
 * from original input rows without any partial aggregation.
 * This function updates the given aggregation buffer with the original input of this
 * function. When it has processed all input rows, the final result of this function is returned.
 */
case object Complete extends AggregateMode
Aggregate 之 inputBufferOffset

org.apache.spark.sql.execution.aggregate.HashAggregateExec

case class HashAggregateExec(
    requiredChildDistributionExpressions: Option[Seq[Expression]],
    isStreaming: Boolean,
    numShufflePartitions: Option[Int],
    groupingExpressions: Seq[NamedExpression],
    aggregateExpressions: Seq[AggregateExpression],
    aggregateAttributes: Seq[Attribute],
    initialInputBufferOffset: Int,
    resultExpressions: Seq[NamedExpression],
    child: SparkPlan)
  extends AggregateCodegenSupport {
        val aggregationIterator =
          new TungstenAggregationIterator(
            partIndex,
            groupingExpressions,
            aggregateExpressions,
            aggregateAttributes,
            initialInputBufferOffset,
            resultExpressions,
            (expressions, inputSchema) =>
              MutableProjection.create(expressions, inputSchema),
            inputAttributes,
            iter,
            testFallbackStartsAt,
            numOutputRows,
            peakMemory,
            spillSize,
            avgHashProbe,
            numTasksFallBacked)

org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator

  extends AggregationIterator(
    partIndex,
    groupingExpressions,
    originalInputAttributes,
    aggregateExpressions,
    aggregateAttributes,
    initialInputBufferOffset,
    resultExpressions,
    newMutableProjection) with Logging {

org.apache.spark.sql.execution.aggregate.AggregationIterator

  protected val aggregateFunctions: Array[AggregateFunction] =
    initializeAggregateFunctions(aggregateExpressions, initialInputBufferOffset)
    for (expression <- expressions) {
      val func = expression.aggregateFunction
      val funcWithBoundReferences: AggregateFunction = expression.mode match {
        case Partial | Complete if func.isInstanceOf[ImperativeAggregate] =>
          // We need to create BoundReferences if the function is not an
          // expression-based aggregate function (it does not support code-gen) and the mode of
          // this function is Partial or Complete because we will call eval of this
          // function's children in the update method of this aggregate function.
          // Those eval calls require BoundReferences to work.
          BindReferences.bindReference(func, inputAttributeSeq)
        case _ =>
          // We only need to set inputBufferOffset for aggregate functions with mode
          // PartialMerge and Final.
          val updatedFunc = func match {
            case function: ImperativeAggregate =>
              function.withNewInputAggBufferOffset(inputBufferOffset)
            case function => function
          }
          inputBufferOffset += func.aggBufferSchema.length
          updatedFunc
      }

可见 inputBufferOffset 对 Partial | Complete 无效

ObjectHashAggregateExec

参考链接:

  • https://dataninjago.com/2022/01/09/spark-sql-query-engine-deep-dive-10-hashaggregateexec-objecthashaggregateexec/
  • https://blog.csdn.net/monkeyboy_tech/article/details/123759074

While the HashAggregateExec, backed by the Tungsten execution engine(基于Tungsten执行引擎), performs well for aggregation operations, it can only support the mutable primitive data type with a fixed size. For the user-defined aggregation functions (UDAFs) and some collect functions (e.g. collect_list and collect_set), they are not supported by the HashAggregateExec. Prior Spark 2.2.0, they have to fall back to the less performant SortAggregateExec. Since Spark 2.2.0, the ObjectHashAggregateExec is released to fill this gap, which enables the performant hash-based aggregations on the data types that are not supported by HashAggregateExec.

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/920851.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C#高级:Winform中的自定义窗体输入

目录 一、多样式输入&#xff08;无封装&#xff09; 1.代码 2.效果 二、单输入框封装 1.使用 2.封装 3.效果 三、组合框批量输入封装 1.使用 2.封装 3.效果 一、多样式输入&#xff08;无封装&#xff09; 1.代码 private async void button1_Click(object send…

使用GDB或Delve对已经运行起来的Go程序进行远程调试

同步发布在我的博客&#xff0c;欢迎来点赞。 使用 GDB 或 Delve 对已经运行起来的 Go 程序进行远程调试 使用 GDB 或 Delve 对已经运行起来的 Go 程序进行远程调试 背景 Java 程序可以很方便地通过 jdwp 参数指定一个对外端口进行远程调试&#xff0c;如 java \ -agentlib…

简单实现QT对象的[json]序列化与反序列化

简单实现QT对象的[json]序列化与反序列化 简介应用场景qt元对象系统思路实现使用方式题外话 简介 众所周知json作为一种轻量级的数据交换格式&#xff0c;在开发中被广泛应用。因此如何方便的将对象数据转为json格式和从json格式中加载数据到对象中就变得尤为重要。 在python类…

Java开发经验——开发常用工具类

摘要 本文介绍了Java开发中常用的工具类&#xff0c;包括Apache Commons Collections的SetUtils、Google Guava的Sets、Apache Commons Lang的ArrayUtils等&#xff0c;以及它们在集合操作、数组操作、字符串处理、JSON处理等方面的应用。文章还涉及了Optional类、Money工具类…

esp32c3开发板通过micropython的mqtt库连MQTT物联网消息服务器

MQTT介绍 MQTT&#xff08;Message Queuing Telemetry Transport&#xff09;是一种轻量级的消息协议&#xff0c;旨在设备之间进行通信&#xff0c;尤其是在网络条件较差的情况下。MQTT v3.1.1 和 MQTT v5 是该协议的两个主要版本。 MQTT v3.1.1&#xff1a; 优点&#xff…

【IDE】使用指南

定期更新实用技能&#xff0c;建议关注收藏点赞。 友情链接&#xff1a; 点击跳转常见代码编辑器的报错解决方案 目录 常用快捷键pycharm右下角边栏脚本头安装IDE的插件git配置TODO 代码编辑器里有许多小技巧&#xff0c;便于办公。本篇主要以pycharm,vscode等主流常用IDE为…

OpenGL入门009——漫反射在片段着色器中的应用

本节将在片段着色器中应用漫反射 文章目录 一些概念漫反射 实战简介dependenciesshadervsshader.fs utilsCube.cpp main.cppCMakeLists.txt最终效果 一些概念 漫反射 概述&#xff1a; 描述的是粗糙表面对光的反射&#xff0c;反射的光线相关各个方向均匀分布&#xff0c;与视…

删库跑路,启动!

起因&#xff1a;这是一个悲伤的故事&#xff0c;在抓logcat时 device待机自动回根目录了&#xff0c;而题主对当前路径的印象还停留在文件夹下&#xff0c;不小心在根目录执行了rm -rf * … 所以&#xff0c;这是个悲伤的故事&#xff0c;东西全没了…device也黑屏了&#xff…

Ubuntu下的Eigen库的安装及基本使用教程

一、Eigen库介绍 简介 Eigen [1]目前最新的版本是3.4&#xff0c;除了C标准库以外&#xff0c;不需要任何其他的依赖包。Eigen使用的CMake建立配置文件和单元测试&#xff0c;并自动安装。如果使用Eigen库&#xff0c;只需包特定模块的的头文件即可。 基本功能 Eigen适用范…

OpenCV与AI深度学习|16个含源码和数据集的计算机视觉实战项目(建议收藏!)

本文来源公众号“OpenCV与AI深度学习”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;分享&#xff5c;16个含源码和数据集的计算机视觉实战项目 本文将分享16个含源码和数据集的计算机视觉实战项目。具体包括&#xff1a; 1. 人…

MySQL win安装 和 pymysql使用示例

目录 一、MySQL安装 下载压缩包&#xff1a; 编写配置文件&#xff1a; 配置环境变量&#xff1a; 初始化服务和账户 关闭mysql开机自启&#xff08;可选&#xff09; 建议找一个数据库可视化软件 二、使用pymysql操作数据库 安装pymysql 示例代码 报错处理 一、My…

springboot基于微信小程序的停车场管理系统

摘 要 停车场管理系统是一种基于移动端的应用程序&#xff0c;旨在方便车主停车的事务办理。该小程序提供了便捷的停车和功能&#xff0c;使车主能够快速完成各项必要的手续和信息填写。旨在提供一种便捷、高效的预约停车方式&#xff0c;减少停车手续的时间和精力成本。通过该…

js:数组转换为字符串

1、使用join 通过join&#xff0c;将数组拼接&#xff0c;使用&#xff0c;进行分割 let array [a, b, c] let str array.join(,); console.log(str) 2、使用toString() const array [a, b, c] const string array.toString() console.log(string) 3、使用扩展运算符和…

npm上传自己封装的插件(vue+vite)

一、npm账号及发包删包等命令 若没有账号&#xff0c;可在npm官网&#xff1a;https://www.npmjs.com/login 进行注册。 在当前项目根目录下打开终端命令窗口&#xff0c;常见命令如下&#xff1a; 1、登录命令&#xff1a;npm login&#xff08;不用每次都重新登录&#xff0…

路由缓存后跳转到新路由时,上一路由中的tip信息框不销毁问题解决

上一路由tip信息框不销毁问题解决 路由缓存篇问题描述及截图解决思路关键代码 路由缓存篇 传送门 问题描述及截图 路由缓存后跳转新路由时&#xff0c;上一个路由的tip信息框没销毁。 解决思路 在全局路由守卫中获取DOM元素&#xff0c;通过css去控制 关键代码 修改文…

uni-app 界面TabBar中间大图标设置的两种方法

一、前言 最近写基于uni-app 写app项目的时候&#xff0c;底部导航栏 中间有一个固定的大图标&#xff0c;并且没有激活状态。这里记录下实现方案。效果如下&#xff08;党组织这个图标&#xff09;&#xff1a; 方法一&#xff1a;midButton的使用 官方文档&#xff1a;ta…

Apple Vision Pro开发003-PolySpatial2.0新建项目

unity6.0下载链接:Unity 实时开发平台 | 3D、2D、VR 和 AR 引擎 一、新建项目 二、导入开发包 com.unity.polyspatial.visionos 输入版本号 2.0.4 com.unity.polyspatial&#xff08;单独导入&#xff09;&#xff0c;或者直接安装 三、对应设置 其他的操作与之前的版本相同…

xiaolin coding 图解网络笔记——基础篇

基础篇 Linux 系统是如何收发网络包的&#xff1f; 网络模型 为了使多种设备能通过网络相互通信&#xff0c;和为了解决不同设备在网络互连中的兼容性问题&#xff0c;国际标准化组织制定了开放式系统互连通信参考模型&#xff08;Open System Interconnection Reference Mo…

【vba源码】导入excel批注信息

Hi&#xff0c;大家好呀&#xff01; 又到了一周一分享的时间&#xff0c;上周繁忙的我都没有给大家直播&#xff0c;视频也没更新&#xff0c;那这周大家放心&#xff0c;都会给大家更新&#xff0c;今天我们来讲点啥呢&#xff1f;每周找优质的内容给大家更新是我最最痛苦的…

跨平台WPF框架Avalonia教程 十三

AutoCompleteBox 自动补全输入框 自动补全输入框提供了一个供用户输入的文本框和一个包含可能匹配项的下拉列表。下拉列表会在用户开始输入时显示&#xff0c;并且每输入一个字符&#xff0c;匹配项都会更新。用户可以从下拉列表中选择匹配项。 文本与可能项匹配的方式是可配…