RDD算子介绍(二)

1. coalesce

用于缩减分区,减少分区个数,减少任务调度成本。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 4)
val newRDD = rdd.coalesce(2)
newRDD.saveAsTextFile("output")

分区数可以减少,但是减少后的分区里的数据分布并不一定是均匀分布的,比如以下场景:

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
val newRDD = rdd.coalesce(2)
newRDD.saveAsTextFile("output")

结果现实1和2在一个分区,3 4 5 6四个数在第二个分区。因为coalesce算子默认不会打乱分区的数据进行重新组合的。原来1和2,3和4,5和6分别在三个分区里,如果缩减分区之后1 2 3在一个分区,4 5 6在一个分区,意味着将原来的3和4所在的分区里的数据打乱重新组合了。所以缩减分区后,应该将5和6所在的分区里的数据移到其他分区中去,即3 4 5 6最终在一个分区了。

coalesce算子可能会导致数据倾斜。如果想要数据均衡,需要进行shuffle处理,coalesce算子第二个参数就表示是否shuffle处理,默认是false,改为true即可,但是数据不一定有规律,这就是shuffle的效果。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
val newRDD = rdd.coalesce(2, true)
newRDD.saveAsTextFile("output")

结果显示1 4 5在一个分区,2 3 6在一个分区。

2. repartition

coalesce算子也可以增加分区,但是第二个参数须为true,但用得更多的是repartition算子。 

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
val newRDD = rdd.repartition(3)
newRDD.saveAsTextFile("output")

repartition算子源码中还是调用了coalesce算子(第二个参数为true)。

3. sortBy

见名知义,就是排序。

val rdd : RDD[Int] = sc.makeRDD(List(1, 4, 2, 3, 6, 5), 2)
val newRDD = rdd.sortBy(num=>num)
newRDD.saveAsTextFile("output")

数据重新排序,但是分区数不变,存在shuffle过程。 默认是升序排序,第二个参数传false,表示降序排序。

4. intersection、union、subtract、zip

val rdd1 : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2 : RDD[Int] = sc.makeRDD(List(3, 4, 5, 6))

val rdd3 = rdd1.intersection(rdd2)
println(rdd3.collect().mkString(","))

val rdd4 = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))

val rdd5 = rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))

val rdd6 : RDD[(Int, Int)]= rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))

 

 注意事项:

1)交集不去重

2)如果两个rdd的数据类型不同,不能做交集、并集、差集操作,但拉链可以

3)拉链操作的两个rdd的分区数需要一致,且分区中的数据数量也要一致

5. partitionBy

只有数据类型为key-value类型的rdd,才有partitionBy操作。partitionBy本身不是RDD的方法,是通过隐式转化得到的PairRDDFunctions的方法。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRDD = rdd.map((_, 1))

val newRDD = mapRDD.partitionBy(new HashPartitioner(2))
newRDD.saveAsTextFile("output")

这里默认使用HashPartitioner,即按照key的哈希值对分区数取模得到分区号,后续会介绍自定义分区器。

 6. reduceByKey

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
val newRDD : RDD[(String,Int)] = rdd.reduceByKey((x:Int, y:Int) => {x + y})
newRDD.collect().foreach(println)

reduceByKey将相同key的值进行聚合,具体来说是两两聚合。 但是上例中"b"只有一个,是不会做两两聚合计算的。

7. groupByKey

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
val newRDD : RDD[(String,Iterable[Int])] = rdd.groupByKey()
newRDD.collect().foreach(println)

groupByKey根据相同key的值进行分组,形成一个可迭代的集合。这与groupBy类似,但是区别是groupBy的可迭代集合不是原有value的集合,而是原来每个元素(即tuple)的集合:

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
val newRDD : RDD[(String,Iterable[(String, Int)])] = rdd.groupBy(_._1)

reduceByKey和groupByKey的区别:

1) reduceByKey相比于groupByKey不仅做了分组,还做了聚合计算

2)groupByKey会将数据打乱重新组合,即存在shuffle操作。既然存在shuffle操作,如果后续还有map等转换操作,原来一个分区的数据处理完之后还需要等待其他分区的数据处理完,因为shuffle后的分区的数据可能不止来源于原来的一个分区。这种等待可能很耗时,并且占用大量内存,因此需要进行落盘操作。简而言之,shuffle操作必须有落盘处理,不能在内存中进行数据等待,否则可能会导致内存溢出,因此性能也不高

3)reduceByKey也会有shuffle,也会有落盘操作,但是在落盘之前,会对原来每个分区内的数据事先进行分组并聚合计算(预聚合,combine)。这样落盘的数据量少了,磁盘IO也少了,性能也提高了

8. aggregateByKey

