《深入理解Spark RDD缓存机制》(第4天)

文章目录

  • 前言
  • 一、小试牛刀:解剖RDD缓存机制?
    • 1. 什么是Spark RDD缓存策略
      • 1.1 为什幺RDD要做缓存
      • 1.2 缓存相关API:
      • 1.3 缓存案例解析:
      • 1.4 图解缓存效果:
    • 2. 什么是checkpoint缓存
      • 2.1 为什么要做checkpoint缓存
      • 2.2 checkpoint相关API:
      • 2.3 checkpoint案例解析
    • 3. 缓存和checkpoint的区别
      • 3.1 案例解析
  • 二、打铁趁热:面试题思考
    • 1. cache缓存和checkpoint检查点的区别
    • 2. 既然持久化的方案有两种,那么在生产环境中应该使用哪种方案呢?
  • 总结


前言

Apache Spark是一个大规模数据处理框架,它提供了高效、快速和通用的数据处理能力。在Spark中,弹性分布式数据集(RDD, Resilient Distributed Dataset)是一个核心概念,而RDD的缓存机制则是确保Spark性能高效的关键因素之一。本文将通过’案例’,'图文’等解析方式深入探讨Spark RDD的缓存机制。


一、小试牛刀:解剖RDD缓存机制?

1. 什么是Spark RDD缓存策略

  • 在Spark中,RDD的缓存机制允许我们将计算的结果存储在内存中,从而避免在后续的计算中重复计算相同的RDD。这对于迭代计算、机器学习等场景尤为重要,可以显著提高计算效率。
    在这里插入图片描述

1.1 为什幺RDD要做缓存

  • 当RDD被重复使用,或者计算该RDD比较容易出错,而且需要消耗比较多的资源和时间的时候,我们就可以将该RDD缓存起来。
  • 主要作用: 提升Spark程序的计算效率
  • 注意事项: RDD的缓存可以存储在内存或者是磁盘上,甚至可以存储在Executor进程的堆外内存中。主要是放在内存中,因此缓存的数据是不太稳定可靠。
    由于是临时存储,可能会存在丢失,所以缓存操作,并不会将RDD之间的依赖关系给截断掉(丢失掉),因为当缓存失效后,可以全部重新计算且缓存的API都是Lazy惰性的,如果需要触发缓存操作,推荐调用“count算子” ,因为运行效率高

1.2 缓存相关API:

设置缓存的API: 
	rdd.cache(): 将RDD的数据缓存储内存中
	rdd.persist(缓存的级别/位置): 将RDD的数据存储在指定位置

手动清理缓存API:
	rdd.unpersist()
默认情况下,当整个Spark应用程序执行完成后,缓存数据会自动失效,会被自动删除


缓存的级别/位置:
    DISK_ONLY: 只存储在磁盘
    DISK_ONLY_2: 只存储在磁盘,并且有2个副本
    DISK_ONLY_3: 只存储在磁盘,并且有3个副本
    MEMORY_ONLY: 只存储在内存中
    MEMORY_ONLY_2: 只存储在内存中,并且有2个副本
    MEMORY_AND_DISK: 存储在内存和磁盘中,先放在内存,再放在磁盘
    MEMORY_AND_DISK_2: 存储在内存和磁盘中,先放在内存,再放在磁盘,并且有2个副本
    OFF_HEAP: Executor进程的堆外内存
    
工作中最常用的是: MEMORY_AND_DISK和MEMORY_AND_DISK_2

1.3 缓存案例解析:

# 导包
import os
import time

import jieba
from pyspark import SparkConf, SparkContext, StorageLevel

# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'


def get_topN_keyword(etlRDD, n):
    r1 = etlRDD.flatMap(lambda line_list: list(jieba.cut(line_list[2]))) \
        .filter(lambda word: word not in ('.', '+', '的')) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda agg, curr: agg + curr) \
        .top(n, lambda t: t[1])
    print(r1)


