【pyspark速成专家】3_Spark之RDD编程1

目录

​编辑

一,创建RDD

二,常用Action操作

三,常用Transformation操作


一,创建RDD

创建RDD主要有两种方式,一个是textFile加载本地或者集群文件系统中的数据,

第二个是用parallelize方法将Driver中的数据结构并行化成RDD。

#从本地文件系统中加载数据
file = "./data/hello.txt"
rdd = sc.textFile(file,3)
rdd.collect()

['hello world',
 'hello spark',
 'spark love jupyter',
 'spark love pandas',
 'spark love sql']

#从集群文件系统中加载数据
#file = "hdfs://localhost:9000/user/hadoop/data.txt"
#也可以省去hdfs://localhost:9000
#rdd = sc.textFile(file,3)

#parallelize将Driver中的数据结构生成RDD,第二个参数指定分区数
rdd = sc.parallelize(range(1,11),2)
rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

二,常用Action操作

Action操作将触发基于RDD依赖关系的计算。

collect

rdd = sc.parallelize(range(10),5) 
#collect操作将数据汇集到Driver,数据过大时有超内存风险
all_data = rdd.collect()
all_data

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

take

#take操作将前若干个数据汇集到Driver,相比collect安全
rdd = sc.parallelize(range(10),5) 
part_data = rdd.take(4)
part_data

[0, 1, 2, 3]

takeSample

#takeSample可以随机取若干个到Driver,第一个参数设置是否放回抽样
rdd = sc.parallelize(range(10),5) 
sample_data = rdd.takeSample(False,10,0)
sample_data

[7, 8, 1, 5, 3, 4, 2, 0, 9, 6]

first

#first取第一个数据
rdd = sc.parallelize(range(10),5) 
first_data = rdd.first()
print(first_data)

0

count

#count查看RDD元素数量
rdd = sc.parallelize(range(10),5)
data_count = rdd.count()
print(data_count)

10

reduce

#reduce利用二元函数对数据进行规约
rdd = sc.parallelize(range(10),5) 
rdd.reduce(lambda x,y:x+y)

45

foreach

#foreach对每一个元素执行某种操作,不生成新的RDD
#累加器用法详见共享变量
rdd = sc.parallelize(range(10),5) 
accum = sc.accumulator(0)
rdd.foreach(lambda x:accum.add(x))
print(accum.value)

45

countByKey

#countByKey对Pair RDD按key统计数量
pairRdd = sc.parallelize([(1,1),(1,4),(3,9),(2,16)]) 
pairRdd.countByKey()

defaultdict(int, {1: 2, 3: 1, 2: 1})

saveAsTextFile

#saveAsTextFile保存rdd成text文件到本地
text_file = "./data/rdd.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)

#重新读入会被解析文本
rdd_loaded = sc.textFile(text_file)
rdd_loaded.collect()

['2', '3', '4', '1', '0']

三,常用Transformation操作

Transformation转换操作具有懒惰执行的特性,它只指定新的RDD和其父RDD的依赖关系,只有当Action操作触发到该依赖的时候,它才被计算。

map

#map操作对每个元素进行一个映射转换
rdd = sc.parallelize(range(10),3)
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

rdd.map(lambda x:x**2).collect()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

filter

#filter应用过滤条件过滤掉一些数据
rdd = sc.parallelize(range(10),3)
rdd.filter(lambda x:x>5).collect()

[6, 7, 8, 9]

flatMap

#flatMap操作执行将每个元素生成一个Array后压平
rdd = sc.parallelize(["hello world","hello China"])
rdd.map(lambda x:x.split(" ")).collect()

[['hello', 'world'], ['hello', 'China']]

rdd.flatMap(lambda x:x.split(" ")).collect()

['hello', 'world', 'hello', 'China']

sample

#sample对原rdd在每个分区按照比例进行抽样,第一个参数设置是否可以重复抽样
rdd = sc.parallelize(range(10),1)
rdd.sample(False,0.5,0).collect()

[1, 4, 9]

distinct

#distinct去重
rdd = sc.parallelize([1,1,2,2,3,3,4,5])
rdd.distinct().collect()

[4, 1, 5, 2, 3]

subtract

#subtract找到属于前一个rdd而不属于后一个rdd的元素
a = sc.parallelize(range(10))
b = sc.parallelize(range(5,15))
a.subtract(b).collect()

[0, 1, 2, 3, 4]

union

#union合并数据
a = sc.parallelize(range(5))
b = sc.parallelize(range(3,8))
a.union(b).collect()

