【pyspark速成专家】11_Spark性能调优方法2

目录

​编辑

二,Spark任务UI监控

三,Spark调优案例


二,Spark任务UI监控

Spark任务启动后,可以在浏览器中输入 http://localhost:4040/ 进入到spark web UI 监控界面。

该界面中可以从多个维度以直观的方式非常细粒度地查看Spark任务的执行情况,包括任务进度,耗时分析,存储分析,shuffle数据量大小等。

最常查看的页面是 Stages页面和Excutors页面。

Jobs: 每一个Action操作对应一个Job,以Job粒度显示Application进度。有时间轴Timeline。

Stages: Job在遇到shuffle切开Stage,显示每个Stage进度,以及shuffle数据量。可以点击某个Stage进入详情页,查看其下面每个Task的执行情况以及各个partition执行的费时统计。

Storage:

监控cache或者persist导致的数据存储大小。

Environment:
显示spark和scala版本,依赖的各种jar包及其版本。

Excutors : 监控各个Excutors的存储和shuffle情况。

SQL: 显示各种SQL命令在那些Jobs中被执行。

三,Spark调优案例

下面介绍几个调优的典型案例:

1,资源配置优化

2,利用缓存减少重复计算

3,数据倾斜调优

4,broadcast+map代替join

5,reduceByKey/aggregateByKey代替groupByKey

1,资源配置优化

下面是一个资源配置的例子:

优化前:

#提交python写的任务
spark-submit --master yarn \
--deploy-mode cluster \
--executor-memory 12G \
--driver-memory 12G \
--num-executors 100 \
--executor-cores 8 \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.task.maxFailures=10 \
--conf spark.stage.maxConsecutiveAttempts=10 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python  #cluster模式时候设置
--archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境
--files  data.csv,profile.txt
--py-files  pkg.py,tqdm.py
pyspark_demo.py 

优化后:这里主要减小了 executor-cores数量,一般设置为1~4,过大的数量可能会造成每个core计算和存储资源不足产生OOM,也会增加GC时间。 此外也将默认分区数调到了1600,并设置了2G的堆外内存。

#提交python写的任务
spark-submit --master yarn \
--deploy-mode cluster \
--executor-memory 12G \
--driver-memory 12G \
--num-executors 100 \
--executor-cores 2 \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.default.parallelism=1600 \
--conf spark.sql.shuffle.partitions=1600 \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=2g\
--conf spark.task.maxFailures=10 \
--conf spark.stage.maxConsecutiveAttempts=10 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python  #cluster模式时候设置
--archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境
--files  data.csv,profile.txt
--py-files  pkg.py,tqdm.py
pyspark_demo.py 

2, 利用缓存减少重复计算

%%time
# 优化前:
import math 
rdd_x = sc.parallelize(range(0,2000000,3),3)
rdd_y = sc.parallelize(range(2000000,4000000,2),3)
rdd_z = sc.parallelize(range(4000000,6000000,2),3)
rdd_data = rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x))
s = rdd_data.reduce(lambda a,b:a+b+0.0)
n = rdd_data.count()
mean = s/n 
print(mean)

%%time 
# 优化后: 
import math 
from  pyspark.storagelevel import StorageLevel
rdd_x = sc.parallelize(range(0,2000000,3),3)
rdd_y = sc.parallelize(range(2000000,4000000,2),3)
rdd_z = sc.parallelize(range(4000000,6000000,2),3)
rdd_data = rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x)).persist(StorageLevel.MEMORY_AND_DISK)

s = rdd_data.reduce(lambda a,b:a+b+0.0)
n = rdd_data.count()
mean = s/n 
rdd_data.unpersist()
print(mean)

3, 数据倾斜调优

%%time 
# 优化前: 
rdd_data = sc.parallelize(["hello world"]*1000000+["good morning"]*10000+["I love spark"]*10000)
rdd_word = rdd_data.flatMap(lambda x:x.split(" "))
rdd_one = rdd_word.map(lambda x:(x,1))
rdd_count = rdd_one.reduceByKey(lambda a,b:a+b+0.0)
print(rdd_count.collect()) 

%%time 
# 优化后: 
import random 
rdd_data = sc.parallelize(["hello world"]*1000000+["good morning"]*10000+["I love spark"]*10000)
rdd_word = rdd_data.flatMap(lambda x:x.split(" "))
rdd_one = rdd_word.map(lambda x:(x,1))
rdd_mid_key = rdd_one.map(lambda x:(x[0]+"_"+str(random.randint(0,999)),x[1]))
rdd_mid_count = rdd_mid_key.reduceByKey(lambda a,b:a+b+0.0)
rdd_count = rdd_mid_count.map(lambda x:(x[0].split("_")[0],x[1])).reduceByKey(lambda a,b:a+b+0.0)
print(rdd_count.collect())  

#作者按:此处仅示范原理,单机上该优化方案难以获得性能优势

4, broadcast+map代替join

该优化策略一般限于有一个参与join的rdd的数据量不大的情况。

%%time 
# 优化前:

