spark第三章:工程化代码

系列文章目录

spark第一章:环境安装
spark第二章:sparkcore实例
spark第三章:工程化代码


文章目录

  • 系列文章目录
  • 前言
  • 一、三层架构
  • 二、拆分WordCount
    • 1.三层拆分
    • 2.代码抽取
  • 总结


前言

我们上一次博客,完成了一些案例的练习,现在我要要进行一些结构上的完善,上一次的案例中,代码的耦合性非常高,想要修改就十分复杂,而且有很多代码都在重复使用,我们想要把一些重复的代码抽取出来,进而完成解耦合的操作,提高代码的复用。


一、三层架构

大数据的三层架构其中包括
controller(控制层):负责调度各模块
service(服务层):存放逻辑代码
dao(持久层):进行文件交互
现在我们分别给各层创建一个包
在这里插入图片描述
解释一下其中几个
application:项目的启动文件
bean:存放实体类
common:存放这个项目的通用代码
util:存放通用代码(所有项目均可)

二、拆分WordCount

万物皆可WordCount我们就以上次的WordCount为例操作。放一下源代码

object WordCount {
  def main(args: Array[String]): Unit = {
    //  创建 Spark 运行配置对象
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    // 创建 Spark 上下文环境对象(连接对象)
    val sc : SparkContext = new SparkContext(sparkConf)
    // 读取文件 获取一行一行的数据
    val lines: RDD[String] = sc.textFile("datas/word.txt")
    // 将一行数据进行拆分
    val words: RDD[String] = lines.flatMap(_.split(" "))
    // 将数据根据单次进行分组,便于统计
    val wordToOne: RDD[(String, Int)] = words.map(word => (word, 1))
    // 对分组后的数据进行转换
    val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
    // 打印输出
    val array: Array[(String, Int)] = wordToSum.collect()
    array.foreach(println)

    sc.stop()
  }

}

1.三层拆分

在进行数据抽取之前,我们先进行简单的三层架构拆分
记得把包名路径换成自己的
在这里插入图片描述
WordCountDao.scala
负责文件交互,也就是第一步的读取文件

package com.atguigu.bigdata.spark.core.rdd.framework1.dao

import com.atguigu.bigdata.spark.core.rdd.framework1.application.WordCountApplication.sc


class WordCountDao {

  def readFile(path:String) ={
    sc.textFile(path)
  }
}

WordCountService.scala
负责逻辑运算

package com.atguigu.bigdata.spark.core.rdd.framework1.service

import com.atguigu.bigdata.spark.core.rdd.framework1.dao.WordCountDao

import org.apache.spark.rdd.RDD

class WordCountService {
  private val wordCountDao =new WordCountDao()

  def dataAnalysis(): Array[(String, Int)] ={

    val lines: RDD[String] =wordCountDao.readFile("datas/word.txt")

    val words: RDD[String] = lines.flatMap(_.split(" "))

    val wordToOne: RDD[(String, Int)] = words.map(word => (word, 1))

    val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)

    val array: Array[(String, Int)] = wordToSum.collect()
    array
  }
}

WordCountController.scala
负责调度项目

package com.atguigu.bigdata.spark.core.rdd.framework1.controller

import com.atguigu.bigdata.spark.core.rdd.framework1.service.WordCountService


class WordCountController {
  private val wordCountService =new WordCountService()

  def dispath(): Unit ={
    val array=wordCountService.dataAnalysis()

    array.foreach(println)
  }
}

WordCountApplication.scala
main方法启动项目

package com.atguigu.bigdata.spark.core.rdd.framework1.application

import com.atguigu.bigdata.spark.core.rdd.framework1.controller.WordCountController
import org.apache.spark.{SparkConf, SparkContext}

object WordCountApplication extends App {

  val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

  val sc : SparkContext = new SparkContext(sparkConf)


  val controller = new WordCountController()
  controller.dispath()

  sc.stop()
}

在这里插入图片描述

2.代码抽取

接下来我们把一些常用或者会重复实用的代码抽取出来。
创建四个Train,用来抽取四个文件
在这里插入图片描述
TApplication.scala
其中通用代码为环境创建

package com.atguigu.bigdata.spark.core.rdd.framework.common

import com.atguigu.bigdata.spark.core.rdd.framework.util.EnvUtil
import org.apache.spark.{SparkConf, SparkContext}

trait TApplication {

  def start(master: String="local[*]", app: String="Application")(op: =>Unit): Unit ={
    val sparkConf: SparkConf = new SparkConf().setMaster(master).setAppName(app)

    val sc : SparkContext = new SparkContext(sparkConf)
    EnvUtil.put(sc)

    try {
      op
    }catch {
      case ex=>println(ex.getMessage)
    }

    sc.stop()
    EnvUtil.clear()
  }
}

TController.scala
定义调度Train之后由Controller进行重写

package com.atguigu.bigdata.spark.core.rdd.framework.common

trait TController {
  def dispatch():Unit
}

TDao.scala
WordCount通用读取,路径为参数

