【Python】PySpark

前言

Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。

简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。

Spark对Python语言的支持,重点体现在Python第三方库:PySpark

PySpark是由Spark官方开发的Python语言第三方库。

Python开发者可以使用pip程序快速的安装PySpark并像其它第三方库那样直接使用。

在这里插入图片描述

基础准备

安装

同其它的Python第三方库一样,PySpark同样可以使用pip程序进行安装。

pip install pyspark

或使用国内代理镜像网站(清华大学源)
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark

构建PySpark执行环境入口对象

想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象。

PySpark的执行环境入口对象是:类SparkContext的类对象

# 导包
from pyspark import SparkConf, SparkContext

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

# 打印PySpark的运行版本
print(sc.version)

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

运行需要Java环境,推荐jdk8

PySpark的编程模型

SparkContext类对象,是PySpark编程中一切功能的入口。

PySpark的编程,主要分为如下三大步骤:

在这里插入图片描述

数据输入

PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象

RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)

PySpark针对数据的处理,都是以RDD对象作为载体,即:

  • 数据存储在RDD内
  • 各类数据的计算方法,也都是RDD的成员方法
  • RDD的数据计算方法,返回值依旧是RDD对象

在这里插入图片描述

Python数据容器转RDD对象

PySpark支持通过SparkContext对象的parallelize成员方法,将list/tuple/set/dict/str转换为PySpark的RDD对象

# 导包
from pyspark import SparkConf, SparkContext

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd1 = sc.parallelize([1, 2, 3])    
rdd2 = sc.parallelize((1, 2, 3))    
rdd3 = sc.parallelize({1, 2, 3})    
rdd4 = sc.parallelize({'key1': 'value1', 'key2': 'value2'}) 
rdd5 = sc.parallelize('hello')  

# 输出RDD的内容,需要使用collect()
print(rdd1.collect())   # [1, 2, 3]
print(rdd2.collect())   # [1, 2, 3]
print(rdd3.collect())   # [1, 2, 3]
print(rdd4.collect())   # ['key1', 'key2']
print(rdd5.collect())   # ['h', 'e', 'l', 'l', 'o']

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

注意:

  • 字符串会被拆分出一个个的字符,存入RDD对象
  • 字典仅有key会被存入RDD对象

读取文件转RDD对象

PySpark也支持通过SparkContext入口对象来读取文件,构建出RDD对象。

先提前预备一个txt文件

hello
python
day
# 导包
from pyspark import SparkConf, SparkContext

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.textFile('E:\\code\\py-space\\8.27\\hello.txt')

# 输出RDD的内容,需要使用collect()
print(rdd.collect())    # ['hello', 'python', 'day']

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

数据计算

RDD对象内置丰富的:成员方法(算子)

map算子

将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD

rdd.map(func)
# func: f:(T) -> U
# f: 表示这是一个函数
# (T) -> U 表示的是方法的定义:()表示无需传入参数,(T)表示传入1个参数
# T是泛型的代称,在这里表示 任意类型
# U是泛型的代称,在这里表示 任意类型

# (T) -> U : 这是一个函数,该函数接收1个参数,传入参数类型不限,返回一个返回值,返回值类型不限
# (A) -> A : 这是一个函数,该函数接收1个参数,传入参数类型不限,返回一个返回值,返回值类型和传入参数类型一致

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 5, 6])

# 通过map方法将全部数据乘以10,传入参数为函数
rdd2 = rdd.map(lambda x: x * 10)

# 输出RDD的内容,需要使用collect()
print(rdd2.collect())   # [10, 20, 30, 40, 50, 60]

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

由于map()的返回值还是RDD对象,可以继续在尾部进行链式调用

rdd3 = rdd.map(lambda x: x * 10).map(lambda x: x + 9)

flatMap算子

对RDD执行map操作,然后进行解除嵌套操作。

在这里插入图片描述

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize(['a b c', 'd e f'])

