实战干货|Spark 在袋鼠云数栈的深度探索与实践

Spark 是一个快速、通用、可扩展的大数据计算引擎,具有高性能、易用、容错、可以与 Hadoop 生态无缝集成、社区活跃度高等优点。在实际使用中,具有广泛的应用场景:

· 数据清洗和预处理:在大数据分析场景下,数据通常需要进行清洗和预处理操作以确保数据质量和一致性,Spark 提供了丰富的 API,可以对数据进行清洗、过滤、转换等操作

· 批处理分析:Spark 适用于各种应用场景下的批处理任务,包括统计分析、数据挖掘、特征提取等,用户可以利用 Spark 强大的 API 和内置库进行复杂的数据处理和分析,从而挖掘数据中的内在价值

· 交互式查询:Spark 提供了支持 SQL 查询的 Spark SQL 模块,用户可以使用标准的 SQL 语句进行交互式查询和大规模数据分析

Spark 在袋鼠云的使用

在袋鼠云数栈离线开发平台,我们提供了三种使用 Spark 的方式:

● 创建 Spark SQL 任务

用户可以直接通过编写 SQL 的方式实现自己的业务逻辑。这种方式是目前数栈离线平台使用 Spark 最广泛的方式,也是最为推荐的一种方式。

● 创建 Spark Jar 任务

用户需要在 IDEA 上使用 Scala 或者 Java 语言实现业务逻辑,然后对该项目进行编译打包,并将得到的 Jar 包上传到离线平台,随后在创建 Spark Jar 任务的时候引用这个 Jar 包,最后将任务提交到调度运行即可。

对于使用 SQL 难以实现或表达的需求,或者用户有其他更深层次的需求,Spark Jar 任务无疑给用户提供了一种更为灵活的使用 Spark 的方式。

● 创建 PySpark 任务

用户可以直接编写对应的 Python 代码。在我们的客户群体中,有相当一部分客户,他们除了 SQL 之外,Python 可能是他们的主力语言。特别是针对有一定数据分析基础、算法基础的用户,他们往往会对处理好的数据进行更深层的分析,此时 PySpark 任务自然是他们的不二之选。

Spark 在袋鼠云数栈离线开发平台发挥着重要的作用,因此,我们内部对 Spark 做了也不少的优化,使客户在使用 Spark 提交任务时更加方便。我们还基于 Spark 做了一些工具来增强整个数栈离线开发平台的功能。

除此之外,在数据湖场景下,Spark 也发挥着相当重要的作用。在袋鼠云的湖仓一体模块中,已经支持了 Iceberg 和 Hudi 两大数据湖,用户可以使用 Spark 对湖表进行读写,湖表的治理底层也是通过使用 Spark 调用不同的存储过程实现。

下文就将从引擎侧和 Spark 本身两个方面来阐述袋鼠云内部所做的优化。

引擎端优化

袋鼠云内部引擎端的功能主要是用于任务提交、任务状态获取、任务日志获取、停止任务、语法校验等。每个功能点我们都做了不同程度的优化,下文通过两个例子进行简单介绍。

Spark on Yarn 提交速度提升

随着引擎端 Spark 插件上新功能的不断开发和完善,引擎端提交 Spark 任务所需的时间也在相应的增加,因此需要对提交 Spark 任务相关的代码进行优化,以缩短 Spark 任务提交的时长,提升用户体验。

为此,我们做了以下工作,对于一些公用的配置文件,如 core-site.xml、yarn-site.xml、keytab 文件、spark-sql-application.jar 等,原来每次提交任务都需要预先从服务器下载并提交这些配置文件。现在经过优化后,上述文件仅仅需要在客户端 SparkYarnClient 初始化的时候下载一次,然后上传到指定的 HDFS 路径,后续提交 Spark 任务只需要通过参数的方式指定到对应的 HDFS 路径即可。通过这种方式大大缩短了每次 Spark 任务的提交时间。

在新版本的数栈中,对于临时查询,我们还会根据自定义的规则判断待执行 SQL 的复杂度,将复杂度不高的 SQL 发送到引擎端启动的 SparkSQLEngine 运行,以加快运行速度。这个内部的 SparkSQLEngine 在以前仅仅用于语法校验,现在也承担了一部分 SQL 执行的功能,并且 SparkSQLEngine 还可以根据运行的整体情况,动态扩缩资源,实现资源的有效利用。

语法校验

