RDD算子(四)、血缘关系、持久化

1. foreach

分布式遍历每一个元素,调用指定函数

val rdd = sc.makeRDD(List(1, 2, 3, 4))
rdd.foreach(println)

结果是随机的,因为foreach是在每一个Executor端并发执行,所以顺序是不确定的。如果采集collect之后再调用foreach打印,则是在Driver端执行。

RDD的方法之所以叫算子,就是为了与scala集合的方法区分开来, scala集合的方法是在同一个节点执行的,而RDD的算子则是在Executor(分布式节点)执行的。从计算的角度讲,RDD算子外部的操作都是在Driver端执行,算子内部的操作都是在Executor端执行。

2. 闭包检测

class User {
    var age : Int = 30
}
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val user = new User()
rdd.foreach(num => {
    println("age = " + (user.age + num))
})

因为foreach内部的操作是在Executor上执行的,所以在Driver上创建的user需要传递给各个Executor,如果user没有序列化,则会报错

class User extends Serializable{
    var age : Int = 30
}

或者将User变为样例类,因为样例类在编译时会自动混入序列化特质(实现可序列化接口)

case class User {
    var age : Int = 30
}

如果把原始集合变为空,依然会报错,这是因为RDD算子中传递的函数会包含闭包操作(匿名函数,算子内用到了算子外的数据),所以会进行闭包检测,即检查里面的变量是否序列化

val rdd = sc.makeRDD(List[Int]())
val user = new User()
rdd.foreach(num => {
    println("age = " + (user.age + num))
})

 再看如下案例:

class Search(query:String) {
    def isMatch(s:String): Boolean {
        s.contains(query)
    }

    def getMatch1(rdd: RDD[String]): RDD[String] {
        rdd.filter(isMatch)
    }

    def getMatch2(rdd: RDD[String]): RDD[String] {
        rdd.filter(x => x.contains(query))
    }
}
val rdd = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))
val search = new Search("h")
search.getMatch1(rdd).collect().foreach(println)

此时会报错Search类没有序列化,因为在rdd的filter算子内调用了query,而query作为类的构造参数,实际上是类的私有变量,isMatch方法相当于:

def isMatch(s:String): Boolean {
    s.contains(this.query)
}

this相当于类的对象,因此需要进行闭包检测。getMatch2也有类似的问题。除了将类序列化以及改为样例类之外,还可以将query赋给方法内部的临时变量:

def getMatch2(rdd: RDD[String]): RDD[String] {
    val s = query
    rdd.filter(x => x.contains(s))
}

3. 依赖关系

 每个RDD会保存血缘关系(不会保存数据),这样提高了容错性,因为如果其中某个RDD转换到另一个RDD失败了,就可以根据血缘关系来重新读取。RDD保存依赖关系而示意图如下:

血缘关系展示代码:

val lines : RDD[String] = sc.textFile("datas")
println(lines.toDebugString)
println("******************")

val words : RDD[String] = lines.flatMap(_.split(" "))
println(words.toDebugString)
println("******************")

val wordToOne = words.map(word=>(word, 1))
println(wordToOne.toDebugString)
println("******************")

val wordToSum : RDD[(String, Int] = wordToOne.reduceByKey(_+_)
println(wordToSum.toDebugString)
println("******************")

查看依赖关系只需直接将代码中的toDebugString改为dependencies即可

val lines : RDD[String] = sc.textFile("datas")
println(lines.dependencies)
println("******************")

val words : RDD[String] = lines.flatMap(_.split(" "))
println(words.dependencies)
println("******************")

val wordToOne = words.map(word=>(word, 1))
println(wordToOne.dependencies)
println("******************")

val wordToSum : RDD[(String, Int] = wordToOne.reduceByKey(_+_)
println(wordToSum.dependencies)
println("******************")

​​​​​​​

 一对一的依赖关系表示新的RDD的一个分区的数据来源于旧的RDD的一个分区的数据,也叫窄依赖,而Shuffle依赖关系表示新的RDD的一个分区的数据来源于旧的RDD的多个分区的数据,也叫宽依赖。

4. 阶段和任务划分

窄依赖中,分区有多少个,就有多少个任务,只有一个阶段;宽依赖中,有两个阶段,每个阶段的任务数等于分区数。

RDD阶段由有向无环图(DAG)表示

Application->Job->Stage->Task每一个层级是1对n的关系。

每个Application中可能会提交多个作业,一个作业会划分为多个阶段(阶段数=宽依赖个数+1),一个阶段可能因为多个分区而包含多个任务,一个阶段中的任务数=最后一个RDD的分区数。

5. cache和persist

如果对于同一份数据源,想做多个不同的功能(比如统计单词数以及根据单词分组),这些不同的功能在实现过程中有很多重复的步骤(比如很多相同的RDD转换),此时可能会引入性能问题。虽然看起来RDD转换的过程复用了,但是RDD不存储数据,只有逻辑,所以最终的行为算子会从头开始再读取相同的数据,比如下面代码:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))