def get_topN_search(etlRDD, n):
    r2 = etlRDD.map(lambda line_list: ((line_list[1], line_list[2]), 1)) \
        .reduceByKey(lambda agg, curr: agg + curr) \
        .top(n, lambda t: t[1])
    print(r2)


# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    conf = SparkConf().setAppName('pyspark_demo').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    # 2.数据输入
    textRDD = sc.textFile('file:///export/data/spark_project/spark_core/data/SogouQ.sample')
    # 3.数据处理(切分,转换,分组聚合)
    etlRDD = textRDD.filter(lambda line: line.strip() != '').map(lambda line: line.split()).filter(
        lambda line_list: len(line_list) >= 6)
    # 去除搜索内容两端的 [ ]
    etlRDD = etlRDD.map(lambda line_list:
                        [
                            line_list[0],
                            line_list[1],
                            line_list[2][1:-1],
                            line_list[3],
                            line_list[4],
                            line_list[5]
                        ])
    # 不加缓存
    # etlRDD.count()
    # 7.TODO: cache添加缓存,注意: 只能把缓存添加内存!相对用的少
    # etlRDD.cache().count()
    # 8.TODO: persist添加缓存,注意: 可以修改缓存级别
    etlRDD.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_2).count()

    # 4.数据输出
    # 需求一: 统计每个 关键词 出现了多少次, 最终展示top10数据 注意:'.', '+', '的'  都需要过滤
    # 伪SQL:select 关键词 ,count(*)  from  搜狗表 group by 关键词
    get_topN_keyword(etlRDD, 10)

    # 8.TODO: 如果不想用缓存,可以使用unpersist释放缓存,给哪个rdd加的,就给哪个释放
    etlRDD.unpersist()

    # 需求二: 统计每个用户 每个 搜索内容 点击的次数, 最终展示top5数据
    # 伪SQL:select 用户,搜索内容,count(*)  from  搜狗表 group by 用户,搜索内容
    get_topN_search(etlRDD, 5)
    # 6.为了方便查看页面,可以让程序多睡会儿
    time.sleep(500)
    # 5.关闭资源
    sc.stop()

1.4 图解缓存效果:

  • 无缓存的DAG流程图显示:
    在这里插入图片描述

  • 有缓存的DAG流程图显示:
    在这里插入图片描述

  • cache基于内存
    在这里插入图片描述

  • persist可以修改缓存级别: 同时基于内存和磁盘
    在这里插入图片描述

2. 什么是checkpoint缓存

Checkpoint缓存,或称Checkpoint机制,是Apache Spark中用于确保数据一致性和容错性的一种技术。在不同的系统中,其实现方式和用途略有不同,但核心思想是一致的:确保关键数据或中间计算结果被安全地存储,以便在系统崩溃或需要恢复时能够重新使用。

2.1 为什么要做checkpoint缓存

  • RDD缓存主要是将数据存储在内存中,是临时存储,不太稳定,它主要是用来提升程序运行效率的。RDD的checkpoint(检查点)主要是将数据存储在HDFS上,是持久化存储。而HDFS存储数据有3副本的机制,让数据更加安全可靠。

  • checkpoint认为使用磁盘或者HDFS存储数据之后,数据非常的安全可靠,因此checkpoint会将RDD间的依赖关系给删除/丢弃掉。因此如果checkpoint的数据真的出现了问题,是无法在从头开始计算。

  • checkpoint主要作用: 提高程序的容错性
    注意事项: checkpoint可以将数据存储在磁盘或者HDFS上,主要是将数据存储在HDFS上。

2.2 checkpoint相关API:

	sc.setCheckpointDir(存储路径): 设置checkpoint数据存放路径
	rdd.checkpoint(): 对指定RDD启用checkpoint
	rdd.count(): 触发checkpoint

2.3 checkpoint案例解析

# 导包
import os
import time

import jieba
from pyspark import SparkConf, SparkContext, StorageLevel

# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'


