Spark SQL概述与基本操作

目录

一、Spark SQL概述

        (1)概念

        (2)特点

        (3)Spark SQL与Hive异同

        (4)Spark的数据抽象

二、Spark Session对象执行环境构建

          (1)Spark Session对象

        (2)代码演示

三、DataFrame创建

        (1)DataFrame组成

        (2)DataFrame创建方式(转换)

        (3)DataFrame创建方式(标准API读取)

四、DataFrame编程

        (1)DSL语法风格

        (2)SQL语法风格

五、Spark SQL——wordcount代码示例

        (1)pyspark.sql.functions包

        (2)代码示例


一、Spark SQL概述

        (1)概念

        Spark SQL是Apache Spark的一个模块,它用于处理结构化和半结构化的数据。Spark SQL允许用户使用SQL查询和操作数据,这种操作可以直接在Spark的DataFrame/Dataset API中进行。此外,Spark SQL还支持多种语言,包括Scala、Java、Python和R。

        (2)特点

        ①融合性:SQL可以无缝集成在代码中,随时用SQL处理数据。

        ②统一数据访问:一套标准API可读写不同的数据源。

        ③Hive兼容:可以使用Spark SQL直接计算生成Hive数据表。

        ④标准化连接:支持标准化JDBC \ ODBC连接,方便和各种数据库进行数据交互。

        (3)Spark SQL与Hive异同

        共同点:Hive和Spark均是:“分布式SQL计算引擎”,均是构建大规模结构化数据计算的绝佳利器,同时SparkSQL拥有更好的性能。

        (4)Spark的数据抽象

        Spark SQL的数据抽象:

        Data Frame与RDD:

二、Spark Session对象执行环境构建

          (1)Spark Session对象

        在RDD阶段,程序的执行入口对象是:SparkContext。在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。

        Spark Session对象作用:

        ①用于SparkSQL编程作为入口对象。

        ②用于SparkCore编程,可以通过Spark Session对象中获取到Spark Context。

        (2)代码演示
# cording:utf8

# Spark Session对象的导包,对象是来自于pyspark.sql包中
from pyspark.sql import SparkSession
if __name__ == '__main__':
    # 构建Spark Session执行环境入口对象
    spark = SparkSession.builder.\
            appName('test').\
            master('local[*]').\
            getOrCreate()
    # 通过Spark Session对象 获取SparkContext对象
    sc = spark.sparkContext

    # SparkSQL测试
    df = spark.read.csv('../input/stu_score.txt', sep=',', header=False)
    df2 = df.toDF('id', 'name', 'score')
    # 打印表结构
    # df2.printSchema()
    # 打印数据内容
    # df2.show()

    df2.createTempView('score')
    # SQL风格
    spark.sql("""SELECT * FROM score WHERE name='语文' LIMIT 5
    """).show()

    # DSL 风格
    df2.where("name='语文'").limit(5).show()

三、DataFrame创建

        (1)DataFrame组成

        DataFrame是一个二维表结构,表格结构的组成:

                ①行

                ②列

                ③表结构描述

        比如,在MySQL中的一个表:

                ①有许多列组成

                ②数据也被分为多个列

                ③表也有表结构信息(列、列名、列类型、列约束等)

        基于这个前提下,DataFrame的组成如下:

                在结构层面:

                        ①StructType对象描述整个DataFrame的表结构

                        ②StructField对象描述一个列的信息

                在数据层面:

                        ①Row对象记录一行数据

                        ②Column对象记录一列数据并包含列的信息

        (2)DataFrame创建方式(转换)

        ①基于RDD方式