val flatRDD : RDD[String] = data.flatMap(_.split(" "))

val mapRDD = flatRDD.map(word=>{
    println("@@@@@@@@@@@@@@@")
    (word, 1)
})

val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)

reduceRDD.collect.foreach(println)

println("****************")

val groupRDD = mapRDD.groupByKey()

groupRDD.collect.foreach(println)

 

可以看到,在一行*上下,@都执行了,说明数据从头开始读取,从最开始的RDD再次执行。 

为了解决这种性能问题,可以对mapRDD里的数据进行缓存,要么缓存在内存中,要么缓存在磁盘中,得看具体情况,这就是RDD的持久化操作。使用cache方法缓存在内存中:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))

val flatRDD : RDD[String] = data.flatMap(_.split(" "))

val mapRDD = flatRDD.map(word=>{
    println("@@@@@@@@@@@@@@@")
    (word, 1)
})

mapRDD.cache()

val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)

reduceRDD.collect.foreach(println)

println("****************")

val groupRDD = mapRDD.groupByKey()

groupRDD.collect.foreach(println)

 

放在内存中可能不安全,使用persist方法缓存在磁盘中:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))

val flatRDD : RDD[String] = data.flatMap(_.split(" "))

val mapRDD = flatRDD.map(word=>{
    println("@@@@@@@@@@@@@@@")
    (word, 1)
})

mapRDD.persist(StorageLevel.DISK_ONLY)

val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)

reduceRDD.collect.foreach(println)

println("****************")

val groupRDD = mapRDD.groupByKey()

groupRDD.collect.foreach(println)

注意:持久化操作也是等到行动算子触发才会真正执行。持久化操作不一定都是为了重用才引入的,有些情况下,前面一些RDD转换操作耗时很长或者数据很重要的场合,也可以进行持久化操作,这样一旦中间出了问题,重新执行任务不至于再执行之前耗时很长的操作。

除了以上的cache和persist方法,还可以使用检查点(checkpoint)的方法进行持久化操作。checkpoint需要落盘,因此需要指定存储路径,之前的persist方法也要落盘,只不过它存储在临时路径,任务执行完就会删除,而checkpoint是永久化存储

sc.setCheckPointDir("cp")

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))

val flatRDD : RDD[String] = data.flatMap(_.split(" "))

val mapRDD = flatRDD.map(word=>{
    println("@@@@@@@@@@@@@@@")
    (word, 1)
})

mapRDD.checkpoint()

val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)

reduceRDD.collect.foreach(println)

println("****************")

val groupRDD = mapRDD.groupByKey()

groupRDD.collect.foreach(println)

但是checkpoint会单独再开启一个作业,因此效率可能更低。但是与cache联合执行,即先cache,再checkpoint,就不会开启新的作业。

另外,使用cache方法会改变RDD的血缘关系:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))

val flatRDD : RDD[String] = data.flatMap(_.split(" "))

val mapRDD = flatRDD.map(word=>{
    println("@@@@@@@@@@@@@@@")
    (word, 1)
})

mapRDD.checkpoint()
println(mapRDD.toDebugString)

val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)

reduceRDD.collect.foreach(println)

println("****************")
println(mapRDD.toDebugString)


 

 

可以看到,cache方法(其实persis方法也会) 在血缘关系中添加新的依赖(原来的依赖还保留)。但是checkpoint方法会改变原来的血缘关系,建立新的血缘关系(等同于数据源变了):

