Python学习从0到1 day26 第三阶段 Spark ③ 数据计算 Ⅱ

目录

一、Filter方法

功能

语法

代码

总结

filter算子

二、distinct方法

功能

语法

代码

总结

distinct算子

三、SortBy方法

功能

语法

代码 

总结

sortBy算子

四、数据计算练习

需求:

解答

总结

去重函数:

过滤函数:

转换函数:

排序函数:


于是我驻足,享受无法复刻的一些瞬间

                                                        —— 24.11.9

一、Filter方法

功能

过滤想要的数据进行保留

语法

基于filter中我们传入的函数,决定rdd对象中哪个保留哪个丢弃

代码

from pyspark import SparkConf,SparkContext

# 设置spark中的python解释器对象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
# 对RDD的数据进行过滤,保留奇数,去除偶数

# 方法1:
def Retain(data):
    if data % 2 == 1:
        return True
    else:
        return False

# 对RDD数据进行过滤,留下奇数
rdd1 = rdd.filter(Retain)
print(rdd1.collect())

# 方法2:
rdd2 = rdd.filter(lambda num:num % 2 == 1)
print(rdd2.collect())


总结

filter算子

接受一个处理函数,可用lambda匿名函数快速编写

函数对RDD数据逐个处理,得到True的保留到返回值的RDD中


二、distinct方法

功能

对RDD数据进行去重,返回新RDD

语法

rdd.distinct()    # 无需传参

代码

from pyspark import SparkConf,SparkContext

# 设置spark中的python解释器对象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1,3,3,4,4,4,7,8,9,9])
rdd = rdd.distinct()
print(rdd.collect())


总结

distinct算子

完成对Rdd内数据的去重操作


三、SortBy方法

功能

对RDD数据进行排序,基于指定的排序依据

语法

rdd.sortBy()

rdd.sortBy(func, ascending = False, numPartitions = 1)
# func:(T) - > U: 告知按照rdd中的哪个数据进行排序,比如 lambda x:x[1] 表示按照rdd中的第二列元素进行排序
# ascending: True升序 False 降序
# numPartitions: 用多少分区排序

代码 

from pyspark import SparkConf,SparkContext

# 设置spark中的python解释器对象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 读取数据文件
rdd = sc.textFile("D:/2LFE\Desktop\WordCount.txt")
# 取出全部单词
word_rdd = rdd.flatMap(lambda x:x.split(" "))
print(word_rdd.collect())

# 将所有单词都转换成二元元组,单词为key,value设置为1
word_with_one_rdd = word_rdd.map(lambda word:(word,1))
# 分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b)
# 对结果进行排序
result_rdd = result_rdd.sortBy(lambda x:x[1],ascending = False,numPartitions = 1)
# 打印并输出结果
print(result_rdd.collect())


总结

sortBy算子

接收一个处理函数,可用lambda快速编写

函数表示用来决定排序的依据

可以控制升序或降序

全局排序需要设置分区数为1


四、数据计算练习

需求:

复制以上内容到文件中,使用Spark读取文件进行计算:

① 各个城市销售额排名,从大到小

② 全部城市,有哪些商品类别在售卖

③ 北京市有哪些商品类别在售卖

解答

from pyspark import SparkConf,SparkContext
import json

# 设置spark中的python解释器对象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 读取文件得到RDD
file_rdd = sc.textFile("E:\python.learning\pyspark\sortBy.txt")

# 取出一个个JSON字符串
json_str_rdd = file_rdd.flatMap(lambda x:x.split("|"))

# 将一个JSON字符串转换为字典 json模块
dict_rdd = json_str_rdd.map(lambda x:json.loads(x))

# 取出城市和销售额数据:(城市,销售额)
city_with_money_rdd = dict_rdd.map(lambda x:(x['areaName'],int(x['money'])))

# 按销售额对结果进行聚合然后根据销售额降序排序
city_result_rdd = city_with_money_rdd.reduceByKey(lambda x,y:x+y)
res1 = city_result_rdd.sortBy(lambda x:x[1],ascending = False,numPartitions = 1)
print("需求1结果:" , res1.collect())

# 需求2 对全部商品进行去重
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2结果:",category_rdd.collect())

# 需求3 过滤北京市的数据
BJ_data_rdd = dict_rdd.filter(lambda x:x['areaName'] == '北京')
print("需求3结果:",BJ_data_rdd.collect())

