Spark核心之02:RDD、算子分类、常用算子

spark内存计算框架

在这里插入图片描述

一、目标

  1. 深入理解RDD弹性分布式数据集底层原理
  2. 掌握RDD弹性分布式数据集的常用算子操作

二、要点

⭐️1. RDD是什么

  • RDD(Resilient Distributed Dataset)叫做**弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算**的集合.
    • Dataset: 就是一个集合,存储很多数据.
    • Distributed:它内部的元素进行了分布式存储,方便于后期进行分布式计算.
    • Resilient: 表示弹性,rdd的数据是可以保存在内存或者是磁盘中.

⭐️2. RDD的五大属性

在这里插入图片描述

  • (1)A list of partitions
    • 一个分区(Partition)列表,数据集的基本组成单位。
	这里表示一个rdd有很多分区,每一个分区内部是包含了该rdd的部分数据,
spark中任务是以task线程的方式运行, 一个分区就对应一个task线程。

	用户可以在创建RDD时指定RDD的分区个数,如果没有指定,那么就会采用默认值。
    val rdd=sparkContext.textFile("/words.txt")
    如果该文件的block块个数小于等于2,这里生产的RDD分区数就为2
    如果该文件的block块个数大于2,这里生产的RDD分区数就与block块个数保持一致
    	
  • (2)A function for computing each split
    • 一个计算每个分区的函数
	Spark中RDD的计算是以分区为单位的,每个RDD都会实现compute计算函数以达到这个目的.
  • (3)A list of dependencies on other RDDs
    • 一个rdd会依赖于其他多个rdd
  这里就涉及到rdd与rdd之间的依赖关系,spark任务的容错机制就是根据这个特性(血统)而来。
  
  • (4)Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    • 一个Partitioner,即RDD的分区函数(可选项)
当前Spark中实现了两种类型的分区函数,
一个是基于哈希的HashPartitioner,(key.hashcode % 分区数= 分区号)
另外一个是基于范围的RangePartitioner。
只有对于key-value的RDD,并且产生shuffle,才会有Partitioner,

非key-value的RDD的Parititioner的值是None。
  • (5)Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
    • 一个列表,存储每个Partition的优先位置(可选项)
这里涉及到数据的本地性,数据块位置最优。
spark任务在调度的时候会优先考虑存有数据的节点开启计算任务,减少数据的网络传输,提升计算效率。

3. 基于spark的单词统计程序剖析rdd的五大属性

  • 需求

    HDFS上有一个大小为300M的文件,通过spark实现文件单词统计,最后把结果数据保存到HDFS上
    
  • 代码

    sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/out")
    
  • 流程分析

在这里插入图片描述

4. RDD的创建方式

  • 1、通过已经存在的scala集合去构建

    val rdd1=sc.parallelize(List(1,2,3,4,5))
    val rdd2=sc.parallelize(Array("hadoop","hive","spark"))
    val rdd3=sc.makeRDD(List(1,2,3,4))
    
  • 2、加载外部的数据源去构建

    val rdd1=sc.textFile("/words.txt")
    
  • 3、从已经存在的rdd进行转换生成一个新的rdd

    val rdd2=rdd1.flatMap(_.split(" "))
    val rdd3=rdd2.map((_,1))
    

⭐️5. RDD的算子分类

  • 1、transformation(转换)
    • 根据已经存在的rdd转换生成一个新的rdd, 它是延迟加载,它不会立即执行
    • 例如
      • map / flatMap / reduceByKey 等
  • 2、action (动作)
    • 它会真正触发任务的运行
      • 将rdd的计算的结果数据返回给Driver端,或者是保存结果数据到外部存储介质中
    • 例如
      • collect / saveAsTextFile 等

6. RDD常见的算子操作说明

6.1 transformation算子
转换含义
map(func)返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func)返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func)类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func)类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func)类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
union(otherDataset)对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset)对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks]))对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks])在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks])在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
sortByKey([ascending], [numTasks])在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks])与sortByKey类似,但是更灵活
join(otherDataset, [numTasks])在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks])在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
coalesce(numPartitions)减少 RDD 的分区数到指定值。
repartition(numPartitions)重新给 RDD 分区
repartitionAndSortWithinPartitions(partitioner)重新给 RDD 分区,并且每个分区内以记录的 key 排序
6.2 action算子
动作含义
reduce(func)reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。
collect()在驱动程序中,以数组的形式返回数据集的所有元素
count()返回RDD的元素个数
first()返回RDD的第一个元素(类似于take(1))
take(n)返回一个由数据集的前n个元素组成的数组
takeOrdered(n, [ordering])返回自然顺序或者自定义顺序的前 n 个元素
saveAsTextFile(path)将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path)将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path)将数据集的元素,以 Java 序列化的方式保存到指定的目录下
countByKey()针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
⭐️foreach(func)在数据集的每一个元素上,运行函数func
⭐️foreachPartition(func)在数据集的每一个分区上,运行函数func