package com.atguigu.bigdata.spark.core.rdd.framework.common

import com.atguigu.bigdata.spark.core.rdd.framework.util.EnvUtil
import org.apache.spark.rdd.RDD

trait TDao {
  def readFile(path:String): RDD[String] ={
    EnvUtil.take().textFile(path)
  }
}

TService.scala
和Controller类似,由Service重写

package com.atguigu.bigdata.spark.core.rdd.framework.common

trait TService {
  def dataAnalysis():Any
}

在这里插入图片描述
定义环境,确保所有类都能访问sc线程
EnvUtil.scala

package com.atguigu.bigdata.spark.core.rdd.framework.util

import org.apache.spark.SparkContext

object EnvUtil {
  private val scLocal =new ThreadLocal[SparkContext]()

  def put(sc:SparkContext): Unit ={
    scLocal.set(sc)
  }

  def take(): SparkContext = {
    scLocal.get()
  }

  def clear(): Unit ={
    scLocal.remove()
  }
}

修改三层架构
WordCountApplication.scala

package com.atguigu.bigdata.spark.core.rdd.framework.application

import com.atguigu.bigdata.spark.core.rdd.framework.common.TApplication
import com.atguigu.bigdata.spark.core.rdd.framework.controller.WordCountController


object WordCountApplication extends App with TApplication{
  start(){
    val controller = new WordCountController()
    controller.dispatch()
  }

}

WordCountController.scala

package com.atguigu.bigdata.spark.core.rdd.framework.controller

import com.atguigu.bigdata.spark.core.rdd.framework.common.TController
import com.atguigu.bigdata.spark.core.rdd.framework.service.WordCountService


class WordCountController extends TController{
  private val WordCountService = new WordCountService()

  def dispatch(): Unit ={

    val array: Array[(String, Int)] = WordCountService.dataAnalysis()

    array.foreach(println)
  }
}

WordCountDao.scala

package com.atguigu.bigdata.spark.core.rdd.framework.dao

import com.atguigu.bigdata.spark.core.rdd.framework.common.TDao

class WordCountDao extends TDao{

}

WordCountService.scala

package com.atguigu.bigdata.spark.core.rdd.framework.service

import com.atguigu.bigdata.spark.core.rdd.framework.common.TService
import com.atguigu.bigdata.spark.core.rdd.framework.dao.WordCountDao
import org.apache.spark.rdd.RDD

class WordCountService extends TService{
  private val wordCountDao=new WordCountDao()

  def dataAnalysis(): Array[(String, Int)] = {
    val lines: RDD[String] = wordCountDao.readFile("datas/word.txt")

    val words: RDD[String] = lines.flatMap(_.split(" "))

    val wordToOne: RDD[(String, Int)] = words.map(word => (word, 1))

    val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)

    val array: Array[(String, Int)] = wordToSum.collect()
    array
  }

}

再次运行
在这里插入图片描述


总结

对spark项目代码的规范就到这里,确实有点复杂,我也不知道说清楚没有。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/1906.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Android电视盒子最强看电视app-tvbox配置(视频源)教程

今天给大家分享一下安卓tv上最强的看视频神器-tvbox的配置方法 tvbox是一款影视观看类的软件,各种影视资源都是为你免费提供的,还有海量热门影视为你提供电视直播,让你可以实时在线进行观看以及体验一样,超多影视剧内容你感兴趣的…

ChatGPT4已经来了,30秒做一个弹球游戏!

前两周写了关于ChatGPT的文章,折腾了一晚!终于开通了ChatGPT plus版本!ChatGPT_Plus的功能有多强!3分钟写一个贪吃蛇游戏!然后果断的注册了Plus, 事实证明这个决定是对的,现在只有plus 可以抢先尝鲜GPT4, 免…

机器学习---聚类算法

目录【写在前面】1、确认安装有scikit-learn库2、使用 make _ classification ()建立数据集3、使用模型进行分类头文件汇总亲和力传播聚合聚类BIRCH 聚类DBSCAN【本人的毕业设计系统中有用到】K-均值高斯混合模型【写在最后】【写在前面】 sklearn和scikit-learn: …

sql中exists的常用用法

exists中子查询结果集非空,则exists子查询返回true。如果exists子查询结果集为空,则exists子查询返回false。在平常的开发工作中,经常会用到exists,那么它应该如何使用呢?1:查询兴趣爱好为跳舞的同学姓名及…

JMM内存模型

JMM内存模型JMM内存模型定义三大特性原子性可见性有序性volatile语义JMM规则操作系统实现术语缓存一致性要求缓存一致性机制写传播事务串行化重排序as-if-serial 语义(像是有序的)happens-before 原则happens-before 原则的八大子原则内存屏障总结finalf…

C#大型HIS医院LIS管理系统源码

▶ 一、实验室信息管理系统(LIS)是什么? 实验室信息管理系统也就是平时所说的LIS(Laboratory Information System)系统,其主要服务的对象主要是医院检验科工作人员,也是医院信息化建设必…

手撕数据结构—栈

