pyspark入门基础详细讲解

1.前言介绍

学习目标:了解什么是Speak、PySpark,了解为什么学习PySpark,了解课程是如何和大数据开发方向进行衔接

使用pyspark库所写出来的代码,既可以在电脑上简单运行,进行数据分析处理,又可以把代码无缝迁移到成百上千的服务器集群上去做分布式计算。

为什么要学习pyspark呢?

总结

2.基础准备

学习目标:掌握pyspark库的安装,掌握pyspark执行环境入口对象的构建,理解pyspark的编程模型。

建议使用国内代理镜像网站下载更快。

 简化代码,本质上是同一个意思,链式结构,链式调用化简程序 基本原则,就是我不管调用什么方法,我的返回值都是同一个对象啊

代码展示:
"""
演示获取pyspark的执行环境入库对象:SparkContext
并通过SparkContext对象获取当前PySpark的版本
"""

# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象  setMaster是描写运行模式   setAppName是设置当前Spark任务的名字
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 同一个意思,链式结构,链式调用化简程序
# 基本原则,就是我不管调用什么方法,我的返回值都是同一个对象啊
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

spark需要启动时间,所以代码的运行一小会,3.5.3就是当前spark的运行版本

这个sc非常非常重要哦,后续给大家讲解。

通过sc拿到数据输入,数据处理计算是通过RDD类对象的一系列成员方法来对数据进行计算,然后把结果对外进行输出

我们只需要记住后期写spark代码的三大步,把数据加载进来,对数据进行计算,把结果输出去

总结

3.数据输入

学习目标:理解RDD对象,掌握PySpark数据输入的2种方法。

RDD就和列表等数据容器差不多

python数据容器转RDD对象

parallelize成员方法把数据容器存入RDD对象

如果要查看RDD里面有什么内容,需要用collect()方法

字符串会把每一个字符都拆出来,存入RDD对象,字典仅有key被存入RDD对象

读取文件转RDD对象

总结

4.数据计算

map方法

学习目标:掌握RDD的map方法

map会把传入的每一个参数都返回一个值

你会发现报错了,报错的原因是spark没有找到python解释器

给他指定一条路径,这样就没有问题了。如果指定路径之后还是没有解决的,可能是因为pycharm版本太新,降低版本就行了,建议是pycharm3.10

对于简单函数我们可以使用lambda匿名函数。

结果是一样的

链式调用

总结

flatMap方法

学习目标:掌握RDD的flatMap方法对数据进行计算。

通过map,可以看到尽管我们把数据分成一个一个的,但是还是存在嵌套,依旧被嵌套在list当中

当我们使用了flatMap方法后,发现解除了嵌套

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] ="D:/python/venv/Scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize(["ikun 22","ikun 3","ik unh hhhh"])
# 需求:将RDD数据里面的一个个单词提取出来
rdd2 = rdd.flatMap(lambda x: x.split(" ")) # 使用空格进行切分
print(rdd2.collect())

使用flatMap可以解除内部嵌套,语法与map一样

总结:

reduceByKey方法

学习目标:掌握RDD的reduceByKey方法    

 

二元元组指的是元组里面存储的只有两个元素

KV型的RDD一般是两个元素,把第一个元素当成key,第二个当成value,自动按照key分组,然后根据你传入的逻辑计算value

(v,v)->(v)  意思是传入两个相同类型的参数,返回一个返回值,类型和传入要求一致

自动分组并且组内求和

总结

可以完成按key进行分组,并且组内进行逻辑计算

练习案例1

学习目标:完成使用PySpark进行单词计数的案例

数据文件

取出所有的单词,flatMap是把单词一个一个取出来,map是把单词一行一行取出来,一行是一个列表。

把单词转换成二元元组

完整代码

"""
完成练习案例:单词计数统计
"""
from pyspark import SparkConf,SparkContext
import os
# 1.构建执行环境入口对象
os.environ['PYSPARK_PYTHON'] ="D:/python/venv/Scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 2.读取数据文件
rdd = sc.textFile("D:/word.txt")
# 3.取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# 4.将所有单词都转换成二元元组,单词为key,value设置为1
# (hello,1) (spark,1) (itheima,1) (itcast,1)
word_with_one_rdd = word_rdd.map(lambda word: (word,1))
# 5.分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b)
# 6.打印输出结果
print(result_rdd.collect())

filter方法

学习目标:掌握RDD的filter方法

True被保留,False被丢弃

总结

distinct方法

学习目标:掌握RDD的distinct方法

不需要传入参数,功能简单就是去重操作

总结

sortBy方法

学习目标:掌握RDD的sortBy方法进行内容的排序

接收函数传入参数并且有一个返回值

目前我们没有解除到分布式,就先写上numPartitions=1

之前写过一个读取文件,统计单词的个数,现在让我们对他进行排序

可以自己控制升序或者降序,True升序,False降序

