Spark项目通用开发框架

文章目录

  • 1. 大数据项目结构
  • 2. 类说明
    • 2.1 公共接口类
    • 2.2 TaskNameEnum指定每个任务的名称
    • 2.3 TaskRunner中编写任务的业务逻辑
  • 3. 任务执行脚本

每个公司内部都有一套自己的架子,一般新人来了就直接在已有的架子上开发业务。
以下仅仅作为记录下自己使用的架子,不作为任何推荐,也不认为这样的组织结构就是好用的。

1. 大数据项目结构

项目的整体组织结构
在这里插入图片描述


目录说明
annotation自定义注解Runner和Task。
app用来放整个项目的各个任务。
test1和test2是具体开发的业务任务。
baseBaseRunner和BaseTask是两个基础类
enums用来定义任务的别名
FeatureContextApp主类在目录中的位置保持不变,如果移动,会影响扫描task和Runner

2. 类说明

2.1 公共接口类

package com.king.ml.base

import com.king.ml.enums.TaskNameEnum
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.joda.time.DateTime
import org.springframework.util.StopWatch

import scala.util.{Failure, Success, Try}

trait BaseTask extends Logging with Serializable {

  def taskName: TaskNameEnum.Value
  def initConf(sparkConf: SparkConf = new SparkConf()): SparkConf = sparkConf
  var runtime: StopWatch = _


  def around(implicit spark: SparkSession, currDate: DateTime = DateTime.now): Unit = {
    before
    Try {
      Class.forName(spark.conf.get("task.runner"))
        .newInstance()
        .asInstanceOf[BaseRunner]
        .run
    } match {
      case Success(_) => after
      case Failure(_) => afterThrowException
    }

  }


  private def before(implicit spark: SparkSession, currDate: DateTime): Unit = {

    val taskName = spark.conf.get("task.runner")
    println("开始执行任务 ...["+taskName+"]")
    runtime = new StopWatch(taskName)
    runtime.start(taskName)

  }

  private def after(implicit spark: SparkSession, currDate: DateTime): Unit = {
    val taskName = spark.conf.get("task.runner")
    runtime.stop()
    println("任务执行结束 ...["+ taskName+"],共耗时:" + runtime.getTotalTimeSeconds +"秒")
  }

  private def afterThrowException(implicit spark: SparkSession, currDate: DateTime): Unit = {
    val taskName = spark.conf.get("task.runner")
    runtime.stop()
    println("任务执行异常 ...[" + taskName + "],共耗时:" + runtime.getTotalTimeSeconds + "秒")
  }
}


通过一个公共的接口记录每个任务执行的具体日志信息。

在这里插入图片描述

2.2 TaskNameEnum指定每个任务的名称

  
object TaskNameEnum extends Enumeration {

  def getEnumType(source:String):TaskNameEnum.Value = {
    val values =TaskNameEnum.values.toList.filter(_.toString.toUpperCase == source.toUpperCase)
    values.length match {
      case 1 => values.head
      case _ => throw new IllegalArgumentException("该任务不存在")
    }
  }

  val Test1 = Value("ods.ods_test1")
  val Test2 = Value("ods.ods_test2")

}

这里的Test1和Test2表示任务的名称。

2.3 TaskRunner中编写任务的业务逻辑

package com.king.ml.app.test1

import com.king.ml.annotation.Runner
import com.king.ml.base.BaseRunner
import com.king.ml.enums.TaskNameEnum
import org.apache.spark.sql.SparkSession
import org.joda.time.DateTime


@Runner
class Test1TaskRunner extends BaseRunner{
  override def taskName: TaskNameEnum.Value = TaskNameEnum.Test1

  override def run(implicit spark: SparkSession, currDate: DateTime): Unit = {
    val cnt = spark.table("ods.ods_test1").count()
    println("===>总记录数为:")
    println("===>" + cnt)

  }
}

3. 任务执行脚本