在较老的数栈版本,对于 SQL 进行语法校验,引擎端会先把 SQL 发送到 Spark Thrift Server。这个 Spark Thrift Server 是以 local 模式部署,不仅仅需要用于语法校验,其他平台上所有元数据的获取都是通过发送 SQL 到这个 Spark Thrift Server 执行来获取。这种方式弊端较大,为此我们做了一些优化。在 Engine 端以 local 模式启动了一个 Spark 任务,在进行语法校验的时候不再将 SQL 发送到 Spark Thrift Server,而是内部维护了一个 SparkSession,直接对 SQL 进行语法校验。

这种方式虽然可以不需要再跟外部的 Spark Thrift server 强关联,但是会给调度组件带来一定的压力,在实现的过程中 Engine-Plugins 的整体复杂度也增大了不少。

为了优化以上问题,我们做了更进一步的优化,调度组件在启动的时候,提交了一个 Spark 任务 SparkSQLEngine 到 Yarn 上。可以理解为是一个远程的运行在 Yarn 上的 Spark Thrift Server,引擎端时刻监控这个 SparkSQLEngine 的健康状态。这样,每次执行语法校验的时候,引擎端将 SQL 通过 JDBC 的形式发送给 SparkSQLEngine 进行语法校验。

通过上述的优化,使得离线开发平台与 Spark Thrift Server 解耦合,EasyManager 不需要额外部署 Spark Thrift Server,使部署更轻量化。调度侧也不用维护一个 local 模式的 Spark 常驻进程。也为离线开发平台上 Spark SQL 任务交互式查询增强做铺垫。

离线开发平台与 EasyManager 部署的 Spark Thrift Server 解耦合后会有以下好处:

· 能够真正意义上的实现 Spark 多集群多版本共存

· EasyManager 标准部署可以去除 Spark Thrift Server,为一线运维减负

· Spark SQL 语法校验变得更轻量,不用缓存 SparkContext,减少 Engine 的资源占用

Spark 功能优化

随着业务的发展深入,我们发现开源的 Spark 在一些场景并没有对应的功能实现。因此我们在开源 Spark 的基础上开发了更多新的插件,以支持数栈更多的功能应用。

任务诊断

首先,我们对 Spark 的 metric sink 做了增强。Spark 内部提供了各种 Sink,除了 ConsoleSink 之外,还有 CSVSink、JmxSink、MetricsServlet、GraphiteSink、Slf4jSink、StatsdSink 等。在 Spark3.0 之后还新增了 PrometheusServlet,但这些还不能满足我们的需求。

在开发任务诊断功能的时候,我们需要通过把 Spark 内部的指标统一推送到 PushGateway,由 Prometheus Server 周期性的从 PushGateway 中拉取指标,最后通过调用 Prometheus 提供的查询接口可以近实时地查询到 Spark 内部的指标。

file

但是 Spark 并没有实现将内部指标 sink 到 PushGateway。因此我们新增了 spark-prometheus-sink 插件,并且自定义了 PrometheusPushGatewaySink 用于将 Spark 内部的指标 push 到 PushGateway。

file

除此之外,我们还自定义了一个新的指标用来描述 Spark SQL 临时查询展示任务执行进度。具体步骤如下:

· 通过自定义 JobProgressSource 来新增用于描述离线任务进度的指标,将该指标注册到 Spark 内部管理系统中的指标管理系统中

· 自定义 JobProgressListener,并将 JobProgressListener 注册到 Spark 内部管理系统中的 ListenerBus。其中,JobProgressListener 的 onJobStart 方法的逻辑是计算当前 Job 下所有的 Task 数量;onTaskEnd 方法的逻辑是在每个 Task 完成后计算并更新当前离线任务进度;onJobEnd 方法的逻辑是在每个 Job 完成后计算并更新当前离线任务进度

对接商业版 Hadoop 集群

随着袋鼠云客户越来越多,客户的环境也是各不相同。有的客户使用的是开源版本的 Hadoop 集群,也有相当一部分客户使用的是 HDP、CDH、CDP、TDH 等。我们在对接这些客户的集群的时候,开发侧往往需要进行新的适配,运维侧每次部署升级的时候也需要配置额外的参数或者有其他额外的操作。

