SparkSQL优化

SparkSQL优化

优化说明

缓存数据到内存

Spark SQL可以通过调用spark.sqlContext.cacheTable("tableName") 或者dataFrame.cache(),将表用一种柱状格式( an in­memory columnar format)缓存至内存中。然后Spark SQL在执行查询任务时,只需扫描必需的列,从而以减少扫描数据量、提高性能。通过缓存数据,Spark SQL还可以自动调节压缩,从而达到最小化内存使用率和降低GC压力的目的。调用sqlContext.uncacheTable("tableName")可将缓存的数据移出内存。

通过sc.broadcast(spark.table("表名")),将表广播出去,进行表与表之间的join相关操作。

可通过两种配置方式开启缓存数据功能:

1)使用spark.sqlContext的setConf方法。

2)执行SQL命令 SET key=value。

表-2 优化方式

Property Name

Default

Meaning

spark.sql.inMemoryColumnarStorage.compressed

true

如果假如设置为true,SparkSql会根据统计信息自动的为每个列选择压缩方式进行压缩

spark.sql.inMemoryColumnarStorage.batchSize

10000

控制列缓存的批量大小。批次大有助于改善内存使用和压缩,但是缓存数据会有OOM的风险

参数调优

可以通过配置下表中的参数调节Spark SQL的性能。

表-3 参数调优

Property Name

Default

Meaning

spark.sql.files.maxPartitionBytes

134217728 (128 MB)

获取数据到分区中的最大字节数。

spark.sql.files.openCostInBytes

4194304 (4 MB)

该参数默认4M,表示小于4M的小文件会合并到一个分区中,用于减小小文件,防止太多单个小文件占一个分区情况。

spark.sql.broadcastTimeout

300

广播等待超时时间,单位秒。

spark.sql.autoBroadcastJoinThreshold

10485760 (10 MB)

最大广播表的大小。设置为-1可以禁止该功能。当前统计信息仅支持Hive Metastore表。

spark.sql.shuffle.partitions

200

设置shuffle分区数,默认200。

SQL炸裂函数

Explode:SparkSql中的列转行函数:专门针对array或map操作。

//使用explode方法必须导入下面的包:
import org.apache.spark.sql.functions._

object explode_Demo{
    def main(args: Array[String]): Unit = {
        //创建程序入口
        val spark: SparkSession = SparkSession.builder()
            .appName("createDF")
            .master("local[*]")
            .getOrCreate()
        //调用sparkContext
        val sc: SparkContext = spark.sparkContext
        //设置控制台日志输出级别
        sc.setLogLevel("WARN")
        //导包
        import spark.implicits._
        //加载数据
        val positionDF = spark.read.json("E:\\资料\\position.json")
        //查看表结构
        positionDF.printSchema()
        //DSL方法处理
        val listData: DataFrame = positionDF.select(explode($"data.list")).toDF("position")
        //查看表结构
        listData.printSchema()
        //查看表数据
        listData.show(false)
        //查看workName并统计个数
        listData.select($"position.workName" as "positions")

   .groupBy($"positions")
            .count()
            .orderBy($"count".desc)
            .show()

  }
}



//SQL风格操作
        /*positionDF.createOrReplaceTempView("t_position")
        val sql =
            """
              |select position.workName as workNames,count(*) as counts
              |from(
              |select explode(data.list) as position
              |from t_position)
              |group by workNames
              |order by counts desc
            """.stripMargin
        spark.sql(sql).show()*/

SparkSQL运行架构

Spark SQL对SQL语句的处理和关系型数据库类似,即词法/语法解析、绑定、优化、执行。Spark SQL会先将SQL语句解析成一棵树,然后使用规则(Rule)对Tree进行绑定、优化等处理过程。Spark SQL由Core、Catalyst、Hive、Hive-ThriftServer四部分构成:

1)Core: 负责处理数据的输入和输出,如获取数据,查询结果输出成DataFrame等。

2)Catalyst: 负责处理整个查询过程,包括解析、绑定、优化等。

3)Hive: 负责对Hive数据进行处理。

4)Hive-ThriftServer: 主要用于对hive的访问。

DataFrame性能上比RDD要高,主要有两方面原因:

1)定制化内存管理:Rdd数据都放在堆内存,JAVA(JVM)内存,内存管理回收分配不是由spark管理,是由JAVA(GC)管理,有时候会出现资源不一致问题,spark不是直接的内存管理者。

2)DataFrame数据以二进制的方式存在于非堆内存,节省了大量空间之外,还摆脱了GC的限制。涉及到序列化和反序列化,如图-13。