在执行脚本中,任务主程序名不需要改变,只需要给任务传参枚举中任务名的值即可。

spark-submit \
--name 'test-ml' \
--master yarn \
--deploy-mode client \
--conf spark.port.maxRetries=100 \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.executor.memoryOverhead=5120 \
--queue root.production \
--driver-memory 2g  --num-executors 2 --executor-memory 2g --executor-cores 1 \
--class com.king.ml.app.FeatureContextApp \
./ml/ml-demo.jar "ods.ods_test1"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/789487.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

KNIME 5.2.5 版本界面切换

1、安装完KNIME后,点击“Create workflow in your local space.” 2、发现是这个样子 4、进行切换。点击“menu”,最后点击“Switch to classic user interfaceto” 5、最终显示结果:

从零开始做题:logtime

题目 给出1个pcapng文件 解题 wireshark打开题目流量包,在TCP流中发现flag.zip压缩包流量,将flag.zip提取到本地,解压的过程中需要解压密码,接着观察流量 import pyshark import re pcapFilePath logtime.pcapng pcapFilter…

Excel第29享:基于sum嵌套sumifs的多条件求和

1、需求描述 如下图所示,现要统计12.17-12.23这一周各个人员的“上班工时(a1)”。 下图为系统直接导出的工时数据明细样例。 2、解决思路 首先,确定逻辑:“对多个条件(日期、人员)进行“工时”…

组件设计原则和度量方法

在日常开发过程中,Spring、Dubbo、Mybatis等都是我们常用的开源框架。当你在使用这些框架时,不可避免需要通过分析源码来理解内部的实现原理。那么,你在翻阅源代码时,有没有想过这些框架的代码结构为什么要这样进行设计和实现呢&a…

爬虫-豆瓣读书排行榜

