【pyspark速成专家】7_SparkSQL编程1

目录

一,RDD,DataFrame和DataSet对比

二,创建DataFrame


本节将介绍SparkSQL编程基本概念和基本用法。

不同于RDD编程的命令式编程范式,SparkSQL编程是一种声明式编程范式,我们可以通过SQL语句或者调用DataFrame的相关API描述我们想要实现的操作。

然后Spark会将我们的描述进行语法解析,找到相应的执行计划并对其进行流程优化,然后调用相应基础命令进行执行。

我们使用pyspark进行RDD编程时,在Excutor上跑的很多时候就是Python代码,当然,少数时候也会跑java字节码。

但我们使用pyspark进行SparkSQL编程时,在Excutor上跑的全部是java字节码,pyspark在Driver端就将相应的Python代码转换成了java任务然后放到Excutor上执行。

因此,使用SparkSQL的编程范式进行编程,我们能够取得几乎和直接使用scala/java进行编程相当的效率(忽略语法解析时间差异)。此外SparkSQL提供了非常方便的数据读写API,我们可以用它和Hive表,HDFS,mysql表,Cassandra,Hbase等各种存储媒介进行数据交换。

美中不足的是,SparkSQL的灵活性会稍差一些,其默认支持的数据类型通常只有Int,Long,Float,Double,String,Boolean 等这些标准SQL数据类型, 类型扩展相对繁琐。对于一些较为SQL中不直接支持的功能,通常可以借助于用户自定义函数(UDF)来实现,如果功能更加复杂,则可以转成RDD来进行实现。

#SparkSQL的许多功能封装在SparkSession的方法接口中

spark = SparkSession.builder \
        .appName("test") \
        .config("master","local[4]") \
        .enableHiveSupport() \
        .getOrCreate()

sc = spark.sparkContext

一,RDD,DataFrame和DataSet对比

DataFrame参照了Pandas的思想,在RDD基础上增加了schma,能够获取列名信息。

DataSet在DataFrame基础上进一步增加了数据类型信息,可以在编译时发现类型错误。

DataFrame可以看成DataSet[Row],两者的API接口完全相同。

DataFrame和DataSet都支持SQL交互式查询,可以和 Hive无缝衔接。

DataSet只有Scala语言和Java语言接口中才支持,在Python和R语言接口只支持DataFrame。

DataFrame数据结构本质上是通过RDD来实现的,但是RDD是一种行存储的数据结构,而DataFrame是一种列存储的数据结构。

二,创建DataFrame

1,通过toDF方法转换成DataFrame

可以将RDD用toDF方法转换成DataFrame

#将RDD转换成DataFrame
rdd = sc.parallelize([("LiLei",15,88),("HanMeiMei",16,90),("DaChui",17,60)])
df = rdd.toDF(["name","age","score"])
df.show()
df.printSchema()

+---------+---+-----+
|     name|age|score|
+---------+---+-----+
|    LiLei| 15|   88|
|HanMeiMei| 16|   90|
|   DaChui| 17|   60|
+---------+---+-----+

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- score: long (nullable = true)

2, 通过createDataFrame方法将Pandas.DataFrame转换成pyspark中的DataFrame

import pandas as pd 

pdf = pd.DataFrame([("LiLei",18),("HanMeiMei",17)],columns = ["name","age"])
df = spark.createDataFrame(pdf)
df.show()

+---------+---+
|     name|age|
+---------+---+
|    LiLei| 18|
|HanMeiMei| 17|
+---------+---+

# 也可以对列表直接转换
values = [("LiLei",18),("HanMeiMei",17)]
df = spark.createDataFrame(values,["name","age"])
df.show()

+---------+---+
|     name|age|
+---------+---+
|    LiLei| 18|
|HanMeiMei| 17|
+---------+---+

3, 通过createDataFrame方法指定schema动态创建DataFrame

可以通过createDataFrame的方法指定rdd和schema创建DataFrame。

这种方法比较繁琐,但是可以在预先不知道schema和数据类型的情况下在代码中动态创建DataFrame.