# cording:utf8

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 构建执行环境对象Spark Session
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()

    # 构建SparkContext

    sc = spark.sparkContext

    # 基于RDD转换为DataFrame
    rdd = sc.textFile('../input/people.txt').\
        map(lambda x: x.split(',')).\
        map(lambda x: (x[0], int(x[1])))

    # 构建DataFrame对象
    # 参数1,被转换的RDD
    # 参数2,指定列名,通过list的形式指定,按照顺序依次提供字符串名称即可
    df = spark.createDataFrame(rdd,schema=['name', 'age'])

    # 打印Data Frame的表结构
    df.printSchema()

    # 打印df中的数据
    # 参数1,表示 展示出多少条数据,默认不传的话是20
    # 参数2,表示是否对列进行截断,如果列的数据长度超过20个字符串长度,厚旬欸日不显示,以....代替
    # 如果给False 表示不截断全部显示,默认是True
    df.show(20,False)

    # 将DF对象转换成临时视图表,可供sql语句查询
    df.createOrReplaceTempView('people')
    spark.sql('SELECT * FROM people WHERE age < 30').show()

        ②通过StructType对象来定义DataFrame的 ‘ 表结构 ’ 转换RDD

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
if __name__ == '__main__':
    # 构建执行环境对象Spark Session
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()

    # 构建SparkContext

    sc = spark.sparkContext

    # 基于RDD转换为DataFrame
    rdd = sc.textFile('../input/people.txt').\
        map(lambda x: x.split(',')).\
        map(lambda x: (x[0], int(x[1])))

    # 构建表结构的描述对象:StructType 对象
    # 参数1,列名
    # 参数2,列数据类型
    # 参数3,是否允许为空
    schema = StructType().add('name', StringType(), nullable=True).\
        add('age', IntegerType(), nullable=False)

    # 构建DataFrame对象
    # 参数1,被转换的RDD
    # 参数2,指定列名,通过list的形式指定,按照顺序依次提供字符串名称即可
    df = spark.createDataFrame(rdd, schema=schema)

    df.printSchema()
    df.show()

        ③通过RDD的toDF方法创建RDD

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
if __name__ == '__main__':
    # 构建执行环境对象Spark Session
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()

    # 构建SparkContext

    sc = spark.sparkContext

    # 基于RDD转换为DataFrame
    rdd = sc.textFile('../input/people.txt').\
        map(lambda x: x.split(',')).\
        map(lambda x: (x[0], int(x[1])))

    # toDF构建DataFrame
    # 第一种构建方式,只能设置列名,列类型靠RDD推断,默认允许为空
    df1 = rdd.toDF(['name', 'name'])
    df1.printSchema()
    df1.show()
    # toDF方式2:通过StructType来构造
    # 设置全面,能设置列名、列数据类型、是否为空
    # 构建表结构的描述对象:StructType 对象
    # 参数1,列名
    # 参数2,列数据类型
    # 参数3,是否允许为空
    schema = StructType().add('name', StringType(), nullable=True).\
        add('age', IntegerType(), nullable=False)

    df2 = rdd.toDF(schema=schema)
    df2.printSchema()
    df2.show()




        ④基于Pandas的DataFrame创建DataFrame

# cording:utf8

from pyspark.sql import SparkSession
import pandas as pd

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    # 基于pandas的DataFrame构建SparkSQL的DataFrame对象
    pdf = pd.DataFrame(
        {
            'id': [1, 2, 3],
            'name': ['张大仙', '王晓晓', '吕不韦'],
            'age': [1, 2, 3]
        }
    )

    df = spark.createDataFrame(pdf)

    df.printSchema()
    df.show()

        (3)DataFrame创建方式(标准API读取)

        统一API示例代码:

        ①读取本地text文件

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    # 构建StructType,text数据源,
    # text读取数据的特点是:将一整行只作为一个列读取,默认列名是value 类型是String
    schema = StructType().add('data', StringType(),nullable=True)
    df = spark.read.format('text').\
        schema(schema=schema).\
        load('../input/people.txt')

    df.printSchema()
    df.show()

        ②读取json文件

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    # json文件类型自带Schema信息
    df = spark.read.format('json').load('../input/people.json')
    df.printSchema()
    df.show()

        ③读取csv文件

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    # 读取csv文件
    df = spark.read.format('csv').\
        option('sep', ';').\
        option('header', True).\
        option('encoding', 'utf-8').\
        schema('name STRING, age INT, job STRING').\
        load('../input/people.csv')

    df.printSchema()
    df.show()

        ④读取parquet文件

        parquet文件:是Spark中常用的一种列式存储文件格式,和Hive中的ORC差不多,他们都是列存储格式。

        parquet对比普通的文本文件的区别:

                ①parquet内置schema(列名、列类型、是否为空)

                ②存储是以列作为存储格式

                ③存储是序列化存储在文件中的(有压缩属性体积小)

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    # 读取parquet文件
    df = spark.read.format('parquet').load('../input/users.parquet')

    df.printSchema()
    df.show()