rdd_age = sc.parallelize([("LiLei",18),("HanMeimei",19),("Jim",17),("LiLy",20)])
rdd_gender = sc.parallelize([("LiLei","male"),("HanMeimei","female"),("Jim","male"),("LiLy","female")])
rdd_students = rdd_age.join(rdd_gender).map(lambda x:(x[0],x[1][0],x[1][1]))

print(rdd_students.collect())

%%time 

# 优化后:
rdd_age = sc.parallelize([("LiLei",18),("HanMeimei",19),("Jim",17),("LiLy",20)])
rdd_gender = sc.parallelize([("LiLei","male"),("HanMeimei","female"),("Jim","male"),("LiLy","female")],2)
ages = rdd_age.collect()
broads = sc.broadcast(ages)

def get_age(it):
    result = []
    ages = dict(broads.value)
    for x in it:
        name = x[0]
        age = ages.get(name,0)
        result.append((x[0],age,x[1]))
    return iter(result)

rdd_students = rdd_gender.mapPartitions(get_age)

print(rdd_students.collect())

5,reduceByKey/aggregateByKey代替groupByKey

groupByKey算子是一个低效的算子,其会产生大量的shuffle。其功能可以用reduceByKey和aggreagateByKey代替,通过在每个partition内部先做一次数据的合并操作,大大减少了shuffle的数据量。

%%time 
# 优化前:
rdd_students = sc.parallelize([("class1","LiLei"),("class2","HanMeimei"),("class1","Lucy"),
                               ("class1","Ann"),("class1","Jim"),("class2","Lily")])
rdd_names = rdd_students.groupByKey().map(lambda t:(t[0],list(t[1])))
names = rdd_names.collect()
print(names)

%%time 
# 优化后:
rdd_students = sc.parallelize([("class1","LiLei"),("class2","HanMeimei"),("class1","Lucy"),
                               ("class1","Ann"),("class1","Jim"),("class2","Lily")])
rdd_names = rdd_students.aggregateByKey([],lambda arr,name:arr+[name],lambda arr1,arr2:arr1+arr2)

names = rdd_names.collect()
print(names)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/655609.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

防止浏览器缓存了静态的配置等文件(例如外部的config.js 等文件)

防止浏览器缓存了静态的配置文件 前言1、在script引入的时候添加随机数1.1、引入js文件1.2、引入css文件2、通过html文件的<meta>设置防止缓存3、使用HTTP响应头:前言 在实际开发中浏览器的缓存问题一直是一个很让人头疼的问题,尤其是我们打包时候防止的静态配置文件c…

Linux线程:管理与控制

一、引言 随着计算机硬件技术的飞速发展&#xff0c;尤其是多核CPU的普及&#xff0c;多线程编程已成为充分利用系统资源、提高程序并发性和响应速度的关键技术。 多线程编程允许一个程序中同时运行多个线程&#xff0c;每个线程可以独立地执行不同的任务。这种并行处理的方式…

计算机操作系统体系结构

我是荔园微风&#xff0c;作为一名在IT界整整25年的老兵&#xff0c;今天给大家讲讲操作系统。 当今的操作系统趋向于越来越复杂&#xff0c;因为它们提供许多服务&#xff0c;并支持各种硬件和软件资源&#xff08;请参见“操作系统思想&#xff1a;尽量保持简单”&#xff0…

Dynadot API调整一览

关于Dynadot Dynadot是通过ICANN认证的域名注册商&#xff0c;自2002年成立以来&#xff0c;服务于全球108个国家和地区的客户&#xff0c;为数以万计的客户提供简洁&#xff0c;优惠&#xff0c;安全的域名注册以及管理服务。 Dynadot平台操作教程索引&#xff08;包括域名邮…

DISCO: Disentangled Control for Realistic Human Dance Generation

NTU&Microsoft CVPR24https://github.com/Wangt-CN/DisCo 问题引入 提高human motion transfer模型的泛化性&#xff1b;给出 f , g f,g f,g作为参考图片的前背景&#xff0c;然后给出单个pose p p t pp_t ppt​或者pose序列 p { p 1 , p 2 , ⋯ , p T } p \{p_1,p_2…

撤销最近一次的提交,使用git revert 和 git reset的区别

文章目录 工作区 暂存区 本地仓库 远程仓库需求&#xff1a;已推送到远程仓库&#xff0c;想要撤销操作git revert &#xff08;添加新的提交来“反做”之前的更改&#xff0c;云端会残留上次的提交记录&#xff09;git reset&#xff08;相当于覆盖上次的提交&#xff09;1.--…

HIGT:用于全景切片图像分析的层次交互图-Transformer

文章目录 HIGT: Hierarchical Interaction Graph-Transformer for Whole Slide Image Analysis摘要方法实验结果 HIGT: Hierarchical Interaction Graph-Transformer for Whole Slide Image Analysis 摘要 在计算病理学领域&#xff0c;全景切片图像&#xff08;WSIs&#xf…

JavaEE-Spring Controller(服务器控制以及Controller的实现和配置)

