Spark Optimization —— Reducing Shuffle

Spark Optimization : Reducing Shuffle

“Shuffling is the only thing which Nature cannot undo.” — Arthur Eddington

在这里插入图片描述

Shuffle Shuffle Shuffle

I used to see people playing cards and using the word “Shuffle” even before I knew how to play it. Shuffling in cards, play a very critical role to distribute “power”, adding weightage to a player’s hand. It is nothing but adding the randomness in selection. When we want to distribute the cards for various games for example contract bridge shuffle is the way to create even/uneven distribution to 4 hands.

在这里插入图片描述

Sweet hand

Well, enough of playing cards!

Let us understand how shuffle impacts big data computation. Ah, yes I think again I will use card shuffle to explain you. 😀

在这里插入图片描述

Chaos! I love that!

Look at the above image and give me the answers of the below questions.

  • How many black cards are present? ♠️♣️
  • How many of the red cards have numbers greater than 4? ♥️♦️
  • How many high value cards(showing off my knowledge eh!) are left in clubs? ♣️

No need to explain that you will tell me, “Yes, I can give you answers but let me arrange them first.” Then you will do what is shown here ⬇️

在这里插入图片描述

Ta Da!

The Shuffle in Big Data World

To answer my questions you must do the arrangement to order cards of same packs together like the above image. That means you need to find all cards of same family one by one and them order then A to K or vice versa. This operation of moving cards(data) to seek and order is actually called Shuffle in big data world.

Imagine a situation when you are processing 1000s of GBs of data joining with similar magnitude and answering similar questions of different grains and groups. Yes, in distributed computing world exchanging data across machines, across networks creates so much exchange(I/O) that it slows down the computing process. Shuffle alone cause multiple stages in a big data job and delays the outcome.

How does shuffle work in Spark?

在这里插入图片描述

In Apache Spark, Shuffle describes the procedure in between reduce task and map task. Shuffling refers to the shuffle of data given. This operation is considered the costliest .The shuffle operation is implemented differently in Spark compared to Hadoop.

On the map side, each map task in Spark writes out a shuffle file (OS disk buffer) for every reducer — which corresponds to a logical block in Spark. These files are not intermediary in the sense that Spark does not merge them into larger partitioned ones. Since scheduling overhead in Spark is lesser, the number of mappers (M) and reducers(R) is far higher than in Hadoop. Thus, shipping M*R files to the respective reducers could result in significant overheads.

Similar to Hadoop, Spark also provide a parameter spark.shuffle.compress to specify compression libraries to compress map outputs. In this case, it could be Snappy (by default) or LZF. Snappy uses only 33KB of buffer for each opened file and significantly reduces risk of encountering out-of-memory errors.

On the reduce side, Spark requires all shuffled data to fit into memory of the corresponding reducer task. This would of course happen only in cases where the reducer task demands all shuffled data for a GroupByKey or a ReduceByKey operation, for instance. Spark throws an out-of-memory exception in this case, which has been quite a challenge because when spark spills over to disk it creates more problem of I/O and read slowness.

Also with Spark there is no overlapping copy phase, unlike Hadoop that has an overlapping copy phase where mappers push data to the reducers even before map is complete. This means that the shuffle is a pull operation in Spark, compared to a push operation in Hadoop. Each reducer should also maintain a network buffer to fetch map outputs. Size of this buffer is specified through the parameter spark.reducer.maxMbInFlight (by default, it is 48MB).

Tuning Spark to reduce shuffle

spark.sql.shuffle.partitions

The Spark SQL shuffle is a mechanism for redistributing or re-partitioning data so that the data is grouped differently across partitions. It is typically based on the volume of data you might have to reduce or increase the number of partitions of RDD/DataFrame using spark.sql.shuffle.partitions configuration or through code.

Using this configuration we can control the number of partitions of shuffle operations. By default, its value is 200. But, 200 partitions does not make any sense if we have files of few GB(s). So, we should change them according to the amount of data we need to process via Spark SQL.

Let’s see a practical difference. Here I am creating a small two small dataframes with the most popular employee, department with two employees Daniel and me.

在这里插入图片描述

The default value of spark.sql.shuffle.partitions is 200. Let us run with default and see how much time it takes.

在这里插入图片描述

Time taken : 6060 ms with spark.sql.shuffle.partitions = 200

在这里插入图片描述

Now, if we do some modification with the config as we don’t need 200 shuffle partitions for this such small amount of data if can be done faster. Here I am setting it to 2.

在这里插入图片描述

Time taken : 398 ms with spark.sql.shuffle.partitions = 2

在这里插入图片描述
So, you can see tweaking the shuffle partition alone made it 15 times faster.

Reduce dataSet size