7. RDD常用的算子操作演示

  • 为了方便前期的测试和学习,可以使用spark-shell进行演示

    spark-shell --master local[2]
    
7.1 map(Trans转换算子)

**map(func)**返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))

//把rdd1中每一个元素乘以10
rdd1.map(_*10).collect
7.2 filter(Trans转换算子)

**filter(func)**返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))

//把rdd1中大于5的元素进行过滤
rdd1.filter(x => x >5).collect
7.3 flatMap(Trans转换算子)

flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
//获取rdd1中元素的每一个字母
rdd1.flatMap(_.split(" ")).collect
7.4 intersection、union(Trans转换算子)

union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD

intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD

val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求交集
rdd1.intersection(rdd2).collect

//求并集
rdd1.union(rdd2).collect
7.5 distinct(Trans转换算子)

distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD

val rdd1 = sc.parallelize(List(1,1,2,3,3,4,5,6,7))
//去重
rdd1.distinct
7.6 join、groupByKey(Trans转换算子)

join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求join
val rdd3 = rdd1.join(rdd2)
rdd3.collect
//求并集
val rdd4 = rdd1 union rdd2
rdd4.groupByKey.collect
7.7 cogroup(Trans转换算子)

cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

collect() 在驱动程序中,以数组的形式返回数据集的所有元素

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("jim", 2)))
//分组
val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect
//
//res0: Array[(String, (Iterable[Int], Iterable[Int]))] = 
//Array(
//    (jim,(CompactBuffer(),CompactBuffer(2))), 
//    (tom,(CompactBuffer(1, 2),CompactBuffer(1))), 
//    (jerry,(CompactBuffer(3),CompactBuffer(2))), 
//    (kitty,(CompactBuffer(2),CompactBuffer()))
//  )
7.8 reduce (Action动作算子)

reduce(func) reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))

//reduce聚合
val rdd2 = rdd1.reduce(_ + _)
rdd2.collect

val rdd3 = sc.parallelize(List("1","2","3","4","5"))
rdd3.reduce(_+_)

这里可能会出现多个不同的结果,由于元素在不同的分区中,每一个分区都是一个独立的task线程去运行。这些task运行有先后关系
7.9 reduceByKey、sortByKey(Trans转换算子)

groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD

reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 ,不同于groupByKey(),reduceByKey会在map端join

sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)

//按key进行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect

//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect
7.10 repartition、coalesce(Trans转换算子)

coalesce(numPartitions) 减少 RDD 的分区数到指定值,默认不会产生shuffle,传入true开启shuffle

repartition(numPartitions) 重新给 RDD 分区,会产生shuffle 相当于coalesce(numPatitions,true)**

val rdd1 = sc.parallelize(1 to 10,3)
//打印rdd1的分区数
rdd1.partitions.size

//利用repartition改变rdd1分区数
//减少分区
rdd1.repartition(2).partitions.size

//增加分区
rdd1.repartition(4).partitions.size

//利用coalesce改变rdd1分区数
//减少分区
rdd1.coalesce(2).partitions.size


//repartition:  重新分区, 有shuffle
//coalesce:     合并分区 / 减少分区 	默认不shuffle   
//默认 coalesce 不能扩大分区数量。除非添加true的参数,或者使用repartition。

//适用场景:
    //1、如果要shuffle,都用 repartition
    //2、不需要shuffle,仅仅是做分区的合并,coalesce
    //3、repartition常用于扩大分区。

⭐️7.11 map、mapPartitions 、mapPartitionsWithIndex(Trans转换算子)

map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]

val rdd1=sc.parallelize(1 to 10,5)
rdd1.map(x => x*10)).collect
rdd1.mapPartitions(iter => iter.map(x=>x*10)).collect

//index表示分区号  可以获取得到每一个元素属于哪一个分区
rdd1.mapPartitionsWithIndex((index,iter)=>iter.map(x=>(index,x)))

map:用于遍历RDD,将函数f应用于每一个元素,返回新的RDD(transformation算子)。
mapPartitions:用于遍历操作RDD中的每一个分区,返回生成一个新的RDD(transformation算子)。