from pyspark.sql.types import *
from pyspark.sql import Row
from datetime import datetime

schema = StructType([StructField("name", StringType(), nullable = False),
                     StructField("score", IntegerType(), nullable = True),
                     StructField("birthday", DateType(), nullable = True)])

rdd = sc.parallelize([Row("LiLei",87,datetime(2010,1,5)),
                      Row("HanMeiMei",90,datetime(2009,3,1)),
                      Row("DaChui",None,datetime(2008,7,2))])

dfstudent = spark.createDataFrame(rdd, schema)

dfstudent.show()

+---------+-----+----------+
|     name|score|  birthday|
+---------+-----+----------+
|    LiLei|   87|2010-01-05|
|HanMeiMei|   90|2009-03-01|
|   DaChui| null|2008-07-02|
+---------+-----+----------+

4,通过读取文件创建

可以读取json文件,csv文件,hive数据表或者mysql数据表得到DataFrame。

#读取json文件生成DataFrame
df = spark.read.json("data/people.json")
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

#读取csv文件
df = spark.read.option("header","true") \
 .option("inferSchema","true") \
 .option("delimiter", ",") \
 .csv("data/iris.csv")
df.show(5)
df.printSchema()

+-----------+----------+-----------+----------+-----+
|sepallength|sepalwidth|petallength|petalwidth|label|
+-----------+----------+-----------+----------+-----+
|        5.1|       3.5|        1.4|       0.2|    0|
|        4.9|       3.0|        1.4|       0.2|    0|
|        4.7|       3.2|        1.3|       0.2|    0|
|        4.6|       3.1|        1.5|       0.2|    0|
|        5.0|       3.6|        1.4|       0.2|    0|
+-----------+----------+-----------+----------+-----+
only showing top 5 rows

root
 |-- sepallength: double (nullable = true)
 |-- sepalwidth: double (nullable = true)
 |-- petallength: double (nullable = true)
 |-- petalwidth: double (nullable = true)
 |-- label: integer (nullable = true)


#读取csv文件
df = spark.read.format("com.databricks.spark.csv") \
 .option("header","true") \
 .option("inferSchema","true") \
 .option("delimiter", ",") \
 .load("data/iris.csv")
df.show(5)
df.printSchema()

+-----------+----------+-----------+----------+-----+
|sepallength|sepalwidth|petallength|petalwidth|label|
+-----------+----------+-----------+----------+-----+
|        5.1|       3.5|        1.4|       0.2|    0|
|        4.9|       3.0|        1.4|       0.2|    0|
|        4.7|       3.2|        1.3|       0.2|    0|
|        4.6|       3.1|        1.5|       0.2|    0|
|        5.0|       3.6|        1.4|       0.2|    0|
+-----------+----------+-----------+----------+-----+
only showing top 5 rows

root
 |-- sepallength: double (nullable = true)
 |-- sepalwidth: double (nullable = true)
 |-- petallength: double (nullable = true)
 |-- petalwidth: double (nullable = true)
 |-- label: integer (nullable = true)

#读取parquet文件
df = spark.read.parquet("data/users.parquet")
df.show()

#读取hive数据表生成DataFrame

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'data/kv1.txt' INTO TABLE src")
df = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
df.show(5)

#读取mysql数据表生成DataFrame
"""
url = "jdbc:mysql://localhost:3306/test"
df = spark.read.format("jdbc") \
 .option("url", url) \
 .option("dbtable", "runoob_tbl") \
 .option("user", "root") \
 .option("password", "0845") \
 .load()\
df.show()
"""

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/644051.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

编程实现标题栏窗口摇动——显示桌面的未公开细节研究

目录 前言 一、“窗口摇动”功能内部原理 二、explorer.exe 中的 “窗口抖动” 实现 三、“切换到桌面” 功能所扩展的内部细节 四、概念验证 五、进一步研究如何自定义保留窗口列表 原文出处链接&#xff1a;[https://blog.csdn.net/qq_59075481/article/details/139204…

