Spark经典案例分享

Spark经典案例

  1. 链接操作案例
  2. 二次排序案例

链接操作案例

案例需求

数据介绍

代码如下:

package base.charpter7

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
 * @projectName sparkGNU2023  
 * @package base.charpter7  
 * @className base.charpter7.Join  
 * @description ${description}  
 * @author pblh123
 * @date 2023/11/28 17:25
 * @version 1.0
 *
 */
    
object Join {
  def main(args: Array[String]): Unit = {

    //    1. 创建一个sc对象
    if (args.length != 4) {
      println("usage is WordCount <rating> <movie> <output>")
      System.exit(5)
    }
    val murl = args(0)
    val ratingfile = args(1)
    val movingfile = args(2)
    val outputfile = args(3)

    val spark: SparkSession = new SparkSession.Builder()
      .appName(s"${this.getClass.getSimpleName}").master(murl).getOrCreate()
    val sc: SparkContext = spark.sparkContext

    //    2. 代码主体
    //    判断输出路径是否存在,存在则删除
    val conf: Configuration = new Configuration()
    val fs: FileSystem = FileSystem.get(conf)
    if (fs.exists(new Path(outputfile))) {
      println(s"存在目标文件夹$outputfile")
      fs.delete(new Path(outputfile))
      println(s"目标文件夹$outputfile 已删除")
    }
    else println(s"目标文件夹$outputfile 不存在")


    //rating etl
    val ratingrdd: RDD[String] = sc.textFile(ratingfile, 1)
    val rating: RDD[(Int, Double)] = ratingrdd.map(line => {
      val fileds: Array[String] = line.split("::")
      (fileds(1).toInt, fileds(2).toDouble)
    })
    val movieScores: RDD[(Int, Double)] = rating.groupByKey().map(x => {
      val avg = x._2.sum / x._2.size
      (x._1, avg)
    })
    //    move etl
    val movierdd: RDD[String] = sc.textFile(movingfile)
    // movieid,(movieid,title)
    val movieskey: RDD[(Int, (Int, String))] = movierdd.map(line => {
      val fileds: Array[String] = line.split("::")
      (fileds(0).toInt, fileds(1))
    }).keyBy(tup => tup._1)

    // movieid,(movieid,avg_rating)
    val sskey: RDD[(Int, (Int, Double))] = movieScores.keyBy(tup => tup._1)
    // movieid, (movieid,avg_rating),(movieid,title)
    val joinres: RDD[(Int, ((Int, Double), (Int, String)))] = sskey.join(movieskey)
    // movieid,avg_rating,title
    val res: RDD[(Int, Double, String)] = joinres.filter(f => f._2._1._2 > 4.0)
      .map(f => (f._1, f._2._1._2, f._2._2._2))
//    val res: RDD[(Int, Double, String)] = sskey.join(movieskey)
//      .filter(f => f._2._1._2 > 4.0)
//      .map(f => (f._1, f._2._1._2, f._2._2._2))

    res.take(5).foreach(println)
    res.saveAsTextFile(outputfile)


    //  3. 关闭sc,spark对象
    sc.stop()
    spark.stop()
  }
}

运行结果

二次排序案例

需求及数据说明:

代码实现

SecondarySortKey.class 方法

package base.charpter7

/**
 * @projectName sparkGNU2023  
 * @package base.charpter7  
 * @className base.charpter7.SecondarySortKey  
 * @description ${description}  
 * @author pblh123
  
* @date 2023/11/29 17:01
  
* @version 1.0
  
*/
    
class SecondarySortKey(val first:Int, val second:Int) extends Ordered[SecondarySortKey] with Serializable{

  override def compare(that: SecondarySortKey): Int = {
    if (this.first - that.first != 0){
      this.first - that.first
    } else {
      this.second - that.second
    }
    }
}
SecondarySortApp.scala方法
package base.charpter7

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
 * @projectName sparkGNU2023  
 * @package base.charpter7  
 * @className base.charpter7.SecondarySortApp  
 * @description ${description}  
 * @author pblh123
 * @date 2023/11/29 17:04
 * @version 1.0
 *
 */
    