四、DataFrame编程

        (1)DSL语法风格
# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    df = spark.read.format('csv').\
        schema('id INT, subject STRING, score INT').\
        load('../input/stu_score.txt')

    # Column对象的获取
    id_column = df['id']
    subject_column = df['subject']

    # DLS风格
    df.select(['id', 'subject']).show()
    df.select('id', 'subject').show()
    df.select(id_column, subject_column).show()

    # filter API
    df.filter('score < 99').show()
    df.filter(df['score'] < 99).show()

    # where API
    df.where('score < 99').show()
    df.where(df['score'] < 99).show()

    # group By API
    # df.groupBy API的返回值为 GroupedData类型1
    # GroupedData对象不是DataFrame
    # 它是一个 有分组关系的数据结构,有一些API供我们对分组做聚合
    # SQL:group by 后接上聚合: sum avg count min max
    # GroupedData 类似于SQL分组后的数据结构,同样由上述5中聚合方法
    # GroupedData 调用聚合方法后,返回值依旧是DayaFrame
    # GroupedData 只是一个中转的对象,最终还是会获得DataFrame的结果
    df.groupBy('subject').count().show()
    df.groupBy(df['subject']).count().show()
        (2)SQL语法风格

        DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sql()来执行SQL语句查询,结果返回一个DataFrame。
        如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:        

df.createTempView( "score")            #注册一个临时视图(表)
df.create0rReplaceTempView("score")    #注册一个临时表,如果存在进行替换。
df.createGlobalTempView( "score")      #注册一个全局表

        全局表:跨SparkSession对象使用,在一个程序内的多个SparkSession中均可调用,查询前带上前缀:
        global_temp.
        临时表:只在当前SparkSession中可用

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    df = spark.read.format('csv').\
        schema('id INT, subject STRING, score INT').\
        load('../input/stu_score.txt')

    # 注册成临时表
    df.createTempView('score')              # 注册临时视图(表)
    df.createOrReplaceTempView('score_2')   # 注册或者替换为临时视图
    df.createGlobalTempView('score_3')      # 注册全局临时视图 全局临时视图使用的时候 需要在前面带上global_temp. 前缀

    # 可以通过SparkSession对象的sql api来完成sql语句的执行
    spark.sql("SELECT subject, COUNT(*) AS cnt FROM score GROUP BY subject").show()
    spark.sql("SELECT subject, COUNT(*) AS cnt FROM score_2 GROUP BY subject").show()
    spark.sql("SELECT subject, COUNT(*) AS cnt FROM global_temp.score_3 GROUP BY subject").show()

五、Spark SQL——wordcount代码示例

        (1)pyspark.sql.functions包

        这个包里面提供了一系列的计算函数供SparkSQL使用

        导包:from pyspark.sql import functions as F

        这些函数返回值多数都是Column对象。

        (2)代码示例
# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

if __name__ == '__main__':
    spark = SparkSession.builder.appName('wordcount').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    # TODO 1:SQL风格进行处理
    rdd = sc.textFile('../input/words.txt').\
        flatMap(lambda x: x.split(' ')).\
        map(lambda x: [x])

    df = rdd.toDF(['word'])

    # 注册DF为表格
    df.createTempView('words')

    spark.sql('SELECT word,COUNT(*) AS cnt FROM words GROUP BY word ORDER BY cnt DESC').show()

    # TODO 2:DSL 风格处理
    df = spark.read.format('text').load('../input/words.txt')

    # withColumn 方法
    # 方法功能:对已存在的列进行操作,返回一个新的列,如果名字和老列相同,那么替换,否则作为新列存在
    df2 = df.withColumn('value', F.explode(F.split(df['value'], ' ')))
    df2.groupBy('value').\
        count().\
        withColumnRenamed('value', 'word').\
        withColumnRenamed('count', 'cnt').\
        orderBy('cnt', ascending=False).show()

    # withColumnRenamed() 对列名进行重命名
    # orderBy() 排序

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/104790.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【深度学习实验】循环神经网络(五):基于GRU的语言模型训练(包括自定义门控循环单元GRU)

