大数据分析与应用实验任务九

大数据分析与应用实验任务九

实验目的

  • 进一步熟悉pyspark程序运行方式;

  • 熟练掌握pysaprkRDD基本操作相关的方法、函数,解决基本问题。

实验任务

进入pyspark实验环境,打开命令行窗口,输入pyspark,完成下列任务:

在实验环境中自行选择路径新建以自己姓名拼音命名的文件夹,后续代码中涉及的文件请保存到该文件夹下(需要时文件夹中可以创建新的文件夹)。

一、参考书中相应代码,练习RDD持久性、分区及写入文件(p64、67、80页相应代码)。
1.持久化

迭代计算经常需要多次重复使用同一组数据。下面就是多次计算同一个RDD的例子。

listlzy=["Hadoop","Spark","Hive","Darcy"]
rddlzy=sc.parallelize(listlzy)
print(rddlzy.count())#行动操作,触发一次真正从头到尾的计算
print(','.join(rddlzy.collect()))#行动操作,触发一次真正从头到尾的计算

image-20231123112954850

一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY)。针对上面的实例,增加持久化语句以后的执行过程如下:

listlzy=["Hadoop","Spark","Hive","Darcy"]
rdd=sc.parallelize(listlzy)
rdd.cache()#会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这时rdd还没有被计算生成
print(rdd.count())#第一次行动操作,触发一次真正从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中
print(','.join(rdd.collect()))#第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd

image-20231123113247048

2.分区
  • 设置分区的个数

    在调用textFile()和parallelize()方法的时候手动指定分区个数即可,语法格式如下:

    sc.textFile(path,partitionNum)
    

    其中,path参数用于指定要加载的文件的地址,partitionNum参数用于指定分区个数。下面是一个分区的实例。

    listlzy=[5,2,0,1,3,1,4]
    rddlzy=sc.parallelize(listlzy,2)//设置两个分区
    

    image-20231123113420685

  • 使用repartition方法重新设置分区个数

    通过转换操作得到新RDD时,直接调用repartition方法即可。例如

datalzy=sc.parallelize([1,2,3,4,5],2)
len(datalzy.glom().collect())#显示datalzy这个RDD的分区数量
rdd = datalzy.repartition(1) #对 data 这个 RDD 进行重新分区
len(rdd.glom().collect()) #显示 rdd 这个 RDD 的分区数量

image-20231123113519540

  • 自定义分区方法

    下面是一个实例,要求根据 key 值的最后一位数字将 key 写入到不同的文件中,比如,10 写入到 part-00000,11 写入到 part-00001,12 写入到 part-00002。打开一个 Linux 终端,使用 vim 编辑器创建一个代码文件“/root/Desktop/luozhongye/TestPartitioner.py”,输入以下代码:

    from pyspark import SparkConf, SparkContext
    
    
    def MyPartitioner(key):
    	print("MyPartitioner is running")
    	print('The key is %d' % key)
    	return key % 10
    
    
    def main():
    	print("The main function is running")
    	conf = SparkConf().setMaster("local").setAppName("MyApp")
    	sc = SparkContext(conf=conf)
    	data = sc.parallelize(range(10), 5)
    	data.map(lambda x: (x, 1)).partitionBy(10, MyPartitioner).map(lambda x: x[0]).saveAsTextFile(
    		"file:///root/Desktop/luozhongye/partitioner")
    
    
    if __name__ == '__main__':
    	main()
    
    

使用如下命令运行 TestPartitioner.py:

cd /root/Desktop/luozhongye
python3 TestPartitioner.py

或者,使用如下命令运行 TestPartitioner.py:

cd /root/Desktop/luozhongye
spark-submit TestPartitioner.py

程序运行后会返回如下信息:

image-20231123114351343

3.文件数据写入
  • 把 RDD 写入到文本文件中
textFile = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt") 
textFile.saveAsTextFile("file:///root/Desktop/luozhongye/writeback")

其中wordlzy.txt的内容