from pyspark import SparkConf,SparkContext
import os
# 1.构建执行环境入口对象
os.environ['PYSPARK_PYTHON'] ="D:/python/venv/Scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 2.读取数据文件
rdd = sc.textFile("D:/word.txt")
# 3.取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# 4.将所有单词都转换成二元元组,单词为key,value设置为1
# (hello,1) (spark,1) (itheima,1) (itcast,1)
word_with_one_rdd = word_rdd.map(lambda word: (word,1))
# 5.分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b)
# 6.对结果进行排序
final_rdd = result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(final_rdd.collect())

总结:

练习案例2

学习目标:完成练习案例2的开发

完整代码:

"""
完成练习案例:json商品统计
"""
# 1.各个城市销售额排名,从小到大
# 2.全部城市,有哪些商品类别在售卖
# 3.北京市有哪些商品类别在售卖
from pyspark import SparkConf,SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] ='D:/python/venv/Scripts/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 需求1:城市销售额排名
# 1.1 读取文件得到RDD
file_rdd = sc.textFile("D:/2222.txt")
# 1.2 取出一个个json字符串
json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))

# 1.3 将一个个json字符串转换为字典
dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
# 1.4 取出城市和销售额数据
city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'],int(x['money'])))
# 1.5 按城市分组按销售额聚合
city_result_rdd = city_with_money_rdd.reduceByKey(lambda a,b:a+b)
# 1.6 按销售额聚合结果进行排序
result1_rdd = city_result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print("需求1的结果:",result1_rdd.collect())
# 需求2:全部城市有哪些商品类别在售卖
# 2.1 取出全部的商品类别
# 2.2 对全部商品类别进行去重
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2的结果:",category_rdd.collect())
# 需求3:北京市有哪些商品类别在售卖
# 3.1 过滤北京市的数据
beijing_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')
# 3.2 取出全部商品类别
# 3.3 进行商品类别去重
result3_rdd = beijing_data_rdd.map(lambda x: x['category']).distinct()
print("需求3的结果:",result3_rdd.collect())

5.数据输出

输出为python对象

学习目标:掌握将RDD的结果输出为python对象的各类方法

collect算子

reduce算子

reduce和reducebykey的区别是reducebykey是获取key然后组内计算,reduce是单纯的直接计算

take算子

就是取前N个元素

count算子

总结

from pyspark import SparkConf,SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] ='D:/python/venv/Scripts/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备RDD
rdd = sc.parallelize([1,2,3,4,5])
# collect算子,输出RDD为list对象
rdd_list:list = rdd.collect()
print(rdd_list)
print(type(rdd_list))
# reduce算子,对RDD进行两两融合
num = rdd.reduce(lambda a,b: a+b)
print(num)
# take算子,取出RDD前N个元素,组成list返回
take_list = rdd.take(3)
print(take_list)
# count,统计rdd内有多少条数据,返回值为数字
num_count = rdd.count()
print(f"rdd内有{num_count}个元素")
sc.stop()

输出到文件中

学习目标:掌握将RDD的内容输出到文件中,了解如何更改RDD的分区数为1

报错了,原因是配置的问题,接下来我们给他配置

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/915671.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Qt 编写插件plugin,支持接口定义信号

https://blog.csdn.net/u014213012/article/details/122434193?spm1001.2014.3001.5506 本教程基于该链接的内容进行升级,在编写插件的基础上,支持接口类定义信号。 环境:Qt5.12.12 MSVC2017 一、创建项目 新建一个子项目便于程序管理【…

【python】python使用虚拟环境

使用虚拟环境的好处是创建一个独立干净的环境 首先cd到新项目的目录下 创建虚拟环境 使用日期命名方便自己找到版本 python -m venv venv20241114激活虚拟环境 .\venv20241114\Scripts\activate会创建一个文件夹 点进去可以看到是python的脚本所存文件结构 纯净环境 p…

CSS 技巧:如何让 div 完美填充 td 高度

引言 一天哈比比突然冒出一个毫无理头的一个问题: 本文就该问题进行展开… 原文链接: 昆仑虚F2E 一、需求说明 大致需求如下, 当然这里做了些简化 有如下初始代码: 一个自适应的表格每个单元格的宽度固定 200px每个单元格高度则是自适应每个单元格内是一个 div 标签, div 标签…

跟上AI的浪潮

现在AI技术已广泛应用至语音助手、写作、绘图、视频,甚至是各种语言的代码编写。平常我们都是应用别人开发好的模型,或者说智能体,那么我们自己能否做那个开发AI智能体的人,近期加了一个AI学习的大社区,几万在AI道路上…

专题十八_动态规划_斐波那契数列模型_路径问题_算法专题详细总结

目录 动态规划 动态规范五步走: 1. 第 N 个泰波那契数(easy) 解析: 1.状态表达式: 2.状态转移方程: 3.初始化: 4.填表顺序: 5.返回值 编写代码: 总结&#xff…

MySQL技巧之跨服务器数据查询:基础篇-更新语句如何写

MySQL技巧之跨服务器数据查询:基础篇-更新语句如何写 上一篇已经描述:借用微软的SQL Server ODBC 即可实现MySQL跨服务器间的数据查询。 而且还介绍了如何获得一个在MS SQL Server 可以连接指定实例的MySQL数据库的连接名: MY_ODBC_MYSQL 以及用同样的…

