RDD介绍

RDD设计背景

在实际应用中,存在许多迭代式计算,这些应用场景的共同之处是 : 不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入.
而目前的MapReduce框架都是把中间结果写入到HDFS中,带来了大量的数据复制、磁盘IO和序列化开销;
如果能将结果保存在内存当中,就可以大量减少IO.
RDD就是为了满足这种需求而出现的,它提供了一个抽象的数据架构,我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理,
不同RDD之间的转换操作形成依赖关系,可以实现管道化,从而避免了中间结果的落地存储,大大降低了数据复制、磁盘IO和序列化开销,最终加快计算速度.

RDD概念

RDD,弹性分布式数据集.
一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合;
一个RDD可以分成多个分区,每个分区就是一个数据集片段(HDFS上的块);
一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算.
弹性:既可以存储在内存又可以存储在磁盘
分布式:可以被分成多个分区,不同的分区可以被保存到不同的节点上进行并行计算
数据集:本质上是一个只读的分区记录集合
RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合;
不能直接修改,只能基于稳定的物理存储中的数据集来创建RDD,或者通过在其他RDD上执行确定的转换操作(如map、join和groupBy)而创建得到新的RDD.

RDD提供了一组丰富的操作以支持常见的数据运算,分为'行动'(Action)'转换'(Transformation)两种类型,前者用于执行计算并指定输出的形式,后者指定RDD之间的相互依赖关系.
两类操作的主要区别是:转换操作(比如map、filter、groupBy、join等)接受RDD并返回RDD,而行动操作(比如count、collect等)接受RDD但是返回非RDD(即输出一个值或结果).

RDD执行过程

Spark用Scala语言实现了RDD的API,程序员可以通过调用API实现对RDD的各种操作.
RDD典型的执行过程如下:
1. 读入外部数据源(或者内存中的集合)创建RDD;
2. RDD经过一系列的'Transformation'操作,每一次都会产生不同的RDD,供给下一个'Transformation'使用;
3. 最后一个RDD经'Action'操作进行处理,并输出到外部数据源(或者变成Scala/JAVA集合或变量).
需要说明的是,RDD采用了惰性调用,即在RDD的执行过程中,真正的计算发生在RDD的'Action'操作,
对于'Action'之前的所有'Transformation'操作,Spark只是记录下'Transformation'操作应用的一些基础数据集以及RDD生成的轨迹,即相互之间的依赖关系,而不会触发真正的计算.

在这里插入图片描述

从输入中逻辑上产生了A和C两个RDD,经过一系列'Transformation'操作,逻辑上生成了F(也是一个RDD),之所以说是逻辑上,是因为这时候计算并没有发生,Spark只是记录了RDD之间的生成和依赖关系.
也就当F要进行输出时,是当F进行'Action'操作的时候,Spark才会根据RDD的依赖关系生成DAG,并从起点开始真正的计算.

在这里插入图片描述

血缘关系

上诉一系列处理称为一个'血缘关系(Lineage)',即DAG拓扑排序的结果.
采用惰性调用,通过血缘关系连接起来的一系列RDD操作就可以实现管道化(pipeline),避免了多次转换操作之间数据同步的等待,
而且不用担心有过多的中间数据,因为具有血缘关系的操作都管道化了,一个操作得到的结果不需要保存为中间数据,而是直接管道式地流入到下一个操作进行处理.
同时,这种通过血缘关系就把一系列操作进行管道化连接的设计方式,也使得管道中每次操作的计算变得相对简单,保证了每个操作在处理逻辑上的单一性;
相反,在MapReduce的设计中,为了尽可能地减少MapReduce过程,在单个MapReduce中会写入过多复杂的逻辑.

RDD特性