Hadoop is good
Spark is fast
Spark is better
luozhongye is handsome

image-20231123115053912

Spark 采用惰性机制。可以使用如下的“行动”类型的操作查看 textFile 中的内容:

textFile.first()

正因为 Spark 采用了惰性机制,在执行转换操作的时候,即使输入了错误的语句,pyspark 也不会马上报错,而是等到执行“行动”类型的语句启动真正的计算时,“转换”操作语句中的错误才会显示出来,比如:

textFile = sc.textFile("file:///root/Desktop/luozhongye/wordcount/word123.txt")

image-20231123115210529

  • 把 RDD 写入到文本文件中

可以使用 saveAsTextFile()方法把 RDD 中的数据保存到文本文件中。下面把 textFile 变量中的内容再次写回到另外一个目录 writeback 中,命令如下:

textFile = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt") 
textFile.saveAsTextFile("file:///root/Desktop/luozhongye/writeback")

进入到“/root/Desktop/luozhongye/writeback”目录查看

cd /root/Desktop/luozhongye/writeback
ls

image-20231123115411983

二、逐行理解并运行4.4.2实例“文件排序”。

新建多个txt文件file1.txt 、file2.txt 、file3.txt ,其内容分别如下:

33
37
12
40
4
16
39
5
1
45
25

要求读取所有文件中的整数,进行排序后,输出到一个新的文件中,输出的内容为每行两个整数,第一个整数为第二个整数的排序位次,第二个整数为原待排序的整数。

实现上述功能的代码文件“/root/Desktop/luozhongye/FileSort.py”的内容如下:

#!/usr/bin/env python3 
from pyspark import SparkConf, SparkContext

index = 0


def getindex():
	global index
	index += 1
	return index


def main():
	conf = SparkConf().setMaster("local[1]").setAppName("FileSort")
	sc = SparkContext(conf=conf)
	lines = sc.textFile("file:///root/Desktop/luozhongye/file*.txt")
	index = 0
	result1 = lines.filter(lambda line: (len(line.strip()) > 0))
	result2 = result1.map(lambda x: (int(x.strip()), ""))
	result3 = result2.repartition(1)
	result4 = result3.sortByKey(True)
	result5 = result4.map(lambda x: x[0])
	result6 = result5.map(lambda x: (getindex(), x))
	result6.foreach(print)
	result6.saveAsTextFile("file:///root/Desktop/luozhongye/sortresult")
    
    
if __name__ == '__main__':
		main()

image-20231123232005765

三、完成p96实验内容3,即“编写独立应用程序实现求平均值问题”,注意每位同学自己修改题目中的数据。

每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个字段是学生名字,第二个字段是学生的成绩;编写 Spark 独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。

数学成绩.txt:

小罗 110
小红 107
小新 100
小丽 99

英语成绩.txt:

小罗 95 
小红 81 
小新 82
小丽 76

政治成绩.txt:

小罗 65 
小红 71 
小新 61 
小丽 66

408成绩.txt:

小罗 100
小红 103
小新 94
小丽 110

实现代码如下:

from pyspark import SparkConf, SparkContext

# 初始化Spark配置和上下文
conf = SparkConf().setAppName("AverageScore")
sc = SparkContext(conf=conf)

# 读取数学成绩文件
math_rdd = sc.textFile("数学成绩.txt").map(lambda x: (x.split()[0], int(x.split()[1])))

# 读取英语成绩文件
english_rdd = sc.textFile("英语成绩.txt").map(lambda x: (x.split()[0], int(x.split()[1])))

# 读取政治成绩文件
politics_rdd = sc.textFile("政治成绩.txt").map(lambda x: (x.split()[0], int(x.split()[1])))

# 读取408成绩文件
computer_rdd = sc.textFile("408成绩.txt").map(lambda x: (x.split()[0], int(x.split()[1])))

# 合并所有成绩数据
all_scores_rdd = math_rdd.union(english_rdd).union(politics_rdd).union(computer_rdd)