以 HDP 为例,在对接 HDP 的时候,我们使用的 Spark 是 HDP 自带的 Spark2.3,并且我们还需要在运维侧新增一些参数,并将 HDP 自带的 Spark 的所有 Jar包 移动到指定目录。这些操作其实会给运维带来一定的困惑和麻烦,不同类型的集群,运维需要维护不同的运维文档,部署的过程也比较容易出错。并且我们其实对 Spark 的源码做了功能增强和 bug 的修复,如果使用的是 HDP 自带的 Spark,那么就享受不到我们内部维护的 Spark 带来的所有好处。

为了解决上面这些问题,我们内部的 Spark 对现有市场上已有的、常见的发行商都做了适配。换句话来说,我们内部的 Spark 可以在所有不同的 Hadoop 集群上运行。这样,无论对接哪一种类型的 Hadoop 集群,运维只需要部署同一个 Spark 即可,这大大减轻了运维部署的压力。更重要的是,客户可以直接使用我们内部的 Spark 稳定版本,享受到更多的新特性和更大的性能提升。

Spark3.2 新特性-AQE

较老的数栈版本中,默认的 Spark 版本是 2.1.3,后来我们将 Spark 的版本升级到 2.4.8,从数栈6.0开始,Spark3.2 也可以使用了。这里着重介绍一下 AQE,这也是 Spark3.x 中最重要的新特性。

AQE 概述

Spark3.2 之前,AQE 默认是关闭的,需要通过将 spark.sql.adaptive.enabled 设置为 true,才能开启 AQE。Spark3.2 之后,AQE 默认是开启的,任务在运行过程中只要满足 AQE 的触发条件,即可享受 AQE 带来的优化。

需要注意的是,AQE 的优化只会发生在 shuffle 阶段,如果 SQL 在运行过程中并没有涉及到 shuffle 操作,那么即使 spark.sql.adaptive.enabled 的值为 true,AQE 也不会发挥作用。更准确来说,只有物理执行计划包含 exchange 节点或者包含子查询,AQE 才会生效。

AQE 在运行期间,会收集 shuffle map 阶段所生成的中间文件的信息,并将这些信息进行统计,结合已有的规则动态的调整尚未执行的 Optimized Logical Plan 和 Spark Plan,从而对原来的 SQL 语句进行运行时优化。

file

从 Spark 源码来看,AQE 涉及到以下4个优化规则:

file

我们知道,RBO 是根据一系列的规则(rule)来对 SQL 进行优化,包括谓词下推、列剪枝、常量替换等。这些静态规则本身已经内置在 Spark 中,Spark 在执行 SQL 的过程中,这些 rule 会一一作用到 SQL 中。

AQE 的优势

CBO 这个特性是 Spark2.2 之后才有的,相比于 RBO,CBO 会结合表的统计信息,并根据这些统计信息和代价模型(Cost Model)选择出较为优化的执行计划。

但是,CBO 仅仅支持注册到 Hive Metastore 的表。对于存储在分布式文件系统的 parquet、orc 等文件,CBO 是不支持的。并且,如果 Hive 表缺少元数据信息,CBO 收集统计信息的时候就会收集不到,这可能会导致 CBO 失效。

CBO 的另外一个劣势在于 CBO 在优化之前需要先执行 ANALYZE TABLE COMPUTE STATISTICS 来收集统计信息。该语句在执行过程中如果碰到大表则会较为耗时,收集效率较低。

无论是 CBO 还是 RBO,它们都属于静态优化。在物理执行计划提交后,如果任务在运行过程中,数据量、数据分布情况发生变化,CBO 也不会对已有的物理执行计划进行优化。

与 CBO、RBO 不同的是,AQE 在运行过程中,会对 shuffle map 过程中所产生的中间文件进行分析,动态的调整并优化尚未开始执行的逻辑执行计划和物理执行计划,相对静态优化的 CBO 和 RBO 而言,AQE 的处理能得到更加优化的物理执行计划。

AQE 三大特性

● 自动分区合并

Shuffle 过程分为 Map 阶段和 Reduce 两个阶段,Reduce 阶段会将 Map 阶段产生的中间临时文件拉取到对应的 Executor 下,如果 Map 阶段所处理的数据分布非常不均匀,有很多 key 其实仅仅只有几条数据,数据经过处理后可能会形成比较多的小文件。

为了避免上述情况,可以开启 AQE 的自动分区合并功能,可以避免启动过多的 reduce task 去拉取 Map 阶段生成的小文件。

file

● 自动数据倾斜处理

应用场景主要在 Data Joins 中,当发生数据倾斜,AQE 能够自动检测到倾斜分区,并对倾斜分区按照一定的规则进行拆分。目前,在 Spark3.2 中,对 SortMergeJoin 和 ShuffleHashJoin 都支持自动数据倾斜处理。