总体而言,Spark采用RDD以后能够实现高效计算的主要原因如下:
(1) 高效的容错性
现有的分布式共享内存、键值存储、内存数据库等,为了实现容错,必须在集群节点之间进行数据复制或者记录日志,即在节点之间会发生大量的数据传输,这对于数据密集型应用而言会带来很大的开销.
而在RDD的设计中,数据只读,不可修改,如果需要修改数据,必须从父RDD转换到子RDD,由此在不同RDD之间建立了血缘关系;所以,RDD是一种天生具有容错机制的特殊集合,
不需要通过数据冗余的方式(比如详细的记录操作的日志)实现容错,而只需通过RDD父子依赖(血缘)关系重新计算得到丢失的分区来实现容错,无需回滚整个系统,这样就避免了数据复制的高开销,
而且重算过程可以在不同节点之间并行进行,实现了高效的容错;
此外,RDD提供的转换操作都是一些粗粒度的操作(比如map、filter和join),RDD依赖关系只需要记录这种粗粒度的转换操作,而不需要记录具体的数据和各种细粒度操作的日志(比如对哪个数据项进行了修改),这就大大降低了数据密集型应用中的容错开销.
(2) 中间结果持久化到内存
数据在内存中的多个RDD操作之间进行传递,不需要落地到磁盘上,避免了不必要的读写磁盘开销.
(3) 存放的数据可以是Java对象,避免了不必要的对象序列化和反序列化开销.

RDD的依赖关系

RDD中不同的操作会使得不同RDD中的分区会产生不同的依赖; RDD中的依赖关系分为窄依赖(Narrow Dependency)、宽依赖(Wide Dependency).
宽依赖: 一个父RDD的一个分区对应一个子RDD的多个分区; 一对多,伴有shuffle过程.
窄依赖: 一个父RDD的分区对应于一个子RDD的分区,或多个父RDD的分区对应于一个子RDD的分区;一对一或多对一.
总结:如果父RDD的一个分区只被一个子RDD的一个分区所使用就是窄依赖,否则就是宽依赖.

在这里插入图片描述

窄依赖典型的操作包括map、filter、union等,宽依赖典型的操作包括groupByKey、sortByKey等;
对于连接(join)操作,可以分为两种情况:
1.对输入进行协同划分,属于窄依赖.
协同划分(co-partitioned)是指多个父RDD的某一分区的所有'键(key)'落在子RDD的同一个分区内,不会产生同一个父RDD的某一分区落在子RDD的两个分区的情况.
2.对输入做非协同划分,属于宽依赖.
对于窄依赖的RDD,可以以流水线的方式计算所有父分区,不会造成网络之间的数据混合;
对于宽依赖的RDD,则通常伴随着Shuffle操作,即首先需要计算好所有父分区数据,然后在节点之间进行Shuffle.

阶段划分(stage)

Spark通过分析各个RDD的依赖关系生成了DAG,再通过分析各个RDD中的分区之间的依赖关系来决定如何划分阶段,
具体划分方法是:在DAG中进行反向解析,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的阶段中;将窄依赖尽量划分在同一个阶段中,可以实现流水线计算.
例如,假设从HDFS中读入数据生成3个不同的RDD(即A、C和E),通过一系列转换操作后再将计算结果保存回HDFS;
对DAG进行解析时,在依赖图中进行反向解析,由于从RDD A到RDD B的转换以及从RDD B和F到RDD G的转换,都属于宽依赖,因此,在宽依赖处断开后可以得到三个阶段,即阶段1、阶段2和阶段3.
可以看出,在阶段2中,从map到union都是窄依赖,这两步操作可以形成一个流水线操作,
比如,分区7通过map操作生成的分区9,可以不用等待分区8到分区10这个转换操作的计算结束,而是继续进行union操作,转换得到分区13,这样流水线执行大大提高了计算的效率.

在这里插入图片描述

由上述论述可知,把一个DAG图划分成多个'stage'以后,每个阶段都代表了一组关联的、相互之间没有Shuffle依赖关系的任务组成的任务集合;
每个任务集合会被提交给任务调度器(TaskScheduler)进行处理,由任务调度器将任务分发给Executor运行.

Spark算子

