Spark---RDD持久化

文章目录

  • 1.RDD持久化
      • 1.1 RDD Cache 缓存
      • 1.2 RDD CheckPoint 检查点
      • 1.3 缓存和检查点区别
  • 2.RDD分区器
      • 2.1 Hash 分区:
      • 2.2 Range 分区:
      • 2.3 用户自定义分区

1.RDD持久化

在Spark中,持久化是将RDD存储在内存中,以便在多次计算之间重复使用。这可以显著减少不必要的计算,提高Spark应用程序的性能。
在这里插入图片描述

    val lines = context.textFile("D:\\learnSoftWare\\IdeaProject\\Spark_Demo\\Spark_Core\\src\\main\\com.mao\\datas\\1.txt")
    //执行扁平化操作
    //扁平化就是将多个集合打散为一个集合
    val words = lines.flatMap((a: String) => a.split(" "))

    //对每个单词进行改造(hello,1)
    val wordMap = words.map(word => {
      println("@@@@@@@@@@@@@@@@@")
      (word, 1)
    })


    //reduceByKey要使用wordMap
    val wordToCount=wordMap.reduceByKey((t1,t2)=>t1+t2)
    wordToCount.collect().foreach(println)

    //groupByKey要使用wordMap
    val wordToGroup = wordMap.groupByKey()
    wordToGroup.collect().foreach(println)

在这里插入图片描述

如上述代码,reduceByKey和groupByKey都要是用wordMap的结果,由于RDD中是不存储数据的,在reduceByKey使用完成之后,groupByKey想要再次使用的时候,需要查找血缘关系,从头开始一步一步的执行,如果数据量大的情况下,会造成很大的成本上的浪费。

1.1 RDD Cache 缓存

RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。 但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。

可以通过设置persist来将数据存储到内存或者磁盘中

    // 数据缓存。
    wordMap.cache()
    // 可以更改存储级别
    //wordMap.persist(StorageLevel.MEMORY_AND_DISK_2)

在这里插入图片描述
可以看出将中间结果放入缓存中后,第二次使用中间结果的时候,将不会从头再执行一遍,而是直接从缓存中读取数据,

存储级别:

object StorageLevel {
 val NONE = new StorageLevel(false, false, false, false)
 val DISK_ONLY = new StorageLevel(true, false, false, false)
 val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
 val MEMORY_ONLY = new StorageLevel(false, true, false, true)
 val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
 val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
 val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
 val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
 val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
 val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
 val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
 val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

在这里插入图片描述
Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist 或 cache。

1.2 RDD CheckPoint 检查点

检查点其实就是通过将 RDD 中间结果写入磁盘
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。

检查点的数据需要进行落盘,当作业执行结束后,不会被删除。检查点数据一般都是存储在hdfs上。

    //建立与Spark框架的连接
    val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")//配置文件
    val context = new SparkContext(wordCount)//读取配置文件

    // 设置检查点路径
    //这里选择将数据落盘在本地
    context.setCheckpointDir("./checkpoint1")

    //执行业务操作
    val lines = context.textFile("D:\\learnSoftWare\\IdeaProject\\Spark_Demo\\Spark_Core\\src\\main\\com.mao\\datas\\1.txt")
    //执行扁平化操作
    //扁平化就是将多个集合打散为一个集合
    val words = lines.flatMap((a: String) => a.split(" "))

    //对每个单词进行改造(hello,1)
    val wordMap = words.map(word => {
      println("@@@@@@@@@@@@@@@@@")
      (word, 1)
    })

    // 数据缓存。
    wordMap.cache()

    // 数据检查点:针对 wordToOneRdd 做检查点计算
    wordMap.checkpoint()


    //reduceByKey要使用wordMap
    val wordToCount=wordMap.reduceByKey((t1,t2)=>t1+t2)
    wordToCount.collect().foreach(println)

    //groupByKey要使用wordMap
    val wordToGroup = wordMap.groupByKey()
    wordToGroup.collect().foreach(println)



    //关闭连接
    context.stop()

1.3 缓存和检查点区别

1)Cache 缓存只是将数据保存起来,不切断血缘依赖,即Cache相当于增加一个依赖关系。Checkpoint 检查点切断血缘依赖。
2)Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存
储在 HDFS 等容错、高可用的文件系统,可靠性高。
3)建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存
中读取数据即可,否则需要再从头计算一次 RDD。

