RDD优化:缓存和checkpoint机制、数据共享(广播变量、累加器)、RDD的依赖关系、shuffle过程、并行度说明

文章目录

  • 1. 缓存和checkpoint机制
    • 1.1 缓存使用
    • 1.2 checkpoint
    • 1.3 缓存和checkpoint的区别
  • 2. 数据共享
    • 2.1 广播变量
    • 2.2 累加器
  • 3. RDD依赖关系
  • 4.shuffle过程
    • 4.1 shuffle介绍
    • 4.2 spark计算要尽量避免shuffle
  • 5. 并行度

1. 缓存和checkpoint机制

缓存和checkpoint也叫作rdd的持久化将rdd的数据存储在指定位置
作用:

  • 计算容错
  • 提升计算速度

1.1 缓存使用

在这里插入图片描述
缓存是将数据存储在内存或者磁盘上,缓存的特点计算结束时,缓存自动清空

  • 缓存级别
    • 指定缓存的数据位置
    • 默认是缓存到内存上
  • 使用
    • persist使用该方法
    • cache内部调用persist
    • 手动释放 unpersist
from pyspark import SparkContext
from pyspark.storagelevel import StorageLevel

sc = SparkContext()
rdd = sc.parallelize(['a', 'b','a','c'])


# rdd数据进行转化
rdd_kv  = rdd.map(lambda x: (x,1))
#rdd_kv数据进行缓存
rdd_kv.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
#使用action算子触发
rdd_kv.collect()


# 分组处理
rdd_group = rdd_kv.groupByKey()

#求和计算
rdd_reduce = rdd_group.reduceByKey(lambda x,y:x+y)
# 查看计算结果
res = rdd_reduce.collect()
print(res)

1.2 checkpoint

在这里插入图片描述
checkpoint也是将中间rdd数据存储起来,但是存储的位置实时分布式存储系统,可以永久保存,程序结束不会释放
如果需要删除就在HDFS上删除对应的目录文件。

#checkpoint使用
from pyspark import SparkContext
sc = SparkContext()

#使用sc对象指定checkpoint存储位置
sc.setCheckpointDir('hdfs://node1:8020/checkpoint')

rdd = sc.parallelize(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k'])

#rdd数据进行转化
rdd_kv = rdd.map(lambda x: (x, 1))

#rdd 数据进行checkpoint
rdd_kv.checkpoint()
#需要使用action算子触发checkpoint
print(rdd_kv.glom().collect())

#分组处理
rdd_group = rdd_kv.groupByKey()

#求和计算
rdd_reduce = rdd_group.reduceByKey(lambda x, y: x + y)

#查看计算结果
res = rdd_kv.collect()
print(res)

res1 = rdd_group.collect()
print(res1)

res2 = rdd_reduce.collect()
print(res2)

1.3 缓存和checkpoint的区别

  • 生命周期
    • 缓存数据,程序计算结束后自动删除
    • checkpoint 程序结束,数据依然保留在HDFS
  • 存储位置
    • 缓存 优先存储在内存上,也可以选存储在本地磁盘,是在计算任务所在的内存和磁盘上。
    • checkpoint存储在HDFS上
  • 依赖关系
    • 缓存数据后,会保留rdd之间依赖关系,缓存临时存储,数据可能会丢失,需要保留依赖,当缓存丢失后可以按照依赖重新计算。
    • checkpoint,数据存储后会断开依赖,数据保存在HDFS,HDFS三副本机制可以保证数据不丢失,所以没有比较保留依赖关系。
      注意:缓存和checkpoint可以作为rdd优化的方案,提升计算速度,一般对经常要使用的rdd进行缓存和checkpoint,对计算比较复杂的rdd进行缓存或checkpoint。

2. 数据共享

2.1 广播变量

在这里插入图片描述
在这里插入图片描述
如果要在分布式计算里面分发大的变量数据,这个都会由driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task都会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源,如果将这个变量声明为广播变量,那么每个executor都会拥有一份,这个executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。

减少task线程对应变量的定义,节省内存空间

#广播变量
from pyspark import SparkContext

sc= SparkContext()

a_obj = sc.broadcast(10)
#生成rdd
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])

#转化计算
rdd2 = rdd.map(lambda x:x+a_obj.value)

#查看数据
res = rdd2.collect()
print(res)

2.2 累加器

避免资源抢占造成错误

#累加器的使用
from pyspark import SparkContext