图-13 GC占比关系图

优化的执行计划

查询计划通过Spark catalyst optimiser进行优化,例子如图-14。

图-14 案例图

SparkSQL针对案例优化如图-15所示:

图-15 优化流程

为了说明查询优化,我们来看图-15展示的人口数据分析的示例。图中构造了两个DataFrame,将它们join之后又做了一次filter操作。如果原封不动地执行这个执行计划,最终的执行效率是不高的。因为join是一个代价较大的操作,也可能会产生一个较大的数据集。如果我们能将filter下推到 join下方,先对DataFrame进行过滤,再join过滤后的较小的结果集,便可以有效缩短执行时间。而Spark SQL的查询优化器正是这样做的。简而言之,逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程。

得到的优化执行计划在转换成物理执行计划的过程中,还可以根据具体的数据源的特性将过滤条件下推至数据源内。最右侧的物理执行计划中Filter之所以消失不见,就是因为溶入了用于执行最终的读取操作的表扫描节点内。

对于普通开发者而言,查询优化器的意义在于,即便是经验并不丰富的程序员写出的次优的查询,也可以被尽量转换为高效的形式予以执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/600584.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

有什么方便实用的成人口语外教软件?6个软件教你快速进行口语练习

有什么方便实用的成人口语外教软件?6个软件教你快速进行口语练习 口语能力在语言学习中占据着重要的位置,因为它直接关系到我们与他人进行交流和沟通的效果。为了提高口语能力,很多成人选择通过外教软件进行口语练习,这些软件提供…

【强训笔记】day13

NO.1 代码实现&#xff1a; #include <iostream>#include<string>using namespace std;int n,k,t; string s;int func() {int ret0;for(int i0;i<n;i){char chs[i];if(chL) ret-1;else{if(i-1>0&&i-2>0&&s[i-1]W&&s[i-2]W) retk…

基于Springboot 的 Excel表格的导入导出

首先 &#xff0c;引入相关依赖EasyPOI <dependency><groupId>cn.afterturn</groupId><artifactId>easypoi-spring-boot-starter</artifactId><version>4.4.0</version></dependency> 编写实体类&#xff1a; Data AllArgs…

Java 框架安全:Struts2 漏洞序列测试.

什么是 Struts2 框架 Struts 2 是一个用于创建企业级 Java 应用程序的开源框架。它是一个 MVC&#xff08;模型-视图-控制器&#xff09;框架&#xff0c;用于开发基于 Java EE&#xff08;Java Platform, Enterprise Edition&#xff09;的 Web 应用程序。Struts 2 主要解决…

ROS机器人实用技术与常见问题解决

问题速查手册&#xff08;时实更新&#xff09;更加全面丰富的问题手册记录 1.机器人使用GPARTED挂载未分配空间 需要在图型界面下操作&#xff0c;建议使用no machine连接 安装gparted磁盘分区工具, sudo apt-get install gparted -y 启动软件 sudo gparted 点击磁盘/内存…

C# OpenCvSharp 图片找茬

C# OpenCvSharp 图片找茬 目录 效果 项目 代码 下载 效果 项目 代码 using OpenCvSharp; using System; using System.Diagnostics; using System.Drawing; using System.Windows.Forms; namespace OpenCvSharp_Demo { public partial class Form1 : Form { …

1688快速获取整店铺列表 采集接口php Python

在电子商务的浪潮中&#xff0c;1688平台作为中国领先的批发交易平台&#xff0c;为广大商家提供了一个展示和销售商品的广阔舞台&#xff1b;然而&#xff0c;要在众多店铺中脱颖而出&#xff0c;快速获取商品列表并进行有效营销是关键。 竞争对手分析 价格比较&#xff1a;…

mysql5.7数据库安装及性能测试

mysql5.7数据库安装及性能测试 记录Centos7.9下安装mysql 5.7并利用benchmark工具简单测试mysql的性能。 测试机&#xff1a;centos7.9 配置&#xff1a;4C8G40G 1. 下安装mysql5.7 安装mysql5.7&#xff1a; # 通过官方镜像源安装$ wget http://dev.mysql.com/get/mysql57-com…

pandas索引

pandas索引 一、索引1.1 建立索引1.2 重置索引1.3 索引类型1.4 索引的属性1.5 索引的操作 一、索引 1.1 建立索引 建立索引可以在数据读取加载中指定索引&#xff1a; import pandas as pd df pd.read_excel(team.xlsx, index_colname) # 将name列设置为索引 df.head()效…

C语言 函数的定义与调用