2.RDD分区器

Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区。Hash 分区为当前的默认分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分区,进而决定了 Reduce 的个数。

只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None
每个 RDD 的分区 ID 范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。

val dataRdd=sparkRdd.makeRDD(List(
      1,2,3,4
    ),2)

在上述代码中创建的dataRDD是没有分区器的,但是Spark会根据指定的分区数(在这个例子中是2)来将数据分布到不同的分区中,但这种分布并不是由分区器控制的。而是根据数据的本地性(locality)或者简单的轮询(round-robin)方式来分配的。

2.1 Hash 分区:

对于给定的 key,计算其 hashCode,并除以分区个数取余

2.2 Range 分区:

将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序

2.3 用户自定义分区

自定义分区器,实现将王同学分到一个分区,张同学分到另一个分区

package bigdata.wordcount.RDD

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

/**
 * 自定义分区器
 */
object SelfPartitioner {
  def main(args: Array[String]): Unit = {
    //建立与Spark框架的连接
    val rdd = new SparkConf().setMaster("local[*]").setAppName("RDD") //配置文件
    val sparkRdd = new SparkContext(rdd) //读取配置文件

    val dataRDD: RDD[(String, String)] = sparkRdd.makeRDD(List(
      ("小王", "xxxxxxxxx"),
      ("小张", "xxxxxxxxx"),
      ("小王", "xxxxxxxxx"),
    ), 2)
    //采用自定义分区器
    val dataRes: RDD[(String, String)] = dataRDD.partitionBy(new MyPartitioner)
    dataRes.saveAsTextFile("output0")

    sparkRdd.stop()
  }

}

/**
 * 自定义分区器
 * 1.继承Partitioner
 * 2.重写相关方法
 */
class MyPartitioner extends Partitioner
{
  //设置分区数量
  override def numPartitions: Int = 2

