Spark join数据倾斜调优

Spark中常见的两种数据倾斜现象如下

  • stage部分task执行特别慢

在这里插入图片描述

一般情况下是某个task处理的数据量远大于其他task处理的数据量,当然也不排除是程序代码没有冗余,异常数据导致程序运行异常。

  • 作业重试多次某几个task总会失败
    在这里插入图片描述

常见的退出码143、53、137、52以及heartbeat timed out异常,通常可认为是executor内存被打满。

RDD调优方法

  1. 查看数据分布
    Spark Core中shuffle算子出现数据倾斜时,可在Spark作业中加入查看key分布的代码,也可以将代码拆解出来使用spark-shell做测试
val rdd = sc.parallelize(Array("hello", "hello", "hello", "hi")).map((_,1))

// 数据量较少
rdd.reduceByKey(_ + _)
.sortBy(_._2, false)
.take(20)
// 数据量较大, 用sample采样后在统计
rdd.sample(false, 0.1)
.reduceByKey(_+_)
.sortBy(_._2, false)
.take(20)
  1. 调整shuffle并行度
    原理:Spark在做shuffle时,默认使用HashPartitioner(非Hash Shuffle)对数据进行分区。如果并行度设置的不合适如比较小,可能造成大量不相同的key对应的数据被分配到了同一个task上,造成该task所处理的数据远大于其它task,从而造成数据倾斜
    在这里插入图片描述

调优建议:

  • 使用spark.default.parallelism调整分区数,默认值200建议500或更大
  • 在shuffle的算子上直接设置分区数,如:a.join(b, 500)、rdd.reduceByKey(_ + _, 500)
  1. reduce join转map join
    原理:不使用join算子直接进行连接操作,而使用broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的出现
    在这里插入图片描述

调优建议:

  • broadcast的数据量不要超过500M, 过大driver/executor可能会oom
// 1.broadcast小表
val rdd1Broadcast = sc.broadcast(rdd1.collect())
// 2.map join
rdd2.map { x =>
  val rdd1DataMap = rdd1Broadcast.value.toMap
  rdd1DataMap.get(x._1) match {
    case Some(v) => (x._1, (x._2, v))
    case None => (x._1, (x._2, null))
  }
}
// 2.或者直接
rdd2.join(rdd1Broadcast)
  1. 分拆join在union
    原理:将有数据倾斜的RDD1中倾斜key对应的数据集单独抽取出来加盐(随机前缀),另外一个RDD2每条数据分别与所有的随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者join之后去掉前缀;然后将不包含倾斜key的剩余数据进行join;最后将两次join的结果集通过union合并,即可得到全部join结果。
    在这里插入图片描述

调优建议:

// 1.统计数量最大的key
val skewedKeySet = rdd1.sample(false, 0.2)
  .reduceByKey(_ + _)
  .sortBy(_._2, false)
  .take(10)
  .map(x => x._1)
  .toSet

// 2.拆分异常的rdd, 倾斜key加上随机数
val rdd1_1 = rdd1.filter(x => skewedKeySet.contains(x._1)).map { x =>
  val prefix = scala.util.Random.nextInt(10).toString
  (s"${prefix}_${x._1}", x._2)
}
val rdd1_2 = rdd1.filter(x => !skewedKeySet.contains(x._1))

// 3.正常rdd存在倾斜key的部分进行膨胀
val rdd2_1 = rdd2.filter(x => skewedKeySet.contains(x._1))
  .flatMap { x =>
    val list = 0 until 10
    list.map(i => (s"${i}_${x._1}", x._2))
  }

val rdd2_2 = rdd2.filter(x => !skewedKeySet.contains(x._1))

// 4.倾斜key的rdd进行join
val skewedRDD = rdd1_1.join(rdd2_1).map(x => (x._1.split("_")(1), x._2))
// 5.普通key的rdd进行join
val sampleRDD = rdd1_2.join(rdd2_2)
// 6.结果union
skewedRDD.union(sampleRDD)

SQL调优方法

  1. 查看数据分布
    统计某个查询结果或表中出现次数超过200次的key
WITH a AS (
            ${query}
        )
SELECT k,s
FROM (
        SELECT ${key} AS k,count(*) AS s
        FROM a
        GROUP BY ${key}
)
WHERE s > 200
  1. 自动调整shuffle并行度
    原理:自适应执行开启的前提下(AQE),假设我们设置的shuffle partition个数为5,在map stage结束之后,我们知道每一个partition的大小分别是70MB,30MB,20MB,10MB和50MB。假设我们设置每一个reducer处理的目标数据量是64MB,那么在运行时,我们可以实际使用3个reducer。第一个reducer处理partition 0 (70MB),第二个reducer处理连续的partition 1 到3,共60MB,第三个reducer处理partition 4 (50MB)
    在这里插入图片描述