RDD支持两种类型的操作:
Transformation:从一个RDD转换为一个新的RDD.
Action:基于一个数据集进行运算(引起Job运算),并返回RDD.
例如,map是一个Transformation操作,map将数据集的每一个元素按指定的函数转换为一个RDD返回;reduce是一个action操作.
Spark的所有Transformation操作都是懒执行,它们并不立马执行,而是先记录对数据集的一系列Transformation操作;这种设计让Spark的运算更加高效.
例如,对一个数据集map操作之后使用reduce只返回结果,而不返回庞大的map运算的结果集.
默认情况下,每个转换的RDD在执行不同Action操作时都会重新计算;即使两个Action操作会使用同一个转换的RDD,该RDD也会重新计算.
除非使用persist方法或cache方法将RDD缓存到内存,这样在下次使用这个RDD时将会提高计算效率,也支持将RDD持久化到硬盘上或在多个节点上复制.

Transformation算子

下面列出了Spark常用的transformation操作,详细的细节请参考RDD API文档(Scala、Java、Python、R)和键值对RDD方法文档(Scala、Java).

map(func)
将原来RDD的每个数据项,使用map中用户自定义的函数func进行映射,转变为一个新的元素,并返回一个新的RDD.

filter(func)
使用函数func对原RDD中数据项进行过滤,将符合func中条件的数据项组成新的RDD返回.

flatMap(func)
类似于map,但是输入数据项可以被映射到0个或多个输出数据集合中,所以函数func的返回值是一个数据项集合而不是一个单一的数据项.

mapPartitions(func)
类似于map,但是该操作是在每个分区上分别执行,所以当操作一个类型为T的RDD时func的格式必须是Iterator<T> => Iterator<U>.
即mapPartitions需要获取到每个分区的迭代器,在函数中通过这个分区的迭代器对整个分区的元素进行操作.

mapPartitionsWithIndex(func)
类似于mapPartitions,但是需要提供给func一个整型值,这个整型值是分区的索引,所以当处理T类型的RDD时,func的格式必须为(Int, Iterator<T>) => Iterator<U>.

union(otherDataset)
返回原数据集和参数指定的数据集合并后的数据集;
使用union函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同;
该操作不进行去重操作,返回的结果会保存所有元素;如果想去重,可以使用distinct().

intersection(otherDataset)
返回两个数据集的交集.

distinct([numTasks]))
将RDD中的元素进行去重操作.

groupByKey([numTasks])
操作(K,V)格式的数据集,返回(K, Iterable)格式的数据集.
注意,如果分组是为了按key进行聚合操作(例如,计算sum、average),此时使用reduceByKey或aggregateByKey计算效率会更高.
注意,默认情况下,并行情况取决于父RDD的分区数,但可以通过参数numTasks来设置任务数.

reduceByKey(func, [numTasks])
使用给定的func,将(K,V)对格式的数据集中key相同的值进行聚集,其中func的格式必须为(V,V) => V,可选参数numTasks可以指定reduce任务的数目.

aggregateByKey(zeroValue)(seqOp, combOp,[numTasks])(K,V)格式的数据按key进行聚合操作,聚合时使用给定的合并函数和一个初始值,返回一个(K,U)对格式数据;
需要指定的三个参数:zeroValue为在每个分区中,对key值第一次读取V类型的值时,使用的U类型的初始变量;
seqOp用于在每个分区中,相同的key中V类型的值合并到zeroValue创建的U类型的变量中;combOp是对重新分区后两个分区中传入的U类型数据的合并函数.

sortByKey([ascending], [numTasks])
(K,V)格式的数据集,其中K已实现了Ordered,经过sortByKey操作返回排序后的数据集,指定布尔值参数ascending来指定升序或降序排列.

join(otherDataset, [numTasks])
用于操作两个键值对格式的数据集,操作两个数据集(K,V)(K,W)返回(K,(V, W))格式的数据集,通过leftOuterJoin、rightOuterJoin、fullOuterJoin完成外连接操作.