总结:
如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效
比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection。
⭐️7.12 foreach、foreachPartition (Action动作算子)

foreach(func) 在数据集的每一个元素上,运行函数func

foreachPartition(func) 在数据集的每一个分区上,运行函数func

val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))

//foreach实现对rdd1里的每一个元素乘10然后打印输出
rdd1.foreach(x=>println(x * 10))

//foreachPartition实现对rdd1里的每一个元素乘10然后打印输出
rdd1.foreachPartition(iter => iter.foreach(x=>println(x * 10)))

foreach:用于遍历RDD,将函数f应用于每一个元素,无返回值(action算子)。
foreachPartition: 用于遍历操作RDD中的每一个分区。无返回值(action算子)。


总结:
一般使用mapPartitions或者foreachPartition算子比map和foreach更加高效,推荐使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/981259.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Redis中常见的延迟问题

使用复杂度高的命令 Redis提供了慢日志命令的统计功能 首先设置Redis的慢日志阈值,只有超过阈值的命令才会被记录,这里的单位是微妙,例如设置慢日志的阈值为5毫秒,同时设置只保留最近1000条慢日志记录: # 命令执行超过…

LangGraph实战:构建智能文本分析流水线

LangGraph实战:构建智能文本分析流水线 1. 智能文本分析 LangGraph是基于图结构的工作流开发框架,通过节点函数和条件流转实现复杂业务逻辑。四大核心能力: 1.1 状态容器 统一管理流程执行上下文,支持JSON序列化存储 1.2 智能路由 基于条件判断实现动态分支跳转 1.3 可…

【北京迅为】itop-3568 开发板openharmony鸿蒙烧写及测试-第1章 体验OpenHarmony—烧写镜像

瑞芯微RK3568芯片是一款定位中高端的通用型SOC,采用22nm制程工艺,搭载一颗四核Cortex-A55处理器和Mali G52 2EE 图形处理器。RK3568 支持4K 解码和 1080P 编码,支持SATA/PCIE/USB3.0 外围接口。RK3568内置独立NPU,可用于轻量级人工…

MyBatis - 单元测试 参数传递 注解 CRUD

目录 1. MyBatis 简介 2. 简单使用 MyBatis 2.1 创建 MyBatis 项目 2.2 连接数据库 2.3 创建 Java 类 2.4 创建 Mapper 接口 2.5 在测试类中执行 3. 单元测试 3.1 Test 3.2 SpringBootTest 3.3 BeforeEach / AfterEach 4. MyBatis 基础操作 4.1 配置 MyBatis 打印日…

课程2. 机器学习方法论

课程2. 机器学习方法论 训练算法并评估其质量将样本分成训练和测试。分层 交叉验证方法sklearn 接口算法模型训练模型的应用质量评估 数据预处理标准缩放Violinplot 数据集使用模型Pipeline 在上一讲中,我们讨论了机器学习专家面临的挑战。无论解决的问题类型和解决…

CentOS 7使用RPM安装MySQL

MySQL是一个开源的关系型数据库管理系统(RDBMS),允许用户高效地存储、管理和检索数据。它被广泛用于各种应用,从小型的web应用到大型企业解决方案。 MySQL提供了丰富的功能,包括支持多个存储引擎、事务能力、数据完整…

涂层,如同一道守护之光,有效遏制了QD(量子点)那如星辰般忽明忽暗的闪烁与如垂暮手电筒般黯淡无光的褪色现象。

涂层,如同一道守护之光,有效遏制了QD(量子点)那如星辰般忽明忽暗的闪烁与如垂暮手电筒般黯淡无光的褪色现象。俄克拉荷马大学(University of Oklahoma)的一项卓越研究,犹如破晓之光,…

C++第六节:stack和queue

本节目标: stack的介绍与使用queue的介绍与使用priority_queue的介绍与使用容器适配器模拟实现与结语 1 stack(堆)的介绍 stack是一种容器适配器,专门用在具有后进先出操作的上下文环境中,只能从容器的一端进行元素的插…

五分钟快速学习优秀网站的HTML骨架布局设计

一.编写多级过滤脚本&#xff0c;在控制台执行copy方法进行提取&#xff1a; 过滤脚本脚本 // 在浏览器F12的控制台里&#xff0c;直接执行以下脚本 copy(document.documentElement.outerHTML// 一级过滤&#xff1a;移除动态内容.replace(/<script\b[^>]*>[\s\S]*?…