● Join 策略调整

AQE 会动态的将 Hash Join、Sort Merge Join,降级调整为 Broadcast Join。

我们知道,Spark 任务一旦开始执行,并行度就已经确定。比如说,shuffle map 阶段,并行度为分区的个数;shuffle reduce 阶段并行度则为 spark.sql.shuffle.partitions 的值,默认为200。如果 Spark 任务在运行的过程中数据量变小导致大部分的分区的大小变小,这时如果仍然启动那么多的线程去处理小的数据集就会导致资源的浪费。

而 AQE 在执行过程会根据 shuffle 后生成的中间临时结果,在一定条件下,通过应用 CoalesceShufflePartitions 规则,结合用户提供的参数自动合并分区,其实就是调整 reducer 的数量。原来一个 reduce 线程只会拉取一个处理后的分区的数据,现在一个 reduce 线程会根据实际情况拉取更多的分区的数据,这样就能减少资源的浪费,提高任务执行效率。 《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057?src=szsm

《数栈产品白皮书》下载地址:https://www.dtstack.com/resources/1004?src=szsm

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001?src=szsm

想了解或咨询更多有关大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szcsdn

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/584472.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C/C++ 入门(9)编译链接

个人主页:仍有未知等待探索-CSDN博客 专题分栏:C 目录 一、域 1、分类 2、搜索顺序 二、编译链接 1、代码在形成可执行文件的过程 2、符号表 三、问题 1、带有缺省参数的函数声明和定义分离 一、域 1、分类 域:全局域、局部域、命…

CSS-IN-JS Emotion

为什么会有css-in-js 优点 缺点 使用emotion插件库 npm i emotion/core emotion/styled使用时需要解析css属性 使用方式一: 通过注释告诉babel不讲jsx转化为react.create Element的调用,而是转化为jsx语法。会导致一个警告react未使用。 使用方式二&am…

对虾病害分类数据集889张7类别