# 输出RDD的内容,需要使用collect()
print(rdd.map(lambda x: x.split(' ')).collect())    # [['a', 'b', 'c'], ['d', 'e', 'f']]
print(rdd.flatMap(lambda x:x.split(' ')).collect())   # ['a', 'b', 'c', 'd', 'e', 'f']

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

reduceByKey算子

针对KV型(二元元组)RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作

rdd.reduceByKey(func)
# func: (V, V) -> V
# 接收2个传入参数(类型要一致),返回一个返回值,返回值类型和传入参数类型要求一致

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])

# 输出RDD的内容,需要使用collect()
print(rdd.reduceByKey(lambda a, b: a+b).collect())  # [('b', 3), ('a', 2)]

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

reduceByKey中的聚合逻辑是:比如有[1,2,3,4,5],然后聚合函数是:lambda a,b: a+b

在这里插入图片描述

注意:reduceByKey中接收的函数,只负责聚合,不理会分组;分组是自动by key来分组的

filter算子

过滤想要的数据进行保留。

rdd.filter(func)
# func: (T) -> bool
# 传入一个参数任意类型,返回值必须是True/False,返回是True的数据被保留,False的数据被丢弃

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 5, 6])

# 输出RDD的内容,需要使用collect()
print(rdd.filter(lambda x: x % 2 == 0).collect())  # [2, 4, 6]

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

distinct算子

对RDD数据进行去重,返回新的RDD

rdd.distinct() # 无需传参

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 3, 2, 6])

# 输出RDD的内容,需要使用collect()
print(rdd.distinct().collect())  # [6, 1, 2, 3]

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

sortBy算子

对RDD数据进行排序,基于你指定的排序依据。

rdd.sortKey(func, ascending=False, numPartitions=1)
# func: (T) -> U:告知按照RDD中的哪个数据进行排序,比如lambda x: x[1]表示按照RDD中的第二列元素进行排序
# ascending:True升序,False降序
# numPartitions:用多少分区排序,全局排序需要设置为1

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize([('Aiw', 9), ('Tom', 6), ('Jack', 8), ('Bolb', 5)])

# 输出RDD的内容,需要使用collect()
print(rdd.sortBy(lambda x: x[1], ascending=False,
      numPartitions=1).collect())  # [('Aiw', 9), ('Jack', 8), ('Tom', 6), ('Bolb', 5)]

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

数据输出

collect算子

将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象。

rdd.collect()
# 返回值是一个List

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3])

rdd_list: list = rdd.collect()

print(rdd_list)   # [1, 2, 3]
print(type(rdd_list))   # <class 'list'>

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

reduce算子

对RDD数据集按照你传入的逻辑进行聚合

rdd.reduce(func)
# func:(T, T) -> T
# 传入2个参数,1个返回值,要求返回值和参数类型一致

在这里插入图片描述

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize(range(1, 10))

print(rdd.reduce(lambda a, b: a+b))   # 45

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

take算子

取RDD的前N个元素,组合成List进行返回。

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize(range(1, 10))

rdd_take: list = rdd.take(3)

print(rdd_take)   # [1, 2, 3]
print(type(rdd_take))   # <class 'list'>

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

count算子

计算RDD有多少条数据,返回值是一个数字。

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize(range(1, 10))

rdd_count: int = rdd.count()

print(rdd_count)   # 9
print(type(rdd_count))   # <class 'int'>

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

saveAsTextFile算子

将RDD的数据写入文本文件中。支持本地写出、HDFS等文件系统。

注意事项:

在这里插入图片描述

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'
os.environ['HADOOP_HOME'] = 'D:/Hadoop-3.0.0'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize(range(1, 10))

rdd.saveAsTextFile('./8.27/output') # 运行之前确保输出文件夹不存在,否则报错

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

上述代码输出结果,输出文件夹内有多个分区文件

修改RDD分区为1个

方式一:SparkConf对象设置属性全局并行度为1:

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')
# 设置属性全局并行度为1
conf.set('spark.default.parallelism','1')
# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

方式二:创建RDD的时候设置(parallelize方法传入numSlices参数为1)