cogroup(otherDataset, [numTasks])
用于操作两个键值对格式数据集(K,V)(K,W),返回数据集格式为(K,(Iterable, Iterable)).这个操作也称为groupWith.
对在两个RDD中的Key-Value类型的元素,每个RDD相同Key的元素分别聚合为一个集合,并且返回两个RDD中对应Key的元素集合的迭代器.

cartesian(otherDataset)
对类型为T和U的两个数据集进行操作,返回包含两个数据集所有元素对的(T,U)格式的数据集;即对两个RDD内的所有元素进行笛卡尔积操作.

pipe(command, [envVars])
以管道(pipe)方式将RDD的各个分区(partition)使用shell命令处理(比如一个 Perl或 bash脚本),
RDD的元素会被写入进程的标准输入(stdin),将进程返回的一个字符串型 RDD(RDD of strings),以一行文本的形式写入进程的标准输出(stdout)中.

coalesce(numPartitions)
把RDD的分区数降低到通过参数numPartitions指定的值,在得到的更大一些数据集上执行操作,会更加高效.

repartition(numPartitions)
随机地对RDD的数据重新洗牌(Reshuffle),从而创建更多或更少的分区,以平衡数据,总是对网络上的所有数据进行洗牌(shuffles).

repartitionAndSortWithinPartitions(partitioner)
根据给定的分区器对RDD进行重新分区,在每个结果分区中,按照key值对记录排序,这在每个分区中比先调用repartition再排序效率更高,因为它可以将排序过程在shuffle操作的机器上进行.

Action算子

下面列出了Spark支持的常用的action操作,详细请参考RDD API文档(Scala、Java、Python、R)和键值对RDD方法文档(Scala、Java).

reduce(func)
使用函数func聚集数据集中的元素,这个函数func输入为两个元素,返回为一个元素;这个函数应该符合结合律和交换率,这样才能保证数据集中各个元素计算的正确性.

collect()
在驱动程序中,以数组的形式返回数据集的所有元素;通常用于filter或其它产生了大量小数据集的情况.

count()
返回数据集中元素的个数.

first()
返回数据集中的第一个元素,类似于take(1).

take(n)
返回数据集中的前n个元素,类似于sql中的limit.

takeOrdered(n,[ordering])
返回RDD按自然顺序或自定义顺序排序后的前n个元素.

saveAsTextFile(path)
将数据集中的元素以文本文件或文本文件集合的形式保存到指定的本地文件系统、HDFS或其它Hadoop支持的文件系统中;
Spark将在每个元素上调用toString方法,将数据元素转换为文本文件中的一行记录.

saveAsSequenceFile(path) (Java and Scala)
将数据集中的元素以Hadoop Sequence文件的形式保存到指定的本地文件系统、HDFS或其它Hadoop支持的文件系统中;
该操作只支持对实现了Hadoop的Writable接口的键值对RDD进行操作,在Scala中,还支持隐式转换为Writable的类型(Spark包括了基本类型的转换,例如Int、Double、String等).

saveAsObjectFile(path) (Java and Scala)
将数据集中的元素以简单的Java序列化的格式写入指定的路径;这些保存该数据的文件,可以使用SparkContext.objectFile()进行加载.

countByKey()
仅支持对(K,V)格式的键值对类型的RDD进行操作;返回(K,Int)格式的Hashmap,(K,Int)为每个key值对应的记录数目.

foreach(func)
对数据集中每个元素使用函数func进行处理.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/642671.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Docker安装、入门及VSCode链接(地平线OE docker镜像)

最近在地平线的SDK X3上做开发&#xff0c;有高手做了一些编译方法的对比&#xff1a; [X3容器应用开发探索-0]开篇&#xff1a;从裸机编译到交叉编译 X86 Qemu for Hobot X3 PI(AARCH64) vs Hobot X3 PI 这里想借助Docker编译提速&#xff0c;做个笔记&#xff1a; 目录 一、…

算法刷题笔记 高精度乘法(C++实现)