数据集类型:图像分类用,不可用于目标检测无标注文件 数据集格式:仅仅包含jpg图片,每个类别文件夹下面存放着对应图片 图片数量(jpg文件个数):889 分类类别数:7 类别名称:["baibanbing","bai…

Vitis HLS 学习笔记--Schedule Viewer 调度查看器

目录 1. 简介 2. Schedule Viewer详解 2.1 视图说明 2.1.1 Operation\Control Step 2.1.2 周期关系图 2.1.3 Schedule Viewer 菜单栏 2.1.4 属性视图 2.2 内容说明 2.2.1 实参(b)解释 2.2.2 实参(a)解释 2.2.3 变量&am…

TCP协议在物联网中的实战

一、TCP协议介绍 网上对TCP协议介绍众多,本人按照自己的理解简单介绍一下。 TCP(Transmission Control Protocol, 传输控制协议)是一种面向连接的、可靠的、基于字节流的传输控制层通信协议。 1.1 协议机制 1.1.1 三次握手 &…

MongoDB安装(windows)

mongodb 的安装(windows) 下载软件 官网下载:https://www.mongodb.com/ 安装 1.双击打开MSI包 2.同意协议 3.选择安装形式 complete:默认安装,不可以修改安装地址 custom:自定义安装【推荐】 4.选择安装路径和…

【c++leetcode】35. Search Insert Position

问题入口 二分搜索 时间复杂度O(logn) class Solution { public:int searchInsert(vector<int>& nums, int target) {int start 0;int end nums.size() - 1;while (start < end){int mid (start end) / 2;if (nums[mid] target){return mid;}else if(nums…

Axure如何调起浏览器的打印功能

Axure如何调起浏览器的打印功能 答&#xff1a;javascript:window.print(); 不明白的继续往下看 应用场景&#xff1a; 原型设计中&#xff0c;页面上的打印按钮&#xff0c;需要模拟操作演示&#xff0c;需要点击指定的按钮时&#xff0c;唤起浏览器的打印功能&#xff08…

QT httpServer多线程后台服务器的例子实现

1.需求 1.1 用户需要其他平台&#xff08;web端&#xff09;调用Qt平台的接口&#xff0c;获取想要的数据并实时显示在网页里&#xff0c;比如实时的温湿度&#xff0c;用户数据等 1.2 用户需要在其他平台&#xff08;web端&#xff09;调用Qt平台的接口&#xff0c;下发数据…

使用 GitHub Actions 实现项目的持续集成(CI)

目录 什么是 GitHub Actions 基础概念 Workflow 文件 Workflow 语法 实例&#xff1a;编译 OpenWrt 什么是 GitHub Actions GitHub Actions 是 GitHub 推出的持续集成&#xff08;Continuous Integration&#xff0c;简称 CI&#xff09;服务它允许你创建自定义工作流&am…

【webrtc】MessageHandler 3: 基于线程的消息处理:以sctp测试为例

消息处理可以用于模拟发包处理G:\CDN\rtcCli\m98\src\net\dcsctp\socket\dcsctp_socket_network_test.cc 这个实现中,onMessage还是仅对了一种消息进行处理,就是接收则模式下,打印带宽。当然,可能程序有多个消息,分别在不同的onmessage中执行?SctpActor:以一个恒定的速率…

【webrtc】MessageHandler 1: 基于线程的消息处理:以10毫秒处理音频为例

基于m98 G:\CDN\rtcCli\m98\src\audio\null_audio_poller.h分发的消息由MessageHandler 类通过其抽象接口OnMessage 实现处理 NullAudioPoller NullAudioPoller 是一个处理audio的消息的分发器 poll 启动:

Adobe Firefly 3.0 AI 图像生成器来了

Adobe 发布其 Midjourney 和 Dall-E 3 竞争对手Firefly 2.0已经半年了。几天前&#xff0c;他们发布了Firefly 3.0&#xff08;目前处于测试阶段&#xff09;&#xff0c;这是他们最新的文本到图像人工智能工具&#xff0c;其中包含一些非常酷的更新。在我们深入了解细节之前&a…

开发一个语音聊天社交app小程序H5需要多少钱?

社交&#xff0c;即时通讯APP系统。如何开发一个社交App||开发一个即时通信应用是一项复杂而充满挑战的任务&#xff0c;需要考虑多个技术、开发时间和功能方面的因素。以下是一个概要&#xff0c;描述了从技术、开发时间和功能角度如何开发这样的应用&#xff1a; 1. 技术要点…

Docker-compose的介绍与用法

Docker-compose Docker Compose 是一个开源的容器编排工具&#xff0c;由 Docker 官方开发。它允许开发者定义一个或多个 Docker 容器作为单个服务&#xff0c;并将这些服务组合成一个项目。这些定义被保存在一个 YAML 文件中&#xff0c;称为 docker-compose.yml。 使用 Dock…

低代码技术在构建质量管理系统中的应用与优势

引言 在当今快节奏的商业环境中&#xff0c;高效的质量管理系统对于组织的成功至关重要。质量管理系统帮助组织确保产品或服务符合客户的期望、符合法规标准&#xff0c;并持续改进以满足不断变化的需求。与此同时&#xff0c;随着技术的不断进步&#xff0c;低代码技术作为一…

Qt下使用7Z源码进行压缩和解压缩

7Z压缩是一款常用的压缩算法和工具&#xff0c;本文主要介绍一款在qt环境下进行编译的压缩方法。 本人测试是可以正常跑通的&#xff0c;具体代码部分请下载&#xff1a;下载链接&#xff0c;提取码&#xff1a;ev9t 7z源码网址&#xff1a;7-Zip 7z简介&#xff1a; 7z 是…

一文带你了解5款高效率软件,建议收藏

​ 人类与99%的动物之间最大差别在于是否会运用工具&#xff0c;借助好的工具&#xff0c;能提升几倍的工作效率。 1. 高速文件复制——TeraCopy ​ TeraCopy是一款高效的文件复制工具&#xff0c;可以大幅度提高文件复制和移动的速度。它支持多线程复制、错误恢复、校验和等…

2024-04学习笔记

1.sql优化-子查询改为外连接 1.改之前 改之前是这样&#xff0c;那针对查出来的每一条数据&#xff0c;都要执行一次箭头所指的函数 执行的sql很慢 2.改之后 改之后是这样&#xff0c;整体做外连接&#xff0c;不用每一条都再执行一次查询 执行时间缩短了好几倍 2.Mybatis中…

maven修改默认编码格式为UTF-8

执行mvn -version查看maven版本信息发现&#xff0c;maven使用的编码格式为GBK。 为什么想到要修改编码格式呢&#xff1f;因为idea中我将文件格式统一设置为UTF-8&#xff08;如果不知道如何修改文件编码&#xff0c;可以参考文末&#xff09;&#xff0c;然后使用maven打包时…