PySpark(四)PySpark SQL、Catalyst优化器、Spark SQL的执行流程

目录

PySpark SQL

基础

SparkSession对象

DataFrame入门

 DataFrame构建

DataFrame代码风格

 DSL

SQL

SparkSQL Shuffle 分区数目

 DataFrame数据写出

Spark UDF

Catalyst优化器 

Spark SQL的执行流程


PySpark SQL

基础

PySpark SQL与Hive的异同

Hive和Spark 均是:“分布式SQL计算引擎”
均是构建大规模结构化数据计算的绝佳利器,同时SparkSQL拥有更好的性能。
目前,企业中使用Hive仍旧居多,但SparkSQL将会在很近的未来替代Hive成为分布式SQL计算市场的顶级

这里的重点是:Spark SQL能支持SQL和其他代码混合执行,自由度更高,且其是内存计算,更快。但是其没有元数据管理,然而它最终还是会作用到Hive层面,可以调用Hive的Metasotre

SparkSQL的基本对象是DataFrame,其特点及与其他对象的区别为: 

 SparkSQL 其实有3类数据抽象对象

  • SchemaRDD对象 (已废弃)
  • DataSet对象: 可用于Java、Scala语言
  • DataFrame对象:可用于Java、Scala、Python、R

SparkSession对象

 在RDD阶段,程序的执行入口对象是: SparkContext
在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象
SparkSession对象可以:
-用于SparkSQL编程作为入口对象
- 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext

from pyspark.sql import SparkSession
if __name__ == '__main__':
    spark =  SparkSession.builder.appName('lmx').master('local[*]').getOrCreate()
    sc = spark.sparkContext

DataFrame入门

DataFrame的组成如下
在结构层面
StructType对象描述整个DataFrame的表结构

StructField对象描述一个列的信息
在数据层面
Row对象记录一行数据
Column对象记录一列数据并包含列的信息

 DataFrame构建

1、用RDD进行构建

rdd的结构要求为:[[xx,xx],[xx,xx]]

spark.createDataFrame(rdd,schema=[])

    spark =  SparkSession.builder.appName('lmx').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    rdd = sc.textFile('data/input/sql/people.txt').map(lambda x:x.split(',')).map(lambda x:[x[0],int(x[1])])
    print(rdd.collect())
    # [['Michael', 29], ['Andy', 30], ['Justin', 19]]
    df = spark.createDataFrame(rdd,schema=['name','age'])
    df.printSchema()#打印表结构
    df.show()#打印表
#     root
#     | -- name: string(nullable=true)
#     | -- age: long(nullable=true)
# 
# +-------+---+
# | name | age |
# +-------+---+
# | Michael | 29 |
# | Andy | 30 |
# | Justin | 19 |
# +-------+---+

2、利用StructType进行创建

需要先引入StructType,StringType,IntegerType等构建schema

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType
if __name__ == '__main__':
    spark =  SparkSession.builder.appName('lmx').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    rdd = sc.textFile('data/input/sql/people.txt').map(lambda x:x.split(',')).map(lambda x:[x[0],int(x[1])])
#构建schema    
schema =StructType().add("name",StringType(),nullable=False).\
        add('age',IntegerType(),nullable=True)
    df = spark.createDataFrame(rdd,schema=schema)
    df.printSchema()
    df.show()

3、toDF将rdd转换为df

下面展示了两种方式

    # 只设定列名,列的数据结构则是内部自己判断
    df = rdd.toDF(['name','age'])
    df.printSchema()
    # root
    # | -- name: string(nullable=true)
    # | -- age: long(nullable=true)
    # 设定列名和数据类型
    schema =StructType().add("name",StringType(),nullable=False).\
        add('age',IntegerType(),nullable=True)
    df = rdd.toDF(schema=schema)
    df.printSchema()
    # root
    # | -- name: string(nullable=false)
    # | -- age: integer(nullable=true)