[0, 1, 2, 3, 4, 3, 4, 5, 6, 7]

intersection

#intersection求交集
a = sc.parallelize(range(1,6))
b = sc.parallelize(range(3,9))
a.intersection(b).collect()

[3, 4, 5]

cartesian

#cartesian笛卡尔积
boys = sc.parallelize(["LiLei","Tom"])
girls = sc.parallelize(["HanMeiMei","Lily"])
boys.cartesian(girls).collect()

[('LiLei', 'HanMeiMei'),
 ('LiLei', 'Lily'),
 ('Tom', 'HanMeiMei'),
 ('Tom', 'Lily')]

sortBy

#按照某种方式进行排序
#指定按照第3个元素大小进行排序
rdd = sc.parallelize([(1,2,3),(3,2,2),(4,1,1)])
rdd.sortBy(lambda x:x[2]).collect()

[(4, 1, 1), (3, 2, 2), (1, 2, 3)]

zip

#按照拉链方式连接两个RDD,效果类似python的zip函数
#需要两个RDD具有相同的分区,每个分区元素数量相同

rdd_name = sc.parallelize(["LiLei","Hanmeimei","Lily"])
rdd_age = sc.parallelize([19,18,20])

rdd_zip = rdd_name.zip(rdd_age)
print(rdd_zip.collect())

[('LiLei', 19), ('Hanmeimei', 18), ('Lily', 20)]

zipWithIndex

#将RDD和一个从0开始的递增序列按照拉链方式连接。
rdd_name =  sc.parallelize(["LiLei","Hanmeimei","Lily","Lucy","Ann","Dachui","RuHua"])
rdd_index = rdd_name.zipWithIndex()
print(rdd_index.collect())