硬件学习笔记--47 LDO相关基础知识介绍

目录 1.LDO主要功能介绍 2.LDO相关参数介绍 3.使用方法 4.优、缺点 1.LDO主要功能介绍 LDO&#xff08;Low Dropout Regulator&#xff09;是一种线性稳压器&#xff0c;用于将输入电压转换为稳定的输出电压。其主要功能包括&#xff1a; 1&#xff09;稳压功能&#xff1…

利用矩阵相乘手动实现卷积操作

卷积&#xff08;Convolution&#xff09; 是信号处理和图像处理中的一种重要操作&#xff0c;广泛应用于深度学习&#xff08;尤其是卷积神经网络&#xff0c;CNN&#xff09;中。它的核心思想是通过一个卷积核&#xff08;Kernel&#xff09; 或 滤波器&#xff08;Filter&am…

STM32-HAL库初始化时钟

使能和失能外设GPIOA 时钟信号初始化函数 HAL_RCC_OscConfig函数&#xff1a; HAL_StatusTypeDef是该函数的返回值类型,最顶上的那句话只是这个函数的原型 HAL_RCC_ClockConfig函数&#xff1a; 因为FLASH实际上只能支持24MHz的时钟信号所以如果用高于24MHz的信号输入则要用到等…

windows环境执行composer install出错

现在的项目环境都是要求比较新的版本&#xff0c;就比如今天部署测试一个新框架遇到了下面这些问题&#xff0c;报错原因有以下几点&#xff1a; PHP版本低了&#xff0c;现在的新项目都是要求PHP8以上版本&#xff1b;指令废弃&#xff0c;配置文件禁用即可&#xff1b;切换P…

Three.js 入门(光线投射实现3d场景交互事件)

本篇主要学习内容 : 光线投射器交互事件 点赞 关注 收藏 学会了 1.光线投射器 Raycaster 此类旨在协助光线投射。光线投射用于鼠标拾取&#xff08;确定鼠标在 3D 空间中的哪些对象上&#xff09;等。 Raycaster( origin : Vector3, direction : Vector3, near : Float,…

蓝桥杯web第三天

展开扇子题目&#xff0c; #box:hover #item1 { transform:rotate(-60deg); } 当悬浮在父盒子&#xff0c;子元素旋转 webkit display: -webkit-box&#xff1a;将元素设置为弹性伸缩盒子模型。-webkit-box-orient: vertical&#xff1a;设置伸缩盒子的子元素排列方…

Unity 使用NGUI制作无限滑动列表

原理&#xff1a; 复用几个子物体&#xff0c;通过子物体的循环移动实现&#xff0c;如下图 在第一个子物体滑动到超出一定数值时&#xff0c;使其放到最下方 --------------------------------------------------------------》 然后不停的循环往复&#xff0c;向下滑动也是这…

网络安全蜜罐产品研究现状

&#x1f345; 点击文末小卡片 &#xff0c;免费获取网络安全全套资料&#xff0c;资料在手&#xff0c;涨薪更快 一、知识点总结 1、蜜罐&#xff08;Honeypot&#xff09;&#xff1a;诱捕攻击者的一个陷阱。 2、蜜网&#xff08;Honeynet&#xff09;&#xff1a;采用了技术…

SpringBoot3—场景整合:环境准备

一、云服务器 阿里云服务器开通安装以下组件 dockerrediskafkaprometheusgrafana 下载windterm&#xff1a;https://github.com/kingToolbox/WindTerm/releases/download/2.5.0/WindTerm_2.5.0_Windows_Portable_x86_64.zip 重要&#xff1a;开通云服务器以后&#xff0c;请一…

Ollama进行DeepSeek本地部署存在安全风险解决方案,nginx反向代理配置

文章目录 概要整体架构流程技术细节**## 1.下载nginx [https://nginx.org/en/download.html](https://nginx.org/en/download.html),推荐Stable version稳定版**2.下载完成解压文件,打开conf文件夹下的nginx.conf,贴上反向代理配置3.然后点击解压文件夹下的nginx.exe,启动成…

【音视频】ffmpeg音视频处理基本流程

一、ffmpeg音视频处理基本流程 首先先看两条命令 ffmpeg -i 1.mp4 -acodec copy -vcodec libx264 -s 1280x720 2.flv ffmpeg -i 1.mp4 -acodec copy -vcodec libx265 -s 1280x720 3.mkv-i :表示输入源&#xff0c;这里是1.mp4&#xff0c;是当前路径下的视频文件-acodec copy…