4、基于pandas构建 

    dfp = pd.DataFrame({
        "id":[1,2,3],
        'score':[99,98,100]
    })
    df = spark.createDataFrame(dfp)
    df.printSchema()
    df.show()
    # root
    # | -- id: long(nullable=true)
    # | -- score: long(nullable=true)
    # 
    # +---+-----+
    # | id | score |
    # +---+-----+
    # | 1 | 99 |
    # | 2 | 98 |
    # | 3 | 100 |
    # +---+-----+

5、通过文件读取创造

在读取json和parquet文件时不需要设定schema,因为文件已经自带

而读取csv时,还需要使用.option设定 header等参数 

这里说一下parquet文件

parquet:是Spark中常用的一种列式存储文件格式
和Hive中的ORC差不多,他俩都是列存储格式
parquet对比普通的文本文件的区别:

  • parquet 内置schema(列名列类型 是否为空)
  • 存储是以列作为存储格式
  • 存储是序列化存储在文件中的(有压缩属性体积小)

DataFrame代码风格

 DataFrame支持两种风格进行编程,分别是DSL风格SQL风格
DSL语法风格
DSL称之为:领域特定语言
其实就是指DataFrame的特有API
DSL风格意思就是以调用API的方式来处理Data比如: df.where0.limit0
SQL语法风格
SQL风格就是使用SQL语句处理DataFrame的数据比如: spark.sql(“SELECT*FROM xxx)

 DSL

其实就是用其内置的API处理数据,举例:

    df.select('id','subject').show()
    df.where('subject="语文"').show()
    df.select('id','subject').where('subject="语文"').show()
    df.groupBy('subject').count().show()

API其实跟SQL类似,这里不详细说明了,个人感觉不如直接写SQL语句

SQL

DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sgl0来执行SQL语句查询,结果返回一个DataFrame。如果想使用SQL风格的语法,需要将DataFrame注册成表采用如下的方式:

    df.createTempView('tmp') #创建临时视图
    df.createGlobalTempView('global_tmp')#创建全局试图
    # 全局表: 跨SparkSession对象使用在一个程序内的多个SparkSession中均可调用查询前带上前缀:global_tmp
    df.createOrReplaceTempView('repalce_tmp')#创建临时表,如果存在则替换

然后使用spark.sql的形式书写sql代码

    spark.sql('select * from tmp where subject = "语文"').show()
    spark.sql('select id,score from repalce_tmp where score>90').show()
    spark.sql('select subject,max(score) from global_temp.global_tmp group by subject').show()

SparkSQL Shuffle 分区数目

 原因: 在SparkSQL中当Job中产生Shufle时,默认的分区数 spark.sql.shufle,partitions 为200,在实际项目中要合理的设置。
在代码中可以设置:

spark =  SparkSession.builder.appName('lmx').\
master('local[*]').config('spark.sql.shufle,partitions',2).\
getOrCreate()

spark.sqL.shuffle.partitions 参数指的是,在sql计算中,shuffle算子阶段默认的分区数是200

对于集群模式来说,200个默认也算比较合适

如在Local下运行,200个很多,在调度上会带宋限外的损耗,所以在Local下建议修改比较低, 比如2\4\10均可,这个参数和Spark RDD中设置并行度的参数是相互独立的

 DataFrame数据写出

统一API:

下面提供两种方法,分别写出为json和csv

    spark.sql(
        'select user_id,avg(score) avg_score from tmp group by user_id order by avg_score desc'
    ).write.mode('overwrite').format('json').save('data/output/1t')

    spark.sql(
        'select user_id,avg(score) avg_score from tmp group by user_id order by avg_score desc'
    ).write.mode('overwrite').format('csv')\
        .option('header',True)\
        .option('sep',';')\
        .save('data/output/csv')

其他的一些方法: 

SparkSQL中读取数据和写出数据 - 知乎

不过这里似乎不能自己命名导出的数据文件

Spark UDF