def get_topN_keyword(etlRDD, n):
    r1 = etlRDD.flatMap(lambda line_list: list(jieba.cut(line_list[2]))) \
        .filter(lambda word: word not in ('.', '+', '的')) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda agg, curr: agg + curr) \
        .top(n, lambda t: t[1])
    print(r1)


def get_topN_search(etlRDD, n):
    r2 = etlRDD.map(lambda line_list: ((line_list[1], line_list[2]), 1)) \
        .reduceByKey(lambda agg, curr: agg + curr) \
        .top(n, lambda t: t[1])
    print(r2)


# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    conf = SparkConf().setAppName('pyspark_demo').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    # 2.数据输入
    textRDD = sc.textFile('file:///export/data/spark_project/spark_core/data/SogouQ.sample')
    # 3.数据处理(切分,转换,分组聚合)
    etlRDD = textRDD.filter(lambda line: line.strip() != '').map(lambda line: line.split()).filter(
        lambda line_list: len(line_list) >= 6)
    # 去除搜索内容两端的 [ ]
    etlRDD = etlRDD.map(lambda line_list:
                        [
                            line_list[0],
                            line_list[1],
                            line_list[2][1:-1],
                            line_list[3],
                            line_list[4],
                            line_list[5]
                        ])
    # 不加缓存
    # etlRDD.count()
    # 7.TODO: 先拿着sc对象设置检查点保存位置, 建议用hdfs,这样能利用hdfs的高可靠高可用性
    sc.setCheckpointDir('hdfs://node1:8020/ckpt')
    # 8.TODO: 添加检查点checkpoint
    etlRDD.checkpoint()
    etlRDD.count()

    # 4.数据输出
    # 需求一: 统计每个 关键词 出现了多少次, 最终展示top10数据 注意:'.', '+', '的'  都需要过滤
    # 伪SQL:select 关键词 ,count(*)  from  搜狗表 group by 关键词
    get_topN_keyword(etlRDD, 10)

    # 需求二: 统计每个用户 每个 搜索内容 点击的次数, 最终展示top5数据
    # 伪SQL:select 用户,搜索内容,count(*)  from  搜狗表 group by 用户,搜索内容
    get_topN_search(etlRDD, 5)
    # 6.为了方便查看页面,可以让程序多睡会儿
    time.sleep(500)
    # 5.关闭资源
    sc.stop()
  • 没有设置检查点正常的DAG执行流图:
    在这里插入图片描述
  • 设置检查点后:
    在这里插入图片描述

3. 缓存和checkpoint的区别

  • 面试题:Spark提供了两种持久化方案。一种为缓存操作,一种为checkpoint方案。请问有什么区别呢?
1- 数据存储位置不同
	缓存: 存储在内存或者磁盘 或者 堆外内存中
	checkpoint检查点: 可以将数据存储在磁盘或者HDFS上, 在集群模式下, 仅能保存到HDFS上

2- 数据生命周期:
	缓存: 当程序执行完成后, 或者手动调用unpersist 缓存都会被删除
	checkpoint检查点: 即使程序退出后, checkpoint检查点的数据依然是存在的, 不会删除, 需要手动删除

3- 血缘关系:
	缓存: 不会截断RDD之间的血缘关系, 因为缓存数据有可能是失效, 当失效后, 需要重新回溯计算操作
	checkpoint检查点: 会截断掉依赖关系, 因为checkpoint将数据保存到更加安全可靠的位置, 不会发生数据丢失的问题, 当执行失败的时候, 也不需要重新回溯执行
	
4- 主要作用不同:
	缓存: 提高Spark程序的运行效率和容错性
	checkpoint检查点: 提高Spark程序的容错性和高可用,高可靠性
  • 思考:既然持久化的方案有两种,那么在生产环境中应该使用哪种方案呢?
在同一个项目中,推荐缓存和checkpoint(检查点)同时配合使用。

使用顺序如下: 在代码中设置缓存和checkpoint检查点,然后再一同使用Action算子触发!!! 
使用count算子触发