# 计算每个学生的成绩总和和成绩数量
sum_count_rdd = all_scores_rdd.combineByKey(lambda value: (value, 1),
                                            lambda acc, value: (acc[0] + value, acc[1] + 1),
                                            lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))

# 计算平均成绩
average_scores_rdd = sum_count_rdd.mapValues(lambda x: x[0] / x[1])

# 输出到新文件
average_scores_rdd.saveAsTextFile("平均成绩")

# 关闭Spark上下文
sc.stop()

image-20231123233508387

实验心得

在这次实验中,我进一步熟悉了使用PySpark进行大数据处理和分析的方法,并深入了解了PySpark RDD的基本操作。学会了分区、持久化、数据写入文件,并解决实际问题。这次实验让我对PySpark有了更深入的理解,并增强了我处理和分析大数据的能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/183026.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Vue3中如何响应式解构 props

目录 1,前言2,解决2.1,利用插件,实现编译时转换2.2,toRef 和 toRefs 1,前言 Vue3 中为了保持响应性,始终需要以 props.x 的方式访问这些 prop。这意味着不能够解构 defineProps 的返回值&#…

linux的基础命令

文章目录 linux的基础命令一、linux的目录结构(一)Linux路径的描述方式 二、Linux命令入门(一)Linux命令基础格式 三、ls命令(一)HOME目录和工作目录(二)ls命令的参数1.ls命令的-a选…

ChatGLM2-6B微调过程说明文档

参考文档: ChatGLM2-6B 微调(初体验) - 知乎 环境配置 下载anaconda,版本是Anaconda3-2023.03-0-Linux-x86_64.sh,其对应的python版本是3.10,试过3.7和3.11版本的在运行时都报错。 执行下面的命令安装anaconda sh Anaconda3-202…

Django之Cookie与Session,CBV加装饰器