Tips不得不再次提一下这个语法问题,当数组创建的时候,进行初始化的时候,分为全部初始化或者说部分初始化,对于不完全初始化而言,剩下的部分就全部默认为零。现在比如说你想对整型数组的1万个元素把它全部变成-1&#x…

简介SpringBoot

目录 一、简介SpringBoot 二、SpringBoot项目的创建与使用 1、创建SpringBoot项目 2、使用SpringBoot项目 三、 SpringBoot中的配置文件 .properties配置文件 读取配置文件信息 .yml配置文件 读取配置文件信息 四、SpringBoot中的日志文件 1、日志文件简介 2、…

(数据结构)八大排序算法

目录一、常见排序算法二、实现1. 直接插入排序2.🌟希尔排序3. 选择排序4.🌟堆排序5. 冒泡排序7. 🌟快速排序7.1 其他版本的快排7.2 优化7.3 ⭐非递归7. 🌟归并排序7.1 ⭐非递归8. 计数排序三、总结1. 分析排序 (Sorting) 是计算机…

网络安全的特性

0x00 前言 网络安全的特性包括,机密性,完整性,可用性,真实性和不可否认性。详细的内容可以参考如下的内容。 Xmind资源请下载~ 0x01 机密性 机密性(Confidentiality) 意味着阻止未经授权的实体&#x…

【springcloud 微服务】Spring Cloud Alibaba Sentinel使用详解

目录 一、前言 二、分布式系统遇到的问题 2.1 服务可用性问题 2.1.1 单点故障 2.1.2 流量飙升 2.1.3 容错机制 2.2 服务雪崩问题 三、 服务可用性解决方案 3.1 服务容错机制 3.1.1 超时机制 3.1.2 服务限流 3.1.3 隔离 3.2 服务熔断 3.2.1 什么是服务熔断 3…

springcloud学习总结

springcloud 构建微服务项目步骤 导入依赖编写配置文件开启这个功能 Enablexxx配置类 于2023年2月24日下午17点38分开始学习于2023年3月17日晚上20点26分学完总结代码地址:https://gitee.com/liang-weihao/StudySpringcloud学习笔记地址:https://www.…

JavaEE简单示例——基于注解的AOP实现

简单介绍: 之前我们介绍了关于XML的面向切面的编程,通过配置文件的方法,在不修改源代码的情况下完成了对已有方法的增强 除了基于XML配置文件的方式,我们还可以使用更简单的,基于注解的方式。 每一次,我们…

【DBC专题】-12-不同类型报文(应用/诊断/网关/测量标定)在DBC中配置,以及在Autosar各模块间的信号数据流向

点击返回「Autosar从入门到精通-实战篇」总目录 案例背景(共18页精讲):该篇博文将告诉您: 1)Autosar中,不同类型报文(App应用,UDS/OBD诊断,NM网络管理报文,XCP测量标定)的信号数据流向; 2)CAN …

【IoT】嵌入式驱动开发:IIC子系统

IIC有三种接口实现方式 三种时序对比: 图1 IIC子系统组成 图2 图3 IIC操作流程 设备端 1.i2c_get_adapter 2.i2c_new_device(相当于register设备) 3.I2c_put_adapter 驱动端 1.填充i2c_driver 2.i2c_add_driver(相当于register驱动) 3.在probe中建立访问方式 client相…

蓝桥杯刷题冲刺 | 倒计时22天

作者:指针不指南吗 专栏:蓝桥杯倒计时冲刺 🐾马上就要蓝桥杯了,最后的这几天尤为重要,不可懈怠哦🐾 文章目录1.选数异或2.特殊年份1.选数异或 题目 链接: 选数异或 - 蓝桥云课 (lanqiao.cn) 给定…

C++修炼之筑基期第一层——认识类与对象

文章目录🌷专栏导读🌷什么是面向对象?🌷类的引入🌷什么是类🌷类的定义方式🌷类的访问限定符与封装🌺访问限定符🌺封装🌷类的作用域🌷类的实例化&a…

基于STM32的ADC采样及各式滤波实现(HAL库,含VOFA+教程)

前言:本文为手把手教学ADC采样及各式滤波算法的教程,本教程的MCU采用STM32F103ZET6。以HAL库的ADC采样函数为基础进行教学,通过各式常见滤波的实验结果进行分析对比,搭配VOFA工具直观的展示滤波效果。ADC与滤波算法都是嵌入式较为…

今天,我终于学懂了C++中的引用

文章目录一、前言二、概念介绍三、引用的五大特性1、引用在定义时必须初始化2、一个变量可以有多个引用3、一个引用可以继续有引用4、引用一旦引用一个实体,再不能引用其他实体5、可以对任何类型做引用【变量、指针....】四、引用的两种使用场景1、做参数a.案例一&a…

vue大型商城系统中遇到的问题(上)

一:创建仓库1.领导创建git仓库(参考————这篇文章),新手下载git2.打开cmd终端,将git仓库拉到本地3.进入文件目录,查看分支(新手向——为什么需要创建分支,查看---)4.创…