object SecondarySortApp {
  def main(args: Array[String]): Unit = {

    //  1. 创建spark,sc对象
    if (args.length != 2) {
      println("您需要输入二个参数")
      System.exit(5)
    }
    val musrl: String = args(0)
    val spark: SparkSession = new SparkSession.Builder()
      .appName(s"${this.getClass.getSimpleName}")
      .master(musrl)
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext

    //  2. 代码主体
    // 读取一个txt文件
    val inputfile: String = args(1)
    val lines: RDD[String] = sc.textFile(inputfile, 1)
    // 进行二次排序
    val pairRDDwithSort: RDD[(SecondarySortKey, String)] = lines.map(line => {
      val strings: Array[String] = line.split(" ")
      (new SecondarySortKey(strings(0).toInt, strings(1).toInt), line)
    })
    val pairRDDwithSort2: RDD[(SecondarySortKey, String)] = pairRDDwithSort.sortByKey(false)
    val sortedRes: RDD[String] = pairRDDwithSort2.map(sortedline => sortedline._2)
    sortedRes.collect().foreach(println)

    //  3. 关闭sc,spark对象
    sc.stop()
    spark.stop()
  }
}

配置参数

运行效果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/201067.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

springcloud==openfeign

单独使用 创建一个服务端 import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.Path…

如何在代码中启动与关闭ROS节点

在ROS开发中&#xff0c;节点的管理是很重要的一部分&#xff0c;其中有一些节点大部分时候用不到&#xff0c;只会在特定情况下被启动&#xff08;比如建图节点&#xff09;同时这些节点在使用完后还需要被关闭&#xff0c;因此我们就需要在程序中对这些节点进行启动与关闭的管…

【C++】继承(上) 继承的基本概念 | 子类的默认成员函数

一、继承 概念 继承(inheritance)是一种面向对象编程的概念&#xff0c;它允许一个类&#xff08;称为子类或派生类&#xff09;继承另一个类&#xff08;称为父类或基类&#xff09;的特征和行为。子类可以获得父类的成员函数和变量&#xff0c;而不需要重新编写它们。子类还…

【GraphQL】什么是Prisma?

本页提供了Prisma及其工作原理的高级概述。 什么是Prisma&#xff1f; Prisma是一个开源的下一代ORM。它由以下部分组成&#xff1a; Prisma客户端&#xff1a;Node.js和TypeScript的自动生成和类型安全查询生成器Prisma迁移&#xff1a;迁移系统Prisma Studio:GUI&#xff0…

柯桥学英语,商务外贸英语,BEC中级写作冲刺干货

think of… as 把……认为 eager to… 渴望 look forward to Ving 期待/盼望…… accept…as 接受……为 be certain of 对……确信 in contact with 与……接触 in accordance with 与……相符/一致 remind…of 提醒……关于 be advantageous to 有利于…… assure…of使……放…

mysql8报sql_mode=only_full_group_by(存储过程一直报)

1&#xff1a;修改数据库配置(重启失效) select global.sql_mode;会打印如下信息 ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION里面包含 ONLY_FULL_GROUP_BY&#xff0c;那么就重新设置&#xff0c;在数据库中输入以下代码&#xff0c;去掉ONLY_FULL_GROU…

WordPress 外链跳转插件

WordPress 外链跳转插件是本站开发的一款WordPress插件&#xff0c;能对文中外链添加一层过滤&#xff0c;有效防止追踪&#xff0c;以及提醒用户。 类似于知乎、CSDN打开其他链接的提示。 后台可以设置白名单 学习资料源代码&#xff1a;百度网盘 密码&#xff1a;123

电子学会C/C++编程等级考试2022年09月(三级)真题解析

C/C++等级考试(1~8级)全部真题・点这里 第1题:课程冲突 小 A 修了 n 门课程, 第 i 门课程是从第 ai 天一直上到第 bi 天。 定义两门课程的冲突程度为 : 有几天是这两门课程都要上的。 例如 a1=1,b1=3,a2=2,b2=4 时, 这两门课的冲突程度为 2。 现在你需要求的是这 n 门课…

Adobe Illustrator绘图解决卡顿问题

最近在用AI做矢量图&#xff0c;但是遇到了一个很难搞的问题&#xff0c;当我们需要分辨率较高的图片的时候&#xff0c;Python用Matplotlib生成的pdf时dpi参数会设置为600及以上&#xff0c;但是样子的话就造成了pdf文件过大以及AI卡顿&#xff0c;比如&#xff0c;下午生成的…

多文件夹图片预处理:清除空值、重置大小、分割训练集

