Spark Job优化

1 Map端优化

1.1 Map端聚合

map-side预聚合,就是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。

RDD的话建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

SparkSQL本身的HashAggregte就会实现本地预聚合+全局聚合。

1.2 读取小文件优化

读取的数据源有很多小文件,会造成查询性能的损耗,大量的数据分片信息以及对应产生的Task元信息也会给Spark Driver的内存造成压力,带来单点问题。

设置参数:

spark.sql.files.maxPartitionBytes=128MB   默认128m

spark.files.openCostInBytes=4194304     默认4m

参数(单位都是bytes):

  • maxPartitionBytes:一个分区最大字节数。
  • openCostInBytes:打开一个文件的开销。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g --class com.atguigu.sparktuning.map.MapSmallFileTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

源码理解: DataSourceScanExec.createNonBucketedReadRDD()

FilePartition. getFilePartitions()

1)切片大小= Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

计算totalBytes的时候,每个文件都要加上一个open开销

defaultParallelism就是RDD的并行度

2)当(文件1大小+ openCostInBytes)+(文件2大小+ openCostInBytes)+…+(文件n-1大小+ openCostInBytes)+ 文件n <= maxPartitionBytes时,n个文件可以读入同一个分区,即满足: N个小文件总大小 (N-1*openCostInBytes <=  maxPartitionBytes的话。

1.3 增大map溢写时输出流buffer

1mapShuffle Write有一个缓冲区初始阈值5m,超过会尝试增加到2*当前使用内存。如果申请不到内存,则进行溢写。这个参数是internal,指定无效(见下方源码)。也就是说资源足够会自动扩容,所以不需要我们去设置。

2)溢写时使用输出流缓冲区默认32k,这些缓冲区减少了磁盘搜索和系统调用次数,适当提高可以提升溢写效率。

3Shuffle文件涉及到序列化,是采取的方式读写,默认按照每批次1万条去读写。设置得太低会导致在序列化时过度复制,因为一些序列化器通过增长和复制的方式来翻倍内部数据结构。这个参数是internal,指定无效(见下方源码)。

综合以上分析,我们可以调整的就是输出缓冲区的大小。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.map.MapFileBufferTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

源码理解:

2 Reduce端优化

2.1 合理设置Reduce数

过多的cpu资源出现空转浪费,过少影响任务性能。关于并行度、并发度的相关参数介绍,在2.2.1中已经介绍过。

2.2 输出产生小文件优化

1Join后的结果插入新表

join结果插入新表,生成的文件数等于shuffle并行度,默认就是200份文件插入到hdfs上。

解决方式:

1)可以在插入表数据前进行缩小分区操作来解决小文件过多问题,如coalescerepartition算子

2)调整shuffle并行度。根据2.2.2的原则来设置。

2、动态分区插入数据

1)没有Shuffle的情况下。最差的情况下,每个Task中都有表各个分区的记录,那文件数最终文件数将达到  Task数量 * 表分区数。这种情况下是极易产生小文件的。

INSERT overwrite table A partition ( aa ) 

SELECT * FROM B;

2)有Shuffle的情况下,上面的Task数量 就变成了spark.sql.shuffle.partitions(默认值200)。那么最差情况就会有 spark.sql.shuffle.partitions * 表分区数。

当spark.sql.shuffle.partitions设置过大时,小文件问题就产生了;当spark.sql.shuffle.partitions设置过小时,任务的并行度就下降了,性能随之受到影响。

最理想的情况是根据分区字段进行shuffle,在上面的sql中加上distribute by aa。把同一分区的记录都哈希到同一个分区中去,由一个Spark的Task进行写入,这样的话只会产生N个文件, 但是这种情况下也容易出现数据倾斜的问题。

解决思路:

结合第4章解决倾斜的思路,在确定哪个分区键倾斜的情况下,将倾斜的分区键单独拎出来:

将入库的SQL拆成(where 分区 !倾斜分区键 )和 (where 分区 = 倾斜分区键) 几个部分,非倾斜分区键的部分正常distribute by 分区字段,倾斜分区键的部分 distribute by随机数,sql如下:

//1.非倾斜键部分

INSERT overwrite table A partition ( aa ) 

SELECT *

FROM B where aa != 大key

distribute by aa;

//2.倾斜键部分

INSERT overwrite table A partition ( aa ) 

SELECT *

FROM B where aa = 大key

distribute by cast(rand() * 5 as int);

案例实操:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.reduce.DynamicPartitionSmallFileTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

2.3 增大reduce缓冲区,减少拉取次数

Spark Shuffle过程中,shuffle reduce task的buffer缓冲区大小决定了reduce task每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。