实际过程如下: 程序会优先从缓存中读取数据,如果发现缓存中没有数据。
再从checkpoint中读取数据,并且接着将读取到的数据重新在内存中放置一份,
后续还是优先从缓存中读取

在这里插入图片描述

3.1 案例解析

# 导包
import os
import time

import jieba
from pyspark import SparkConf, SparkContext, StorageLevel

# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'


def get_topN_keyword(etlRDD, n):
    r1 = etlRDD.flatMap(lambda line_list: list(jieba.cut(line_list[2]))) \
        .filter(lambda word: word not in ('.', '+', '的')) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda agg, curr: agg + curr) \
        .top(n, lambda t: t[1])
    print(r1)


def get_topN_search(etlRDD, n):
    r2 = etlRDD.map(lambda line_list: ((line_list[1], line_list[2]), 1)) \
        .reduceByKey(lambda agg, curr: agg + curr) \
        .top(n, lambda t: t[1])
    print(r2)


# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    conf = SparkConf().setAppName('pyspark_demo').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    # 2.数据输入
    textRDD = sc.textFile('file:///export/data/spark_project/spark_core/data/SogouQ.sample')
    # 3.数据处理(切分,转换,分组聚合)
    etlRDD = textRDD.filter(lambda line: line.strip() != '').map(lambda line: line.split()).filter(
        lambda line_list: len(line_list) >= 6)
    # 去除搜索内容两端的 [ ]
    etlRDD = etlRDD.map(lambda line_list:
                        [
                            line_list[0],
                            line_list[1],
                            line_list[2][1:-1],
                            line_list[3],
                            line_list[4],
                            line_list[5]
                        ])
    # 不加缓存
    # etlRDD.count()
    # 7.TODO: persist添加缓存,注意: 可以修改缓存级别
    etlRDD.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_2)
    # 8.TODO: 先拿着sc对象设置检查点保存位置, 建议用hdfs,这样能利用hdfs的高可靠高可用性
    sc.setCheckpointDir('hdfs://node1:8020/ckpt')
    etlRDD.checkpoint()
    etlRDD.count()
    # TODO:触发缓存和检查点
    etlRDD.count()

    # 4.数据输出
    # 需求一: 统计每个 关键词 出现了多少次, 最终展示top10数据 注意:'.', '+', '的'  都需要过滤
    # 伪SQL:select 关键词 ,count(*)  from  搜狗表 group by 关键词
    get_topN_keyword(etlRDD, 10)

    # 7.TODO: 如果不想用缓存,可以使用unpersist释放缓存,给哪个rdd加的,就给哪个释放
    # etlRDD.unpersist()

    # 需求二: 统计每个用户 每个 搜索内容 点击的次数, 最终展示top5数据
    # 伪SQL:select 用户,搜索内容,count(*)  from  搜狗表 group by 用户,搜索内容
    get_topN_search(etlRDD, 5)
    # 6.为了方便查看页面,可以让程序多睡会儿
    time.sleep(500)
    # 5.关闭资源
    sc.stop()
  • DAG有向无环图:
    在这里插入图片描述

二、打铁趁热:面试题思考

1. cache缓存和checkpoint检查点的区别

1- 数据存储位置不同
	缓存: 存储在内存或者磁盘 或者 堆外内存中
	checkpoint检查点: 可以将数据存储在磁盘或者HDFS上, 在集群模式下, 仅能保存到HDFS上

2- 数据生命周期:
	缓存: 当程序执行完成后, 或者手动调用unpersist 缓存都会被删除
	checkpoint检查点: 即使程序退出后, checkpoint检查点的数据依然是存在的, 不会删除, 需要手动删除

3- 血缘关系:
	缓存: 不会截断RDD之间的血缘关系, 因为缓存数据有可能是失效, 当失效后, 需要重新回溯计算操作
	checkpoint检查点: 会截断掉依赖关系, 因为checkpoint将数据保存到更加安全可靠的位置, 不会发生数据丢失的问题, 当执行失败的时候, 也不需要重新回溯执行
	