前言 会话跟踪技术 在一个会话的多个请求中共享数据,这就是会话跟踪技术。例如在一个会话中的请求如下:  请求银行主页; 请求登录(请求参数是用户名和密码);请求转账(请求参数与转账相关的数…

winlogbeat采集windows日志

下载链接 https://www.elastic.co/cn/downloads/past-releases/winlogbeat-7-16-2 配置文件 # ---------------------------- Elasticsearch Output ---------------------------- output.elasticsearch:# Array of hosts to connect to.hosts: ["192.168.227.160:9200&…

wagtail-安装配置

系列文章目录 文章目录 系列文章目录安装虚拟环境安装wagtail查看安装后的包 创建wagtail项目安装依赖迁移创建超级用户运行项目 安装虚拟环境 https://blog.csdn.net/gsl371/article/details/117917857 安装wagtail (wagenv) C:\djproject\wagprj>pip list Package V…

Mac下载的软件显示文件已损坏,如何解决文件已损坏问题,让文件可以正常运行

Mac下载的软件显示文件已损坏,如何解决文件已损坏问题,让文件可以正常运行 设备/引擎:Mac(11.6)/Mac Mini 开发工具:终端 开发需求:让显示已损坏的文件顺利安装到电脑 大家肯定都遇到过下载…

geoserver发布tif矢量数据图层

cesium加载上传至geoserver的tif矢量数据_cesium加载tiff-CSDN博客 geoserver安装及跨域问题解决方案:geoserver安装及跨域问题解决方案_geoserver 跨域_1 1王的博客-CSDN博客 将TIF上传至geoserver 启动geoserver服务,并进入geoserver主页。 1. 新建…

【Java 进阶篇】Redis持久化之RDB:数据的安全守护者

Redis,作为一款高性能的键值存储系统,支持多种持久化方式,其中RDB(Redis DataBase)是其最常用的一种。RDB可以将当前时刻的数据快照保存到磁盘,以便在Redis重启时快速恢复数据。本文将深入探讨RDB的原理、配…

走近科学之《MySQL 的秘密》

走近科学之《MySQL 的秘密》 mysql 存储引擎、索引、执行计划、事务、锁、分库分表、优化 1、存储引擎(storage engines) 存储引擎规定了数据存储时的不同底层实现,如存储机制、索引、锁、事务等。 可以通过 show engines 命令查看当前服务…

web前端之若依框架图标对照表、node获取文件夹中的文件名,并通过数组返回文件名、在html文件中引入.svg文件、require、icon

MENU 前言效果图htmlJavaScripstylenode获取文件夹中的文件名 前言 需要把若依原有的icon的svg文件拿到哦&#xff01; 注意看生成svg的路径。 效果图 html <div id"idSvg" class"svg_box"></div>JavaScrip let listSvg [404, bug, build, …

TypeScript 学习笔记 第三部分 贪吃蛇游戏

尚硅谷TypeScript教程&#xff08;李立超老师TS新课&#xff09; 1. 创建开发环境 创建工程&#xff0c;使用学习笔记的第二部分安装css部分 npm i -D less less-loader css-loader style-loader对css部分处理&#xff0c;能够运行在低版本浏览器 npm i -D postcss postcss…

【Docker】从零开始:9.Docker命令:Push推送仓库(Docker Hub,阿里云)

【Docker】从零开始&#xff1a;9.Docker命令:Push推送仓库 知识点1.Docker Push有什么作用&#xff1f;2.Docker仓库有哪几种2.1 公有仓库2.2 第三方仓库2.3 私有仓库2.4 搭建私有仓库的方法有哪几种 3.Docker公有仓库与私有仓库的优缺点对比 Docker Push 命令标准语法操作参数…

Design Guidelines for 100 Gbps

文章目录 Stratix V GT Transceiver ChannelsCFP2 Host Connector Assembly and PinoutStratix V GT to CFP2 Interface Layout DesignBoard Stack Up DimensionsExample Design Channel PerformanceSimulation Results for Stratix V GT to CFP2 Connector Layout Design Desi…

【JavaSE】基础笔记 - 异常(Exception)

目录 1、异常的概念和体系结构 1.1、异常的概念 1.2、 异常的体系结构 1.3 异常的分类 2、异常的处理 2.1、防御式编程 2.2、异常的抛出 2.3、异常的捕获 2.3.1、异常声明throws 2.3.2、try-catch捕获并处理 3、自定义异常类 1、异常的概念和体系结构 1.1、异常的…

Mac安装配置typescript及在VSCode上运行ts

一、Mac上安装typescript sudo npm install -g typescript 测试一下&#xff1a;出现Version则证明安装成功 tsc -v 二、在VSCode上运行 新建一个xxx.ts文件&#xff0c;测试能否运行 console.log("helloworld") 运行报错&#xff1a;ts-node: command not…

LabVIEW中如何达到NI SMU最大采样率

LabVIEW中如何达到NI SMU最大采样率 NISMU的数字化仪功能对于捕获SMU详细的瞬态响应特性或表征待测设备&#xff08;DUT&#xff09;响应&#xff08;例如线性调整率和负载调整率&#xff09;至关重要。没有此功能&#xff0c;将需要一个外部示波器。 例如&#xff0c;假设在…

uniapp 轮播图(含组件封装,自动注册全局组件)

效果预览 组件封装 src\components\SUI_Swiper.vue 可参考官网配置更多属性 swipernavigator <script setup lang"ts"> import { ref } from vue defineProps({config: Object, })const activeIndex ref(0) const change: UniHelper.SwiperOnChange (e) &…

关于elementui和ant design vue无法禁止浏览器自动填充问题

以and design vue 为例&#xff1a; 图标用来显隐账号密码 html&#xff1a; <a-form-model-item label"账号密码:" prop"password"><a-input v-if"passwordTab" ref"passwordInput" v-model"form.password" typ…

element中el-switch的v-model自定义值

一、问题 element中的el-switch的值默认都是true或false&#xff0c;但是有些时候后端接口该字段可能是0或者1&#xff0c;如果说再转换一次值&#xff0c;那就有点太费力了。如下所示&#xff1a; <template><el-switchinactive-text"否"active-text&quo…