# 需求4 对北京市的商品类别进行商品类别去重
res2 = BJ_data_rdd.map(lambda x:x['category']).distinct()
print("需求4结果:",res2.collect())


总结

去重函数:

在 PySpark 框架下,distinct函数用于返回一个新的 RDD,其中包含原始 RDD 中的不同元素。

过滤函数:

filter函数用于从弹性分布式数据集(RDD)中筛选出满足特定条件的元素,返回一个新的 RDD 只包含满足条件的元素。

转换函数:

在 PySpark 中,map函数是对弹性分布式数据集(RDD)进行转换操作的一种重要方法。map函数对 RDD 中的每个元素应用一个函数,返回一个新的 RDD,其中包含应用函数后的结果。

排序函数:

sortBy 函数用于对RDD 中的元素进行排序,它接受一个函数或者一个字段名作为参数,根据这个参数来确定排序的依据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/915368.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Jmeter基础篇(23)TPS和QPS的异同

前言 这是一篇性能测试指标的科普文章哦! TPS和QPS是同一个概念吗? TPS(Transactions Per Second)和QPS(Queries Per Second)虽然都是衡量系统性能的指标,但是它们并不是同一个概念。这两个各…

IEC60870-5-104 协议源码架构详细分析

IEC60870-5-104 协议源码架构 前言一、资源三、目录层级一二、目录层级二config/lib60870_config.hdependencies/READMEexamplesCMakeLists.txtcs101_master_balancedcs104_client_asyncmulti_client_servertls_clienttls_server说明 make这些文件的作用是否需要导入这些文件&a…

机器学习在网络安全中的应用

💓 博客主页:瑕疵的CSDN主页 📝 Gitee主页:瑕疵的gitee主页 ⏩ 文章专栏:《热点资讯》 机器学习在网络安全中的应用 机器学习在网络安全中的应用 机器学习在网络安全中的应用 引言 机器学习概述 定义与原理 发展历程 …

JUC基础类-AbstractQueuedSynchronizer

AbstractQueuedSynchronizer 1、AbstractQueuedSynchronizer概述2、AbstractQueuedSynchronizer源码分析2.1 AQS源码2.2 Node类 如有侵权,请联系~ 如有问题,也欢迎批评指正~ 1、AbstractQueuedSynchronizer概述 AbstractQueuedSy…

文献阅读 | Nature Methods:使用 STAMP 对空间转录组进行可解释的空间感知降维

文献介绍 文献题目: 使用 STAMP 对空间转录组进行可解释的空间感知降维 研究团队: 陈金妙(新加坡科学技术研究局) 发表时间: 2024-10-15 发表期刊: Nature Methods 影响因子: 36.1&#xff0…

Redis系列之底层数据结构ZipList

Redis系列之底层数据结构ZipList 实验环境 Redis 6.0 什么是Ziplist? Ziplist,压缩列表,这种数据结构会根据存入数据的类型和大小,分配大小不同的空间,所以是为了节省内存而采用的。因为这种数据结构是一种完整连续…

界面控件DevExpress WPF中文教程:TreeList视图及创建分配视图

DevExpress WPF拥有120个控件和库,将帮助您交付满足甚至超出企业需求的高性能业务应用程序。通过DevExpress WPF能创建有着强大互动功能的XAML基础应用程序,这些应用程序专注于当代客户的需求和构建未来新一代支持触摸的解决方案。 无论是Office办公软件…

数据结构中数据有序性/ 单调性 ——二分查找

以下记录的都是闭区间写法 例题&#xff1a;34. 在排序数组中查找元素的第一个和最后一个位置 1.关系转换 寻找目标值有四种情况&#xff1a;≥、>、≤、< 比如目标值x&#xff0c; 可以转化为 ≥x、≥ x1、≤x、≤ x1 比如数组大小为6&#xff0c;目标值为…

探索Python的HTTP利器:Requests库的神秘面纱

文章目录 **探索Python的HTTP利器&#xff1a;Requests库的神秘面纱**一、背景&#xff1a;为何选择Requests库&#xff1f;二、Requests库是什么&#xff1f;三、如何安装Requests库&#xff1f;四、Requests库的五个简单函数使用方法1. GET请求2. POST请求3. PUT请求4. DELET…

《Linux从小白到高手》综合应用篇:深入详解Linux swap及其调整优化