The classic rule of ETL. Filter as much as data near to the source is much important in spark as well. If you are dealing with lot of data, which has very fine grained aggregates and joins, it is pretty obvious there would be shuffles. It is always essential to control number of records before you start joins/aggregates so that data volume gets reduced by some %. Use appropriate filter predicates in your SQL query so Spark can push them down to the underlying datasource. Selective predicates are good. Use them as appropriate. Use partition filters if they are applicable.

Broadcast Broadcast Broadcast

When you join two datasets, one large and one small the best option in Spark is to perform a broadcast join (map-side join). With broadcast join, you can very effectively join a large table (fact) with relatively small tables (dimensions) by avoiding sending all data of the large table over the network.

You can use broadcast function to mark a dataset to be broadcasted when used in a join operator. It uses spark.sql.autoBroadcastJoinThreshold setting to control the size of a table that will be broadcast to all worker nodes when performing a join.

在这里插入图片描述

SELECT /*+ BROADCAST(B) */ * FROM TableA A INNER JOIN TableB B ON A.key = B.key;

This technique will broadcast the entire table B to all the executors and will help spark to avoid shuffle. The joins will will be local to all executors and thus it won’t be needed any data to come across machines and there won’t be any shuffle.

在这里插入图片描述

More Shuffles vs Lesser Shuffles

Some times we encounter situations where we are joining multiple datasets but based on different keys. For example, let’s check the sqls below.

SELECT * FROM TableA A INNER JOIN TableB B ON A.key1 = B.key1;
SELECT * FROM TableB B INNER JOIN TableC C ON B.key2 = C.key2;

It is evident that if we consider that while we read A and B it may or may not be partitioned to support the second join that means if we try to execute the joins without any such optimisation technique it might cause more shuffles. Key1 and Key2 across executors will not be evenly distributed. So in such cases we prefer to do repartition B or C accordingly. Repartition can be done on a column with a number specified or we can just do it with a random number which is suitable and comparable with the number of executor and core combination.

SELECT /*+ REPARTITION(key2)*/ * FROM TableB B;

There are many other techniques to overcome shuffles which you will come across as much you start dealing with production level problems. I think the above ones are definitely the most important to start with.

you start dealing with production level problems. I think the above ones are definitely the most important to start with.

For any type of help regarding career counselling, resume building, discussing designs or know more about latest data engineering trends and technologies reach out to me at anigos.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/926954.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据结构 (22)哈夫曼树及其应用

前言 哈夫曼树(Huffman Tree),又称最优二叉树或最优树,是一种特殊的二叉树结构,其带权路径长度(WPL)最短。 一、哈夫曼树的基本概念 定义:给定N个权值作为N个叶子结点,构…

Android Studio安装TalkX AI编程助手

文章目录 TalkX简介编程场景 TalkX安装TalkX编程使用ai编程助手相关文章 TalkX简介 TalkX是一款将OpenAI的GPT 3.5/4模型集成到IDE的AI编程插件。它免费提供特定场景的AI编程指导,帮助开发人员提高工作效率约38%,甚至在解决编程问题的效率上提升超过2倍…

泷羽sec-shell脚本(全) 学习笔记

声明! 学习视频来自B站up主 **泷羽sec** 有兴趣的师傅可以关注一下,如涉及侵权马上删除文章,笔记只是方便各位师傅的学习和探讨,文章所提到的网站以及内容,只做学习交流,其他均与本人以及泷羽sec团队无关&a…

开发者如何使用GCC提升开发效率IMG操作

看此篇前请先阅读https://blog.csdn.net/qq_20330595/article/details/144134160?spm1001.2014.3001.5502 stb_image库配置 https://github.com/nothings/stb 代码 #define STB_IMAGE_IMPLEMENTATION #include "stb_image.h" #define STB_IMAGE_WRITE_IMPLEM…

vue3【实战】面包屑【组件封装】Breadcrumb (根据菜单自动生成,实时响应路由变化,添加顺滑的过渡动画)