reduce端数据拉取缓冲区的大小可以通过spark.reducer.maxSizeInFlight参数进行设置,默认为48MB。

源码:BlockStoreShuffleReader.read()

2.4 调节reduce端拉取数据重试次数

Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试。对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

reduce端拉取数据重试次数可以通过spark.shuffle.io.maxRetries参数进行设置,该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败,默认为3:

2.5 调节reduce端拉取数据等待间隔

Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长(比如60s),以增加shuffle操作的稳定性。

reduce端拉取数据等待间隔可以通过spark.shuffle.io.retryWait参数进行设置,默认值为5s。

综合2.32.42.5,案例实操:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.reduce.ReduceShuffleTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

2.6 合理利用bypass

当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200)且不需要map端进行合并操作,则shuffle write过程中不会进行排序操作,使用BypassMergeSortShuffleWriter去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

当你使用SortShuffleManager时,如果确实不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

源码分析:SortShuffleManager.registerShuffle()

SortShuffleManager.getWriter()

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.reduce.BypassTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

3 整体优化

3.1 调节数据本地化等待时长

在 Spark 项目开发阶段,可以使用 client 模式对程序进行测试,此时,可以在本地看到比较全的日志信息,日志信息中有明确的 Task 数据本地化的级别,如果大部分都是 PROCESS_LOCAL、NODE_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是 RACK_LOCAL、ANY,那么需要对本地化的等待时长进行调节,应该是反复调节,每次调节完以后,再来运行观察日志,看看大部分的task的本地化级别有没有提升;看看,整个spark作业的运行时间有没有缩短。

注意过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得 Spark 作业的运行时间反而增加了。

下面几个参数,默认都是3s,可以改成如下:

spark.locality.wait //建议6s、10s

spark.locality.wait.process //建议60s

spark.locality.wait.node //建议30s

spark.locality.wait.rack //建议20s

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.job.LocalityWaitTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

3.2 使用堆外内存

1、堆外内存参数

讲到堆外内存,就必须去提一个东西,那就是去yarn申请资源的单位,容器。Spark  on yarn模式,一个容器到底申请多少内存资源。

一个容器最多可以申请多大资源,是由yarn参数yarn.scheduler.maximum-allocation-mb决定, 需要满足:

spark.executor.memoryOverhead + spark.executor.memory + spark.memory.offHeap.size

≤ yarn.scheduler.maximum-allocation-mb

参数解释:

  • spark.executor.memory:提交任务时指定的堆内内存。
  • spark.executor.memoryOverhead:堆外内存参数,内存额外开销。

默认开启,默认值为spark.executor.memory*0.1并且会与最小值384mb做对比,取最大值。所以spark on yarn任务堆内内存申请1个g,而实际去yarn申请的内存大于1个g的原因。

  • spark.memory.offHeap.size:堆外内存参数,spark中默认关闭,需要将spark.memory.enable.offheap.enable参数设置为true。

注意:很多网上资料说spark.executor.memoryOverhead包含spark.memory.offHeap.size,这是由版本区别的,仅限于spark3.0之前的版本。3.0之后就发生改变,实际去yarn申请的内存资源由三个参数相加。

测试申请容器上限:

yarn.scheduler.maximum-allocation-mb修改为7G,将三个参数设为如下大于7G,会报错:

spark-submit --master yarn --deploy-mode client --driver-memory 1g  --num-executors 3 --executor-cores 4 --conf  spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=2g  --executor-memory 5g  --class com.atguigu.sparktuning.join.SMBJoinTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

spark.memory.offHeap.size修改为1g后再次提交:

spark-submit --master yarn --deploy-mode client --driver-memory 1g  --num-executors 3 --executor-cores 4 --conf  spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=1g  --executor-memory 5g  --class com.atguigu.sparktuning.join.SMBJoinTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

2、使用堆外缓存

使用堆外内存可以减轻垃圾回收的工作,也加快了复制的速度。

当需要缓存非常大的数据量时,虚拟机将承受非常大的GC压力,因为虚拟机必须检查每个对象是否可以收集并必须访问所有内存页。本地缓存是最快的,但会给虚拟机带来GC压力,所以,当你需要处理非常多GB的数据量时可以考虑使用堆外内存来进行优化,因为这不会给Java垃圾收集器带来任何压力。让JAVA GC为应用程序完成工作,缓存操作交给堆外。

spark-submit --master yarn --deploy-mode client --driver-memory 1g  --num-executors 3 --executor-cores 4 --conf  spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=1g  --executor-memory 5g  --class com.atguigu.sparktuning.job.OFFHeapCache spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

3.3 调节连接等待时长