文章目录 题目描述解题思路解题代码 题目描述 给定两个非负整数&#xff08;不含前导0&#xff09;A和B&#xff0c;请你计算 AB的值。 输入格式 共两行&#xff0c;第一行包含整数 A&#xff0c;第二行包含整数 B。 输出格式 共一行&#xff0c;包含AB的值。 数据范围 …

最新文章合集

GitHub宝藏项目&#xff1a;每天一个&#xff0c;让你的技术库增值不停&#xff01; STORM、SuperMemory、Awesome Chinese LLM、AI写作助手、资料搜集、文章生成、视角问题引导、模拟对话策略、内容导入、浏览器插件、资源库、开源微调模型 开发者必看&#xff1a;Linux终端…

开关电源AC-DC(15W 3-18V可调)

简介: 该模块使用PI的TNY268PN电源芯片制作的开关电源,实现最大功率15W 3-18V可调输出(更改反馈电阻)隔离式反激电源; 简介:该模块使用PI的TNY268PN电源芯片制作的开关电源,实现最大功率15W 3-18V可调输出(更改反馈电阻,现电路图输出5V)隔离式反激电源; 一、产品简…

leecode 1206|跳表的设计

跳表 跳表&#xff0c;一种链表数据结构&#xff0c;其增删改茶的效率能和平衡树相媲美 leecode1206 可以看上面的那个动画&#xff0c;动画效果很贴切。 我简单讲讲它的机制吧&#xff0c;每个节点不单单是一个&#xff0c;测试好几层&#xff0c;然后同一层的节点和统一节点…

力扣96. 不同的二叉搜索树

Problem: 96. 不同的二叉搜索树 文章目录 题目描述思路复杂度Code 题目描述 思路 一个数字做根节点的话可能的结果为&#xff1a;其左边数字做子树的组合数字乘以其右边数字做子树的个数之积 1.创建备忘录memo&#xff1b; 2.递归分别求取当前数字左边和右边数字做子树的数量&…

Putty: 随心御剑——远程启动服务工具plink

一、引言:如何远程控制 也许你会有这样的场景,交互程序(以下简称UI程序)跑在windows端,而控制程序跑在Linux上。我们想要通过windows端 UI程序来启动Linux下面的服务,来一场酣畅淋漓的御剑飞行咋办,难道要自己十年磨一剑,在Linux下编写一个受控服务程序么.计算机科技发…

Stable Diffusion【艺术特效】【霓虹灯】:霓虹灯像素化马赛克特效

提示词 Neon pixelated mosaic of [Subject Description],highly detailed [主题]的霓虹灯像素化马赛克&#xff0c;高度详细 参数设置 大模型&#xff1a;万享XL_超写实摄影V8.2 采样器&#xff1a;Euler a 采样迭代步数&#xff1a;25 CFG&#xff1a;3 反向提示词&#x…

GmSSL3.X编译iOS和Android动态库

一、环境准备 我用的Mac电脑编译&#xff0c;Xcode版本15.2&#xff0c;安卓的NDK版本是android-ndk-r21e。 1.1、下载国密源码 下载最新的国密SDK源码到本地。 1.2、安装Xcode 前往Mac系统的AppStore下载安装最新Xcode。 1.3、安卓NDK下载 下载NDK到本地&#xff0c;选…

微服务可用性之隔离

摘要 ​ 本文主要微服务场景下服务的可用性保障之隔离。隔离又分为几种情况&#xff0c;动静隔离、读写隔离、热点隔离、资源隔离等场景。 为什么要隔离 ​ 本质上是对资源进行分割确保在出现故障的时候服务只是部分不可用&#xff0c;不至于系统陷入整体性瘫痪&#xff0c;…

推特热帖:大语言模型自荐能够替代的20种人类工作!快来看你是否需要转行!

最近推特上有一个例子引起了广泛的讨论&#xff0c;事情的起因是这样的&#xff1a;网友让 GPT-4o 预测一下自己未来将会替代人类哪些工作&#xff1f; 这听起来很有趣&#xff01;GPT-4o会给出什么样的预测呢&#xff1f; 3.5研究测试&#xff1a;hujiaoai.cn 4研究测试&…