效果预览 技术方案 vue3 ( vite | TS | vueUse | AutoImport ) Element Plus UnoCSS 技术要点 根据当前路由查询所有父级路由 /*** 从树状列表中获取指定节点的所有父节点** param treeList 树状列表,包含多个节点* param value 目标节点的路径值* param parents…

pdf也算是矢量图——pdf大小调整--福昕pdf

有时候需要把pdf作为矢量图放到latex论文中,有时候需要裁剪掉空白的部分,就需要用福昕pdf进行编辑, 参考文章:福昕高级PDF编辑器裁切工具怎么用?裁切工具使用方法介绍_福昕PDF软件工具集 (foxitsoftware.cn)

【k8s】kubelet 的相关证书

在 Kubernetes 集群中,kubelet 使用的证书通常存放在节点上的特定目录。这些证书用于 kubelet 与 API 服务器之间的安全通信。具体的位置可能会根据你的 Kubernetes 安装方式和配置有所不同,下图是我自己环境【通过 kubeadm 安装的集群】中的kubelet的证…

Java项目Docker部署

docker将应用程序与该程序的依赖打包在一个文件里。运行这个文件就会生成一个虚拟容器,就不用担心环境问题,还可以进行版本管理、复制修改等。 docker安装 由于在CentOS下安装docker最常用,所以以Linux环境安装为主 1.安装工具包 缺少依赖…

【数据结构与算法】排序算法(上)——插入排序与选择排序

文章目录 一、常见的排序算法二、插入排序2.1、直接插入排序2.2、希尔排序( 缩小增量排序 ) 三、选择排序3.1、直接选择排序3.2、堆排序3.2.1、堆排序的代码实现 一、常见的排序算法 常见排序算法中有四大排序算法,第一是插入排序,二是选择排序&#xff…

Flink四大基石之Time (时间语义) 的使用详解

目录 一、引言 二、Time 的分类及 EventTime 的重要性 Time 分类详述 EventTime 重要性凸显 三、Watermark 机制详解 核心原理 Watermark能解决什么问题,如何解决的? Watermark图解原理 举例 总结 多并行度的水印触发 Watermark代码演示 需求 代码演示&#xff…

虚拟机docker记录

最近看了一个up的这个视频,感觉docker真的挺不错的,遂也想来搞一下: https://www.bilibili.com/video/BV1QC4y1A7Xi/?spm_id_from333.337.search-card.all.click&vd_sourcef5fd730321bc0e9ca497d98869046942 这里我用的是vmware安装ubu…

[ACTF2020 新生赛]BackupFile--详细解析

信息搜集 让我们寻找源文件,目录扫描: 找到了/index.php.bak文件,也就是index.php的备份文件。 后缀名是.bak的文件是备份文件,是文件格式的扩展名。 我们访问这个路径,就会直接下载该备份文件。 我们把.bak后缀删掉…

AD单通道AD多通道

AD单通道接线图 滑动变阻器的内部结构 左边和右边的两个引脚接的是电阻的两个固定端,中间这个引脚接的是滑动抽头,电位器外边这里有个十字形状的槽可以拧,往左拧,抽头就往左靠,往右拧,抽头就往右靠。所以外…

解决Ubuntu DNS覆盖写入127.0.0.53

ubuntu22.04解析网址时报错如图所示: 因为/etc/resolve.conf中存在 nameserver 127.0.0.53回环地址造成循环引用 原因: ubuntu17.0之后特有,systemd-resolvd服务会一直覆盖 解决方法: 1、修改resolv.config文件中的nameserver…

hint: Updates were rejected because the tip of your current branch is behind!

问题 本地仓库往远段仓库推代码时候提示: error: failed to push some refs to 192.168.2.1:java-base/java-cloud.git hint: Updates were rejected because the tip of your current branch is behind! refs/heads/master:refs/heads/master [rejected] (…

[golang][MAC]Go环境搭建+VsCode配置

一、go环境搭建 1.1 安装SDK 1、下载go官方SDK 官方:go 官方地址 中文:go 中文社区 根据你的设备下载对应的安装包: 2、打开压缩包,根据引导一路下一步安装。 3、检测安装是否完成打开终端,输入: go ve…

关于VNC连接时自动断联的问题

在服务器端打开VNC Server的选项设置对话框,点左边的“Expert”(专家),然后找到“IdleTimeout”,将数值设置为0,点OK关闭对话框。搞定。 注意,服务端有两个vnc服务,这俩都要设置ide timeout为0才行 附件是v…

AIGC时代 | 如何从零开始学网页设计及3D编程

文章目录 一、网页设计入门1. 基础知识2. 学习平台与资源3. 示例代码:简单的HTMLCSSJavaScript网页 二、3D编程入门1. 基础知识2. 学习平台与资源3. 示例代码:简单的Unity 3D游戏 《编程真好玩:从零开始学网页设计及3D编程》内容简介作者简介…

virtualbox给Ubuntu22创建共享文件夹

1.在windows上的操作,创建共享文件夹Share 2.Ubuntu22上的操作,创建共享文件夹LinuxShare 3.在virtualbox虚拟机设置里,设置共享文件夹 共享文件夹路径:选择Windows系统中你需要共享的文件夹 共享文件夹名称:挂载至wi…

C#窗体简单登录

创建一个Windows登录程序,创建两个窗体,一个用来登录,一个为欢迎窗体,要求输入用户名和密码(以个人的姓名和学号分别作为用户名和密码),点击【登录】按钮登录,登录成功后显示欢迎窗体…