Spark 离线开发框架设计与实现

一、背景

随着 Spark 以及其社区的不断发展,Spark 本身技术也在不断成熟,Spark 在技术架构和性能上的优势越来越明显,目前大多数公司在大数据处理中都倾向使用 Spark。Spark 支持多种语言的开发,如 Scala、Java、Sql、Python 等。

Spark SQL 使用标准的数据连接,与 Hive 兼容,易与其它语言 API 整合,表达清晰、简单易上手、学习成本低,是开发者开发简单数据处理的首选语言,但对于复杂的数据处理、数据分析的开发,使用 SQL 开发显得力不从心,维护成本也非常高,使用高级语言处理会更高效。

在日常的数据仓库开发工作中,我们除了开发工作外,也涉及大量的数据回溯任务。对于创新型业务来说,口径变化频繁、业务迅速迭代,数据仓库的回溯非常常见,通过回溯几个月甚至一年是非常普遍的,但传统的回溯任务方式效率极低,而且需要人力密切关注各任务状态。

针对目前现状,我们开发了一套 Spark 离线开发框架,如下表所示,我们例举了目前存在的问题及解决方案。框架的实现不仅让开发变得简单高效,而且对于数据的回溯工作在不需要任何开发的情况下,快速高效地完成大量的回溯工作。

二、框架设计

框架旨在封装重复的工作,让开发变得简单。框架如图 2-1 所示,主要分为三个部分,基础框架、可扩展工具及应用程序,开发者只需关注应用程序即可简单快速实现代码开发。


2.1 基础框架

基础框架中,我们对于所有类型的应用实现代码与配置分离机制,资源配置统一以 XML 文件形式保存并由框架解析处理。框架会根据开发者配置的任务使用资源大小,完成了 SparkSession、SparkContext、SparkConf 的创建,同时加载了常用环境变量,开发了通用的 UDF 函数(如常用的 url 参数解析等)。其中 Application 为所有应用的父类,处理流程如图所示,开发者只需编写关注绿色部分即可。

目前,离线框架所支持的常用环境变量如下表所示。


2.2 可扩展工具

可扩展工具中包含了大量的工具类,服务于应用程序及基础框架,常用有,配置文件解析类,如解析任务资源参数等;数据库工具类,用于读写数据库;日期工具类,用于日期加减、转换、识别并解析环境变量等。服务于应用程序的通用工具模块可统称为可扩展工具,这里不再赘述。

2.3 应用程序

2.3.1 SQL 应用

对于 SQL 应用,只需要创建 SQL 代码及资源配置即可,应用类为唯一类(已实现),有且只有一个,供所有 SQL 应用使用,开发者无需关心。如下配置所示,class 为所有应用的唯一类名,开发者要关心的是 path 中的 sql 代码及 conf 中该 sql 所使用的资源大小。

<?xml version="1.0" encoding="UTF-8"?>
<project name="test">
    <class>com.way.app.instance.SqlExecutor</class>
    <path>sql文件路径</path>
  <!--    sparksession conf   -->
    <conf>
        <spark.executor.memory>1G</spark.executor.memory>
        <spark.executor.cores>2</spark.executor.cores>
        <spark.driver.memory>1G</spark.driver.memory>
        <spark.executor.instances>20</spark.executor.instances>
    </conf>
</project>

2.3.2 Java 应用

对于复杂的数据处理,SQL 代码不能满足需求时,我们也支持 Java 程序的编写,与 SQL 不同的是,开发者需要创建新的应用类,继承 Application 父类并实现 run 方法即可,run 方法中开发者只需要关注数据的处理逻辑,对于通用的 SparkSession、SparkContext 等创建及关闭无需关注,框架还帮助开发者封装了代码的输入、输出逻辑,对于输入类型,框架支持 HDFS 文件输入、SQL 输入等多种输入类型,开发者只需调用相关处理函数即可。

如下为一个简单的 Java 数据处理应用,从配置文件可以看出,仍需配置资源大小,但与 SQL 不同的是,开发者需要定制化编写对应的 Java 类(class 参数),以及应用的输入(input 参数)和输出参数(output 参数),此应用中输入为 SQL 代码,输出为 HDFS 文件。从 Test 类实现可以看出,开发者只需三步走:获取输入数据、逻辑处理、结果输出,即可完成代码编写。