ELK 日志监控平台(二)- 优化日志格式

文章目录 ELK 日志监控平台&#xff08;二&#xff09;- 优化日志格式1.日志输出要点2.优化应用的日志格式2.1.确定日志输出要点来源2.1.1.服务名称2.1.2.服务环境2.1.3.日志级别2.1.4.日志输出时间2.1.5.日志内容2.1.6.日志输出对象2.1.7.线程名称 2.2.logback.xml修改日志输出…

win10安装rabbitmq

安装 第一步&#xff1a;下载并安装erlang RabbitMQ服务端代码是使用并发式语言Erlang编写&#xff0c;因此首先需要安装Erlang下载地址&#xff1a;http://www.erlang.org/downloads采用默认安装即可&#xff0c;选择适合的安装路径 添加环境变量 第二步&#xff1a;下载并…

【DevOps】深入了解RabbitMQ:AMQP协议基础、消息队列工作原理和应用场景

目录 一、核心功能 二、优势 三、核心概念 四、工作原理 五、交换机类型 六、消息确认 七、持久性和可靠性 八、插件和扩展 九、集群和镜像队列 十、客户端库 十一、管理界面 十二、应用场景 RabbitMQ是一个基于AMQP协议的消息队列中间件&#xff0c;提供高可用、可…

【数据结构与算法 刷题系列】移除链表元素

&#x1f493; 博客主页&#xff1a;倔强的石头的CSDN主页 &#x1f4dd;Gitee主页&#xff1a;倔强的石头的gitee主页 ⏩ 文章专栏&#xff1a;数据结构与算法刷题系列&#xff08;C语言&#xff09; 期待您的关注 目录 一、问题描述 二、解题思路 三、源代码实现 一、问题…

Qt for android 获取USB设备列表(二)JNI方式 获取