rdd = sc.parallelize(range(1, 10), numSlices=1)
rdd = sc.parallelize(range(1, 10), 1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/101942.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

知识图谱实战应用26-基于知识图谱构建《本草纲目》的中药查询与推荐项目应用

大家好,我是微学AI,今天给大家介绍一下知识图谱实战应用26-基于知识图谱构建《本草纲目》的中药查询与推荐项目应用,本文通过Py2neo连接到知识图谱数据库,系统实现了中药的快速查询、关系分析、智能推荐和知识展示等功能。用户可以输入中药的名称或特征进行查询,系统将从知…

分页功能实现

大家好 , 我是苏麟 , 今天聊一聊分页功能 . Page分页构造器是mybatisplus包中的一个分页类 . Page分页 引入依赖 <dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.1</ver…

【LeetCode每日一题】——274.H指数

文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 排序 二【题目难度】 中等 三【题目编号】 274.H指数 四【题目描述】 给你一个整数数组 ci…

linux系统中串口驱动框架基本分析(经典)

第一&#xff1a;区分不同的终端类型 串行端口终端&#xff08;/dev/ttySn&#xff09; 串行端口终端&#xff08;Serial Port Terminal&#xff09;是使用计算机串行端口连接的终端设备。计算机把每个串行端口都看作是一个字符设备。 有段时间这些串行端口设备通常被称为终…

Python:列表推导式

相关阅读 Python专栏https://blog.csdn.net/weixin_45791458/category_12403403.html?spm1001.2014.3001.5482 列表推导式使得创建特定列表的方式更简洁。常见的用法为&#xff0c;对序列或可迭代对象中的每个元素应用某种操作&#xff0c;用生成的结果创建新的列表&#xff…

Python—匹配字段

1. 「概述」 在日常开发中&#xff0c;经常需要对数据中的某些字段进行匹配&#xff0c;但这些字段可能存在微小的差异。例如&#xff0c;同一个招聘岗位的数据中&#xff0c;省份字段可能有“广西”、“广西壮族自治区”和“广西省”等不同的写法。为了处理这些情况&#xff…

(数字图像处理MATLAB+Python)第十章图像分割-第四,五节:分水岭分割和综合案例

文章目录 一&#xff1a;分水岭分割&#xff08;1&#xff09;原理&#xff08;2&#xff09;程序 二&#xff1a;综合案例&#xff1a;答题卡图像分割&#xff08;1&#xff09;设计思路&#xff08;2&#xff09;各模块设计&#xff08;3&#xff09;代码 一&#xff1a;分水…

three.js(二):webpack + three.js + ts

用webpackts 开发 three.js 项目 webpack 依旧是主流的模块打包工具;ts和three.js 是绝配&#xff0c;three.js本身就是用ts写的&#xff0c;ts可以为three 项目提前做好规则约束&#xff0c;使项目的开发更加顺畅。 1.创建一个目录&#xff0c;初始化 npm mkdir demo cd de…

第五章 树与二叉树 二、二叉树的定义和常考考点,WPL的算法

一、定义 二叉树可以用以下方式详细定义&#xff1a; 二叉树是由节点构成的树形结构&#xff0c;每个节点最多可以有两个子节点。每个节点有以下几个属性&#xff1a; 值&#xff1a;存储该节点的数据。左子节点&#xff1a;有一个左子节点&#xff0c;如果没有则为空。右子节…

Visual Studio 2017安装和项目配置

目录 前言1. What、Why and How1.1 What1.2 Why1.3 How 2. 安装3. 创建新项目4. 配置OpenCV库4.1 下载opencv安装包4.2 配置系统环境变量4.3 VS项目环境配置4.4 总结 5. 已有项目添加6. Tips6.1 常用快捷键6.2 字体和颜色选择6.3 配置编译路径 结语下载链接参考 前言 最近因为项…

【STM32教程】第二章 通用输入输出口GPIO

资料下载链接&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1hsIibEmsB91xFclJd-YTYA?pwdjauj 提取码&#xff1a;jauj 1. GPIO的基本结构 1.1 概述 GPIO&#xff08;General Purpose Input Output&#xff09;意思是通用输入输出口可配置为8种输入输出模式&a…

I IntelliJ IDEA 2023.2 最新解锁方式,支持java20

在 IntelliJ IDEA 2023.1 中&#xff0c;我们根据用户的宝贵反馈对新 UI 做出了大量改进。 我们还实现了性能增强&#xff0c;从而更快导入 Maven&#xff0c;以及在打开项目时更早提供 IDE 功能。 新版本通过后台提交检查提供了简化的提交流程。 IntelliJ IDEA Ultimate 现在支…

【Terraform学习】使用 Terraform创建DynamoDB添加项目(Terraform-AWS最佳实战学习)

本站以分享各种运维经验和运维所需要的技能为主 《python》&#xff1a;python零基础入门学习 《shell》&#xff1a;shell学习 《terraform》持续更新中&#xff1a;terraform_Aws学习零基础入门到最佳实战 《k8》暂未更新 《docker学习》暂未更新 《ceph学习》ceph日常问题解…

Django静态文件媒体文件文件上传

文章目录 一、静态文件和媒体文件1.在django中使用静态文件实践2.在django中使用媒体文件 二、文件上传单文件上传实践多文件上传 一、静态文件和媒体文件 媒体文件: 用户上传的文件&#xff0c;叫做media 静态文件:存放在服务器的css,js,image,font等 叫做static1.在django中…

Docker(三) 创建Docker镜像

一、在Docker中拉取最基本的Ubuntu系统镜像 搜索Ubuntu镜像 Explore Dockers Container Image Repository | Docker Hub 下载镜像 docker pull ubuntu:22.04 二、在镜像中添加自己的内容 使用ubuntu镜像创建容器 docker run -it ubuntu:20.04 /bin/bash 在容器中创建了一个文…

文心一言接入Promptulate,开发复杂LLM应用程序

简介 最近在尝试将文心一言的LLM能力接入Promptulate&#xff0c;故写了一篇博客记录一下&#xff0c;Promptulate 是 Promptulate AI 旗下的大语言模型自动化与应用开发框架&#xff0c;旨在帮助开发者通过更小的成本构建行业级的大模型应用&#xff0c;其包含了LLM领域应用层…

CP Autosar-Ethernet配置

文章目录 前言一、Eth层级结构介绍二、Autosar实践2.1 ETH Driver2.2 Eth InterfaceEth Interface Autosar配置2.3 TcpIp模块Eth TcpIp Autosar配置2.4 SoAdEth SoAd配置前言 因汽车E/E架构和功能的复杂度提升而带来的对车辆数据传输带宽提高和通讯方式改变(基于服务的通讯-S…

万字长文:Stable Diffusion 保姆级教程

万字长文&#xff1a;Stable Diffusion 保姆级教程 2022年绝对是人工智能爆发的元年&#xff0c;前有 stability.ai 开源 Stable Diffusion 模型&#xff0c;后有 Open AI 发布 ChatGPT&#xff0c;二者都是里程碑式的节点事件&#xff0c;其重要性不亚于当年苹果发布iPhone&a…

ELK安装、部署、调试 (七)kibana的安装与配置

1.介绍 Kibana 是一个基于浏览器的开源可视化工具&#xff0c;主要用于分析大量日志&#xff0c;以折线图、条形图、饼图、热图、区域图、坐标图、仪表、目标、时间等形式。预测或查看输入源的错误或其他重大事件趋势的变化。Kibana 与 Elasticsearch 和 Logstash 同步工作&am…

【C++习题集】-- 顺序表、链表

&#xff08;用于复习&#xff09; 目录 线性表 顺序表 链表 单链表 单向 \ 双向 带哨兵位 \ 不带哨兵位 循环 \ 非循环 无头单向非循环链表实现 oj题 203. 移除链表元素 206. 反转链表 快慢指针 141.环形链表 【解题思路】 带头双向循环链表 顺序表和链表的区…