TypeScript学习日志-第三十二天(infer关键字)

infer关键字 一、作用与使用 infer 的作用就是推导泛型参数&#xff0c;infer 声明只能出现在 extends 子语句中&#xff0c;使用如下&#xff1a; 可以看出 已经推导出类型是 User 了 二、协变 infer 的 协变会返回联合类型&#xff0c;如图&#xff1a; 三、逆变 infer…

Redis - 优惠卷秒杀

场景分析 为了避免对数据库造成压力&#xff0c;我们在新增优惠卷的时候&#xff0c;可以将优惠卷的信息储存在Redis中&#xff0c;这样用户抢购的时候访问优惠卷信息&#xff0c;通过Redis读取信息。 抢购流程&#xff1a; 业务分析 既然在新增优惠卷的时候&#xff0c;我…

【数据结构与算法】之堆的应用——堆排序及Top_K问题!

目录 1、堆排序 2、Top_K问题 3、完结散花 个人主页&#xff1a;秋风起&#xff0c;再归来~ 数据结构与算法 个人格言&#xff1a;悟已往之不谏&#xff0c;知来者犹可追 克心守己&#xff0c;律己则安&#xff01; 1、堆排序 对一个无序的数组…

安卓开发--安卓使用Echatrs绘制折线图

安卓开发--安卓使用Echatrs绘制折线图 前期资料安卓使用Echarts绘制折线图1.1 下载 Echarts 安卓资源1.2 新建assets文件1.3 新建布局文件1.4 在布局文件中布局WebView1.5 在活动文件中调用 最终效果 前期资料 Echarts 官网样式预览: https://echarts.apache.org/examples/zh/…

Java开发者必知的时间处理工具:SimpleDateFormat类详解

哈喽&#xff0c;各位小伙伴们&#xff0c;你们好呀&#xff0c;我是喵手。运营社区&#xff1a;C站/掘金/腾讯云&#xff1b;欢迎大家常来逛逛 今天我要给大家分享一些自己日常学习到的一些知识点&#xff0c;并以文字的形式跟大家一起交流&#xff0c;互相学习&#xff0c;一…

【论文阅读】 YOLOv10: Real-Time End-to-End Object Detection

文章目录 AbstractIntroductionRelated WorkMethodologyConsistent Dual Assignments for NMS-free Training &#xff08;无NMS训练的一致性双重任务分配&#xff09;Holistic Efficiency-Accuracy Driven Model Design &#xff08;效率-精度驱动的整体模型设计&#xff09; …

ABB 任务 模块 程序

1&#xff0c;任务由模块组成 &#xff0c; 2&#xff0c;模块分为程序模块和系统模块 3&#xff0c;可以通过新建程序模块和删除程序模块 4.可以在程序模块中构建程序 5&#xff0c;系统模块不能够被删除 6&#xff0c;main 程序主要体现在自动运行中

C++—— set、map、multiset、multimap的介绍及使用

目录 关联式容器 关联式容器的特点和使用场景 树形结构与哈希结构 树形结构 哈希结构 键值对 set set的介绍 set的定义方式 set的使用 multiset map map的介绍 map的定义方式 map的使用 multimap 关联式容器 C标准模板库&#xff08;STL&#xff09;中的关联…

【2024最新华为OD-C卷试题汇总】传递悄悄话的最长时间(100分) - 三语言AC题解(Python/Java/Cpp)

&#x1f36d; 大家好这里是清隆学长 &#xff0c;一枚热爱算法的程序员 ✨ 本系列打算持续跟新华为OD-C卷的三语言AC题解 &#x1f4bb; ACM银牌&#x1f948;| 多次AK大厂笔试 &#xff5c; 编程一对一辅导 &#x1f44f; 感谢大家的订阅➕ 和 喜欢&#x1f497; 文章目录 前…