【Python】PySpark 数据计算 ⑤ ( RDD#sortBy方法 - 排序 RDD 中的元素 )

文章目录

  • 一、RDD#sortBy 方法
    • 1、RDD#sortBy 语法简介
    • 2、RDD#sortBy 传入的函数参数分析
  • 二、代码示例 - RDD#sortBy 示例
    • 1、需求分析
    • 2、代码示例
    • 3、执行结果





一、RDD#sortBy 方法




1、RDD#sortBy 语法简介


RDD#sortBy 方法 用于 按照 指定的 键 对 RDD 中的元素进行排序 , 该方法 接受一个 函数 作为 参数 , 该函数从 RDD 中的每个元素提取 排序键 ;

根据 传入 sortBy 方法 的 函数参数 和 其它参数 , 将 RDD 中的元素按 升序 或 降序 进行排序 , 同时还可以指定 新的 RDD 对象的 分区数 ;


RDD#sortBy 语法 :

sortBy(f: (T) ⇒ U, ascending: Boolean, numPartitions: Int): RDD[T]
  • 参数说明 :
    • f: (T) ⇒ U 参数 : 函数 或 lambda 匿名函数 , 用于 指定 RDD 中的每个元素 的 排序键 ;
    • ascending: Boolean 参数 : 排序的升降设置 , True 生序排序 , False 降序排序 ;
    • numPartitions: Int 参数 : 设置 排序结果 ( 新的 RDD 对象 ) 中的 分区数 ;
      • 当前没有接触到分布式 , 将该参数设置为 1 即可 , 排序完毕后是全局有序的 ;
  • 返回值说明 : 返回一个新的 RDD 对象 , 其中的元素是 按照指定的 排序键 进行排序的结果 ;

2、RDD#sortBy 传入的函数参数分析


RDD#sortBy 传入的函数参数 类型为 :

(T) ⇒ U

T 是泛型 , 表示传入的参数类型可以是任意类型 ;

U 也是泛型 , 表示 函数 返回值 的类型 可以是任意类型 ;

T 类型的参数 和 U 类型的返回值 , 可以是相同的类型 , 也可以是不同的类型 ;





二、代码示例 - RDD#sortBy 示例




1、需求分析


统计 文本文件 word.txt 中出现的每个单词的个数 , 并且为每个单词出现的次数进行排序 ;

Tom Jerry
Tom Jerry Tom
Jack Jerry Jack Tom

在这里插入图片描述

读取文件中的内容 , 统计文件中单词的个数并排序 ;

思路 :

  • 读取数据到 RDD 中 ,
  • 然后 按照空格分割开 再展平 , 获取到每个单词 ,
  • 根据上述单词列表 , 生成一个 二元元组 列表 , 列表中每个元素的 键 Key 为单词 , 值 Value 为 数字 1 ,
  • 对上述 二元元组 列表 进行 聚合操作 , 相同的 键 Key 对应的 值 Value 进行相加 ;
  • 将聚合后的结果的 单词出现次数作为 排序键 进行排序 , 按照升序进行排序 ;

2、代码示例


对 RDD 数据进行排序的核心代码如下 :

# 对 rdd4 中的数据进行排序
rdd5 = rdd4.sortBy(lambda element: element[1], ascending=True, numPartitions=1)

要排序的数据如下 :

[('Tom', 4), ('Jack', 2), ('Jerry', 3)]

按照上述二元元素的 第二个 元素 进行排序 , 对应的 lambda 表达式为 :

lambda element: element[1]

ascending=True 表示升序排序 ,

numPartitions=1 表示分区个数为 1 ;


排序后的结果为 :

[('Jack', 2), ('Jerry', 3), ('Tom', 4)]

代码示例 :

"""
PySpark 数据处理
"""

# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext
# 为 PySpark 配置 Python 解释器
import os
os.environ['PYSPARK_PYTHON'] = "D:/001_Develop/022_Python/Python39/python.exe"

# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("hello_spark")

# 创建 PySpark 执行环境 入口对象
sparkContext = SparkContext(conf=sparkConf)

# 打印 PySpark 版本号
print("PySpark 版本号 : ", sparkContext.version)

# 将 文件 转为 RDD 对象
rdd = sparkContext.textFile("word.txt")
print("查看文件内容 : ", rdd.collect())

# 通过 flatMap 展平文件, 先按照 空格 切割每行数据为 字符串 列表
#   然后展平数据解除嵌套
rdd2 = rdd.flatMap(lambda element: element.split(" "))
print("查看文件内容展平效果 : ", rdd2.collect())

# 将 rdd 数据 的 列表中的元素 转为二元元组, 第二个元素设置为 1
rdd3 = rdd2.map(lambda element: (element, 1))
print("转为二元元组效果 : ", rdd3.collect())

# 应用 reduceByKey 操作,
#   将同一个 Key 下的 Value 相加, 也就是统计 键 Key 的个数
rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
print("统计单词 : ", rdd4.collect())

# 对 rdd4 中的数据进行排序
rdd5 = rdd4.sortBy(lambda element: element[1], ascending=True, numPartitions=1)
print("最终统计单词并排序 : ", rdd4.collect())

# 停止 PySpark 程序
sparkContext.stop()



3、执行结果


执行结果 :

D:\001_Develop\022_Python\Python39\python.exe D:/002_Project/011_Python/HelloPython/Client.py
23/08/04 10:49:06 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: Could not locate Hadoop executable: D:\001_Develop\052_Hadoop\hadoop-3.3.4\bin\winutils.exe -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
PySpark 版本号 :  3.4.1
查看文件内容 :  ['Tom Jerry', 'Tom Jerry Tom', 'Jack Jerry Jack Tom']
查看文件内容展平效果 :  ['Tom', 'Jerry', 'Tom', 'Jerry', 'Tom', 'Jack', 'Jerry', 'Jack', 'Tom']
转为二元元组效果 :  [('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jack', 1), ('Jerry', 1), ('Jack', 1), ('Tom', 1)]
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
统计单词 :  [('Tom', 4), ('Jack', 2), ('Jerry', 3)]
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
最终统计单词并排序 :  [('Jack', 2), ('Jerry', 3), ('Tom', 4)]

Process finished with exit code 0

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/58690.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

细讲一个 TCP 连接能发多少个 HTTP 请求(一)

一道经典的面试题是从 URL 在浏览器被被输入到页面展现的过程中发生了什么,大多数回答都是说请求响应之后 DOM 怎么被构建,被绘制出来。但是你有没有想过,收到的 HTML 如果包含几十个图片标签,这些图片是以什么方式、什么顺序、建…

Arthas GC日志-JVM(十八)

上篇文章说jvm的实际运行情况。 Jvm实际运行情况-JVM(十七) Arthas介绍 因为arthas完全是java代码写的,我们直接用命令启动: Java -jar arthas-boot.jar 启动成功后,选择我们项目的进程。 进入我们可用dashboard…

django实现部门表的增删改查界面

1、前期准备 部署好mysql数据库,创建好unicom数据库下载好bootstap的插件下载好jquery的插件下载好mysqlclient-1.4.6-cp36-cp36m-win_amd64.whl的安装包,根据python的版本下载 2、创建项目 在pycharm中创建项目 在pycharm的终端创建虚拟环境 py -m v…

【Docker】Docker安装Elasticsearch服务的正确方式

文章目录 1. 什么是Elasticsearch2. Docker安装Elasticsearch2.1 确定Elasticsearch的版本2.2. Docker安装Elasticsearch2.3. 给Elasticsearch安装中文分词器IKAnalyzer(可选) 点击跳转:Docker安装MySQL、Redis、RabbitMQ、Elasticsearch、Na…

Django使用uwsgi+nginx部署,admin没有样式解决办法

Django使用uwsginginx部署,admin没有样式解决办法 如果使用了虚拟环境则修改nginx.conf文件中的/static/路径为你虚拟环境的路径,没有使用虚拟环境则改为你python安装路径下的static server {listen 8008;server_name location; #改为自己的域名,没域名…

mybatisplus实现自动填充 时间

mybatisplus实现自动填充功能——自动填充时间 数据库表中的字段 创建时间 (createTime)更新时间 (updateTime) 每次 增删改查的时候,需要通过对Entity的字段(createTime,updateTime)进行set设置,但是,每…

途乐证券|俄罗斯宣布9月削减石油出口量

当地时间周四,美股兜售潮仍在持续,三大股指连续第二个交易日团体收跌。到收盘,道指跌落0.19%,标普500指数跌落0.25%,纳指跌幅为0.10%。 美国ISM7月非制造业PMI下滑 数据面上,美国供应办理协会ISM周四发布的…

电力巡检无人机助力迎峰度夏,保障夏季电力供应

夏季是电力需求量较高的时期,随着高温天气的来临,风扇、空调和冰箱等电器的使用量也大大增加,从而迎来夏季用电高峰期,电网用电负荷不断攀升。为了保障夏季电网供电稳定,供电公司会加强对电力设施设备的巡检&#xff0…

mmap函数详解

1、什么是mmap mmap是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。 实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存&#x…

Android入门教程||Android 架构||Android 应用程序组件

Android 架构 Android 操作系统是一个软件组件的栈,在架构图中它大致可以分为五个部分和四个主要层。 Linux内核 在所有层的最底下是 Linux - 包括大约115个补丁的 Linux 3.6。它提供了基本的系统功能,比如进程管理,内存管理,设…

Android 面试题 应用程序结构 十一

🔥 Framework主要包含以下模块 🔥 ActivityManagerService 这是一个Activity的管理者,负责管理所有Activity的生命周期。WindowManagerService 它是手机屏幕的的管理者,管理着屏幕的详细情况,所有对屏幕的操作最终都…

express学习笔记4 - 热更新以及express-boom

我们每次改动代码的时候都要重启项目,现在我们给项目添加一个热更新 npm install --save-dev nodemon # or using yarn: yarn add nodemon -D 在package.json添加一行代码 "dev": "nodemon ./bin/www" 重启项目 然后随便做改动&#xff…

【初阶C语言】学会结构体

1.结构体类型的声明 2.结构体初始化 3.结构体成员访问 4.结构体传参 前言:结构是一些值的集合,这些值称为成员变量。结构的每个成员可以是不同类型的变量。 一、结构体类型的声明 1.结构的声明 结构体声明的模板: struct tag {member-li…

Java版工程行业管理系统源码-专业的工程管理软件-em提供一站式服务

​ Java版工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离 功能清单如下: 首页 工作台:待办工作、消息通知、预警信息,点击可进入相应的列表 项目进度图表:选择(总体或单个)项目…

sqlyog导出mysql数据字典

1.打开sqlyog执行sql获取字典数据 SELECTt.COLUMN_NAME AS 字段名,t.COLUMN_TYPE AS 数据类型,CASE IFNULL(t.COLUMN_DEFAULT,Null) WHEN THEN 空字符串 WHEN Null THEN NULL ELSE t.COLUMN_DEFAULT END AS 默认值,CASE t.IS_NULLABLE WHEN YES THEN 是 ELSE 否 END AS 是否…

docker的使用

docker安装 https://docs.docker.com/engine/install/debian/ 设置国内镜像 创建或修改 /etc/docker/daemon.json 文件,修改为如下形式 {"registry-mirrors": ["https://registry.hub.docker.com","http://hub-mirror.c.163.com"…

音频光耦合器

音频光耦合器是一种能够将电信号转换为光信号并进行传输的设备。它通常由发光二极管(LED)和光敏电阻(光电二极管或光敏电阻器)组成。 在音频光耦合器中,音频信号经过放大和调节后,被转换为电流信号&#xf…

opencv基础40-礼帽运算(原始图像减去其开运算)cv2.MORPH_TOPHAT

礼帽运算是用原始图像减去其开运算图像的操作。礼帽运算能够获取图像的噪声信息,或者得到比原始图像的边缘更亮的边缘信息。 例如,图 8-22 是一个礼帽运算示例,其中: 左图是原始图像。中间的图是开运算图像。右图是原始图像减开运…

LeetCode 热题 100 JavaScript--543. 二叉树的直径

给你一棵二叉树的根节点,返回该树的 直径 。 二叉树的 直径 是指树中任意两个节点之间最长路径的 长度 。这条路径可能经过也可能不经过根节点 root 。 两节点之间路径的 长度 由它们之间边数表示。 var diameterOfBinaryTree function(root) {var maxDiameter…

多雷达探测论文阅读笔记:雷达学报 2023, 多雷达协同探测技术研究进展:认知跟踪与资源调度算法

多雷达协同探测技术 原始笔记链接:https://mp.weixin.qq.com/s?__biz=Mzg4MjgxMjgyMg==&mid=2247486627&idx=1&sn=f32c31bfea98b85f2105254a4e64d210&chksm=cf51be5af826374c706f3c9dcd5392e0ed2a5fb31ab20924b7dd38e1b1ae32abe9a48afa8174#rd ↑ \uparrow …