sc = SparkContext()
#将变量值添加到累加器中
acc_obj = sc.accumulator(0)

#rdd操作
rdd = sc.parallelize([1,2,3])

#使用累加器进行数据累加
rdd2 = rdd.map(lambda x:acc_obj.add(x))

#查看结果
res = rdd.collect()
print(res)

res1 = rdd2.collect()
print(res1)

#查看累加器的结果
print(acc_obj.value)

查看结果:
在这里插入图片描述

3. RDD依赖关系

  • 窄依赖
    • 每个父RDD的一个partition最多被子RDD的一个partition所使用。
      • map
      • flatMap
      • filter
  • 宽依赖
    • 一个父RDD的partition会被多个子RDD的partition所使用
      • groupByKey
      • reduceByKey
      • sortByKey
    • 在宽依赖中rdd之间会发生数据交换,这个交换的过程称为rdd的shuffle
      • 只要是宽依赖必然发生shuffle
      • 在宽依赖进行数据交换时,只有等待所有分区交换完成后,才能进行后续的计算,非常影响计算速度。

那么如何判断是宽依赖还是窄依赖呢?

#判断宽窄依赖
from pyspark import SparkContext
sc = SparkContext()

rdd = sc.parallelize([1,2,3,4])
rdd_kv = sc.parallelize([('a',1), ('b',2), ('c',3)])

#算子演示
rdd2 = rdd.map(lambda x:x+1)
rdd3 = rdd_kv.groupByKey()

#查看结果
# res = rdd2.collect()
# print(res)

res = rdd3.collect()
print(res)

DAG 管理维护rdd之间依赖关系,保证代码的执行顺序,


DAG会根据依赖关系划分stage,每个stage都是一个独立的计算步骤,当发生宽依赖时,会单独拆分一个计算步骤(stage),进行相关数据计算,可以保证每个单独的stage可以并行执行

在发生宽依赖进行shuffle时,会独立的方法执行shuffle计算


拆分计算步骤的本质是为了保证数据计算的并行执行

查看spark的计算过程,通过DAG判断算子是宽依赖还是窄依赖

拆分了计算stage是宽依赖,没有拆分是窄依赖

启动spark的历史日志

start-history-server.sh

在这里插入图片描述
在这里插入图片描述

4.shuffle过程

mapreduce的shuffle作用: 将map计算后的数据传递给redue使用。
mapreduce的shuffle过程: 分区(将相同key的数据放在一个分区,采用hash),排序,合并(规约)。
将map计算的数据传递给reduce


spark中也有shuffle
当执行宽依赖的算子就会进行shuffle
将rdd的数据传递给下一个rdd,进行数据交换


无论是spark还是mr,shuffle的本质是传递交换数据。

在这里插入图片描述

4.1 shuffle介绍

  • spark的shuffle的两个部分
    • shuffle write 写
    • shuffle read 读
    • 会进行文件的读写,影响spark的计算速度
  • spark的shuffle方法类
    • 是spark封装好的处理shuffle的方法
    • hashshuffle类
      • 进行的是hash计算
      • spark1.2版本之前主要使用,之后引入了sortshuffle
      • spark2.0之后,删除了hashshuffle,从2.0版本开始使用sortshuffle类
      • 优化的hashshuffle和未优化的ashshuffle
    • sortshuffle类
      • 排序方式将相同key值数据放在一起
      • sortshuffle类使用时,有两个方法实现shuffle
        • bypass模式版本和普通模式版本
        • bypass模式版本不会排序,会进行hash操作
        • 普通模式版本会排序进行shuffle
      • 可以通过配置指定按照那种模式执行 根据task数量决定 默认 task数量小于等于200 采用bypass,task数量超过200个则使用普通模式的方法进行shuffle
      • 一个分区对应一个task,所以task数量由分区数决定

普通模式和bypass模式的主要区别在于如何将相同key值的数据放在一起

  • 排序 普通模式采用的策略
  • 哈希取余 bypass模式采用的策略

4.2 spark计算要尽量避免shuffle

# 优化计算,减少shuffle
from pyspark import SparkContext
sc = SparkContext()

rdd = sc.parallelize([('男',20),('男',23),('女',20),('女',22)])
#求不同性别的年龄和
#reduceByKey  是宽依赖算子
rdd2 = rdd.reduceByKey(lambda x,y:x+y)