简介 基于上篇 [Qt for android 获取USB设备列表&#xff08;一&#xff09;Java方式 获取]&#xff0c; 这篇就纯粹多了&#xff0c; 直接将上篇代码转换成JNI方式即可。即所有的设备连接与上篇一致。 (https://listentome.blog.csdn.net/article/details/139205850) 关键代码…

UTC与GPS时间转换-[week, sow]

UTC与GPS时间转换-[week, sow] utc2gpsgps2utc测试参考 Ref: Global Positioning System utc2gps matlab源码 function res utc2gps(utc_t, weekStart)%% parameterssec_day 86400;sec_week 604800;leapsec 18; % 默认周一为一周的开始if nargin < 2weekStart d…

HarmonyOS-MPChart绘制一条虚实相接的曲线

本文是基于鸿蒙三方库mpchart&#xff08;OpenHarmony-SIG/ohos-MPChart&#xff09;的使用&#xff0c;自定义绘制方法&#xff0c;绘制一条虚实相接的曲线。 mpchart本身的绘制功能是不支持虚实相接的曲线的&#xff0c;要么完全是实线&#xff0c;要么完全是虚线。那么当我…

VTK 数据处理:特征边提取

VTK 数据处理&#xff1a;特征边提取 VTK 数据处理&#xff1a;特征边提取原理实例 1&#xff1a;边界边提取实例 2&#xff1a;模型特征边提取实例 3&#xff1a;利用 vtkFeatureEdges 提取的边界补洞实例 4&#xff1a;利用 vtkFillHolesFilter 补洞 VTK 数据处理&#xff1a…

[C语言]自定义类型详解:结构体、联合体、枚举

目录 &#x1f680;结构体 &#x1f525;结构体类型的声明 &#x1f525;结构的自引用 &#x1f525;结构体变量的定义和初始化 &#x1f525;结构体内存对齐 &#x1f525;结构体传参 &#x1f525;结构体实现位段&#xff08;位段的填充&可移植性&#xff09; &a…

企业异地网络组网:SD-WAN解决方案的优势

在当今全球化的商业环境中&#xff0c;企业的业务扩展已不再局限于本地或单一国家。随着分公司和子公司的不断增加&#xff0c;企业总部与这些分支机构之间的数据通信和资源共享变得尤为重要。然而&#xff0c;传统的网络访问方式&#xff0c;如点对点电路和多协议标签交换&…

17.7K星开源产品分析平台:Posthog

Posthog&#xff1a;开源洞察&#xff0c;产品优化的得力助手 - 精选真开源&#xff0c;释放新价值。 概览 PostHog是一个全面开源的平台&#xff0c;旨在帮助团队构建更好的产品。它提供了从产品分析到会话回放、功能标志和A/B测试等一系列工具&#xff0c;支持自托管&#x…

个人博客网站搭建笔记1

文章目录 前言要求自己的理解资源过程视频教程SpringBoot开发一个小而美的个人博客p1课程介绍p2需求和功能 前言 自己之前其实就想搭建一个属于自己的网站&#xff0c;但是不知道怎么操作&#xff0c;没找到合适的教程&#xff0c;&#xff08;手把手的那种&#xff09;&#…

Prometheus+Grafana监控服务器、mysql数据库并配置报警规则推送邮箱

文章目录 一、安装prometheus1.1下载1.2 安装1.3 开机启动1.4 验证 二、安装 Grafana2.1 下载2.2 安装2.3 启动2.4 验证 三、安装服务器监控 node_exporter3.1 下载3.2 安装3.3 设置 node_exporter 系统服务3.4 设置开机自动启动3.5 验证3.6配置Prometheus3.7 修改 Prometheus …

Tomcat部署项目的方式

目录 1、Tomcat发布项目的方式 方式1&#xff1a; 直接把项目发布到webapps目录下 方式2&#xff1a;项目发布到ROOT目录 方式3&#xff1a;虚拟路径方式发布项目 方式4&#xff1a;(推荐)虚拟路径&#xff0c;另外的方式&#xff01; 方式5&#xff1a;发布多个网站 1、…

[windows系统安装/重装系统][step-4][番外篇-2]N卡驱动重装 |解决:开机几小时后电脑卡顿 | 后台自动运行了上千个Rundll32进程问题

现象 开机几小时后&#xff0c;电脑变卡&#xff0c;打开后台管理器都卡&#xff0c;后台管理去转圈圈一小会儿后看到后台进程上千个&#xff0c;好多个Rundll32进程 重启下运行会稍快 重启后运行快&#xff0c;后台管理器反应也快 打开后台管理器不卡&#xff08;几小时后打…

Python语法(全)

前言&#xff1a; 下面是Python基本的语法&#xff0c;大家耐心观看&#xff01; 1.基础语法 1.1字面量 字面量&#xff1a;在代码中&#xff0c;被写下来的的固定的值&#xff0c;称之为字面 1.2字符串 字符串&#xff08;string&#xff09;&#xff0c;又称文本&#xff…

C语言数据结构栈的概念及结构、栈的实现、栈的初始化、销毁栈、入栈、出栈、检查是否为空、获取栈顶元素、获取有效元素个数等的介绍

文章目录 前言栈的概念及结构栈的实现一、 栈结构创建二、 初始化结构三、销毁栈四、入栈五、出栈六、检查是否为空七、获取栈顶元素八、获取有效元素的个数九、测试 1十、测试 2总结 前言 C语言数据结构栈的概念及结构、栈的实现、栈的初始化、销毁栈、入栈、出栈、检查是否为…

blender 布尔运算,切割模型。

1.创建一个立方体和球体。 2.选中立方体&#xff0c;在属性面板添加布尔修改器。点击物体属性右边的按钮选中球体。参数如下。 3.此时隐藏球体&#xff0c;就可以看到被切掉的效果了。

C结构详解

目录 1、结构模板 1. 建立结构声明 2. 定义结构变量 3. 访问结构成员 4. 初始化结构 声明结构数组 声明和初始化结构指针 1、结构模板 1. 建立结构声明 struct book{char title[MAXTITL];char author[MAXAUTL];float value; }&#xff1b; 该声明描述了一个又两个字符…