→ 清理空值 防止出现cannot identify image file 参考Python数据清洗----删除读取失败图片__简单版_python用pil读取图片出错删除掉-CSDN博客 import os import shutil import warnings import cv2 import iofrom PIL import Image warnings.filterwarnings("error&qu…

UE 事件分发机制(二) day10

自定义事件分发机制 自建事件分发机制与结构 Unreal推荐的游戏逻辑开发流程 基于 Unreal推荐的游戏逻辑开发流程&#xff0c;一般我们的整体规划也就是这样 大致结构类图 创建接口类与管理类以及所需函数 新建一个Unreal接口类作为接口 然后创建一个蓝图函数库的基类 Ev…

Python基础:推导式(Comprehensions)详解

1. 推导式概念 Python推导式&#xff08;comprehensions&#xff09;是一种简洁而强大的语法&#xff0c;用于从已存在的数据&#xff08;列表、元组、集合、字典等&#xff09;中创建新的数据结构。推导式包括&#xff1a; 列表推导式元组推导式字典推导式集合推导式 2. 列表…

mybatis参数输入 #{}和${}

1、建库建表 CREATE DATABASE mybatis-example;USE mybatis-example;CREATE TABLE t_emp(emp_id INT AUTO_INCREMENT,emp_name CHAR(100),emp_salary DOUBLE(10,5),PRIMARY KEY(emp_id) );INSERT INTO t_emp(emp_name,emp_salary) VALUES("tom",200.33); INSERT INTO…

基于ssm亚盛汽车配件销售业绩管理系统

摘 要 如今的信息时代&#xff0c;对信息的共享性&#xff0c;信息的流通性有着较高要求&#xff0c;因此传统管理方式就不适合。为了让亚盛汽车配件销售信息的管理模式进行升级&#xff0c;也为了更好的维护亚盛汽车配件销售信息&#xff0c;亚盛汽车配件销售业绩管理系统的开…

快速操控鼠标行为!Vue鼠标按键修饰符让你事半功倍

&#x1f3ac; 江城开朗的豌豆&#xff1a;个人主页 &#x1f525; 个人专栏 :《 VUE 》 《 javaScript 》 &#x1f4dd; 个人网站 :《 江城开朗的豌豆&#x1fadb; 》 ⛺️ 生活的理想&#xff0c;就是为了理想的生活 ! ⭐ 专栏简介 欢迎来到前端入门之旅&#xff01;这个…

网络入门---网络编程预备知识

目录标题 ifconfigip地址和mac地址的区别端口号pid和端口号UDP和TCP的初步了解网络字节序socket套接字 ifconfig 通过指令ifconfig便可以查看到两个网络接口&#xff1a; 我们当前使用的是一个linux服务器并是一个终端设备&#xff0c;所以他只需要一个接口用来入网即可&…

甘草书店记:2023年10月15日 星期日 「等待也是人生的大事」

我常说&#xff0c;最好的人生是刚刚好。 财富不可少&#xff0c;也不必多&#xff0c;够用就好。爱情不要晚&#xff0c;也不要早&#xff0c;恰好就好。 可是人生活在社会中、自然中&#xff0c;不会万事由己。所以&#xff0c;等待是人生的必修课。 书店的装修设计和LOGO…

Tomcat及JDK下载安装(Linux系统)

前言 Tomcat是一个开源的Web应用服务器&#xff0c;由Apache软件基金会管理和维护。它的主要功能是处理来自客户端的HTTP请求&#xff0c;生成并返回响应结果。Tomcat不仅可以实现Java Servlet和JavaServer Pages&#xff08;JSP&#xff09;等Web编程模型的支持&#xff0c;也…

STM32开发学习(地址映射)

LED灯代码&#xff1a; #define PERIPH_BASE ((unsigned int)0x40000000)#define AHB1PERIPH_BASE (PERIPH_BASE 0x00020000)#define GPIOF_BASE (AHB1PERIPH_BASE 0x1400)#define GPIOF_MODER *(unsigned int*)(GPIOF_BASE0x00) #define GPIOF_BSRR *(uns…

使用自动化测试获取手机短信验证码

目前在职测试开发,,写一些脚本,个人认为这职业不科学不应该有的职业,测试就是测试,开发就是开发,运维还是老鸟,这行业总能折腾些莫名其妙的东西出来,刚做这行时学的第一门语言是bash shell, 去新去单位上班直接写了个一键搭建测试环境的测试脚本,本来不想干测试了,好好做微信小…