  //根据得到的key值获取到分区的索引(从0开始)
  override def getPartition(key: Any): Int = {
     key match {
     case "小王" => 0
     case "小张" => 1
     }
     }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/324753.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

HDFS WebHDFS 读写文件分析及HTTP Chunk Transfer Encoding相关问题探究

文章目录 前言需要回答的首要问题DataNode端基于Netty的WebHDFS Service的实现基于重定向的文件写入流程写入一个大文件时WebHDFS和Hadoop Native的块分布差异 基于重定向的数据读取流程尝试读取一个小文件尝试读取一个大文件 读写过程中的Chunk Transfer-Encoding支持写文件使…

快慢指针-Floyd判圈算法

对于环形链表是否存在环的做法,普通算法可以通过额外Hash数组来存储链表元素,直到Hash数组中出现重复元素。时间复杂度O(n),空间复杂度O(n) Floyd判圈算法通过利用快慢指针的移动来实现,时间复杂度O(n)&am…

Elasticsearch:聊天机器人教程(二)

这是继上一篇文章 “Elasticsearch:聊天机器人教程(一)”的续篇。本教程的这一部分讨论聊天机器人实现中最有趣的方面,以帮助你理解它并对其进行自定义。 数据摄入 在此应用程序中,所有示例文档的摄取都是通过 flask …

搭建知识付费小程序平台:如何避免被坑,选择最佳方案?

随着知识经济的兴起,知识付费已经成为一种趋势。越来越多的人开始将自己的知识和技能进行变现,而知识付费小程序平台则成为了一个重要的渠道。然而,市面上的知识付费小程序平台琳琅满目,其中不乏一些不良平台,让老实人…

git提交报错:remote: Please remove the file from history and try again.

1. 报错信息 remote: error: File: fba7046b22fd74b77425aa3e4eae0ea992d44998 500.28 MB, exceeds 100.00 MB. remote: Please remove the file from history and try again. git rev-list --objects --all | grep fba7046b22fd74b77425aa3e4eae0ea992d44998 2. 分析原因 e…

单例模式实现最好的方式即枚举实现

单例类作为23种设计模式当中最常用的设计模式,实现方式有很多种,比较流行的是DCL(DoubleCheckLock)双重检查的实现,线程安全,又比较好,除了存在序列化的问题之外,还算不错,如果对DCL模式还不熟悉…

UE5 RPG使用GAS技能系统

之前也介绍过GAS的使用: UE 5 GAS Gameplay Ability System UE 5 GAS 在项目中处理AttributeSet相关 UE 5 GAS 在项目中通过数据初始化 基础的讲解这里不再诉说,有兴趣的可以翻我之前的博客。 接下来,在RPG游戏中实现GAS系统的使用。 GAS系统…

微店商品详情API(micro.item_get)的数据分析和挖掘

随着电商行业的迅猛发展,微店作为电商平台的重要组成部分,提供了丰富的API接口供开发者使用。其中,微店商品详情API(micro.item_get)是用于获取商品详情的接口,为数据分析和挖掘提供了大量有价值的数据源。…

YOLOv8改进 | 融合改进篇 | 轻量化CCFM + SENetv2进行融合改进涨点 (全网独家首发)

一、本文介绍 本文给大家带来的改进机制是轻量化的Neck结构CCFM配合SENetv2改进的网络结构进行融合改进,其中CCFM为我本人根据RT-DETR模型一比一总结出来的,文中配其手撕结构图,其中SENetV2为网络结构重构化模块,通过其改进主干从而提取更有效的特征,这两个模块搭配在一起…

2.1.2 一个关于y=ax+b的故事

跳转到根目录:知行合一:投资篇 已完成: 1、投资&技术   1.1.1 投资-编程基础-numpy   1.1.2 投资-编程基础-pandas   1.2 金融数据处理   1.3 金融数据可视化 2、投资方法论   2.1.1 预期年化收益率   2.1.2 一个关于yaxb的…

Linux/Uinx 什么是栈帧?

什么是栈帧? 栈帧是计算机内存中的一个独立区域,用于存储程序函数调用过程中的局部变量、参数和返回地址。每当一个函数被调用时,都会在栈上创建一个新的栈帧。函数执行完毕后,对应的栈帧将被销毁。栈帧的概念有助于理解程序函数…

MS-DETR: Efficient DETR Training with Mixed Supervision论文学习笔记

论文地址:https://arxiv.org/pdf/2401.03989.pdf 代码地址(中稿后开源):GitHub - Atten4Vis/MS-DETR: The official implementation for "MS-DETR: Efficient DETR Training with Mixed Supervision" 摘要 DETR 通过迭代…

canvas绘制图片的三种方法(图文示例)

查看专栏目录 canvas示例教程100专栏,提供canvas的基础知识,高级动画,相关应用扩展等信息。canvas作为html的一部分,是图像图标地图可视化的一个重要的基础,学好了canvas,在其他的一些应用上将会起到非常重…

我如何知道我的MinIO集群复制是最新的?

客户可以在任何需要快速、弹性、可扩展对象存储的地方运行 MinIO。MinIO 包括多种类型的复制,以确保每个应用程序都使用最新的数据,无论它在哪里运行。在之前有关批量复制、站点复制和存储桶复制的文章中,我们详细介绍了各种可用的复制选项及…

(N-139)基于springboot,vue宠物领养系统

开发工具:IDEA 服务器:Tomcat9.0, jdk1.8 项目构建:maven 数据库:mysql5.7 系统分前后台,项目采用前后端分离 前端技术:vue3element-plus 服务端技术:springbootmybatis-plusr…

MySQL体系结构

MySQL体系结构 MySQL采用的是客户/服务器体系结构,实际是有两个程序,一个是MySQL服务器程序,指的是mysqld程序,运行在存放数据库的机器上,负责在网络上监听并处理来自客户的服务请求,根据这些请求去访问数据…

彩超框架EchoSight开发日志记录

EchoSight开发记录 蒋志强 我会不定期的更新 开发进展。最近更新进展于2024年1月15日 1.背景 由于某些不可抗逆的原因,离开了以前的彩超大厂,竞业在家,难得有空闲的时间。我计划利用这段时间 自己独立 从零开始 搭建一套 彩超系统的软件工…

datax关系数据库插件设计和实现解释

背景 DataX是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路&#xff0…

GO——cobra

定义 Cobra 是 Go 的 CLI 框架 CLI,command-line interface,命令行界面 使用 注意 第一个cmd的USE即使命名了也没有意义,一般保持和项目名一致。 示例 package mainimport ("fmt""github.com/spf13/cobra" )func …

构建稳健的Web应用:LAMP 实践

LAMP 介绍 LAMP 代表 Linux、Apache、MySQL 和 PHP/Python/Perl(这些选项中一种)的组合,用于搭建 Web 应用程序的开发和运行环境。 Linux:作为操作系统的基础,提供整个 LAMP 堆栈的基础。Linux 提供稳定、安全的环境&…