上文 C语言 函数概述 我们对函数进行了概述 本文 我们来说函数的定义和调用 C语言规定 使用函数之前&#xff0c;首先要对函数进行定义。 根据模块化程序设计思想&#xff0c;C语言的函数定义是互相平行、独立的&#xff0c;即函数定义不能嵌套 C语言函数定义 分为三种 有参函…

Kansformer?变形金刚来自过去的新敌人

​1.前言 多层感知器(MLPs),也被称为全连接前馈神经网络,是当今深度学习模型的基础组成部分。 MLPs在机器学习中扮演着至关重要的角色,因为它们是用于近似非线性函数的默认模型,这得益于通用近似定理所保证的表达能力。然而,MLPs真的是我们能构建的最佳非线性回归器吗?尽管ML…

鸿蒙OpenHarmony南向:【Hi3516标准系统入门(命令行方式)】

Hi3516标准系统入门&#xff08;命令行方式&#xff09; 注意&#xff1a; 从3.2版本起&#xff0c;标准系统不再针对Hi3516DV300进行适配验证&#xff0c;建议您使用RK3568进行标准系统的设备开发。 如您仍然需要使用Hi3516DV300进行标准系统相关开发操作&#xff0c;则可能会…

人工智能编程的创新探索 卧龙与凤雏的畅想

在一间宽敞明亮的办公室内&#xff0c;阳光透过窗户洒在地上&#xff0c;形成一片片光斑。卧龙和凤雏正坐在舒适的办公椅上休息&#xff0c;享受着这片刻的宁静。 卧龙微微皱眉&#xff0c;一只手托着下巴&#xff0c;略显苦恼地说道&#xff1a;“现在的人工智能&#xff0c;也…

vue2人力资源项目5组织架构的增删改查

编辑表单回显 父组件&#xff1a;这里用到了父亲调子组件的方法和同步异步先后方法的处理 //methods里else if (type edit) {this.showDialog true// 显示弹层this.currentNodeId id// 记录id&#xff0c;要用它获取数据// 在子组件中获取数据// 父组件调用子组件的方法来获…

【力扣】203、环形链表 II

142. 环形链表 II 要解决这道题&#xff0c;首先需要对问题进行拆解&#xff1a; 确定链表是否存在环确定环的入口点 如何判断是否存在环呢&#xff1f;这个比较容易想到&#xff0c;使用快慢指针即可判断链表是否存在环。我们定义两个指针&#xff1a; ListNode slow head…

Redis学习4——Redis应用之限流

引言 Redis作为一个内存数据库其读写速度非常快&#xff0c;并且支持原子操作&#xff0c;这使得它非常适合处理频繁的请求&#xff0c;一般情况下&#xff0c;我们会使用Redis作为缓存数据库&#xff0c;但处理做缓存数据库之外&#xff0c;Redis的应用还十分广泛&#xff0c…

远程服务器 docker XRDP 桌面访问 记录

需求描述: 我现在在远程连接 一台服务器&#xff0c;由于需要实验环境需要GUI 和 桌面系统&#xff0c;但是又想在 docker 中运行。因此&#xff0c;我现在首先需要通过 ssh 连接服务器&#xff0c;然后再服务器中连接 docker. REF: https://github.com/danielguerra69/ubuntu-…

代码随想录day19day20打卡

二叉树 1 二叉树的最大深度和最小深度 最大深度已经学习过了&#xff0c;实质就是递归的去判断左右子节点的深度&#xff0c;然后对其进行返回。 附加两个学习的部分&#xff1a; &#xff08;1&#xff09;使用前序遍历的方法求解 int result; void getdepth(TreeNode* nod…

NXP i.MX8系列平台开发讲解 - 3.11 Linux PCIe设备调试(WIFI模块)

专栏文章目录传送门&#xff1a;返回专栏目录 文章目录 目录 1. WIFI 模块简单介绍 2. 设备驱动原理介绍 3. PCIE WIFI驱动实例分析 3.1 查看设备树 3.2 wifi 设备驱动代码分析 3.3 内核配置选项 4. WIFI驱动调试相关 根据前面对PCIe的讲解&#xff0c;对PCIe的整体都有…

Ansible --- playbook 脚本+inventory 主机清单

一 inventory 主机清单 Inventory支持对主机进行分组&#xff0c;每个组内可以定义多个主机&#xff0c;每个主机都可以定义在任何一个或 多个主机组内。 如果是名称类似的主机&#xff0c;可以使用列表的方式标识各个主机。vim /etc/ansible/hosts[webservers]192.168.10.1…