Spark RDD惰性计算的自主优化

原创/朱季谦

RDD(弹性分布式数据集)中的数据就如final定义一般,只可读而无法修改,若要对RDD进行转换或操作,那就需要创建一个新的RDD来保存结果。故而就需要用到转换和行动的算子。

Spark运行是惰性的,在RDD转换阶段,只会记录该转换逻辑而不会执行,只有在遇到行动算子时,才会触发真正的运算,若整个生命周期都没有行动算子,那么RDD的转换代码便不会运行。

这样的惰性计算,其实是有好处的,它在遇到行动算子需要对整个DAG(有向无环图)做优化,以下是一些优化说明——

本文的样本部分内容如下,可以基于这些数据做验证——

Amy Harris,39,男,18561,性价比,家居用品,天猫,微信支付,10,折扣优惠,品牌忠诚
Lori Willis,33,女,14071,功能性,家居用品,苏宁易购,货到付款,1,折扣优惠,日常使用
Jim Williams,61,男,14145,时尚潮流,汽车配件,淘宝,微信支付,3,免费赠品,礼物赠送
Anthony Perez,19,女,11587,时尚潮流,珠宝首饰,拼多多,支付宝,5,免费赠品,商品推荐
Allison Carroll,28,男,18292,环保可持续,美妆护肤,唯品会,信用卡,8,免费赠品,日常使用
Robert Rice,47,男,5347,时尚潮流,图书音像,拼多多,微信支付,8,有优惠券,兴趣爱好
Jason Bradley,25,男,9480,性价比,汽车配件,拼多多,信用卡,5,折扣优惠,促销打折
Joel Small,18,女,15586,社交影响,食品饮料,亚马逊,支付宝,5,无优惠券,日常使用
Stephanie Austin,33,男,7653,舒适度,汽车配件,亚马逊,银联支付,3,无优惠券,跟风购买
Kathy Myers,33,男,18159,舒适度,美妆护肤,亚马逊,货到付款,4,无优惠券,商品推荐
Gabrielle Mccarty,57,男,19561,环保可持续,母婴用品,网易考拉,支付宝,5,免费赠品,日常使用
Joan Smith,43,女,11896,品牌追求,图书音像,亚马逊,支付宝,4,免费赠品,商品推荐
Monica Garcia,19,男,16665,时尚潮流,电子产品,京东,货到付款,7,免费赠品,商品推荐
Christopher Faulkner,55,男,3621,社交影响,美妆护肤,苏宁易购,支付宝,7,无优惠券,日常使用

一、减少不必要的计算

RDD的惰性计算可以通过优化执行计划去避免不必要的计算,同时可以将过滤操作下推到数据源或者其他转换操作之前,减少需要处理的数据量,进而达到计算的优化。

例如,执行以下这段spark代码时,

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("count")
    val ss = SparkSession.builder().config(conf).getOrCreate()
    val filePath: String = "transaction_data.csv"
    val lineRDD = ss.sparkContext.textFile(filePath)
    val value = lineRDD.map { x => {
      println(s"打印 $x")
      x.split(",")
    } }
    value.take(10).foreach(println)
    ss.stop()
  }

若Spark不是惰性计算的情况下,代码顺序运行到这行 val lineRDD = ss.sparkContext.textFile(filePath)代码时,就会将transaction_data.csv文件里的几万条数据全部加载出来,然后再做计算。

而在惰性计算的情况下,直至运行这行代码 value.take(10).foreach(println)而遇到foreach这个行动算子时,才会去执行前面的转换,这时它会基于RDD的转化自行做一个优化——在这个例子里,它会基于lineRDD.take(5)这行代码只会从transaction_data.csv取出前5行,避免了将文件里的几万条数据全部取出。

打印结果如下,发现lineRDD.map确实只处理了前5条数据——