获取数据 requests库 获取数据环节需要用到requests库。安装方式也简单 pip install requests 爬取页面豆瓣读书 Top 250 用requests库来访问 import requests res requests.get(https://book.douban.com/top250/) 解析: 导入requests库调用了requests库中的…

昇思14天

ResNet50图像分类 1. ResNet50图像分类概述 ResNet50是一种用于图像分类的深度卷积神经网络。图像分类是计算机视觉的基本应用,属于有监督学习范畴。ResNet50通过引入残差结构,解决了深层网络中的退化问题,使得可以训练非常深的网络。 2. …

看到指针就头疼?这篇文章让你对指针有更全面的了解!

文章目录 1.什么是指针2.指针和指针类型2.1 指针-整数2.2 指针的解引用 3.野指针3.1为什么会有野指针3.2 如何规避野指针 4.指针运算4.1 指针-整数4.2 指针减指针4.3 指针的关系运算 5.指针与数组6.二级指针7.指针数组 1.什么是指针 指针的两个要点 1.指针是内存中的一个最小单…

Apache中使用CGI

Apache24 使用Visual Studio 2022 // CGI2.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。 // #include <stdio.h> #include <stdlib.h>#include <stdio.h>void main() {//设置HTML语言printf("Content-type:text/html\n\n&q…

Ansys Zemax|什么是点扩散函数( PSF )

附件下载 联系工作人员获取附件 概览 这篇文章讲述了&#xff1a; 什么是点扩散函数&#xff1f; 点列图 快速傅里叶变换计算的点扩散函数&#xff08;FFT PSF&#xff09; 惠更斯算法计算的点扩散函数&#xff08;Huygens PSF&#xff09; 如何使用非序列模式下的透镜和…

地下水环评(一级)实践技术及Modflow地下水数值模拟

主要围绕的环评导则&#xff0c;结合不同行业类别&#xff0c;实例讲解地下水环境影响评价的原则、内容、工作程序、方法。包括数据处理分析、数值模型构建以及环评报告编写等。涉及地下水流场绘制软件&#xff08;Surfer&#xff09;的操作流程及数据处理、地下水数值模拟软件…

JVM:类的生命周期

文章目录 一、介绍二、加载阶段三、连接阶段1、验证阶段2、准备阶段3、解析阶段 四、初始化阶段 一、介绍 类的生命周期描述了一个类加载、连接&#xff08;验证、准备和解析&#xff09;、初始化、使用、卸载的整个过程。 二、加载阶段 加载&#xff08;Loading&#xff09…

【论文速读】| JADE:用于大语言模型的基于语言学的安全评估平台

本次分享论文&#xff1a;JADE : A Linguistics-based Safety Evaluation Platform for Large Language Models 基本信息 原文作者&#xff1a;Mi Zhang, Xudong Pan, Min Yang 作者单位&#xff1a;Whitzard-AI, System Software and Security Lab Fudan University 关键…

JavaWeb__正则表达式

目录 1. 正则表达式简介2. 正则表达式体验2.1 验证2.2 匹配2.3 替换2.4 全文查找2.5 忽略大小写2.6 元字符使用2.7 字符集合的使用2.8 常用正则表达式 1. 正则表达式简介 正则表达式是描述字符模式的对象。正则表达式用于对字符串模式匹配及检索替换&#xff0c;是对字符串执行…

用SurfaceView实现落花动画效果

上篇文章 Android子线程真的不能刷新UI吗&#xff1f;(一&#xff09;复现异常 中可以看出子线程更新main线程创建的View&#xff0c;会抛出异常。SurfaceView不依赖main线程&#xff0c;可以直接使用自己的线程控制绘制逻辑。具体代码怎么实现了&#xff1f; 这篇文章用Surfa…

vue 中 使用腾讯地图 (动态引用腾讯地图及使用签名验证)

在设置定位的时候使用 腾讯地图 选择地址 在 mounted中引入腾讯地图&#xff1a; this.website.mapKey 为地图的 key // 异步加载腾讯地图APIconst script document.createElement(script);script.type text/javascript;script.src https://map.qq.com/api/js?v2.exp&…

C++11中重要的新特性之 lambda表达式 Part two

序言 在上一篇文章中&#xff0c;我们主要介绍了 C11 中的新增的关键词&#xff0c;以及 范围for循环 这类语法糖的使用和背后的逻辑。在这篇文章中我们会继续介绍一个特别重要的新特性分别是 lambda表达式 。 1. lambda表达式 1.1 lambda的定义 C11 中的 lambda表达式 是一种…

APB总线协议

一、APB总线介绍 关于总线的一些概念&#xff1a; 总线&#xff1a;计算机内部和计算机之间传输数据的共用通道。 总线位宽&#xff1a;总线能够一次性传送的二进制数据位数&#xff0c;例如8bit、16bit、32bit、64bit等。 总线工作频率&#xff1a;即时钟频率&#xff08;时…

PHP实现用户认证与权限管理的全面指南

目录 引言 1. 数据库设计 1.1 用户表&#xff08;users&#xff09; 1.2 角色表&#xff08;roles&#xff09; 1.3 权限表&#xff08;permissions&#xff09; 1.4 用户角色关联表&#xff08;user_roles&#xff09; 1.5 角色权限关联表&#xff08;role_permissions…

【内网渗透】内网渗透学习之域渗透常规方法

域渗透常规方法和思路 1、域内信息收集1.1、获取当前用户信息1.1.1、获取当前用户与域 SID1.1.2、查询指定用户的详细信息 1.2、判断是否存在域1.2、查询域内所有计算机1.3、查询域内所有用户组列表1.4、查询所有域成员计算机列表1.5、获取域密码信息1.6、获取域信任信息1.7、查…

最短路径算法:Dijkstra算法探险记

想象一下,你是一只小蚂蚁,名字叫小明。你住在一个大大的花园里,这个花园有很多小路,小路之间还有交叉点,就像是一个迷宫一样。现在,你接到了一个任务:找到从你家到花园里一个特定地方(比如一块超级大的糖果)的最短路径! 第一步:画出地图 首先,我们需要一张地图来…