Hadoop+Spark大数据技术 实验8 Spark SQL结构化

9.2 创建DataFrame对象的方式

val dfUsers = spark.read.load("/usr/local/spark/examples/src/main/resources/users.parquet")

dfUsers: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

dfUsers.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          NULL|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

9.2.2 json文件创建DataFrame对象

val dfGrade = spark.read.format("json").load("file:/media/sf_download/grade.json")

dfGrade: org.apache.spark.sql.DataFrame = [Class: string, ID: string ... 3 more fields]

dfGrade.show(3)

+-----+---+----+-----+-----+
|Class| ID|Name|Scala|Spark|
+-----+---+----+-----+-----+
|    1|106|Ding|   92|   91|
|    2|242| Yan|   96|   90|
|    1|107|Feng|   84|   91|
+-----+---+----+-----+-----+
only showing top 3 rows

9.2.3 RDD创建DataFrame对象

val list = List(

("zhangsan" , "19") , ("B" , "29") , ("C" , "9")

)

val df = sc.parallelize(list).toDF("name","age")

list: List[(String, String)] = List((zhangsan,19), (B,29), (C,9))
df: org.apache.spark.sql.DataFrame = [name: string, age: string]

df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)

df.show()

+--------+---+
|    name|age|
+--------+---+
|zhangsan| 19|
|       B| 29|
|       C|  9|
+--------+---+

9.2.4 SparkSession创建DataFrame对象

// 1.json创建Datarame对象

val dfGrade = spark.read.format("json").load("file:/media/sf_download/grade.json")

dfGrade.show(3)

+-----+---+----+-----+-----+
|Class| ID|Name|Scala|Spark|
+-----+---+----+-----+-----+
|    1|106|Ding|   92|   91|
|    2|242| Yan|   96|   90|
|    1|107|Feng|   84|   91|
+-----+---+----+-----+-----+
only showing top 3 rows

dfGrade: org.apache.spark.sql.DataFrame = [Class: string, ID: string ... 3 more fields]

Selection deleted

// 2.csv创建Datarame对象

val dfGrade2 = spark.read.option("header",true).csv("file:/media/sf_download/grade.json")