Spark参数:

参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.adaptive.coalescePartitions.minPartitionNum自适应执行中使用的最小shuffle后分区数,默认值executor*core数
spark.sql.adaptive.coalescePartitions.initialPartitionNum合并前的初始shuffle分区数量,默认值spark.sql.shuffle.partitions
spark.sql.adaptive.advisoryPartitionSizeInBytes合并小分区到建议的目标值, 默认256m
spark.sql.shuffle.partitionsjoin等操作分区数,默认值200推荐500或更大
  1. 自动优化Join
    原理:自适应执行开启的前提下(AQE),我们可以获得SortMergeJoin两个子stage的数据量,在满足条件的情况下,即一张表小于broadcast阈值,可以将SortMergeJoin转化成BroadcastHashJoin
    在这里插入图片描述
参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.autoBroadcastJoinThreshold默认10M,设置为-1可以禁用广播;实际根据hive表存储的统计信息或文件预估大小与此值做判断看是否做broadcast,由于文件是压缩格式一般情况下此参数并不可靠建议膨胀系数spark.sql.sources.fileCompressionFactor=10推荐此参数保持默认,调整自适应的broadcast参数
spark.sql.adaptive.autoBroadcastJoinThreshold此参数仅影响自适应执行阶段join优化时broadcast阈值;设置为-1可以禁用广播;默认值spark.sql.autoBroadcastJoinThreshold自适应执行得到的数据比较准确,driver内存足够的前提下可以将此值调大如200M
  1. 自动处理数据倾斜
    原理:自适应执行开启的前提下(AQE),我们可以在运行时很容易地检测出有数据倾斜的partition。当执行某个stage时,我们收集该stage每个mapper 的shuffle数据大小和记录条数。如果某一个partition的数据量或者记录条数超过中位数的N倍,并且大于某个预先配置的阈值,我们就认为这是一个数据倾斜的partition,需要进行特殊的处理
    在这里插入图片描述
参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.adaptive.skewJoin.enabled开启自动解决数据倾斜,默认值true
spark.sql.adaptive.skewJoin.skewedPartitionFactor影响因子,某分区数据大小超过所有分区中位数与影响因子乘积,才会被认为发生了数据倾斜
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes视为倾斜分区的分区数据最小值

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/767974.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【18】认证服务02—微博社交登录

持续学习&持续更新中… 守破离 【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【18】认证服务02—微博社交登录 微博社交登录图示原理前置准备实现流程完整代码 参考 微博社交登录 OAuth: OAuth(开放授权)是一个开放标准&#xff0…

qt6 通过http查询天气的实现