1. 引言&#xff1a; Swap是存储设备上的一块空间&#xff08;分区&#xff09;&#xff0c;操作系统可以在这里暂存一些内存里放不下的东西。这从某种程度上相当于增加了服务器的可用内存。虽然从swap读写比内存慢&#xff0c;但总比没有好&#xff0c;算是内存不足时一种比较…

SpringMVC学习笔记(一)

一、SpringMVC的基本概念 &#xff08;一&#xff09;三层架构和MVC 1、三层架构概述 我们的开发架构一般都是基于两种形式&#xff0c;一种是 C/S 架构&#xff0c;也就是客户端/服务器&#xff0c;另一种是 B/S 架构&#xff0c;也就是浏览器服务器。在 JavaEE 开发中&…

小面馆叫号取餐流程 佳易王面馆米线店点餐叫号管理系统操作教程

一、概述 【软件资源文件下载在文章最后】 小面馆叫号取餐流程 佳易王面馆米线店点餐叫号管理系统操作教程 点餐软件以其实用的功能和简便的操作&#xff0c;为小型餐饮店提供了高效的点餐管理解决方案&#xff0c;提高了工作效率和服务质量 ‌点餐管理‌&#xff1a;支持电…

【.NET 8 实战--孢子记账--从单体到微服务】--简易权限--角色可访问接口管理

咱们继续来编写孢子记账的简易权限&#xff0c;这篇文章中我们将编写角色可访问接口的管理API&#xff0c;同样我不会把完整的代码全都列出来&#xff0c;只会列出部分代码&#xff0c;其余代码我希望大家能自己手动编写&#xff0c;然后对比项目代码。废话不多说&#xff0c;开…

Linux上Python使用MySQLdb包连接MySQL5.7和MySQL8的问题

在一台安装有MySQL8的Linux上用MySQLdb包连接MySQL5.7&#xff0c;连接参数中加上ssl_mode‘DISABLED’,能正常连接&#xff1b;不加ssl_mode参数&#xff0c;会报 而在连接MySQL8时加不加ssl_mode都能正常连接&#xff0c;但在使用过程&#xff0c;加了ssl_mode参数&#xff…

列表(list)

一、前言 本次博客主要讲解 list 容器的基本操作、常用接口做一个系统的整理&#xff0c;结合具体案例熟悉自定义内部排序方法的使用。如有任何错误&#xff0c;欢迎在评论区指出&#xff0c;我会积极改正。 二、什么是list list是C的一个序列容器&#xff0c;插入和删除元素…

spring使用xml文件整合事务+druid+mybatis

1.事务 事务&#xff08;Transaction&#xff09;是数据库管理系统中的一个重要概念&#xff0c;它表示一组不可分割的操作序列&#xff0c;这些操作要么全部执行成功&#xff0c;要么全部不执行&#xff0c;以确保数据库从一个一致性状态转换到另一个一致性状态。事务具有以下…

大语言模型LLM综述

一、LM主要发展阶段 1.1、统计语言模型SLM 基于统计学习方法&#xff0c;基本思想是基于马尔可夫假设HMM建立词概率预测模型。如n-gram语言模型 1.2、神经语言模型NLM 基于神经网络来做词的分布式表示。如word2vec模型 1.3、 预训练语言模型PLM 预训练一个网络模型来做词表…

【Jenkins实战】Windows安装服务启动失败

写此篇短文&#xff0c;望告诫后人。 如果你之前装过Jenkins&#xff0c;出于换域账号/本地帐号的原因想重新安装&#xff0c;你大概率会遇上一次Jenkins服务启动失败提示&#xff1a; Jenkins failed to start - Verify that you have sufficient privileges to start system…

Linux kernel 堆溢出利用方法(二)

前言 本文我们通过我们的老朋友heap_bof来讲解Linux kernel中off-by-null的利用手法。在通过讲解另一道相对来说比较困难的kernel off-by-null docker escape来深入了解这种漏洞的利用手法。&#xff08;没了解过docker逃逸的朋友也可以看懂&#xff0c;毕竟有了root权限后&a…

微服务(一)

目录 1.认识微服务 1.1.单体架构 1.2.微服务 1.3.SpringCloud SpringCloud版本 SpringBoot版本 2.服务注册和发现 2.1.注册中心原理 2.2.Nacos注册中心 2.3.服务注册 2.3.1.添加依赖 2.3.2.配置Nacos 2.4.服务发现 2.4.1.引入依赖 2.4.2.配置Nacos地址 2.4.3.发…