[('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/638602.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue-officef实现pdf文件在线预览

一、参考网址 https://www.cnblogs.com/guozhiqiang/p/17957288 1、引入依赖 npm install vue-office/pdf vue-demi2、编写组件 <template><vue-office-pdf :src"pdf"/> </template> <script> // import pdf from vue-pdf import VueOffice…

机器学习云环境测试

等待创建完成后&#xff0c;点击 PyTorch 打开&#xff0c;创建一个全新的 notebook 在 Cell 中输入如下代码&#xff0c;并点击 Run 完成后点击 New Cell &#xff0c;在 New Cell 中输入如下代码 输入完成后点击 Run &#xff0c;运行 New Cell 。&#xff08;每个 Cell 代…

云和恩墨海外首秀在吉隆坡召开的2024中国智能科技与文化展览会

作为中马建交50周年官方重点推荐的活动之一&#xff0c;2024中国智能科技与文化展览会&#xff08;第四届&#xff09;于5月20至21日在毗邻吉隆坡双子塔的吉隆坡国际会展中心举办。本次展览会获得马来西亚科学技术创新部、马来西亚通讯部、中国驻马来西亚大使馆和马来西亚中华总…

获得 AI Applied Skills 凭证:微软在线评估认证的注意事项

在你踏上微软的亚洲AI奥德赛之旅&#xff0c;完成基础课程学习后&#xff0c;你可以继续进行相应的评估&#xff0c;在交互式实验室体验中完成一系列任务&#xff0c;通过线上即时评估赢得认证。通过本文的介绍&#xff0c;可以帮助你了解评估认证的一些细节以及注意事项&#…

C++中的四种类型转换运算符

隐式类型转换是安全的&#xff0c;显式类型转换是有风险的&#xff0c;C语言之所以增加强制类型转换的语法&#xff0c;就是为了强调风险&#xff0c;让程序员意识到自己在做什么。但是&#xff0c;这种强调风险的方式还是比较粗放&#xff0c;粒度比较大&#xff0c;它并没有表…

[协议]stm32读取AHT20程序示例

AHT20温度传感器使用程序&#xff1a; 使用i2c读取温度传感器数据很简单&#xff0c;但市面上有至少两个手册&#xff0c;我这个对应的手册贴出来&#xff1a; main: #include "stm32f10x.h" // Device header #include <stdint.h> #includ…

【408精华知识】页、页面、页框、页帧、内存块、物理块、物理页面还傻傻分不清?

在做题过程中&#xff0c;我们经常能看到页、页框、块等概念&#xff0c;初接触时&#xff0c;常感觉傻傻分不清&#xff0c;这篇文章将简洁地介绍它们之间的联系与区别。 这些概念之间的根本区别&#xff0c;在于是物理上的概念还是逻辑上的概念&#xff0c;也即是虚地址还是实…

认知架构 cognitive architecture

Assistants API&#xff1a;以开发人员为中心。 有状态的API&#xff1a;允许存储以前的消息、上传文件、访问内置工具&#xff08;代码解释器&#xff09;、通过函数调用控制其他工具。 认知架构应用的两个组件&#xff1a;&#xff08;1&#xff09;如何提供上下文给应用 &…

Widows技术专题系列-系统装机教程

Windows技术专题系列 注&#xff1a; 本教程由羞涩梦整理同步发布&#xff0c;本人技术分享站点&#xff1a;blog.hukanfa.com转发本文请备注原文链接&#xff0c;本文内容整理日期&#xff1a;2024-05-20csdn 博客名称&#xff1a;五维空间-影子&#xff0c;欢迎关注 1 系统…

使用 Supabase 的 Realtime + Storage 非常方便呢

文章目录 &#xff08;一&#xff09;Supabase&#xff08;二&#xff09;Realtime&#xff08;消息&#xff09;&#xff08;2.1&#xff09;Python 消息订阅&#xff08;2.2&#xff09;JavaScript 消息订阅 &#xff08;三&#xff09;Storage&#xff08;存储&#xff09;&…

【Centos7+JDK1.8】Jenkins安装手册

一、安装环境 Centos7 JDK1.8 Jenkins-2.346.3 JDK1.8安装以及网络配置等 自行搜索资料解决。 二、卸载历史安装的Jenkins&#xff0c;直接全部复制粘贴下面的命令 service jenkins stop yum -y remove jenkins rpm -e jenkins rpm -ql jenkins rm -rf /etc/sysconfig/je…

探索Facebook:数字社交的新时代

Facebook&#xff0c;作为全球最大的社交网络平台之一&#xff0c;一直在引领着数字社交的发展潮流。随着科技的不断进步和社会的不断变迁&#xff0c;Facebook也在不断演进和创新&#xff0c;迎接着数字社交的新时代。本文将探索Facebook在数字社交领域的新发展&#xff0c;以…

一个开源的工具类轮子是怎么造出来的

心路历程 为什么要做 在22年9月的某一天&#xff0c;在公司开需求评审时&#xff0c;接到了一个给PDF、图片添加水印的需求。做为一个刚工作的CURD程序员&#xff0c;在遇到这些问题时&#xff0c;第一反应是去github上找找有没有类似的开源框架。但是&#xff0c;出乎我意料…

基于springboot实现旅游管理系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现旅游管理系统演示 摘要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本旅游管理系统就是在这样的大环境下诞生&#xff0c;其可以帮助使用者在…

XSS---DOM破坏

文章目录 前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 一.什么是DOM破坏 DOM破坏总结为一句话就是&#xff1a;利用HTML元素来响应JS代码的执行结果。 举个例子&#xff1a; <body> <img id"x"> <img name"y"…

145.栈和队列:删除字符串中的所有相邻重复项(力扣)

题目描述 代码解决 class Solution { public:string removeDuplicates(string s) {// 定义一个栈来存储字符stack<char> st;// 遍历字符串中的每一个字符for(int i 0; i < s.size(); i){// 如果栈为空或栈顶字符与当前字符不相同&#xff0c;则将当前字符入栈if(st.e…

JVM运行时内存:垃圾回收器(Serial ParNew Parallel )详解

文章目录 1. 查看默认GC2. Serial GC : 串行回收3. ParNew GC&#xff1a;并行回收4. Parallel GC&#xff1a;吞吐量优先 1. 查看默认GC -XX:PrintCommandLineFlags&#xff1a;查看命令行相关参数&#xff08;包含使用的垃圾收集器&#xff09;使用命令行指令&#xff1a;ji…

【简单介绍下爬山算法】

&#x1f308;个人主页: 程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…

java-查询字符串当中是否包含中文

文章目录 前言java-查询字符串当中是否包含中文 前言 如果您觉得有用的话&#xff0c;记得给博主点个赞&#xff0c;评论&#xff0c;收藏一键三连啊&#xff0c;写作不易啊^ _ ^。   而且听说点赞的人每天的运气都不会太差&#xff0c;实在白嫖的话&#xff0c;那欢迎常来啊…

订餐系统总结、

应用层&#xff1a; SpringBoot:快速构建Spring项目&#xff0c;采用“约定大于配置”的思想&#xff0c;简化Spring项目的配置开发。 SpringMvc&#xff1a;Spring框架的一个模块&#xff0c;springmvc和spring无需通过中间整合层进行整合&#xff0c;可以无缝集成。 Sprin…