reduceByKey会进行预聚合,这是分区内的聚合,然后shuffle操作打乱数据,进行分区间的聚合,此时分区内和分区间的聚合规则是一样的。如果分区内和分区间的聚合计算规则不一样,那就要使用aggregateByKey算子。例如,分区内求最大值,分区间求和:

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)), 2)
val newRDD : RDD[(String,Int)] = rdd.aggregateByKey(0)((x, y) => math.max(x, y), (x, y) => x + y)
newRDD.collect().foreach(println)

aggregateByKey有两个参数列表,第一个参数列表表示初始值(aggregateByKey的最终计算结果与这个初始值类型是相同的),用于和第一个key的value进行分区内计算,第二个参数列表的两个参数分别表示分区内和分区间的计算规则。

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)
val newRDD : RDD[(String,Int)] = rdd.aggregateByKey(5)((x, y) => math.max(x, y), (x, y) => x + y)
newRDD.collect().foreach(println)

 

计算相同key的value的平均值:

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)

val newRDD : RDD[(String,(Int, Int))] = rdd.aggregateByKey((0, 0))((t, v) => (t._1 + v, t._2 + 1), (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))

val result = newRDD.mapValue {
    case (val, num) => {
        val / num
    }
}

result.collect().foreach(println)

9. foldByKey

如果分区内和分区间的计算规则一样,可以使用foldByKey算子。

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)
val newRDD : RDD[(String,Int)] = rdd.foldByKey(0)(_+_)
newRDD.collect().foreach(println)

 10. combineByKey

aggreagteByKey的初始值在一些场景其实很难确定,但如果初始值是相同key的第一个value或者其适当转换,就更为合理。

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)

val newRDD : RDD[(String,(Int, Int))] = rdd.combineByKey(v => (v, 1))((t : (Int, Int), v) => (t._1 + v, t._2 + 1), (t1 : (Int, Int), t2 : (Int, Int)) => (t1._1 + t2._1, t1._2 + t2._2))

val result = newRDD.mapValue {
    case (val, num) => {
        val / num
    }
}

result.collect().foreach(println)

 

wordCount的多种实现方式(假设已经得到所有的(单词, 1)的tuple):

rdd.reduceByKey(_+_)
rdd.aggregateByKey(0)(_+_, _+_)
rdd.foldByKey(0)(_+_)
rdd.comnbineByKey(v=>v)((x:Int, y:Int)=>x+y, (x:Int, y:Int)=>x+y)

观察源码,发现他们底层调用的都是combineByKey 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/447132.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

政安晨:【深度学习处理实践】(五)—— 初识RNN-循环神经网络

RNN(循环神经网络)是一种在深度学习中常用的神经网络结构,用于处理序列数据。与传统的前馈神经网络不同,RNN通过引入循环连接在网络中保留了历史信息。 RNN中的每个神经元都有一个隐藏状态,它会根据当前输入和前一个时…

SRS(Simple Realtime Server)

SRS(Simple Realtime Server - github) SRS 中文官网 docker安装srs ##(安全组放开1935端口、8080端口) docker run --rm -it -p 1935:1935 -p 1985:1985 -p 8080:8080 -p 8000:8000/udp -p 10080:10080/udp ossrs/srs:5推流 ## 不需要加端口 ffmpeg…

深度学习armv8/armv9 cache的原理