无论Hive还是SparKSQL分析处理数据时,往往需要使用函数,SparkSQL模块本身自带很多实现公共功能的函数,在pyspark.sql.functions中SparkSQL与Hive一样支持定义函数:UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛。回顾Hive中自定义函数有三种类型:
第一种:UDF(User-Defined-Function)函数.
一对一的关系,输入一个值经过函数以后输出一个值;
在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;

第二种:UDAF(User-Defined Aggregation Function)聚合函数

多对一的关系,输入多个值输出一个值,通常与groupBy联合使用;

第三种:UDTF(User-DefinedTable-Generating Functions)函数

一对多的关系,输入一个值输出多个值(一行变为多行),用户自定义生成函数,有点像flatMap;

在SparkSQL中,目前仅仅支持UDF函数和UDAF函数,目前Python仅支持UDF 

UDF有两种定义方式

方式1语法
udf对象=sparksession.udfregister(参数1,参数2,参数3)

参数1:UDF名称,可用于SQL风格

参数2:被注册成UDF的方法名
参数3:声明UDF的返回值类型

udf对象:返回值对象,是一个UDF对象,可用于DSL风格
方式2语法

from pyspark.sql import functions as F

udf对象 = F.udf(参数1,参数2)

参数1:被注册成UDF的方法名

参数2:声明UDF的返回值类型

udf对象:返回值对象,是一个UDF对象,可用于DSL风格

举例:

    def double_score(num):
        return 2*num

    udf1 = spark.udf.register('udf_1',double_score,IntegerType())
    # dsl风格
    df.select(udf1(df['score'])).show()
    # sql风格
    df.selectExpr('udf_1(score)').show()
    # sql风格2
    df.createTempView('tmp')
    spark.sql("select udf_1(score) from tmp").show()

    udf2 = F.udf(double_score,IntegerType())
    df.select(udf2(df['score'])).show()

当返回值是数组时,需要定义数组内部数据的数据类型:ArrayType(StringType())

    spark =  SparkSession.builder.appName('lmx').master('local[*]').config('spark.sql.shufle,partitions',2).getOrCreate()
    sc = spark.sparkContext

    rdd=sc.parallelize([['i love you'],['i like you']])
    df = rdd.toDF(['ifo'])
    def func(num):
        return num.split(' ')
    udf = spark.udf.register('udf_sql',func,ArrayType(StringType()))

    # dsl风格
    df.select(udf(df['ifo'])).show()

当返回值是字典时,需要使用StructType(),且定义每个列的名字(需要跟函数返回值的列名一样)和数据类型

    rdd=sc.parallelize([[1],[2],[3],[4],[5]])
    df = rdd.toDF(['ifo'])
    df.show()
    def func(num):
        return {'num':num,'num1':num+10}
    udf = spark.udf.register('udf_sql',func,StructType().\
                             add('num',IntegerType(),nullable=False).\
                             add('num1',IntegerType(),nullable=False))
    df.select(udf(df['ifo'])).show()

Catalyst优化器 

RDD的执行流程为:

代码 ->DAG调度器逻辑任务 ->Task调度器任务分配和管理监控 ->Worker干活

SparkSQL会对写完的代码,执行“自动优化”,既Catalyst优化器,以提升代码运行效率,避免开发者水平影响到代码执行效率。 (RDD代码不会,是因为RDD的数据对象太过复杂,无法被针对性的优化)

加入优化的SparkSQL大致架构为:

1.API 层简单的说就是 Spark 会通过一些 API 接受 SQL 语句

2.收到 SQL 语句以后,将其交给 Catalyst,Catalyst 负责解析 SQL,生成执行计划等

3.Catalyst 的输出应该是 RDD 的执行计划

4.最终交由集群运行 

 Catalyst优化器主要分为四个步骤

1、解析sql,生成AST(抽象语法树)

2、在 AST 中加入元数据信息,做这一步主要是为了一些优化,例如 col=col 这样的条件