在Spark作业运行过程中,Executor优先从自己本地关联的BlockManager中获取某份数据,如果本地BlockManager没有的话,会通过TransferService远程连接其他节点上Executor的BlockManager来获取数据。

如果task在运行过程中创建大量对象或者创建的对象较大,会占用大量的内存,这回导致频繁的垃圾回收,但是垃圾回收会导致工作现场全部停止,也就是说,垃圾回收一旦执行,Spark的Executor进程就会停止工作,无法提供相应,此时,由于没有响应,无法建立网络连接,会导致网络连接超时。

在生产环境下,有时会遇到file not found、file lost这类错误,在这种情况下,很有可能是Executor的BlockManager在拉取数据的时候,无法建立连接,然后超过默认的连接等待时长120s后,宣告数据拉取失败,如果反复尝试都拉取不到数据,可能会导致Spark作业的崩溃。这种情况也可能会导致DAGScheduler反复提交几次stage,TaskScheduler反复提交几次task,大大延长了我们的Spark作业的运行时间。

为了避免长时间暂停(如GC)导致的超时,可以考虑调节连接的超时时长,连接等待时长需要在spark-submit脚本中进行设置,设置方式可以在提交时指定:

--conf spark.core.connection.ack.wait.timeout=300s

调节连接等待时长后,通常可以避免部分的XX文件拉取失败、XX文件lost等报错。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 1g --conf spark.core.connection.ack.wait.timeout=300s --class com.atguigu.sparktuning.job.AckWaitTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/137612.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

pychon/PIL/opencv/json学习过程中遇到的问题

1. 使用PIL.Image读取图片 注意&#xff1a;pytorch中对图像预处理是transforms的输入必须是PIL格式的文件&#xff0c;使用cv2读取的图片就按照第二条的代码处理&#xff08;3通道合并、归一化处理&#xff09; from PIL import Image img Image.open("test1.jpg"…

数据结构 队列(C语言实现)

目录 1.队列的概念及结构2.队列的代码实现 正文开始前给大家推荐个网站&#xff0c;前些天发现了一个巨牛的 人工智能学习网站&#xff0c; 通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。 点击跳转到网站。 1.队列的概念及结构 队列&#xff1a;只允许在…

【多线程 - 03、线程的生命周期】

生命周期 当线程被创建并启动以后&#xff0c;它不是一启动就进入执行状态&#xff0c;也不会一直处于执行状态&#xff0c;而是会经历五种状态。 线程状态的五个阶段&#xff1a; 新建状态&#xff08;New&#xff09;就绪状态&#xff08;Runnable&#xff09;运行状态&…

【c++随笔12】继承

【c随笔12】继承 一、继承1、继承的概念2、3种继承方式3、父类和子类对象赋值转换4、继承中的作用域——隐藏5、继承与友元6、继承与静态成员 二、继承和子类默认成员函数1、子类构造函数 二、子类拷贝构造函数3、子类的赋值重载4、子类析构函数 三、单继承、多继承、菱形继承1…

MyBatis研究

入门级使用 参照MyBatis官网的简介与入门部分&#xff0c;尝试使用MyBatis&#xff0c;可创建新的Maven项目&#xff0c;引入以下依赖&#xff1a; <dependencies> <dependency><groupId>org.mybatis</groupId><artifactId>mybatis</…

Spark 资源调优

1 资源规划 1.1 资源设定考虑 1、总体原则 以单台服务器128G内存&#xff0c;32线程为例。 先设定单个Executor核数&#xff0c;根据Yarn配置得出每个节点最多的Executor数量&#xff0c;每个节点的yarn内存/每个节点数量单个节点的数量 总的executor数单节点数量*节点数。 2、…

C/C++满足条件的数累加 2021年9月电子学会青少年软件编程(C/C++)等级考试一级真题答案解析

目录 C/C满足条件的数累加 一、题目要求 1、编程实现 2、输入输出 二、算法分析 三、程序编写 四、程序说明 五、运行结果 六、考点分析 C/C满足条件的数累加 2021年9月 C/C编程等级考试一级编程题 一、题目要求 1、编程实现 现有n个整数&#xff0c;将其中个位数…

react 组件进阶

目标&#xff1a;1.能够使用props接收数据 2.能够实现父子组建之间的通讯 3.能够实现兄弟组建之间的通讯 4.能够给组建添加props校验 5.能够说出生命周期常用的钩子函数 6.能够知道高阶组件的作用 一&#xff0c;组件通讯介绍 组件是独立且封闭的单元&#xff0c;默认情况下&a…

U-Mail邮件中继,让海外邮件沟通更顺畅