4- 主要作用不同:
	缓存: 提高Spark程序的运行效率和容错性
	checkpoint检查点: 提高Spark程序的容错性和高可用,高可靠性

2. 既然持久化的方案有两种,那么在生产环境中应该使用哪种方案呢?

在同一个项目中,推荐缓存和checkpoint(检查点)同时配合使用。

使用顺序如下: 在代码中设置缓存和checkpoint检查点,然后再一同使用Action算子触发!!! 
使用count算子触发

实际过程如下: 程序会优先从缓存中读取数据,如果发现缓存中没有数据。
再从checkpoint中读取数据,并且接着将读取到的数据重新在内存中放置一份,
后续还是优先从缓存中读取

总结

本文主要通过案例和图文的方式详解了Spark RDD 数据持久化的2种方案,重点思考项目中该采取什么方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/728080.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

爬取CSDN博文到本地(包含图片,标签等信息)

文章目录 csdnToMD改进将CSDN文章转化为Markdown文档那有什么办法快速得到md文档?例如:获取单个文章markdown获取所有的文章markdown 项目中待解决的问题 csdnToMD 项目原作者:https://gitee.com/liushili888/csdn-is—mark-down 改进后仓库…

Linux-账号和权限管理

目录 一、管理用户账号 1、用户账号类型 2、UID--身份标识 3、UID的分类 ​4、用户账号文件​ 5、chage-修改账号密码 5.1、chage—使用格式: 5.2、chage—使用参数: ​6、添加用户账号与管理 6.1、useradd—添加用户 6.2、passwd—设置/修改…

(创新)基于VMD-CNN-BiLSTM的电力负荷预测—代码+数据