以上面的图为例:

  • score.id → id#1#L 为 score.id 生成 id 为1,类型是 Long
  • score.math_score→math_score#2#L为 score.math_score 生成 id 为 2,类型为 Long
  • people.id→id#3#L为 people.id 生成 id 为3,类型为 Long
  • people.age→age#4#L为 people.age 生成 id 为 4,类型为 Long 

3、对已经加入元数据的 AST,输入优化器,进行优化,主要包含两种常见的优化:

谓词下推(Predicate Pushdown)\ 断言下推:将逻辑判断 提前到前面,以减少shuffle阶段的数据量。

以上面的demo举例,可以先进行people.age>10的判断再进行Join等操作。

列值裁剪(Column Pruning):将加载的列进行裁剪,尽量减少被处理数据的宽度

以上面的demo举例,由于只select了score和id,所以开始的时候,可以只保留这两个列,由于parquet是按列存储的,所以很适合这个操作

4、上面的过程生成的 AST 其实最终还没办法直接运行,这个 AST 叫做 逻辑计划,结束后,需要生成 物理计划,从而生成 RDD 来运行

Spark SQL的执行流程

如此,Spark SQL的执行流程为: 

1.提交SparkSQL代码
2.catalyst优化
        a.生成原始AST语法数
        b.标记AST元数据
        c.进行断言下推和列值裁剪 以及其它方面的优化作用在AST上
        d.将最终AST得到,生成执行计划
        e.将执行计划翻译为RDD代码
3.Driver执行环境入口构建(SparkSession)
4.DAG 调度器规划逻辑任务
5.TASK 调度区分配逻辑任务到具体Executor上工作并监控管理任务
6.Worker干活

Spark新特性

自适应查询(SparkSQL)

即:Adaptive Query Execution

由于缺乏或者不准确的数据统计信息(元数据)和对成本的错误估算(执行计划调度)导致生成的初始执行计划不理想

在Spark3.x版本提供Adaptive Query Execution自适应查询技术 通过在”运行时”对查询执行计划进行优化, 允许Planner在运行时执行可选计划,这些可选计划将会基于运行时数据统 计进行动态优化, 从而提高性能,其开启方式为:

set spark.sql.adaptive.enabled = true;

Adaptive Query Execution AQE主要提供了三个自适应优化:

动态合并

即:Dynamically coalescing shuffle partitions 

 可以动态调整shuffle分区的数量。用户可以在开始时设置相对较多的shuffle分区数,AQE会在运行时将相邻的小分区 合并为较大的分区。

动态调整Join策略

即:Dynamically switching join strategies 

此优化可以在一定程度上避免由于缺少统计信息或着错误估计大小(当然也可能两种情况同时存在),而导致执行计 划性能不佳的情况。这种自适应优化可以在运行时sort merge join转换成broadcast hash join,从而进一步提升性能

动态优化倾斜Join 

 skew joins可能导致负载的极端不平衡,并严重降低性能。在AQE从shuffle文件统计信息中检测到任何倾斜后,它可 以将倾斜的分区分割成更小的分区,并将它们与另一侧的相应分区连接起来。这种优化可以并行化倾斜处理,获得更 好的整体性能。

触发条件: 1. 分区大小> spark.sql.adaptive.skewJoin.skewedPartitionFactor (default=10) * "median partition size(中位数分区大小)"

2. 分区大小> spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (default = 256MB )
 

动态分区裁剪(SparkSQL)

即:Dynamic Partition Pruning

当优化器在编译时无法识别可跳过的分区时,可以使用"动态分区裁剪",即基于运行时推断的信息来进一步进行分区 裁剪。这在星型模型中很常见,星型模型是由一个或多个并且引用了任意数量的维度表的事实表组成。在这种连接操 作中,我们可以通过识别维度表过滤之后的分区来裁剪从事实表中读取的分区。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/373640.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot-基础篇03