<?xml version="1.0" encoding="UTF-8"?>
<project name="ecommerce_dwd_hanwuji_click_incr_day_domain">
    <class>com.way.app.instance.ecommerce.Test</class>
    <input>
        <type>table</type>
        <sql>select
            clk_url,
            clk_num
            from test_table
            where event_day='{DATE}'
            and click_pv > 0
            and is_ubs_spam=0
        </sql>
    </input>
    <output>
        <type>afs_kp</type>
        <path>test/event_day={DATE}</path>
    </output>
    <conf>
        <spark.executor.memory>2G</spark.executor.memory>
        <spark.executor.cores>2</spark.executor.cores>
        <spark.driver.memory>2G</spark.driver.memory>
        <spark.executor.instances>10</spark.executor.instances>
    </conf>
</project>
package com.way.app.instance.ecommerce;import com.way.app.Application;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.sql.Row;import java.util.Map;import org.apache.spark.api.java.function.FilterFunction;import org.apache.spark.sql.Dataset;public class Test extends Application {    @Override    public void run() {        // 输入        Map<String, String> input = (Map<String, String>) property.get("input");        Dataset<Row> ds = sparkSession.sql(getInput(input)).toDF("url", "num");        // 逻辑处理(简单的筛选出url带有部分站点的日志)        JavaRDD<String> outRdd = ds.filter((FilterFunction<Row>) row -> {            String url = row.getAs("url").toString();            return url.contains(".jd.com")                    || url.contains(".suning.com")                    || url.contains("pin.suning.com")                      || url.contains(".taobao.com")                    || url.contains("detail.tmall.hk")                    || url.contains(".amazon.cn")                    || url.contains(".kongfz.com")                    || url.contains(".gome.com.cn")                    || url.contains(".kaola.com")                    || url.contains(".dangdang.com")                    || url.contains("aisite.wejianzhan.com")                    || url.contains("w.weipaitang.com");        })                .toJavaRDD()                .map(row -> row.mkString("\001"));        // 输出        Map<String, String> output = (Map<String, String>) property.get("output");        outRdd.saveAsTextFile(getOutPut(output));    }}

2.3.3 数据回溯应用

数据回溯应用是为解决快速回溯、释放人力而研发的,使用非常便捷,开发者无需重构任务代码,与 SQL 应用相同,回溯应用类为唯一类(已实现),有且只有一个,供所有回溯任务使用,且支持多种回溯方案。

2.3.3.1 方案设计

在日常回溯过程中发现,一次回溯任务存在严重的时间浪费,无论以何种方式提交任务,都需要经历以下执行环境申请及准备的过程:

  1. 在 client 提交 application,首先 client 向 RS 申请启动 ApplicationMaster

  2. RS 先随机找到一台 NodeManager 启动 ApplicationMaster

  3. ApplicationMaster 向 RS 申请启动 Executor 的资源

  4. RS 返回一批资源给 ApplicationMaster

  5. ApplicationMaster 连接 Executor

  6. 各个 Executor 反向注册给 ApplicationMaster

  7. ApplicationMaster 发送 task、监控 task 执行,回收结果

这个过程占用的时间我们统称为执行环境准备,我们提交任务后,经历如下三个过程:

  1. 执行环境准备

  2. 开始执行代码

  3. 释放资源

执行环境准备通常会有 5-20 分钟的等待时间,以队列当时的资源情况上下波动,失败率为 10% 左右,失败原因由于队列、网络、资源不足等造成的不可抗力因素;代码执行过程通常失败率在 5% 左右,通常由于节点不稳定、网络等因素导致。离线开发框架回溯应用从节省时间和人力两个方面考虑,设计方案图 2-3 所示。

从回溯时间方面来看:将所有回溯子任务的第一、第三步的时间压缩为一次,即环境准备及释放各一次,执行多次回溯代码。若开发者回溯任务为 30 个子任务,则节省的时间为 5-20 分钟乘 29,可见,回溯子任务越多,回溯提效越明显。

从人工介入方面来看,第一,开发者无需额外开发、添加回溯配置即可。第二,离线框架回溯应用启动的任务数量远远小于传统回溯方案,以图 2-3 为例,该回溯任务为串行回溯方式,使用框架后只需关注一个任务的执行状态,而传统方式则需人工维护 N 个任务的执行状态。

最后,我们在使用离线开发框架回溯一个一年的串行任务中,代码的执行只需要 5 分钟左右,我们发现,不使用离线开发框架回溯的任务在最理想的情况下(即最短时间分配到资源、所有子任务均无失败情况、一次可以串行启动 365 天),需要的时间为 2.5 天,但使用离线开发框架回溯的任务,在最坏的情况下(即最长时间分配到资源,任务失败情况出现 10%),只需要 6 个小时就可完成,提效 90% 以上,且基本无需人力关注。

2.3.3.2 功能介绍

断点续回

使用 Spark 计算,我们在享受其计算带来的飞快速度时,难免会遭遇其中的不稳定性,节点宕机、网络连接失败、资源问题带来的任务失败屡见不鲜,回溯任务动辄几个月、甚至一年,任务量巨大,失败后可以继续从断点处回溯显得尤为重要。在离线框架设计中,记录了任务回溯过程中已成功的部分,任务失败重启后会进行断点续回。

回溯顺序

在回溯任务中,通常我们会根据业务需要确定回溯顺序,如对于有新老用户的增量数据,由于当前的日期数据依赖历史数据,所以我们通常会从历史到现在开始回溯。但没有这种需要时,一般来说,先回溯现在可以快速满足业务方对现在数据指标的了解,我们通常会从现在到历史回溯。在离线框架设计中,开发者可根据业务需要选择回溯顺序。

并行回溯

通常,回溯任务优先级低于例行任务,在资源有限的情况下,回溯过程中不能一次性全部开启,以免占用大量资源影响例行任务,所以离线框架默认为串行回溯。当然在资源充分的时间段,我们可以选择适当的并行回溯。离线开发框架支持一定的并发度,开发者在回溯任务时游刃有余。

2.3.3.3 创建一个回溯任务

回溯应用的使用非常方便,开发者无需新开发代码,使用例行的代码,配置回溯方案即可,如下代码所示,

  • class 参数为回溯应用的唯一类,必填参数,所有回溯任务无需变化。

  • type 参数为回溯应用类型,默认为 sql,若应用类型为 java,则 type 值应为 java 类名。

  • path 参数为回溯代码路径,必填参数,无默认值,通常与例行任务代码相同,无需修改。

  • limitdate 参数为回溯的截止日期,必填参数,无默认值。

  • startdate 参数为回溯开始日期,必填参数,无默认值,若任务进入断点续回或开启并行回溯时,则该参数无效。

  • order 参数为回溯顺序,默认为倒序。当值为 1 时为正序,为值为 - 1 时为倒序。

  • distance 参数为回溯步长,框架默认为串行回溯,但也支持并行回溯,该参数主要用于支持并行回溯,当该参数存在且值不为 - 1 时,回溯开始日期取值为基准日期。如启动两个并行任务,任务的执行范围为基准日期至基准日期加步长或 limitdate,若基准日期加步长后日期大于 limitdate,则是取 limitdate,否则反之。

  • file 参数为回溯日志文件,必填参数,无默认值,用于记录已回溯成功的日期,当失败再次重启任务时,startdate 会以日志文件中日期的下一个日期为准。

  • conf 参数与其他应用相同,为本次回溯任务的资源占用配置。

<?xml version="1.0" encoding="UTF-8"?><project name="ecommerce_ads_others_order_retain_incr_day">    <class>com.way.app.instance.ecommerce.Huisu</class>    <type>sql</type>    <path>/sql/ecommerce/ecommerce_ads_others_order_retain_incr_day.sql</path>    <limitdate>20220404</limitdate>    <startdate>20210101</startdate>    <order>1</order>    <distance>-1</distance>    <file>/user/ecommerce_ads_others_order_retain_incr_day/process</file>    <conf>        <spark.executor.memory>1G</spark.executor.memory>        <spark.executor.cores>2</spark.executor.cores>        <spark.executor.instances>30</spark.executor.instances>        <spark.yarn.maxAppAttempts>1</spark.yarn.maxAppAttempts>    </conf></project>
‍

三、使用方式

3.1 使用介绍

使用离线框架方式开发时,开发者只需重点关注数据逻辑处理部分,开发完成打包后,提交执行,对于每一个应用主类相同,如前文所述为 Application 父类,不随应用变化,唯一变化的是父类需要接收的参数,该参数为应用的配置文件的相对路径。

3.2 使用对比

使用离线框架前后对比图如下所示。


四、展望

目前,离线开发框架仅支持 SQL、Java 语言代码的开发,但 Spark 支持的语言远不止这两种,我们需要继续对框架升级支持多语言开发等,让开发者更方便、快速地进行大数据开发。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/399801.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

创业者看到这3000多箱磁吸春联滞销面临销毁一定要吸取教训!

2月18日&#xff0c;浙江金华一个工厂&#x1f3ed;3000多箱龙年磁吸春联&#xff0c; 因为滞销&#xff0c;加上春联中含有龙元素和日期而面临报废销毁&#xff0c; 造成了数十万的损失以及大量的资源浪费。 —————————— 而引起广泛的社会讨论&#x1f5e3;️&…

《Solidity 简易速速上手小册》第8章:高级 Solidity 概念(2024 最新版)

文章目录 8.1 高级数据类型和结构8.1.1 基础知识解析更深入的理解实际操作技巧 8.1.2 重点案例&#xff1a;构建一个去中心化身份系统案例 Demo&#xff1a;创建去中心化身份系统案例代码DecentralizedIdentityContract.sol 测试和验证拓展案例 8.1.3 拓展案例 1&#xff1a;管…

C++力扣题目 121--买卖股票的最佳时机 122-- 买卖股票的最佳时机II 123--买卖股票的最佳时机III 188--买卖股票的最佳时机IV

121. 买卖股票的最佳时机 力扣题目链接(opens new window) 给定一个数组 prices &#xff0c;它的第 i 个元素 prices[i] 表示一支给定股票第 i 天的价格。 你只能选择 某一天 买入这只股票&#xff0c;并选择在 未来的某一个不同的日子 卖出该股票。设计一个算法来计算你所…

基于FPGA的二维DCT变换和逆变换verilog实现,包含testbench

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 数据导入到matlab显示图像 2.算法运行软件版本 vivado2019.2 matlab2022a 3.部分核心程序 timescale 1ns / 1ps // // Company: // Engineer:…

阿里云国际-在阿里云服务器上快速搭建幻兽帕鲁多人服务器

幻兽帕鲁是最近流行的新型生存游戏。该游戏一夜之间变得极为流行&#xff0c;同时在线玩家数量达到了200万。然而&#xff0c;幻兽帕鲁的服务器难以应对大量玩家的压力。为解决这一问题&#xff0c;幻兽帕鲁允许玩家建立专用服务器&#xff0c;其提供以下优势&#xff1a; &am…

如何在Ubuntu部署Emlog,并将本地博客发布至公网可远程访问

文章目录 前言1. 网站搭建1.1 Emolog网页下载和安装1.2 网页测试1.3 cpolar的安装和注册 2. 本地网页发布2.1 Cpolar临时数据隧道2.2.Cpolar稳定隧道&#xff08;云端设置&#xff09;2.3.Cpolar稳定隧道&#xff08;本地设置&#xff09; 3. 公网访问测试总结 前言 博客作为使…

使用Docker部署Docker-Compose-Ui工具并实现公网访问

文章目录 1. 安装Docker2. 检查本地docker环境3. 安装cpolar内网穿透4. 使用固定二级子域名地址远程访问 Docker Compose UI是Docker Compose的web界面。这个项目的目标是在Docker Compose之上提供一个最小的HTTP API&#xff0c;同时保持与Docker Compose CLI的完全互操作性。…

十大顶级电脑分区恢复软件,不用重装系统直接分区磁盘

与分区相关的问题总是令人不愉快&#xff0c;但解决它们并不像看起来那么困难。您只需要使用可用的最佳分区恢复软件&#xff0c;例如本文列出的 10 种解决方案。配备功能强大的分区数据恢复软件&#xff0c;无论分区损坏有多严重&#xff0c;您都应该能够立即恢复数据。 我们如…

【git 使用】使用 git rebase -i 修改任意的提交信息/合并多个提交

修改最近一次的提交信息的方法有很多&#xff0c;可以参考这篇文章&#xff0c;但是对于之前的提交信息进行修改只能使用 rebase。 修改提交信息 假设我们想修改下面这个提交信息&#xff0c;想把【登录】改成【退出登录】步骤如下 运行 git rebase -i head~3 打开了一个文本…

数据结构与算法:栈

朋友们大家好啊&#xff0c;在链表的讲解过后&#xff0c;我们本节内容来介绍一个特殊的线性表&#xff1a;栈&#xff0c;在讲解后也会以例题来加深对本节内容的理解 栈 栈的介绍栈进出栈的变化形式 栈的顺序存储结构的有关操作栈的结构定义与初始化压栈操作出栈操作获取栈顶元…

Fluter学习3 - Dart 空安全

Dart 空安全&#xff1a; 空类型操作符 (?)空值合并操作符 (??)空值断言操作符 (!)延迟初始化 (late) 1、空类型操作符 (?) 当你想要根据一个表达式是否为 null 来执行某个操作时&#xff0c;你可以使用 (?)语法&#xff1a;expression1?.expression2如果 expression1…

关于开放系统互联的一些笔记

最近几天就发几篇计算机方面的基础知识 属于个人归纳整理&#xff0c;便于理解希望对大家有帮助 原文地址&#xff1a;关于开放系统互联的一些笔记 - Pleasure的博客 下面是正文内容&#xff1a; 前言 最近在恶补一些计算机方面的基础知识…… 正文 首先为了能够更透彻的理…

c语言结构体与共用体

前面我们介绍了基本的数据类型 在c语言中 有一种特殊的数据类型 由程序员来定义类型 目录 一结构体 1.1概述 1.2定义结构体 1.3 结构体变量的初始化 1.4 访问结构体的成员 1.5结构体作为函数的参数 1.6指向结构的指针 1.7结构体大小的计算 二共用体 2.1概述 2.2 访…

智慧安防/视频监控汇聚平台EasyCVR如何通过接口调用获取设备录像回看的流地址?

视频云存储/视频融合/安防监控EasyCVR视频汇聚系统可兼容各品牌的IPC、NVR、移动单兵、智能手持终端、移动执法仪、无人机、布控球等设备的接入&#xff0c;支持的接入协议包括&#xff1a;国标GB28181、RTSP/Onvif、RTMP&#xff0c;以及厂家的私有协议与SDK&#xff0c;如&am…

探索在GIS中使用ChatGPT

在创建了一个简单的点击询问 ChatGPT GIS 应用程序之后&#xff0c;我一直在努力想出关于如何在 GIS 应用程序中使用 ChatGPT 和 OpenAI 的更好的主意。后来想到只需要要问问 ChatGPT “如何使用它”&#xff0c;下面是对其中的几个实例。 简单的点击询问应用程序 地理输入和输…

最大划水收益

解法&#xff1a; 双指针、贪心 #include <iostream> #include <vector> using namespace std; #define endl \nint main() {ios::sync_with_stdio(false);cin.tie(0); cout.tie(0);int n, k;cin >> n >> k;vector<int> vec(n, 0);for (int i …

你所在的行业,有必要做小程序么?

引言 在当今数字化飞速发展的时代&#xff0c;企业和行业正面临着不断变化的市场环境。随着移动互联网的崛起&#xff0c;小程序作为一种轻量级、便捷的应用形式&#xff0c;逐渐成为各行各业提升服务效率、拓展市场份额的重要工具。对于你所在的行业&#xff0c;究竟是否有必…

【深度优先搜索】【树】【状态压缩】2791. 树中可以形成回文的路径数

作者推荐 【深度优先搜索】【树】【有向图】【推荐】685. 冗余连接 II 本文涉及知识点 深度优先搜索 树 图论 状态压缩 LeetCode:2791. 树中可以形成回文的路径数 给你一棵 树&#xff08;即&#xff0c;一个连通、无向且无环的图&#xff09;&#xff0c;根 节点为 0 &am…

Javascript怎么输出内容?两种常见方式以及控制台介绍

javascript是一种非常重要的编程语言&#xff0c;在许多网页中它被广泛使用&#xff0c;可以实现许多交互效果和动态效果。输出是javascript中最基本的操作之一&#xff0c;下面将介绍两种常见的输出方式。 一、使用console.log()函数输出 console.log()函数是常用的输出函数…

【牛客面试必刷TOP101】Day24.BM34 判断是不是二叉搜索树和BM35 判断是不是完全二叉树

作者简介&#xff1a;大家好&#xff0c;我是未央&#xff1b; 博客首页&#xff1a;未央.303 系列专栏&#xff1a;牛客面试必刷TOP101 每日一句&#xff1a;人的一生&#xff0c;可以有所作为的时机只有一次&#xff0c;那就是现在&#xff01;&#xff01;&#xff01;&…