打印 Amy Harris,39,男,18561,性价比,家居用品,天猫,微信支付,10,折扣优惠,品牌忠诚
打印 Lori Willis,33,女,14071,功能性,家居用品,苏宁易购,货到付款,1,折扣优惠,日常使用
打印 Jim Williams,61,男,14145,时尚潮流,汽车配件,淘宝,微信支付,3,免费赠品,礼物赠送
打印 Anthony Perez,19,女,11587,时尚潮流,珠宝首饰,拼多多,支付宝,5,免费赠品,商品推荐
打印 Allison Carroll,28,男,18292,环保可持续,美妆护肤,唯品会,信用卡,8,免费赠品,日常使用
[Ljava.lang.String;@3c87e6b7
[Ljava.lang.String;@77bbadc
[Ljava.lang.String;@3c3a0032
[Ljava.lang.String;@7ceb4478
[Ljava.lang.String;@7fdab70c

二、操作合并和优化

Spark在执行行动算子时,会自动将存在连续转换的RDD操作合并到更为高效的执行计划,这样可以减少中间不是必要的RDD数据的生成和传输,可以整体提高计算的效率。这很像是,摆在你面前是一条弯弯曲曲的道路,但是因为你手里有地图,知道这条路是怎么走的,因此,可以基于这样的地图,去尝试发掘下是否有更好的直径。

还是以一个代码案例说明,假如需要统计薪资在10000以上的人数。

运行的代码,是从transaction_data.csv读取了几万条数据,然后将每行数据按","分割成数组,再基于每个数组去过滤出满足薪资大于10000的数据,最后再做count统计出满足条件的人数。

以下是最冗余的代码,每个步骤都转换生成一个新的RDD,彼此之间是连续的,这些RDD是会占内存空间,同时增加了很多不必要的计算。

def main(args: Array[String]): Unit = {
  val conf = new SparkConf().setMaster("local[*]").setAppName("count")
  val ss = SparkSession.builder().config(conf).getOrCreate()
  val filePath: String = "transaction_data.csv"
  val lineRDD = ss.sparkContext.textFile(filePath)
  val array = lineRDD.map(_.split(","))
  //过滤出薪资10000的数据
  val valueRdd = array.filter(x => x.apply(3).toInt > 10000)
  //统计薪资10000以上的人数
  val count = valueRdd.count()
  ss.stop()
}

Spark就可能会将这些存在连续的RDD进行优化,将其合并成一个单独的转换操作,直接就对原始RDD进行映射和过滤——

val value = ss.sparkContext.textFile(filePath).map(_.split(",")).filter(x =>{x.apply(3).toInt > 10000})
value.count()

这样优化同时避免了多次循环遍历,每个映射的数组只需要遍历一次即可。

可以通过coalesce(1)只设置一个分区,使代码串行运行,然后增加打印验证一下效果——

val value = ss.sparkContext.textFile(filePath).coalesce(1).map(x =>{
  println(s"分割打印 $x")
  x.split(",")
}).filter(x =>
  {
    println(s"过滤打印 ${x.apply(0)}")
    x.apply(3).toInt > 10000
  }
 )
value.count()

打印部分结果,发现没每遍历一次,就把映射数组和过滤都完成了,没有像先前多个RDD那样需要每次都得遍历,这样就能达到一定优化效果——

分割打印 Amy Harris,39,男,18561,性价比,家居用品,天猫,微信支付,10,折扣优惠,品牌忠诚
过滤打印 Amy Harris
分割打印 Lori Willis,33,女,14071,功能性,家居用品,苏宁易购,货到付款,1,折扣优惠,日常使用
过滤打印 Lori Willis
分割打印 Jim Williams,61,男,14145,时尚潮流,汽车配件,淘宝,微信支付,3,免费赠品,礼物赠送
过滤打印 Jim Williams
分割打印 Anthony Perez,19,女,11587,时尚潮流,珠宝首饰,拼多多,支付宝,5,免费赠品,商品推荐
过滤打印 Anthony Perez
分割打印 Allison Carroll,28,男,18292,环保可持续,美妆护肤,唯品会,信用卡,8,免费赠品,日常使用
过滤打印 Allison Carroll
分割打印 Robert Rice,47,男,5347,时尚潮流,图书音像,拼多多,微信支付,8,有优惠券,兴趣爱好
过滤打印 Robert Rice

这样也提醒了我们,在遇到连续转换的RDD时,其实可以自行做代码优化,避免产生中间可优化的RDD和遍历操作。

三、窄依赖优化

RDD在执行惰性计算时,会尽可能进行窄依赖优化。

有窄依赖,便会有宽依赖,两者有什么区别呢?

窄依赖指的是父RDD的每个分区只需要通过简单的转换操作就可以计算出对应的子RDD分区,不涉及跨多个分区的数据交换,即父子之间每个分区都是一对一的。

前文提到的map、filter等转换都属于窄依赖的操作。

例如,array.filter(x => x.apply(3).toInt > 10000),父RDD有三个分区,那么三个分区就会分别执行array.filter(x => x.apply(3).toInt > 10000)将过滤的数据传给子RDD对应的分区——

image

宽依赖指父RDD的每个分区会通过跨区计算将原本同一个分区数据分发到不同子分区上,这中间涉及到shuffle重新洗牌操作,会存在较大的计算,父子之间分区是一对多的。可以看到,父RDD同一个分区的数据,在宽依赖情况下,会将相同的key传输到同一个分区里,这就意味着,同一个父RDD,如果存在多个不同的key,可能会分发到多个不同的子分区上,进而出现shuffle重新洗牌操作。

image

因此,RDD会尽可能的进行窄依赖优化,在无需跨区计算的情况下,就避免进行shuffle重新洗牌操作,将父分区一对一地传输给子分区。同时,窄依赖还有一个好处是,在子分区出现丢失数据异常时,只需要重新计算对应的父分区数据即可,无需将父分区全部数据进行计算。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/227025.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

UE Http笔记

c参考链接 UE4 开发如何使用 Http 请求_wx61ae2f5191643的技术博客_51CTO博客 虚幻引擎:UEC如何对JSON文件进行读写?-CSDN博客 UE4 HTTP使用 官方免费插件 VaRest 在代码插件创建的VaRest - 虚幻引擎商城 UE5在蓝图中使用Varest插件Get,Post两种常见请求方式…

C# Solidworks二次开发:三种获取SW设计结构树的方法-第二讲

今天这篇文章是接上一篇文章的,主要讲述的是获取SW设计结构树节点的第二种方法。 这个方法获取节点的逻辑是先获取最顶层节点,然后再通过获取顶层节点的子节点一层一层的把所有节点都找出来,也就是需要递归。想要用这个方法就要了解下面几个…

常见的校验码

在计算机领域中,校验码是一种用于检测或纠正数据传输或存储中错误的技术。校验码通常通过在数据中添加一些冗余信息来实现。其主要目的是确保数据的完整性和准确性。 奇偶校验码(Parity Check) 奇校验: 确保数据中二进制位中的1的…

JWT安全及WebGoat靶场

JWT 安全 cookie(放在浏览器) cookie 是一个非常具体的东西,指的就是浏览器里面能永久存储的一种数据,仅仅是浏览器实现的一种数据存储功能。 cookie 由服务器生成,发送给浏览器,浏览器把 cookie 以 kv 形式保存到某个目录下的…

文件同步及实现简单监控

1. 软件简介 rsync rsync 是一款开源的、快速的、多功能的、可实现全量及增量的本地或远程 数据同步备份的优秀工具。在同步备份数据时,默认情况下,Rsync 通过其 独特的“quick check”算法,它仅同步大小或者最后修改时间发生变化的文 件或…

CentOS上配置和管理HTTP服务器的工具和实用程序

在CentOS系统上,有多个工具和实用程序可以帮助你配置和管理HTTP服务器。以下是一些常用的工具和实用程序: Apache HTTP服务器: Apache是CentOS上最常用的HTTP服务器之一。它是一个开源的Web服务器软件,具有高度的可配置性和可扩…

在jupyter notebook中修改其他文件的解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

dell服务器安装PERCCLI

因在linux 系统中无法查看系统磁盘的raid级别,也无法得知raid状态,需要安装额外的包来监控,因是dell服务器,就在dell网站中下载并安装 1、下载链接:驱动程序和下载 | Dell 中国https://www.dell.com/support/home/zh-…

ELK(四)—els基本操作

目录 elasticsearch基本概念RESTful API创建非结构化索引(增)创建空索引(删)删除索引(改)插入数据(改)数据更新(查)搜索数据(id)&…

查看端口号是否被占用

windows10查看端口号是否被占用及解除占用的常用命令 netstat -ano:查看所有端口号占用情况 netstat -ano |findstr “XXX”:查看端口号为XXX的占用情况,如下: 得到进程号为12160的进程正在占用本地的9090端口号(如果只…

Python+requests+unittest+excel实现接口自动化测试框架

在刚刚进入测试行业的时候,最开始也是做功能测试,我想很多伙伴和我一样,觉得自动化测试都很高端,很神秘。迫不及待的想去学习作自动化测试。 以前比较常用数据库python做自动化,后面发现excel个人觉得更加适合&#x…

flex布局的flex为1到底是什么

参考博客:flex:1什么意思_公孙元二的博客-CSDN博客 flex:1即为flex-grow:1,经常用作自适应布局,将父容器的display:flex,侧边栏大小固定后,将内容区flex:1,内…

算术运算(这么简单?进来坐坐?)

先热热身 算术运算,也称为四则运算,包括加法、减法、乘法和除法。此外,算术运算还包括乘方和开方。 在算术中,加减被视为一级运算,乘除被视为二级运算,乘方和开方被视为三级运算。在一道算式中,…

GDPU 数据结构 天码行空13

文章目录 一、【实验目的】二、【实验内容】三、实验源代码四、实验结果五、实验总结 一、【实验目的】 (1) 理解插入排序算法的实现过程; (2)理解不同排序算法的时间复杂度及适用环境; (3)了解算法性能…

华为数通---配置Smart Link负载分担案例

定义 Smart Link,又叫做备份链路。一个Smart Link由两个接口组成,其中一个接口作为另一个的备份。Smart Link常用于双上行组网,提供可靠高效的备份和快速的切换机制。 目的 下游设备连接到上游设备,当使用单上行方式时&#x…

算能 MilkV Duo开发板实战——opencv-mobile (迷你版opencv库)的移植和应用

前言 OpenCV是一种开源的计算机视觉和机器学习软件库,旨在提供一组通用的计算机视觉工具。它用于图像处理、目标识别、人脸识别、机器学习等领域,广泛应用于计算机视觉任务。 OpenCV-Mobile是OpenCV库的轻量版本,专为移动平台(A…

服务器感染了.DevicData-D-XXXXXXXX勒索病毒,如何确保数据文件完整恢复?

引言: 勒索病毒成为网络安全的严峻挑战,而最新的.DevicData-D-XXXXXXXX勒索病毒更是引起广泛关注。本文将深入介绍.DevicData-D-XXXXXXXX勒索病毒的特征,提供恢复被其加密的数据文件的方法,并分享预防措施,以确保您的数…

单细胞seurat-细胞比例分析-画图详细教程

大家好,今天我们来画单细胞中最简单的细胞比例图~ 1.老规矩,先加载pbmc数据 dir.create("~/gzh/细胞比例") setwd("~/gzh/细胞比例")subset_datareadRDS("~/gzh/pbmc3k_final.rds") table(stringr::str_split(string c…

Bounding boxes augmentation for object detection

Different annotations formats Bounding boxes are rectangles that mark objects on an image. There are multiple formats of bounding boxes annotations. Each format uses its specific representation of bouning boxes coordinates 每种格式都使用其特定的边界框坐标…

TCP聊天

一、项目创建 二、代码 Client类 package tcp; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.Scanner; public class Client { public sta…