目录 一、主要内容: 二、运行效果: 三、VMD-BiLSTM负荷预测理论: 四、代码数据下载: 一、主要内容: 本代码结合变分模态分解( Variational Mode Decomposition,VMD) 和卷积神经网络(Convolutional neu…

【0基础学爬虫】爬虫基础之自动化工具 Appium 的使用

大数据时代,各行各业对数据采集的需求日益增多,网络爬虫的运用也更为广泛,越来越多的人开始学习网络爬虫这项技术,K哥爬虫此前已经推出不少爬虫进阶、逆向相关文章,为实现从易到难全方位覆盖,特设【0基础学…

目标检测——SCUT-HEAD:大规模人头检测数据集的深度剖析

引言 亲爱的读者们,您是否在寻找某个特定的数据集,用于研究或项目实践?欢迎您在评论区留言,或者通过公众号私信告诉我,您想要的数据集的类型主题。小编会竭尽全力为您寻找,并在找到后第一时间与您分享。 在…

Python学习打卡:day11

day11 笔记来源于:黑马程序员python教程,8天python从入门到精通,学python看这套就够了 目录 day1183、自定义 Python 包创建包导入包方式1方式2方式3方式4 84、安装第三方包安装第三方包——pippip的网络优化 安装第三方包——PyCharm 85、…

小林图解系统-三、操作系统结构

Linux 内核 vs Windows 内核 内核 作为应用连接硬件设备的桥梁,保证应用程序只需要关心与内核交互,不需要关心硬件的细节 内核具备四个基本能力: 管理进程、线程,决定哪个进程、线程使用CPU,也就是进程调度的能力&a…

openh264 帧级码率控制原理:RcCalculateIdrQp 函数

RcCalculateIdrQp函数 功能 在码控中,当eSliceType为I_SLICE时 计算 IDR 帧的帧级量化参数QP 值。 原理过程 初始化变量: dBpp:初始化为0,用来存储比特率每像素(bits per pixel)的值。i:一个…

nginx的正向代理

目录 1 正向代理 1.1 使用正向代理的作用 1.2 Nginx正向代理实战 1.2.1 下载对应版本的nginx(源码编译) 1.2.2 下载 https 代理模块 1.2.3 使用https代理模块对源代码修改 1.2.4 源码安装 1.2.5 编写systemd 服务单元 1.2.6 修改nginx的主配置文件 1.2.…

虚拟现实环境下的远程教育和智能评估系统(十一)

视频帧画面知识点区域划分 知识点区域精确分割技术: 在深度学习检测模型结果基础上使用基于交并比(IoU)阈值的目标合并算法,合并过度重合目标区域面积,实现知识点区域精确分割 多模态知识点内容匹配策略: 图像:利用…

【人工智能,机器学习,统计学习,科学表征】开源商用与研发合作

个体工户linjing-lab托管在Github,现公开招募商用与合作人员,目标人群分为以下几个方向: 数学、信息科学、计算机专业的大学高年级学生,熟悉C和面向对象模型,擅长Pybind11编译算子到Python环境。26岁以下的大学本科毕…

PDF文档翻译软件哪个好?分享5款快速翻译的工具

世界各地的交流日益密切,文档翻译服务因此变得不可或缺。 无论是企业间的跨国商务合同,还是学术领域的专业研究论文,准确无误地将文档内容翻译成目标语言,对于保障信息的清晰传达和正确理解极为关键。 在这样的背景下&#xff0…

今日分享:中国石油年金系统交互、视觉设计和vue开发

金融系统交互及UI设计时,需注意简洁明了、色彩合理、字体统一、交互易用、安全感和用户控制。确保用户快速理解、安全操作并提升体验。

JavaScript:at()方法遇到的问题并解决

目录 第一章 前言 第二章 使用at方法 第三章 分析原因并解决问题 第一章 前言 最近上线了一个项目,测试过程中并没有什么问题,但是上线后使用的用户多了,结果出现了这么一个问题:.at方法对低版本手机的浏览器不兼容问题&#x…

英伟达中国特供芯片降价背后:巨头与市场的较量

英伟达,这家曾经在人工智能芯片领域独领风骚的巨头,近期在中国市场遭遇了一些挑战。为了应对来自华为等中国本土企业的竞争,英伟达不得不采取降价策略,调整其专为中国市场打造的H20芯片价格,甚至低于华为的同类产品。这…

STM32 串口通讯

使用STM32的串口通讯,接收串口助手的数据,并且将接收到的数据返回串口,重定义printf功能。 配置引脚信息 由于每次新建工程都需要配置信息,比较麻烦,好在STM32CubeIDE提供了导入.ioc文件的功能,可以帮我们…

Flutter【组件】按钮

简介 flutter 按钮组件。提供一种封装按钮组件的思路,并不支持过多的自定义属性。根据使用场景及设计规范进行封装,使用起来比较方便。 github地址:https://github.com/ThinkerJack/jac_uikit pub地址:https://pub.dev/package…

Faiss:加速大规模数据相似性搜索的利器

在机器学习和数据挖掘领域,相似性搜索是一项基本且重要的任务,它涉及到在大型数据集中找到与特定对象最相似的对象。Faiss是一个由Facebook AI Research开发的库,专门用于高效地进行相似性搜索和聚类,它之所以重要,是因…

uni-app的uni-list列表组件高效使用举例 (仿知乎日报实现)

目录 前言 uni-list组件介绍 基本使用 高级配置与自定义 仿知乎日报实现 知乎的api接口 后台服务实现 知乎日报首页 轮播图界面实现 客户端接口实现 uni-list列表使用 插入日期分割线 下滑分页的实现 完整页面代码 其他资源 前言 在移动应用开发领域&#xff0…

2024年【N1叉车司机】作业考试题库及N1叉车司机实操考试视频

题库来源:安全生产模拟考试一点通公众号小程序 2024年N1叉车司机作业考试题库为正在备考N1叉车司机操作证的学员准备的理论考试专题,每个月更新的N1叉车司机实操考试视频祝您顺利通过N1叉车司机考试。 1、【多选题】《中华人民共和国特种设备安全法》第…