# 避免shuffle,需要将宽依赖算子计算的过程换成窄依赖
boy = sc.accumulator(0)
girl = sc.accumulator(0)
def func(x):
    if x[0]=='男':
        boy.add(x[1])
    else :
        girl.add(x[1])

    return None
rdd3 = rdd.map(func)

rdd3.collect()
print(boy.value)
print(girl.value)

5. 并行度

  • 资源并行度

    • task在指定任务能够使用到的cpu核心数量

    • 多任务 多个进程或多个线程执行任务

      • 两种方式
        • 并行 多个任务同时执行
        • 并发 任务交替执行
        • 和cpu的核心数有关
          • 例如
          • cpu核心是4核 有两个线程任务 两个线程任务可以 并行执行
          • cpu核心是4核 有八个线程任务 并发执行
    • spark中cpu核心数据设置

      • –num-executors=2 设置executors数量 和服务器数量保持一致
      • –executor-cores=2 设置每个executors中的cpu核心数 每个服务器中cpu核心数一致
      spark-submit  --master yarn  --num-executors=3   --executor-cores=2
      

      最大支持的task并行数量是 num-executors* executor-cores =6

      需要按照服务器实际的cpu核心数指定 lscpu

  • 数据并行度

    • 就是task数量,task由分区数决定
    • 为了保证task能充分利用cpu资源,实现并行计算,需要设置分区数应该和资源并行度一致
    • 在实际公司中就要根据公司资源并行度进行设置分区数
    • 有的场景下公司会要求数据并行度大于资源并行度

资源并行度,

按照yarn安装的服务器数量指定excutor数量 3

核心数量按照yarn中的nodemanager中的核心数指定 2
数据并行度指定

官方建议 数据并行度的task数量和资源并行度数量一致

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/893691.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SAM应用:医学图像和视频中的任何内容分割中的基准测试与部署

医学图像和视频中的任何内容分割:基准测试与部署 目录 摘要:一、引言1.1 SAM2 在医学图像和视频中的应用 二.结果2.1 数据集和评估协议2.2 二维图像分割的评估结果 三 讨论四 局限性和未来的工作五、方法5.1数据来源和预处理5.2 微调协议5.3 评估指标 总…

无人机视角下火灾检测数据集 共12736张 标注文件为YOLO适用的txt格式。已划分为训练集、验证集、测试集。类别:Fire yolov5-v10通用

无人机视角下火灾检测数据集 共12736张 标注文件为YOLO适用的txt格式。已划分为训练集、验证集、测试集。类别:Fire yolov5-v10通用 无人机视角下火灾检测数据集 共12736张 标注文件为YOLO适用的txt格式。已划分为训练集、验证集、测试集。类别:Fire yol…

Apple Find My介绍

0 Preface/Foreword 1 Apple Find My介绍

【Vue】Vue(八)Vue3.0 使用ref 和 reactive创建响应式数据

ref 创建:基本类型的响应式数据 **作用:**定义响应式变量。语法:let xxx ref(初始值)。**返回值:**一个RefImpl的实例对象,简称ref对象或ref,ref对象的value属性是响应式的。注意点: JS中操作…

达梦8-SQL日志配置与分析工具

以 dmsql_数据库实例名.log 类型命名的文件为跟踪日志文件,跟踪日志内容包含系统各会话执行的 SQL 语句、参数信息、错误信息等。跟踪日志主要用于分析错误和分析性能问题,比如,可以挑出系统现在执行速度较慢的 SQL 语句,进而对其…

以JavaScript的学习角度看Axios,并以spring boot+vue3为例具体分析实现

什么是Axios Axios 是一个基于 Promise 的 HTTP 客户端,用于在浏览器和 后端 中发送异步的 HTTP 请求。它功能强大、易用,常用于与 API 交互,发送 GET、POST、PUT、DELETE 等请求。 Axios 的主要特点: 支持 Promise Axios 基于 …

鸿蒙应用开发:全面认识鸿蒙系统

前言 随着智能设备的普及和物联网的发展,对操作系统的需求也越来越多样化。鸿蒙操作系统作为一款面向全场景的分布式操作系统,其适用范围非常广泛,从智能手机到家用电器,再到工业设备,都能找到应用场景。特别是在智能…

【含开题报告+文档+PPT+源码】基于SSM的景行天下旅游网站的设计与实现

开题报告 随着互联网的快速发展,旅游业也逐渐进入了数字化时代。作为一个旅游目的地,云浮市意识到了互联网在促进旅游业发展方面的巨大潜力。为了更好地推广云浮的旅游资源,提高旅游服务质量,云浮市决定开发一个专门的旅游网站。…