之前搭建了整个开发环境实现了登录注册,springBoot整合mybatis完成增删改查,今天完成分页查询,使用阿里云oss存储照片等资源,后期会尝试自己搭建分布式文件系统来实现。 一,SpringBootMybatis完成分页查询 1&#xff…

计算机项目SpringBoot项目 办公小程序开发

从零构建后端项目、利用UNI-APP创建移动端项目 实现注册与登陆、人脸考勤签到、实现系统通知模块 实现会议管理功能、完成在线视频会议功能、 发布Emos在线办公系统 项目分享: SpringBoot项目 办公小程序开发https://pan.baidu.com/s/1sYPLOAMtaopJCFHAWDa2xQ?…

幻兽帕鲁mac可以玩吗?

《幻兽帕鲁》(英文:Palworld)是一款近期在 Steam 爆红的动作冒险生存游戏,游戏设置在一个居住着「帕鲁」的开放世界中,玩家可以战斗并捕捉帕鲁,也能用它们来建造基地、骑乘和战斗。 不过目前《幻兽帕鲁》仅…

基于CEVA DSP BX2的架构分析(五)- 标量处理单元(二)

目录 5.3.5 结果饱和度 5.3.4 乘法饱和度 5.3.5 乘法后移位 5.3.6 标量浮点支持 5.3.7 复数支持 5.3.7.1 ​​​​​​​​​​​​​​16位复杂算法支持 ​​​​​​​5.3.7.2 32位复杂算法支持 5.4 SPU算术标志 ​​​​​​​5.4.1 进位标志 ​​​​​​​5.4.2 溢出标志 …

docker部署docker管理工具easydockerweb

重要提示 功能比较少,建议体验一下即可 安装 docker run -it -d -p 10041:3000 -e EDW_USERNAMEadmin -e EDW_PASSWORDadmin -v /var/run/docker.sock:/var/run/docker.sock qfdk/easydockerweb 使用 概览 镜像管理 容器管理

Python命令行工具库之argcomplete使用详解

概要 命令行工具是开发者和系统管理员的得力助手,但随着命令行选项的增多,用户可能会感到困惑。Python 中的 argcomplete 库可以帮助轻松地为命令行工具添加自动补全功能,提高用户体验。本文将介绍如何使用 Python argcomplete 库实现命令行…

[嵌入式AI从0开始到入土]13_orangepi aipro开箱测评

[嵌入式AI从0开始到入土]嵌入式AI系列教程 注:等我摸完鱼再把链接补上 可以关注我的B站号工具人呵呵的个人空间,后期会考虑出视频教程,务必催更,以防我变身鸽王。 第1期 昇腾Altas 200 DK上手 第2期 下载昇腾案例并运行 第3期 官…

好看的安全跳转单页html源码

好看的安全跳转单页html源码,效果如下 代码如下&#xff1a; <!DOCTYPE html> <html> <head> <meta charset"UTF-8"> <!--[if IE 8]><style>.ie8 .alert-circle,.ie8 .alert-footer{display:none}.ie8 .alert-box{padding-top:…

基于Vue2用keydown、keyup事件实现长按键盘任意键(或组合键)3秒触发自定义事件(以F1键为例)