文章目录 一、实验介绍二、实验环境1. 配置虚拟环境2. 库版本介绍 三、实验内容&#xff08;一&#xff09;自定义门控循环单元&#xff08;GRU&#xff0c;Gated Recurrent Unit&#xff09;1. get_params2. init_gru_state3. gru &#xff08;二&#xff09;创建模型0. 超参数…

3. 博弈树 (15分)

下棋属于一种博弈游戏&#xff0c;博弈过程可以用树&#xff08;博弈树&#xff09;来表示。假设游戏由两个人&#xff08; A 和 B &#xff09;玩&#xff0c;开始由某个人从根结点开始走&#xff0c;两个人轮流走棋&#xff0c;每次只能走一步&#xff0c; 下一步棋只能选择当…

OTA语音芯片NV040C在智能电动牙刷的应用

以往我们对牙齿的清洁是使用的是手动方式进行&#xff0c;用柔软的牙刷刷毛去进行牙齿的清洁。但现在我们拥有了一种新颖的刷牙方式&#xff0c;靠电力去驱动、清洁我们的牙齿。电动牙刷的刷头通过快速旋转&#xff0c;产生高频振动&#xff0c;将牙膏迅速分解为细小的泡沫&…

支付宝支付接入流程

一、 接入准备 支付宝支付流程没有微信那么复杂&#xff0c;而且支付宝支持沙箱。登录支付宝开放平台控制台 点击开发工具中的沙箱 接口加密方式&#xff0c;我这里使用的是自定义密钥。生成密钥的方式 使用支付宝官方提供的密钥工具&#xff0c;唯一要注意的是支付宝密钥工具…

idea + Docker-Compose 实现自动化打包部署(仅限测试环境)

一、修改docker.service文件&#xff0c;添加监听端口 vi /usr/lib/systemd/system/docker.service ExecStart/usr/bin/dockerd -H fd:// --containerd/run/containerd/containerd.sock -H tcp://0.0.0.0:2375 -H unix://var/run/docker.sock重启docker服务 systemctl daemo…

线性代数3:矢量方程

一、前言 欢迎回到系列文章的第三篇文章&#xff0c;内容是线性代数的基础知识&#xff0c;线性代数是机器学习背后的基础数学。在我之前的文章中&#xff0c;我介绍了梯队矩阵形式。本文将介绍向量、跨度和线性组合&#xff0c;并将这些新想法与我们已经学到的内容联系起来。本…

基于 Redis + Lua 脚本实现分布式锁,确保操作的原子性

1.加锁的Lua脚本&#xff1a; lock.lua --- -1 failed --- 1 success--- getLock key local result redis.call(setnx , KEYS[1] , ARGV[1]) if result 1 then--PEXPIRE:以毫秒的形式指定过期时间redis.call(pexpire , KEYS[1] , 3600000) elseresult -1;-- 如果value相同&…

数据清洗(data clean)

整理了下数据清洗的基本流程&#xff0c;异常值部分整理在了EDA中。

什么是简单网络管理协议(SNMP)

简单网络管理协议&#xff08;SNMP&#xff1a;Simple Network Management Protocol&#xff09;是由互联网工程任务组&#xff08;IETF&#xff1a;Internet Engineering Task Force &#xff09;定义的一套网络管理协议。该协议基于简单网关监视协议&#xff08;SGMP&#xf…

React中的Virtual DOM(看这一篇就够了)

文章目录 前言了解Virtual DOMreact创建虚拟dom的方式React Element虚拟dom的流程虚拟dom和真实dom的对比后言 前言 hello world欢迎来到前端的新世界 &#x1f61c;当前文章系列专栏&#xff1a;react合集 &#x1f431;‍&#x1f453;博主在前端领域还有很多知识和技术需要掌…

DevOps持续集成-Jenkins(3)