使用开源的 Vue 移动端表单设计器创建表单

FcDesigner Vant 版是一款基于 Vue3.0 的移动端低代码可视化表单设计器工具,通过数据驱动表单渲染。可以通过拖拽的方式快速创建表单,提高开发者对表单的开发效率,节省开发者的时间。 源码下载 | 演示地址 | 帮助文档 本项目采用 Vue3.0 和 …

数字后端零基础入门系列 | Innovus零基础LAB学习Day1

一 Floorplan 数字IC后端设计如何从零基础快速入门?(内附数字IC后端学习视频) Lab5-1这个lab学习目标很明确——启动Innovus工具并完成设计的导入。 在进入lab之前,我们需要进入我们的FPR工作目录。 其中ic062为个人服务器账户。比如你端…

竞品分析|用户体验五要素|KANO模型

用户体验五要素 我感觉产品的设计师从最底层的战略层确定,再依次上升一层层确定直至最后确定表现层输出给用户的视觉效果。 KANO模型 KANO 模型是东京理工大学教授狩野纪昭(Noriaki Kano)发明的对用户需求分类和优先排序的有用工具,以分析用户需求对用…

harbor 如何做到物理删除镜像 harbor镜像清理脚本

一、背景 相比于nexus,harbor的一大优点是方便及时清理无用的docker镜像。本文就harbor怎么设置清理,梳理一下具体的操作办法。 harbor 版本是 v2.9.0 二、目标 随着我们推送至仓库的镜像越来越多,带来的一个最大运维问题就是存储空间的浪…

SL3037B降压恒压芯片DC24伏输入5伏输出带单片机,电流100mA

一、SL3037B芯片概述 SL3037B是一款内置功率MOSFET的单片降压型开关模式转换器,具有高效、稳定、外围元器件少等特点。它能够在宽输入电源范围(5.5~60V)内实现0.6A的峰值输出电流,并具有出色的线电压和负载调整率。此外&#xff…

利用LangChain实现大语言模型与数据库的交互对话

大语言模型使用LangChain与数据库对话 大型语言模型(LLMs)的兴起在技术上带来了重大转变,使开发者能够创建曾经难以想象的应用程序。LangChain 是一个提示编排工具,利用LLMs的能力改变你与数据库的通信方式。通过LangChain&#…

从零讲解线性回归(Linear Regression)

Linear Regression 线性回归 线性回归是一种简单且常用的技术,用来预测连续变量,假设预测变量(自变量, x_i )和结果变量(因变量, y_i )之间存在线性关系。线性回归公式&#xff08…

Qt自定义一个圆角对话框

如何得到一个圆角对话框? 步骤: 1、继承自QDiaglog 2、去掉系统自带的边框 3、设置背景透明,不设置4个角会有多余的部分出现颜色 4、对话框内部添加1个QWidget,给这个widget设置圆角,并添加到布局中让他充满对话框 5、后续对…

智慧校园打架斗殴检测预警系统 异常奔跑检测系统 Python 和 OpenCV 实现简单

在当今数字化时代,智慧校园建设已成为教育领域的重要发展方向。校园安全作为学校管理的重中之重,如何借助先进的技术手段实现高效、精准的安全监控,成为了教育工作者和技术专家共同关注的焦点。其中,智慧校园打架斗殴检测预警系统…

linux线程 | 线程的控制(上)

前言:本节内容为线程的控制。在本篇文章中, 博主不仅将会带友友们认识接口, 使用接口。 而且也会剖析底层,带领友友们理解线程的底层原理。 相信友友们学完本节内容, 一定会对线程的控制有一个很好的把握。 那么&#…

Spring AI 整体介绍_关键组件快速入门_prompt_embedding等

Spring AI:Java开发者的AI集成新利器 在过去,Java开发者在构建AI应用时面临着缺乏统一框架的问题,导致不同AI服务的集成过程复杂且耗时。Spring AI应运而生,旨在为基于Java的应用程序提供一个标准化、高效且易于使用的AI开发平台…

用PHP爬虫API数据获取商品SKU信息实战指南

在电子商务的精细化运营中,SKU(Stock Keeping Unit,库存单位)信息是商品管理的核心。它不仅包含了商品的规格、价格、库存等关键数据,还直接影响到库存管理、价格策略和市场分析等多个方面。本文将介绍如何使用PHP爬虫…