步骤如下: cmakelist 当中,增加如下配置 引入包 访问远端api 解析返回的数据 cmakelist 当中,增加如下配置,作用是引入Network库。 引入包 3、访问远端api void Form1::on_pushButton_clicked() {//根据URL(http://t.weather.…

6.Android逆向协议-配置FD抓包环境

免责声明:内容仅供学习参考,请合法利用知识,禁止进行违法犯罪活动! 内容参考于:微尘网校 上一个内容:5.Android逆向协议-初识HTTP和HTTPS协议 工具下载: 链接:https://pan.baidu.…

20.《C语言》——【移位操作符】

🌹开场语 亲爱的读者,大家好!我是一名正在学习编程的高校生。在这个博客里,我将和大家一起探讨编程技巧、分享实用工具,并交流学习心得。希望通过我的博客,你能学到有用的知识,提高自己的技能&a…

大数据之Linux部署常用命令脚本封装

文章目录 编写集群命令执行脚本xcall集群命令执行1. 脚本需求2. 需求分析3. 脚本实现3.1 创建脚本存放目录3.2 编写xcall脚本3.3 修改脚本执行权限3.4 测试脚本 编写集群分发脚本xsync集群分发脚本1. 脚本需求2. 需求分析3. 脚本实现3.1 创建脚本存放目录3.2 编写xsync脚本3.3 …

cad由于找不到mfc140u.dll的解决方法,彻底解决mfc140u.dll丢失问题

在计算机辅助设计(CAD)的时候,我们可能会遇到各种错误和问题。其中,“CAD由于找不到mfc140u.dll,无法继续执行代码”的错误提示。这个问题可能会导致CAD无法启动运行,因此,我希望通过分享我的经…

外籍学员报到,四川眼科医院开启国际屈光手术专科医生培训

“能够来到中国、来到四川眼科医院学习,我真的很幸运!”这个夏天,对于马来西亚眼科医生Ivan Cheng En Yoo来说,充满了期待和挑战。他是首位来到四川眼科医院进修学习的马来西亚籍医生,Ivan Cheng将在周进院长的带领下&…

AzureDataFactory 实体间的关联如何处理(Lookup)

使用ADF从外部数据源(例如Sql Server)往D365推数时,实体间的Lookup一定是要做的,本篇以我项目中的设备为例,设备表中有产品的lookup字段 设备表结构如下 msdyn_customerasset 表名ID 设备表guidSerialNumber设备序列号ProductCode设备对应的…

docker部署简单的Kafka

文章目录 1. 拉取镜像2. 运行创建网络运行 ZooKeeper 容器运行 Kafka 容器 3. 简单的校验1. 检查容器状态2. 检查 ZooKeeper 日志3. 检查 Kafka 日志4. 使用 Kafka 命令行工具检查5. 创建和删除测试主题 1. 拉取镜像 选择一组兼容性好的版本。 docker pull bitnami/kafka:3.6…

2024“国培“来也UiBot6.0 RPA数字机器人开发综合应用

前言 (本博客中会有部分课程ppt截屏,如有侵权请及请及时与小北我取得联系~) 国培笔记: 依次读取数组中每个元素 输出调试信息 [ value=[ "vivian", value[0] "老师", "上午好,O(∩_∩)O哈哈~" ], v…

小红书 达芬奇:生活问答 AI 机器人

小红书去年 9 月开始内测的生活问答 AI 机器人:达芬奇,现在可以在小红书 APP 上用了 得益于小红书平台的特性,该助手擅长吃、住、宠、喝、学等等各类生活知识,目前还在搞活动,写评测笔记最高得 666 元

DMA学习笔记

参考文章 https://blog.csdn.net/as480133937/article/details/104927922 DMA简介 DMA,全称Direct Memory Access,即直接存储器访问。DMAC 即 DMA 控制器,提供了一种硬件的数据传输方式,无需 CPU 的介入,可以处理外…

VS Code 常用快捷键大全

Visual Studio Code 是目前最好用的代码编辑器之一。它提供了许多开箱即用的功能以及丰富的第三方扩展,本文将分享常用的 VS Code 快捷键,助你提高开发效率! 代码导航 跳转指定行:快速跳转到文件中的指定行,只需按下快…

从0开始transformer代码理解(附带调试和个人原理理解)

代码来源 本次代码来源自github https://github.com/graykode/nlp-tutorial 里面的5.1 transformer代码 本文目录 代码来源第一步 数据准备(从main函数开始)make_batch函数 **Transformer 主体函数定义代码**Encoder层词向量维度嵌入掩码部分实现多层e…

Linux运维之需掌握的基本Linux命令

前言:本博客仅作记录学习使用,部分图片出自网络,如有侵犯您的权益,请联系删除 目录 一、SHELL 二、执行命令 三、常用系统工作命令 四、系统状态检测命令 五、查找定位文件命令 六、文本文件编辑命令 七、文件目录管理命令…

echarts--Tree的label上添加图片

使用echarts的rich富文本,配合lable的formatter去实现 主要代码:label里 rich: {img1: {backgroundColor: {image: Cloudy,},height: 40}},formatter: function (param) {var res "";res {img1|} param.name;return res;}, 如果想要哪一节…

Python 生成Md文件带超链 和 PDF文件 带分页显示内容

software.md # -*- coding: utf-8 -*- import os f open("software.md", "w", encoding"utf-8") f.write(内部测试版2024 MD版\n) for root, dirs, files in os.walk(path): dax os.path.basename(root)if dax "":print("空白…

大模型补贴政策来了!!!

广州琶洲人工智能与数字经济试验区管理委员会 广州市海珠区科技工业商务和信息化局关于印发广州市海珠区建设人工智能大模型应用示范区实施细则的通知 各有关单位: 为进一步促进海珠区人工智能大模型产业发展,加快建设人工智能大模型应用示范区&#xf…

昇思MindSpore学习总结八——模型保存与加载

在训练网络模型的过程中,实际上我们希望保存中间和最后的结果,用于微调(fine-tune)和后续的模型推理与部署,接下来将介绍如何保存与加载模型。 1.构建模型 import numpy as np import mindspore from mindspore impo…

【RocketMQ】记录一次RocketMQ消费延迟问题排查思路

文章目录 背景问题排查Consumer负载均衡机制订阅关系的一致 背景 业务团队反馈使用我提供的RocketMQ集群,上游生产的消息,部分消息,消费程序需要等1分钟,甚至几分钟后,才能收到。 问题排查 见怪不怪,大部…