dfGrade2: org.apache.spark.sql.DataFrame = [{"ID":"106": string, "Name":"Ding": string ... 3 more fields]

dfGrade2.show(3)

+-----------+-------------+-----------+----------+-----------+
|{"ID":"106"|"Name":"Ding"|"Class":"1"|"Scala":92|"Spark":91}|
+-----------+-------------+-----------+----------+-----------+
|{"ID":"242"| "Name":"Yan"|"Class":"2"|"Scala":96|"Spark":90}|
|{"ID":"107"|"Name":"Feng"|"Class":"1"|"Scala":84|"Spark":91}|
|{"ID":"230"|"Name":"Wang"|"Class":"2"|"Scala":87|"Spark":91}|
+-----------+-------------+-----------+----------+-----------+
only showing top 3 rows

Selection deleted

// 3.Parquet创建Datarame对象

val dfGrade3 = spark.read.parquet("file:/usr/local/spark/examples/src/main/resources/users.parquet")

dfGrade3: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

dfGrade3.show(3)

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          NULL|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

9.2.5 Seq创建DataFrame对象

val dfGrade4 = spark.createDataFrame(

Seq(

("A" , 20 ,98),

("B" , 19 ,93),

("C" , 21 ,92),

)

)toDF("Name" , "Age" , "Score")

dfGrade4: org.apache.spark.sql.DataFrame = [Name: string, Age: int ... 1 more field]

dfGrade4.show()

+----+---+-----+
|Name|Age|Score|
+----+---+-----+
|   A| 20|   98|
|   B| 19|   93|
|   C| 21|   92|
+----+---+-----+

9.3 DataFrame对象保存为不同格式

9.3.1 write.()保存DataFrame对象

DataFrame.write() 提供了一种方便的方式将 DataFrame 保存为各种格式。以下是几种常见格式的保存方法:

1. 保存为JSON格式

df.write.json("path/to/file.json")

2. 保存为Parquet文件

df.write.parquet("path/to/file.parquet")

3. 保存为CSV文件

df.write.csv("path/to/file.csv")

9.3.2 write.format()保存DataFrame对象

DataFrame.write.format() 提供了一种更灵活的方式来保存 DataFrame,可以通过指定格式名称来选择输出格式。以下是几种常见格式的保存方法:

1. 保存为JSON格式

df.write.format("json").save("path/to/file.json")

2. 保存为Parquet文件

df.write.format("parquet").save("path/to/file.parquet")

3. 保存为CSV文件

df.write.format("csv").save("path/to/file.csv")

9.3.3 先将DataFrame对象转化为RDD再保存文件

虽然可以直接使用 DataFrame 的 write 方法保存文件,但有时需要先将 DataFrame 转换为 RDD 再进行保存。这可能是因为需要对数据进行一些 RDD 特定的操作,或者需要使用 RDD 的保存方法。

rdd = df.rdd.map(lambda row: ",".join(str(x) for x in row))
rdd.saveAsTextFile("path/to/file.txt")

注意:

  • 上述代码将 DataFrame 的每一行转换为逗号分隔的字符串,并将结果保存为文本文件。
  • 可以根据需要修改代码以使用不同的分隔符或保存为其他格式。

9.4 DataFrame对象常用操作

9.4.1 展示数据

1. show()

// 显示前20行数据

gradedf.show()

 

// 显示前10行数据

gradedf.show(10)

 

// 不截断列宽显示数据

gradedf.show(truncate=False)

 

2. collect()

// 将 DataFrame 转换成 Dataset 或 RDD,返回 Array 对象

gradedf.collect()

 

3. collectAsList()

// 将 DataFrame 转换成 Dataset 或 RDD,返回 Java List 对象

gradedf.collectAsList()

 

4. printSchema()

// 打印 DataFrame 的模式(schema)

gradedf.printSchema()

 

5. count()

// 统计 DataFrame 中的行数

gradedf.count()

 

6. first()、head()、take()、takeAsList()

// 返回第一行数据

gradedf.first()

 

// 返回前3行数据

gradedf.head(3)

 

// 返回前5行数据

gradedf.take(5)

 

// 返回前5行数据,以 Java List 形式

gradedf.takeAsList(5)

 

7. distinct()

// 返回 DataFrame 中唯一的行数据

gradedf.distinct.show()

 

8. dropDuplicates()

// 删除 DataFrame 中重复的行数据

gradedf.dropDuplicates(Seq("Spark")).show()

9.4.2 筛选

1. where()
   - 根据条件过滤 DataFrame 中的行数据。
   - 示例: gradedf.where("Class = '1' and Spark = '91'").show()

2. filter()
   - 与 where() 功能相同,根据条件过滤 DataFrame 中的行数据。
   - 示例: gradedf.filter("Class = '1'").show()

3. select()
   - 选择 DataFrame 中的指定列。
   - 示例: gradedf.select("Name", "Class","Scala").show(3,false)

修改名称:gradedf.select(gradedf("Name").as("name")).show()

4. selectExpr()
   - 允许使用 SQL 表达式选择列。
   - 示例: gradedf.selectExpr("name", "name as names" ,"upper(Name)","Scala * 10").show(3)

5. col()
   - 获取 DataFrame 中指定列的引用。
   - 示例: gradedf.col("name")

6. apply()
   - 对 DataFrame 中的每一行应用函数。
   - 示例: def get_grade_level(grade): return "A" if grade > 90 else "B" 
   gradedf.select("name", "grade", "grade_level").apply(get_grade_level, "grade_level")

7. drop()
   - 从 DataFrame 中删除指定的列。
   - 示例: gradedf.drop("grade")

8. limit()
   - 限制返回的行数。
   - 示例: gradedf.limit(10)

9.4.3 排序

按ID排序

1. orderBy()、sort()

orderBy() 和 sort() 方法都可以用于对 DataFrame 进行排序,它们的功能相同。

// 按id升序排序

gradedf.orderBy("id").show()

gradedf.sort(gradedf("Class").desc,gradedf("Scala").asc).show(3)

// 按id降序排序

gradedf.orderBy(desc("id")).show()

gradedf.sort(desc("id")).show()

2. sortWithinPartitions()

示例:gradedf.sortWithinPartitions("id").show(5)

引申:sortWithinPartitions() 方法用于对 DataFrame 的每个分区内进行排序。

// 首先对 DataFrame 进行重新分区,使其包含两个分区

val partitionedDF = gradedf.repartition(2)

// 对每个分区内的 id 进行升序排序

partitionedDF.sortWithinPartitions("id").show()

需要注意的是,sortWithinPartitions() 方法不会改变 DataFrame 的分区数量,它只是对每个分区内部进行排序。

9.4.4 汇总与聚合

1. groupBy()

groupBy() 方法用于根据指定的列对 DataFrame 进行分组。

(1) 结合 count()

// 统计每个名字的学生人数
gradedf.groupBy("name").count().show()

(2) 结合 max()

// 找出每个课程学生的最高成绩
gradedf.groupBy("Class").max("Scala","Spark").show()

(3) 结合 min()

// 找出每个名字学生的最低成绩
gradedf.groupBy("name").min("grade").show()

(4) 结合 sum()

// 计算每个名字学生的总成绩
gradedf.groupBy("name").sum("grade").show()

gradedf.groupBy("Class").sum("Scala","Spark").show()

(5) 结合 mean()

// 计算课程的平均成绩

gradedf.groupBy("Class").sum("Scala","Spark").show()

2. agg()

agg() 方法允许对 DataFrame 应用多个聚合函数。

(1) 结合 countDistinct()

// 统计不重复的名字数量
gradedf.agg(countDistinct("name")).show()

(2) 结合 avg()

gradedf.agg(max("Spark"), avg("Scala")).show()

// 计算所有学生的平均成绩
gradedf.agg(avg("grade")).show()

(3) 结合 count()

// 统计学生总数
gradedf.agg(count("*")).show()

(4) 结合 first()

// 获取第一个学生的姓名
gradedf.agg(first("name")).show()

(5) 结合 last()

// 获取最后一个学生的姓名
gradedf.agg(last("name")).show()

(6) 结合 max()、min()

// 获取最高和最低成绩
gradedf.agg(max("grade"), min("grade")).show()

(7) 结合 mean()

// 计算所有学生的平均成绩
gradedf.agg(mean("grade")).show()

(8) 结合 sum()

// 计算所有学生的总成绩
gradedf.agg(sum("grade")).show()

(9) 结合 var_pop()、variance()

// 计算成绩的总体方差
gradedf.agg(var_pop("grade"), variance("grade")).show()

(10) 结合 covar_pop()

// 计算 id 和 grade 之间的总体协方差
gradedf.agg(covar_pop("id", "grade")).show()

(11) 结合 corr()

gradedf.agg(corr("Spark","Scala")).show()

9.4.5 统计

771468c1170e42e88ed99fd7cc2fc4d2.png

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/641319.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Nginx <三>⭐️⭐️⭐️】Nginx 负载均衡使用

目录 👋前言 👀一、 负载均衡概述 🌱二、项目模拟 2.1 环境准备 2.2 启动多个服务器 2.3 配置 Nginx 2.4 测试配置 💞️三、章末 👋前言 小伙伴们大家好,前不久开始学习了 Nginx 的使用,在…

[图解]产品经理创新之阿布思考法

0 00:00:00,000 --> 00:00:01,900 那刚才我们讲到了 1 00:00:02,730 --> 00:00:03,746 业务序列图 2 00:00:03,746 --> 00:00:04,560 然后怎么 3 00:00:05,530 --> 00:00:06,963 画现状,怎么改进 4 00:00:06,963 --> 00:00:09,012 然后改进的模式…

PaddleSeg训练推理及模型转换全流程

文章目录 1、数据准备1.1 数据标注1.2 数据导出1.3 标签较验1.4 数据集整理1.5 标签可视化 2、 模型训练3、模型验证4、模型推理5、模型导出6、导出文件的推理7、将模型转换成onnx8、使用onnx进行推理 本文记录一下使用paddleseg进行语议分割模型对人体进行分割的使用流程。事实…

DTC 2024回顾丨云和恩墨重塑数据库内核技术,革新企业降本增效之道

在数字化浪潮席卷全球的当下,关系型数据库作为市场主导力量的地位依然稳固。然而,面对新兴数据库与服务形态的挑战,以及企业日益强烈的降本增效需求,数据库技术的发展必须紧跟时代步伐,充分发挥资源效能以提升企业竞争…

A股重磅!史上最严减持新规,发布!

此次减持新规被市场视为A股史上最严、最全面的规则,“花式”减持通道被全面“封堵”。 5月24日晚间,证监会正式发布《上市公司股东减持股份管理暂行办法》(以下简称《减持管理办法》)及相关配套规则。 据了解,《减持…

HTTP 错误 404.3 - Not Found 问题处理

问题描述 HTTP 错误 404.3 - Not Found 由于扩展配置问题而无法提供您请求的页面。如果该页面是脚本,请添加处理程序。如果应下载文件,请添加 MIME 映射。 解决对策

【算法】前缀和算法——和可被K整除的子数组

题解:和可被K整除的子数组(前缀和算法) 目录 1.题目2.前置知识2.1同余定理2.2CPP中‘%’的计算方式与数学‘%’的差异 及其 修正2.3题目思路 3.代码示例4.总结 1.题目 题目链接:LINK 2.前置知识 2.1同余定理 注:这里的‘/’代表的是数学…

订单id的设计问题探讨

如何设计一个订单id 设计一个订单ID系统需要考虑多个因素,包括唯一性、排序性(时间顺序)、可读性(可选)以及系统的扩展性和性能。结合这些因素,可以选择不同的方案来生成订单ID。以下是几种常见的订单ID设…

论文阅读--GLIP

把detection和phrase ground(对于给定的sentence,要定位其中提到的全部物体)这两个任务合起来变成统一框架,从而扩展数据来源,因为文本图像对的数据还是很好收集的 目标检测的loss是分类loss定位loss,它与phrase ground的定位los…

内网穿透--Nps-自定义-上线

免责声明:本文仅做技术交流与学习... 目录 Nps项目: 一图通解: 1-下载nps/npc 2-服务端启动 访问web网页: 添加客户端,生成密匙. 3-kali客户端连接服务端 4-添加协议隧道. 5-kali生成后门: 6-kali创建监听: Nps项目: https://github.com/ehang…

《2024年中国机器人行业投融资报告》| 附下载

近年来,国内机器人行业取得了显著的技术进步,包括人工智能、感知技术、自主导航等技术方面的突破,使得机器人能够更好地适应复杂环境和任务需求,带动了机器人行业加快发展。 当然,技术的进步是外在驱动因素&#xff0…

【JAVA基础之网络编程】UDP和TCP协议以及三次握手和四次挥手的过程

🔥作者主页:小林同学的学习笔录 🔥mysql专栏:小林同学的专栏 目录 1. 网络编程 1.1 概述 1.2 网络编程的三要素 1.2.1 IP地址 1.2.2 InetAddress 1.2.3 端口和协议 1.3 UDP协议 1.3.1 UDP发送数据 1.3.2 UDP接收数据 1.4…

Terminal Web终端基础(Web IDE 技术探索 二)

Terminal是web终端技术,类似cmd命令窗口,Webcontainer 中推荐使用的是Xterm.js,这里就不细说Xterm.js 的使用了,我们使用第三方库来实现(原生确实有点难用)。 vue-web-terminal 一个由 Vue 构建的支持多内容…

基础5 探索JAVA图形编程桌面:字符操作组件详解

在繁华都市的一个角落,卧龙和凤雏相聚在他们常去的台球厅。灯光洒在绿色的台球桌上,彩色的台球整齐地排列着,仿佛在等待着一场激烈的角逐。 卧龙轻轻地拿起球杆,微微瞄准,然后用力一击,白球带着一股强大的力…

Vue.js - Vue 的安装 以及 常用的 Vue 指令 【0基础向 Vue 基础学习】

文章目录 Vue 快速上手1、Vue.js 官网 & Vue.js 的获取2、创建 Vue 实例,初始化渲染3、插值表达式 安装 Vue 开发者工具:装插件调试 Vue 应用Vue 指令1、v-show 指令2、v-if3、v-else & v-else-if4、v-onv-on 调用传参 5、v-bindv-bind 对于样式…

类和对象(下篇)(未完结)!

文章目录 在谈构造函数1.构造函数体赋值2.初始化列表尽量使用初始化列表?初始化列表的初始化顺序?成员变量声明处的缺省值构造函数支持类型转换3.explicit关键字 static成员 在谈构造函数 1.构造函数体赋值 class Date{public:Date(int year, int mont…

Python设计模式之适配器模式

目录 一、适配器模式 适配器模式的组成部分 适配器模式的种类 应用场景 实现步骤 二、测试例子 一、适配器模式 适配器模式(Adapter Pattern)是一种结构型设计模式,它通过将一个现有接口转换为另一个期望的接口来让不兼容的接口能够合作…

香港服务器负载过高的原因和应对办法

保持网站正常运行看似简单,但事实上,有许多问题会影响网站和应用程序的性能,并可能导致停机。其中一个问题就是服务器过载。而香港服务器作为一种常见的服务器类型,有时会出现负载过高的情况。为了帮助您确保在香港服务器过载不会…

跨境电商投放Facebook广告推广攻略!

在出海浪潮中,跨境电商已经成为企业连接不同市场、拓展国际业务的重要途径。Facebook,作为全球最大的社交平台之一,拥有超过20亿的活跃用户,为跨境卖家提供了一个无与伦比的营销舞台。有效利用Facebook广告,不仅能帮助…

捕捉二氧化碳也能赚钱?深入探索CCUS技术与商业前景

引言 随着全球变暖和气候变化的加剧,如何有效减少二氧化碳(CO2)排放成为各国亟待解决的问题。近日,全球最大的二氧化碳捕集工厂在冰岛正式运营,这一消息引起了广泛关注。本文将深入探讨捕集二氧化碳技术(C…