核心代码 <template></template> <script> export default {created() {//监听长按快捷键addEventListener("keydown", this.keydown);addEventListener("keyup", this.keyup);},destroyed(d) {//移除长按快捷键removeEventListener(&…

一文get国自然热点“组蛋白乳酸化”的研究方向和思路

作为近些年的国自然热点&#xff0c;“组蛋白修饰”不仅是细胞记忆的守护者&#xff0c;也是生命过程调控的重要的参与者。组蛋白是构成染色质的基本蛋白质单位&#xff0c;它们能够通过各种化学修饰如乙酰化、甲基化、磷酸化和乳酸化等&#xff0c;精确调控基因的表达&#xf…

【翻译】Processing安卓模式的安装使用及打包发布(内含中文版截图)

原文链接在下面的每一章的最前面。 原文有三篇&#xff0c;译者不知道贴哪篇了&#xff0c;这篇干脆标了原创。。 译者声明&#xff1a;本文原文来自于GNU协议支持下的项目&#xff0c;具备开源二改授权&#xff0c;可翻译后公开。 文章目录 Install&#xff08;安装&#xff0…

通过docker-compose部署NGINX服务,并使该服务开机自启

要在通过docker-compose部署的NGINX服务实现开机自启&#xff0c;你需要确保Docker守护进程在系统启动时自动运行&#xff0c;并配置docker-compose.yml文件以在容器中运行NGINX服务。以下是步骤&#xff1a; 确保Docker守护进程开机启动&#xff1a; 在Ubuntu/Debian上&#x…

DuiLib示例代码研究1

DuiLib有一个示例,名为360Safe,跑起来如下,纯界面的; 下面大体看一下代码; 首先它是从CWindowWnd和INotifyUI继承了一个类C360SafeFrameWnd出来,CWindowWnd和INotifyUI这两是DuiLib的类; 初始化的时候看上去是创建了窗口右上角的四个按钮; 如果注释了这四句代码,…

在 MacOS 上虚拟化 x86Linux 的最佳方法(通过 Rosetta)

categories: [VM] tags: MacOS VM 写在前面 买了 ARM 的 mac, 就注定了要折腾一下虚拟机了… 之前写过一篇文章是通过 utm 虚拟化archlinux, 其实本质上还是调用了 qemu-system-x86_64, 所以速度并不快, 后来想着能不能借用 Rosetta 的优势即原生转译, 来虚拟化 Intel 的 Linu…

“极简壁纸“爬虫JS逆向·实战

文章目录 声明目标分析确定目标目标检索 代码补全完整代码 爬虫逻辑完整代码 运行结果 声明 本教程只用于交流学习&#xff0c;不可用于商业用途&#xff0c;不可对目标网站进行破坏性请求&#xff0c;请遵守相关法律法规。 目标分析 确定目标 获取图片下载链接 目标检索…

Vite+Vue3项目配置启动项目后自动打开浏览器

有时候&#xff0c;我们在启动前端项目时&#xff0c;输入了启动命令&#xff0c;但却需要我们手动点开控制台输出的URL链接 如果每次都要点这个连接&#xff0c;会十分繁琐 为了提高开发效率&#xff0c;减少不必要的操作&#xff0c;我们就来解决这个问题。 1.在文件资源管…

Linux进程信号(3)--信号的处理

目录 前置知识 捕捉信号 内核如何实现信号的捕捉 sigaction 信号的其他补充问题 可重入函数 volatile关键字 SIGCHILD信号 前置知识 什么是用户态&#xff0c;内核态呢&#xff1f; 这里我们再来看看进程的地址空间&#xff1a; 我们知道每一个进程都会有自己的地址空…

大数据Zookeeper--案例

文章目录 服务器动态上下线监听案例需求需求分析具体实现测试 Zookeeper分布式锁案例原生Zookeeper实现分布式锁Curator框架实现分布式锁 Zookeeper面试重点选举机制生产集群安装多少zk合适zk常用命令 服务器动态上下线监听案例 需求 某分布式系统中&#xff0c;主节点可以有…

unity 增加系统时间显示、FPS帧率、ms延迟

代码 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;using UnityEngine;public class Frame : MonoBehaviour {// 记录帧数private int _frame;// 上一次计算帧率的时间private float _lastTime;// 平…

Docker 一小时从入门到实战 —— Docker commands | Create your own image | vs VM ... 基本概念扫盲

Docker crash course 文章目录 Docker crash course1. What and Why of Docker?2.1 What2.2 What problem does it solve?2.2.1 before containers2.1.2 with containers 2. Docker vs Virtual Machines2.1 Difference2.2 Benefits 3. Install docker locally4. Images vs Co…