文章目录 DevOpsDevOps概述Jenkins实战3&#xff1a;实战1和实战2的加强版&#xff08;新增SonarQube和Harbor&#xff09;⭐环境准备⭐项目架构图对比Jenkins实战1和实战2&#xff0c;新增内容有哪些&#xff1f;SonarQube教程采用Docker安装SonarQube &#xff08;在Jenkins所…

生成树协议:监控 STP 端口和交换机

什么是生成树协议 生成树协议 &#xff08;STP&#xff09; 用于网络交换机&#xff0c;以防止循环和广播风暴。在局域网 &#xff08;LAN&#xff09; 中&#xff0c;两条或多条冗余路径可以连接到同一网段。当交换机或网桥从所有可用端口传输帧时&#xff0c;这些帧开始在网…

基于单片机设计的智能窗帘控制系统

一、前言 智能家居技术在近年来取得了巨大的发展&#xff0c;并逐渐成为人们日常生活中的一部分。智能家居系统带来了便利、舒适和高效的生活体验&#xff0c;拥有广泛的应用领域&#xff0c;其中之一就是智能窗帘控制系统。 传统窗帘需要手动操作&#xff0c;打开或关闭窗帘…

华硕天选1天选2天选3天选4天选air原厂预装出厂系统恢复安装教程方法

华硕天选1天选2天选3天选4天选air原厂预装出厂系统恢复安装教程方法 第一:自备原装swm/esd/wim/iso等格式系统文件&#xff0c;以上这几种格式文件安装恢复非常简单&#xff0c;使用PE工具即可完成恢复安装&#xff0c;还有一种安装方法就是华硕zip工厂恢复模式 1.首先需要自…

Adaptive AUTOSAR RTA-VRTE工具链介绍

ETAS Adaptive AUTOSAR RTA-VRTE是一种面向服务架构的中间件方案,提供了自适应AutoSAR平台,为应用层软件提供了运行环境. RTA-VRTE start kit的构建系统在主机VM内执行,可以创建AUTOSAR自适应应用程序并将其部署到一个或多个目标ECU虚拟机.

【VPX610】 青翼科技基于6U VPX总线架构的高性能实时信号处理平台

板卡概述 VPX610是一款基于6U VPX架构的高性能实时信号处理平台&#xff0c;该平台采用2片TI的KeyStone系列多核DSP TMS320C6678作为主处理单元&#xff0c;采用1片Xilinx的Virtex-7系列FPGA XC7VX690T作为协处理单元&#xff0c;具有2个FMC子卡接口&#xff0c;各个处理节点之…

linux-文件系统

目录 一、文件系统 1.分区 2.文件系统分类 3.文件系统创建工具 4.查看文件系统的属性 5.挂载 6.buffer和cache 一、文件系统 1.分区 1-4个主分区 第五个序号开始&#xff0c;是逻辑分区 2.文件系统分类 vfs文件系统 ------------- virtualenv file System&#xff0…

智慧社区燃气管网监测系统

燃气易燃易爆&#xff0c;一旦操作不当或疏忽大意&#xff0c;极易引发燃气安全事故&#xff0c;造成严重后果&#xff0c;2023年10月24日&#xff0c;在吉林某小区&#xff0c;发生了燃气使用不当产生的爆炸导致了1人死亡&#xff0c;1人重伤&#xff0c;15人轻伤&#xff0c;…

【嵌入式开源库】timeslice的使用,完全解耦的时间片轮询框架构

完全解耦的时间片轮询框架构 简介项目代码timeslice.htimeslice.clist.hlist.c 创建工程移植代码实验函数说明timeslice_task_inittimeslice_task_addtimeslice_tak_deltimeslice_get_task_num 结尾 简介 timeslice是一个时间片轮询框架&#xff0c;他是一个完全解耦的时间片轮…

电脑视频怎么转音频mp3

如果你在电脑上观看视频时喜欢上某个片段的背景音乐&#xff0c;且想将喜欢的背景音乐制作为手机铃声。我是建议你将此视频转换为 MP3 格式&#xff0c;因为 MP3 几乎与所有设备相兼容&#xff0c;让你可以在不同设备上不受限制地去聆听它。那该如何转换呢&#xff1f;无需担心…