C/C++语言 多项式加法和乘法

多项式加法和乘法 多项式的加法题目描述输入输出样例步骤代码段全局变量设定新建结点合并链表 完整代码 多项式乘法题目描述输入输出样例代码段计算两多项式结果输入 完整代码 多项式的加法 题目描述 输入输出 样例 步骤 总体思想是用链表来做 ① 我们发现输入样例中&#xf…

ArkTs面向对象编程

ArkTs面向对象编程 1.1 面向对象编程概述 1.1.1 什么是面向对象编程 面向对象编程是一种编程范式,它使用“对象”来设计软件和创建可重用的程序设计 对象是包含数据和方法的实体,可以与其他对象进行交互 面相对象编程鼓励使用已有的对象来组合或修改以…

乳腺癌诊断分析——基于聚类分析实现

一、研究背景 乳腺癌属于恶性肿瘤,在早期发现后需要及早将病变组织切除,而且术后还要化疗和放射等辅助治疗,能够抑制癌细胞的扩散和增长。 二、研究目的 研究乳腺癌病人的患病特征通过聚类分析方法对特征进行分类通过上述聚类结果对乳腺诊…

丹摩征文活动|FLUX.1 和 ComfyUI:从部署到上手,轻松驾驭!

FLUX.1 和 ComfyUI:从部署到上手,轻松驾驭! FLUX.1历史曲线 黑森林实验室推出了一款名为FLUX.1的先进图像生成模型,根据不同用户需求,提供了三种独特的版本。 FLUX.1-pro:作为专为企业打造的强大闭源版本…

数据分析:16s差异分析DESeq2 | Corncob | MaAsLin2 | ALDEx2

禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍DESeq2原理计算步骤结果Corncob原理计算步骤结果MaAsLin2原理计算步骤结果ALDEx2原理计算步骤结果加载R包数据链接数据预处理微生物数据样本信息提取物种名称过滤零值保留结果读取…

OCR识别铁路电子客票

随着中国铁路客运领域进入全面数字化时代,国家税务总局、财政部和国铁集团于2024年10月18日联合发布公告,自2024年11月1日起,推广使用“电子发票(铁路电子客票)”。这一举措不仅为旅客出行提供了极大的便利&#xff0c…

【MySQL基础刷题】总结题型(三)

十题左右,便于复习 1.查询结果的质量和占比2.每月交易I3.销售分析III4.只出现一次的最大数字5.买下所有产品的客户6.员工的直属部门7.指定日期的产品价格 1.查询结果的质量和占比 avg大神啊… SELECT query_name, ROUND(avg(rating / position), 2) as quality, …

python 同时控制多部手机

在这个智能时代,我们的手机早已成为生活和工作中不可或缺的工具。无论是管理多个社交媒体账号,还是处理多台设备上的事务,如何更高效地控制多个手机成为了每个人的痛点。 今天带来的这个的软件为你提供了一键控制多部手机的强大功能。无论是办公、娱乐,还是社交,你都能通过…

外星人入侵

学习于Python编程从入门到实践(Eric Matthes 著) 整体目录:外星人入侵文件夹是打包后的不必在意 图片和音效都是网上下载的 音效下载网站:Free 游戏爆击中 Sound Effects Download - Pixabay 运行效果:可以上下左右移…

前端监控与埋点 全总结

一、概念 前端埋点是指在网页或者应用程序中插入特定的代码,用于收集用户的行为数据并发送给服务器进行分析。这些数据可以包括用户的点击、浏览、输入等操作,帮助开发者了解用户的在其网站中的行为,从而进行针对性的优化和改进。 前端埋点…

Python简单文件操作day9

1、文件操作的重要性和场景 重要性: 数据持久化、跨平台兼容性、数据备份与恢复、数据共享、配置管理、日志记录 应用场景: 数据分析、web开发、文本处理 2、文件的概念 文件是一个存储在某种持久性存储介质【硬盘、光盘、磁盘等】上的数据的结合。 …

指令存储和指令流水线

要求存储器的编址单位,首先观察到计算机采用的是32位定长指令字,因此一条指令就是32位,即4B,根据表中可知一条指令所占地址空间为08048104H-08048100H4H,因此所用的编制单位为字节(B) 将所有指令…

kafka管理工具

文章目录 前言一、Kafka Assistan1.1 描述1.2、配置安装 二、Conduktor2.1、描述2.2、配置安装 三、kafka-maneger3.1、描述3.2、配置安装3.3、命令启动3.4、[refer to](https://www.ctyun.cn/document/10000120/10033218#section-39755766f4910e4b) 前言 提示:这里…

JavaWeb常见注解

1.Controller 在 JavaWeb 开发中,Controller是 Spring 框架中的一个注解,主要用于定义控制器类(Controller),是 Spring MVC 模式的核心组件之一。它表示该类是一个 Spring MVC 控制器,用来处理 HTTP 请求并…