Spring Controller 服务器控制 响应架构 Spring Boot 内集成了 Tomcat 服务器&#xff0c;也可以外接 Tomcat 服务器。通过控制层接收浏览器的 URL 请求进行操作并返回数据。 底层和浏览器的信息交互仍旧由 servlet 完成&#xff0c;服务器整体架构如下&#xff1a; Server&…

调整表格大小

方法一&#xff1a;使用鼠标拖动表格边框或右下角的调整控点 在Word文档中&#xff0c;选中要缩小的表格&#xff0c;将鼠标指针放在表格的边框线上&#xff0c;直到指针变成双箭头的形状。 按住鼠标左键&#xff0c;拖动边框线&#xff0c;调整表格的宽度或高度。如果同时按住…

01 一文理解,Prometheus详细介绍

01 一文理解&#xff0c;Prometheus详细介绍 介绍 大家好&#xff0c;我是秋意零。 Prometheus 是一个开源的系统监控和报警工具包&#xff0c;最初由SoundCloud开发&#xff0c;并在2012年作为开源项目发布。Prometheus 目前由Cloud Native Computing Foundation&#xff08…

python爬虫之pandas库——数据清洗

安装pandas库 pip install pandas pandas库操作文件 已知在本地桌面有一名为Python开发岗位的csv文件(如果是excel文件可以做简单修改即可&#xff0c;道理是通用的) 打开文件&#xff1a; 打开文件并查看文件内容 from pandas import DataFrame import pandas as pd data_c…

AIGC 010-CLIP第一个文本和图像对齐的大模型!

AIGC 010-CLIP第一个文本和图像对齐的大模型&#xff01; 文章目录 0 论文工作1 论文方法2 效果 0 论文工作 不客气的说CLIP和扩散模型的成功让计算式视觉领域几乎所有工作都重新做了一遍。 CLIP&#xff08;对比语言-图像预训练&#xff09;论文提出了一种新的对比学习方法&a…

【C++课程学习】:二叉树的基本函数实现

&#x1f381;个人主页&#xff1a;我们的五年 &#x1f50d;系列专栏&#xff1a;C课程学习 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 目录 &#x1f349;二叉树的结构类型&#xff1a; &#x1f349;1.创建二叉树函数&#xff08;根据数组&am…

如何将云服务器上操作系统由centos切换为ubuntu

本文将介绍如何将我们购买的云服务器上之前装的centos切换为ubuntu&#xff0c;云服务器以华为云为例&#xff0c;要切换的ubuntu版本为ubuntu20.04。 参考官方文档&#xff1a;切换操作系统_弹性云服务器 ECS (huaweicloud.com) 首先打开华为云官网&#xff0c;登录后点击右…

机器学习(五) -- 监督学习(5) -- 线性回归1

系列文章目录及链接 上篇&#xff1a;机器学习&#xff08;五&#xff09; -- 监督学习&#xff08;4&#xff09; -- 集成学习方法 - 随机森林 下篇&#xff1a;机器学习&#xff08;五&#xff09; -- 监督学习&#xff08;5&#xff09; -- 线性回归2 前言 tips&#xff1…

树莓派指令

1.常用指令 2.在终端窗口编辑文本文件 2.1nano编辑器 在文本里ctrlG就可以查看更多的快捷按键 2.2vi编辑器 进入默认为命令模式

Spring-Cloud-OpenFeign源码解析-04-调用流程分析

在Spring-Cloud-OpenFeign源码解析-03-FeignClientFactoryBean分析到&#xff0c;通过Autowired或者Resource注入FeignClient实例的时候&#xff0c;实际上返回的是JDK动态代理对象&#xff0c;具体的实现逻辑在InvocationHandler的invoke方法中 回看ReflectiveFeign.newInsta…

怎么简单的把图片缩小?图片在线改大小的方法

在日常工作中经常需要在网上上传图片&#xff0c;但是一般网上不同的平台对上传的图片大小和尺寸都会有限定的要求&#xff0c;不符合要求无法正常上传使用。所以当遇到图片太大的问题时&#xff0c;该如何快速修改图片大小&#xff0c;有很多的小伙伴都很关注这个问题的解决方…

macOS上用Qt creator编译并跑shotcut

1 简介 Shotcut是一个开源的跨平台的视频编辑软件&#xff0c;支持WIN/MACOS/LINUX等平台&#xff0c;由于该项目的编译较为麻烦&#xff0c;踩坑几许&#xff0c;因此写此文章记录完整编译构建过程&#xff0c;后续按此法编译&#xff0c;可减少走弯路&#xff0c;提高生产力。…

Springboot项目打包:将依赖的jar包输出到指定目录

场景 公司要对springboot项目依赖的jar包进行升级&#xff0c;但是遇到一个问题&#xff0c;项目打包之后&#xff0c;没办法看到他里面依赖的jar包&#xff0c;版本到底是不是升上去了&#xff0c;没办法看到。 下面是项目打的jar包 我们通过反编译工具jdgui&#xff0c;来…