文章目录 1、为什么要用cache?2、背景:架构的变化?2、cache的层级关系 ––big.LITTLE架构(A53为例)3、cache的层级关系 –-- DynamIQ架构(A76为例)4、DSU / L3 cache5、L1/L2/L3 cache都是多大呢6、cache相关的术语介绍7、cache的分配策略(alocation,…

Python读取influxDB数据库

1. influxDB连接 首先用InfluxDBStudio软件连接influxDB数据库来查看所有表: 2. 写sql语句来查询数据 然后和平时写sql查询语句一样,先创建连接client,然后调用其query函数来查询获取数据 self.client influxdb.InfluxDBClient(hostinflu…

前端文件上传

文件上传方式 前端文件上传有两种方式,第一种通过二进制blob传输(formData传输),第二种是通过base64传输 文件相关的对象 file对象其实是blob的子类 blob对象的第一个参数必须是一个数组,你可以把一个file对象放进去…

HCS-华为云Stack-计算节点内部网络结构

HCS-华为云Stack-计算节点内部网络结构 图中表示的仅为计算节点是两网口的模式,如果是四网口模式,系统会再自动创建一个网桥出来 图中未画出存储平面和Internal Base平面,它们和tunnel bearing、External OM-样,都是通过trunk0的…

模型驱动架构MDA

MDE 模型驱动工程(MDE, Model-Driven Engineering)是软件工程的一个分支,它将模型与建模拓展到软件开发的所有方面,形成一个多维建模空间,从而将工程活动建立在这些模型的映射和转换之上。[1] MDE的基本原则是将模型视…

《量子计算:下一个大风口,还是一个热炒概念?》

引言 量子计算,作为一项颠覆性的技术,一直以来备受关注。它被认为是未来计算领域的一次革命,可能改变我们对计算能力和数据处理的理解。然而,随着技术的不断进步和商业应用的探索,人们开始思考,量子计算到底是一个即将到来的大风口,还是一个被过度炒作的概念? 量子计…

空间复杂度(数据结构)

概念: 空间复杂度也是一个数学表达式,是对一个算法在运行过程中临时占用存储空间大小的量度 。 空间复杂度不是程序占用了多少bytes的空间,因为这个也没太大意义,所以空间复杂度算的是变量的个数。空间复杂度计算规则基本跟实践复…

软考73-上午题-【面向对象技术2-UML】-UML中的图4

一、构件图(组件图) 1-1、构件图的定义 展现了,一组构件之间的组织和依赖。 构件图专注于系统的静态实现图。 构件图与类图相关,通常把构件映射为一个、多个类、接口、协作。 【回顾】: 类图展示了一组对象、接口、…

C++之map与set的使用与原理+拓展avl树(详解)

前言:map和set是C里面的两种常用STL容器,他们被设计为关联式容器,这两个容器存储的元素都是唯一的,并且每个元素与键(key)相关联,简单来说就是两个存储不重复元素的容器。那就有人有疑问了&…

PostgreSQL中In, Exists在SQL查询中到底有无区别

前言 SQL查询当中,In和Exists子查询到底有无区别?记得很多年以前,确实是有相关的使用戒条的,或者说存在一些使用的惯用法。试图完全抹开两者的区别,就有点过了。 两者的主要区别: 从目的上讲&#xff0c…

微信小程序开发系列(二十七)·小程序生命周期详细介绍

目录 1. 小程序生命周期介绍 2. 应用生命周期 3. 页面生命周期 1. 小程序生命周期介绍 一个小程序完整的生命周期由 应用生命周期、页面生命周期 和 组件生命周期 三部分来组成。 应用生命周期:是指应用程序进程从创建到消亡的整个过程。 小程序的生命&#…

python——By.ID/By.NAME

一、通过元素的id属性来定位元素 from selenuim import webdriver from selenuim.webdriver.common.by import Bydriver webdriver.Chrome() driver.maximize_window() driver.get(http://xxx) time.sleep(3)# 通过By.ID找到唯一页面元素后,输入abcd driver.find_…

简析内部审计数字化转型的方法和路径

简析内部审计数字化转型的方法和路径 内部审计是一种独立的、客观的确认和咨询活动,包括鉴证、识别和分析问题以及提供管理建议和解决方案。狭义的数字化转型是指将企业经营管理和业务操作的各种行为、状态和结果用数字的形式来记录和存储,据此再对数据…

华为OD机试 - 模拟数据序列化传输(Java JS Python C C++)

题目描述 模拟一套简化的序列化传输方式,请实现下面的数据编码与解码过程 编码前数据格式为 [位置,类型,值],多个数据的时候用逗号分隔,位置仅支持数字,不考虑重复等场景;类型仅支持:Integer / String / Compose(Compose的数据类型表示该存储的数据也需要编码)编码后数…

VSCode安装C语言编译环境

目录 一、在vscode下载C/C扩展 二、配置gcc环境 1.访问网站:https://sourceforge.net/projects/mingw-w64/files/ 2.解压并复制bin目录 三、配置gcc环境 四、在cmd检查是否配置成功 五、vscode配置gcc环境 六、在vscode运行C文件 运行.c代码 七、在vscode运…

JVM-2

目录 1.虚拟机类加载机制 2.JVM常见回收算法 2.1标记清除算法 2.2标记整理算法 2.3标记复制算法 3.分代垃圾回收机制 新生代收集 第一次垃圾回收 第二次垃圾回收 第N此垃圾回收 老年代收集 4.补充 Stop The World Java的对象结构 1.虚拟机类加载机制 双亲委派模式…

理解STM32的低功耗模式

低功耗模式简介 TM32的低功耗模式是特别设计来减少微控制器在不活跃状态下的能耗。这些模式允许STM32在保持核心功能的同时尽可能减少电力消耗,适合用在电池供电或需长期运行的场景。理解各种低功耗模式如何节能,主要包括以下几个方面: 关闭…

DenseNet笔记

📒from ©实现pytorch实现DenseNet(CNN经典网络模型详解) - 知乎 (zhihu.com) 是什么之 DenseBlock 读图: x0是inputH1的输入是x0 (input)H2的输入是x0和x1 (x1是H1的输出) Summary: 传统卷积网,网…