sc.setCheckPointDir("cp")


val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))

val flatRDD : RDD[String] = data.flatMap(_.split(" "))

val mapRDD = flatRDD.map(word=>{
    println("@@@@@@@@@@@@@@@")
    (word, 1)
})

mapRDD.checkpoint()
println(mapRDD.toDebugString)

val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)

reduceRDD.collect.foreach(println)

println("****************")
println(mapRDD.toDebugString)


 

6. 自定义分区器

class MyPartitioner extends Partitioner {
    override def numPartitions : Int = n  //自定义,可以写死

    override def getPartition(key : Any) : Int = {
        key match {
            case "xxx" => 0
            case _ => 1
        }
    }
    
}

分区器传给RDD:

val partitionRDD = rdd.partitionBy(new MyPartitioner)

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/519000.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ADB(Android Debug Bridge)操作命令详解及示例

ADB(Android Debug Bridge)是一个强大的命令行工具,它是Android SDK的一部分,主要用于Android设备(包括真实手机和平板电脑以及模拟器)的调试、系统控制和应用程序部署。 下面是一些ADB的常用命令&#xff…

全面解析找不到msvcr110.dll,无法继续执行代码的解决方法

MSVCR110.dll的丢失可能导致某些应用程序无法启动。当用户试图打开依赖于该特定版本DLL文件的软件时,可能会遭遇“找不到指定模块”的错误提示,使得程序启动进程戛然而止。这种突如其来的故障不仅打断了用户的正常工作流程,也可能导致重要数据…

[中级]软考_软件设计_计算机组成与体系结构_08_输入输出技术

输入输出技术 前言控制方式考点往年真题 前言 输入输出技术就是IO技术 控制方式 程序控制(查询)方式:分为无条件传送和程序查询方式两种。 方法简单,硬件开销小,但I/O能力不高,严重影响CPU的利用率。 程序中断方式&#xff1…

机器学习第33周周报Airformer

文章目录 week33 AirFormer摘要Abstract一、论文的前置知识1. 多头注意力机制(MSA)2. 具有潜变量的变分模型 二、文献阅读1. 题目2. abstract3. 问题与模型阐述3.1 问题定义3.2 模型概述3.3 跨空间MSA(DS-MSA)3.4 时间相关MSA&…

特定领域软件体系结构