在海外&#xff0c;电子邮件是人们主要的通信工具&#xff0c;尤其是商务往来沟通&#xff0c;企业邮箱是标配。这主要是因为西方国家互联网发展较早&#xff0c;在互联网早期&#xff0c;电子邮件技术较为成熟&#xff0c;大家都用电子邮件交流&#xff0c;于是这成了一种潮流…

2022年03月 Python(五级)真题解析#中国电子学会#全国青少年软件编程等级考试

Python等级考试(1~6级)全部真题・点这里 一、单选题(共25题,每题2分,共50分) 第1题 下面有关random的常用方法,描述错误的是? A: random.random()生成一个[0.0,1.0)之间的随机小数 B: random.randint(a,b)生成一个[a,b]之间的随机整数 C: random.choice(seq)从序列中…

SpringBoot2.X整合集成Dubbo

环境安装 Dubbo使用zookeeper作为注册中心&#xff0c;首先要安装zookeeper。 Windows安装zookeeper如下&#xff1a; https://blog.csdn.net/qq_33316784/article/details/88563482 Linux安装zookeeper如下&#xff1a; https://www.cnblogs.com/expiator/p/9853378.html Sp…

高防CDN:护航网络安全的卓越之选

在当今数字化时代&#xff0c;网络攻击与日俱增&#xff0c;为了确保网站和应用程序的稳定运行&#xff0c;高防CDN&#xff08;高防御内容分发网络&#xff09;应运而生。选择高防CDN的理由不仅源于其强大的防护性能&#xff0c;还体现了其与硬件防火墙异曲同工的奥妙。 选择高…

wordpress是什么?快速搭网站经验分享

​作者主页 &#x1f4da;lovewold少个r博客主页 ⚠️本文重点&#xff1a;c入门第一个程序和基本知识讲解 &#x1f449;【C-C入门系列专栏】&#xff1a;博客文章专栏传送门 &#x1f604;每日一言&#xff1a;宁静是一片强大而治愈的神奇海洋&#xff01; 目录 前言 wordp…

[ASP]数据库编辑与管理V1.0

本地测试&#xff1a;需要运行 ASP专业调试工具&#xff08;自己搜索下载&#xff09; 默认登陆口令&#xff1a;admin 修改口令&#xff1a;打开index.asp找到第3行把admin"admin"改成其他&#xff0c;如admin"abc123" 程序功能齐全&#xff0c;代码精简…

[.NET]启明星电子文档管理系统edoc v33.0

启明星电子文档库是一个简单、实用的企业文档在线存储工具。系统采用ASP.NETMSSQL2008 Express开发&#xff0c;所有文档数据都以二进制方式存储在数据库里方便备份。 系统的特点包括&#xff1a; &#xff08;1&#xff09;支持文档在线预览&#xff0c;可以在线预览word&…

SolidWorks绘制花瓶教程

这个花瓶是我学习solidworks画图以来用时最长的一个图形了&#xff0c;特此记录一下&#xff0c;用了我足足两个早晨才把他给画出来&#xff0c;我这是跟着哔站里的隔壁老王学习的&#xff0c;下面是视频地址&#xff1a;点击我一下看视频教程 下面是我的绘图过程&#xff0c;…

Spring源码系列-Spring AOP

AOP 要实现的是在我们原来写的代码的基础上&#xff0c;进行一定的包装&#xff0c;如在方法执行前、方法返回后、方法抛出异常后等地方进行一定的拦截处理或者叫增强处理。 AOP 的实现并不是因为 Java 提供了什么神奇的钩子&#xff0c;可以把方法的几个生命周期告诉我们&…

Leetcode刷题详解—— 有效的数独

1. 题目链接&#xff1a;36. 有效的数独 2. 题目描述&#xff1a; 请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 &#xff0c;验证已经填入的数字是否有效即可。 数字 1-9 在每一行只能出现一次。数字 1-9 在每一列只能出现一次。数字 1-9 在每一个以粗实线分隔的…

11/12总结

项目进度&#xff1a; 界面画了搜索机票&#xff0c;预定机票&#xff0c;搜索酒店&#xff0c;预定酒店&#xff0c; 然后是开始写这些功能的后端逻辑

RT-DTER 引入用于低分辨率图像和小物体的新 CNN 模块 SPD-Conv

论文地址:https://arxiv.org/pdf/2208.03641v1.pdf 代码地址:https://github.com/labsaint/spd-conv 卷积神经网络(CNN)在图像分类、目标检测等计算机视觉任务中取得了巨大的成功。然而,在图像分辨率较低或对象较小的更困难的任务中,它们的性能会迅速下降。 这源于现有CNN…