1.DSSA的定义 简单地说,DSSA(Domain Specific Software Architecture)就是在一个特定应用领域中为一组应用提供组织结构参考的标准软件体系结构。 从功能覆盖的范围的角度有两种理解DSSA中领域的含义的方式: (1&#x…

微信小程序生命周期管理:从数据初始化到事件绑定

作为一个独立的应用开发平台,微信小程序提供了自己的生命周期机制,与我们熟悉的Vue.js框架有一些差异。掌握小程序生命周期的特点和使用技巧,对于开发高质量的小程序应用至关重要。深入理解和掌握小程序生命周期的使用技巧,将有助于我们构建出更加健壮和可维护的小程序应用。 小…

c语言数据结构(10)——冒泡排序、快速排序

欢迎来到博主的专栏——C语言数据结构 博主ID:代码小豪 文章目录 冒泡排序冒泡排序的代码及原理快速排序快速排序的代码和原理快速排序的其他排序方法非递归的快速排序 冒泡排序 相信冒泡排序是绝大多数计科学子接触的第一个排序算法。作为最简单、最容易理解的排序…

【软件测试】测试常见知识点汇总

测试常见知识点汇总 一、什么是测试1.1 测试和调试的区别1.2 什么是需求1.2.1 用户需求1.2.2 软件需求 1.3 测试用例要素1.4 软件的生命周期及各阶段概述1.5 开发模型和测试模型(记住特点和适用场景)1.5.1 开发模型1.5.1.1 瀑布模型(自上而下…

解密项目管理工具数据安全:防火防盗,保密有招

相关数据显示,2021年中国数字经济规模总量达到45.5万亿元,占到国内GDP总量的39.8%。数字经济已经渗入我们工作生活的方方面面,项目管理工具就是其中之一,在数据安全备受重视的今天如何保证项目管理工具的数据安全性?Zo…

Linux+HA高可用24X7的安全保证

一. 介绍作为服务器,需要提供一定的24X7的安全保证,这样可以防止关键节点的宕机引起系统的全面崩溃。利用OpenSource开源软件,完成系统的高可靠双机热备方案。基于linux的 HA软件可靠稳定,比使用商业版本的HA软件降低成…

微信小程序python+uniapp高校图书馆图书借阅管理系统ljr9i

根据日常实际需要,一方面需要在系统中实现基础信息的管理,同时还需要结合实际情况的需要,提供图书信息管理功能,方便图书管理工作的展开,综合考虑,本套系统应该满足如下要求: 首先,在…

人工智能基础概念5:使用L1范数惩罚进行Lasso回归(正则化)解决机器学习线性回归模型幻觉和过拟合的原理

一、引言 在老猿CSDN的博文《人工智能基础概念3:模型陷阱、过拟合、模型幻觉》中介绍了通过L1或L2正则化来限制模型的复杂度来解决过拟合的问题,老猿当时并不了解这背后的原理,这2天通过查阅资料终于明白了相关知识,在此一L1正则…

Linux故障排查(亲身经历),Linux运维开发6年了

这里输入数字时注意不要按小键盘,要按键盘字母区上面的那排数字键; 比如我们要关闭pid为2的进程,输入2后按回车,会出现以下提示,此时再按回车就ok 注意 如果执行top命令后,发现没有cpu占用率较高的进程&a…

如何在Linux中安装软件

文章目录 一、Linux应用程序基础1.Linux软件安装包分类2.应用程序和系统命令的关系3.常见的软件包的封装类型 二、安装软件的方式1.RPM包管理工具2.yum安装3.编译 一、Linux应用程序基础 1.Linux软件安装包分类 Linux源码包: 实际上,源码包就是一大堆源…

基于JAVAEE技术校园车辆管理系统论文

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本校园车辆管理系统就是在这样的大环境下诞生,其可以帮助管理者在短时间内处理完毕庞大的数据信息…

python_web1(前端开发之HTML、CSS、Bootstap、Javascript、JQuery)

文章目录 一、Flask网页开发1.1创建一个名为web1.py的python文件1.2 templates目录创建文件index.html 二、html标签2.1 编码2.2title < head >2.3 标题< h>2.4 div和span2.5超链接1.在index.xml文件中补充。2.修改web1.py文件3.添加get_self.html4.效果 2.6图片1.…

Python常用算法思想--回溯算法思想详解【附源码】

通过回溯算法解决“组合”问题、“排序”问题、“搜索”之八皇后问题、“子集和”之0-1背包问题、字符串匹配等六个经典案例进行介绍: 一、解决“组合”问题 从给定的一组元素中找到所有可能的组合,这段代码中的 backtrack_combinations 函数使用了回溯思想,调用 backtrack…

【论文精读】Detecting Out-of-Distribution Examples with Gram Matrices 使用Gram矩阵检测分布外实例

文章目录 一、文章概览&#xff08;一&#xff09;Gram矩阵1、Gram&#xff08;格朗姆&#xff09;矩阵的定义2、Gram矩阵计算特征表示3、风格迁移中的Gram矩阵 &#xff08;二&#xff09;ood检测&#xff08;三&#xff09;核心思路&#xff1a;扩展 Gram 矩阵以进行分布外检…

DHCP工作过程以及抓包分析

从PC1的e0/0/1接口进行抓包 客户端基于UDP、源端口68、目标端口67进行广播请求&#xff0c;源IP0.0.0.0&#xff0c;&#xff08;无效地址&#xff0c;代表本地无地址&#xff09;目标IP255.255.255.255&#xff1b; 从下面截图可以看出&#xff1a; 源mac为电脑mac&#xff…

steam和epic的使用

steam和epic的使用 介绍 这俩都是游戏平台。 登录注册 steam 使用网吧uu加速器打开steam 点击启动游戏&#xff1a;&#xff08;网吧实例&#xff0c;接着点启动&#xff09; 两种方法&#xff1a; 1.直接点内个“创建免费账户”。然后直接注册就行&#xff08;我在网…