2023_Spark_实验十五:SparkSQL进阶操作

实验目标
  1. 通过实践掌握Spark SQL中复杂查询(包括子查询、窗口函数、联接等)的实现方式。
  2. 了解如何通过合理的数据分区和缓存策略进行性能优化。
  3. 实现一个基于Spark SQL的ETL数据处理流程,应用相关优化技巧。
实验背景

在本实验中,学员将使用Spark SQL处理一个典型的企业级大数据处理场景:从日志文件和交易数据中提取信息、清洗数据、进行复杂查询,并优化查询性能。

实验内容
  1. 环境准备

    • 配置并启动一个Spark集群,确保每个学员有一个可用的Spark环境。
    • 准备实验数据:模拟一份包含交易记录、用户信息和产品数据的日志文件,以及对应的CSV格式数据文件。
  2. 实验步骤

    • 使用Spark SQL进行数据加载(加载CSV、JSON等数据格式)。
    • 对加载的数据进行基本清洗与转换。
    • 编写并优化SQL查询,使用窗口函数、JOIN操作和子查询。
    • 对查询过程进行性能分析,并采用缓存、分区等优化策略。

实验数据如下:

users.csv

user_id,name,age,gender
1,李静,62,M
2,梁静,64,M
3,梁静,46,M
4,赵伟,59,M
5,徐丽娟,32,F
6,赵伟,23,M
7,王伟,46,F
8,徐涛,63,M
9,梁强,23,M
10,吴晓,18,M
11,周波,53,M

transactions.csv

transaction_id,user_id,amount,transaction_date
1,486,429.85170924871284,2024-10-09
2,102,736.1594138169264,2024-08-19
3,758,958.0420403336467,2024-05-02
4,156,137.85335989595777,2024-10-17
5,436,962.1964461356514,2023-12-28
6,10,472.1597363615911,2024-07-25
7,349,247.35900107583026,2023-11-26
8,901,349.2802498314715,2024-05-26

实验代码
package SparkSQL

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, row_number, to_date}
import org.apache.spark.sql.{SparkSession, functions => F}


/**
 * @projectName SparkLearning2023  
 * @package SparkSQL  
 * @className SparkSQL.SparkSQLAdvancedExp  
 * @description SparkSQL进阶案例实验分析用户消费习惯 
 * @author pblh123
  
* @date 2024/11/14 22:24
  
* @version 1.0
  
*/
    
object SparkSQLAdvancedExp {

  def main(args: Array[String]): Unit = {


    val spark = SparkSession.builder().appName("Spark SQL Advanced Operations")
      .master("local[2]")
      .config("spark.sql.warehouse.dir", "tmp/spark-warehouse")
      .enableHiveSupport()
      .getOrCreate()

    // 设置日志级别为ERROR,避免冗余日志信息
    spark.sparkContext.setLogLevel("ERROR")

    // 加载CSV文件
    val dfUsers = spark.read.option("header", "true").csv("datas/sparksqldemo/users/users.csv")
    val dfTransactions = spark.read.option("header", "true").csv("datas/sparksqldemo/transactions/transactions.csv")

    // 执行操作,查看缓存是否有效
    dfUsers.show(3,0)
    dfTransactions.show(3,false)
    dfUsers.printSchema()
    dfTransactions.printSchema()

    // 去除重复数据
    val dfUsersClean = dfUsers.dropDuplicates()
    val dfTransactionsClean = dfTransactions.dropDuplicates()
    // 填充空值
    val dfUsersFilled = dfUsersClean.na.fill(Map("age" -> "0", "gender" -> "unknown"))
    // 使用filter去掉amount为null或空字符串的行
    val dfTransactionsNoNull = dfTransactionsClean
      .filter(dfTransactionsClean("amount").isNotNull && dfTransactionsClean("amount") =!= "")

    // 列转换:将年龄转换为整数类型
    val dfUsersWithAge = dfUsersFilled.withColumn("age", dfUsersFilled("age").cast("int"))
    val dfTransactioncc = dfTransactionsNoNull
      .withColumn("amount", dfTransactionsNoNull("amount").cast("double"))
      .withColumn("transaction_date", to_date(col("transaction_date"), "yyyy-MM-dd"))
    dfUsersWithAge.printSchema()
    dfTransactioncc.printSchema()

    // 使用缓存缓存数据
    val dfUsersCached = dfUsersWithAge.cache()
    val dfTransactionsCached = dfTransactioncc.persist()
    // 执行操作,查看缓存是否有效
    dfUsersCached.show(3,0)
    dfTransactionsCached.show(3,0)

//    将dataframe注册成临时视图,供sparksql使用
    dfUsersWithAge.createOrReplaceTempView("user")
    dfTransactioncc.createOrReplaceTempView("trans")

    // 获取每个用户的总交易金额(子查询)
    val totalSpentQuery =
      """
        |SELECT user_id,
        |(SELECT SUM(amount) FROM trans WHERE trans.user_id = user.user_id) AS total_spent
        |FROM user
        |order by total_spent desc""".stripMargin
    val dfTotalSpent = spark.sql(totalSpentQuery)
    dfTotalSpent.show(3,0)

    // 定义窗口函数:按金额排序
    // col("amount").desc 降序排序,col("amount") 升序排序
    val windowSpec = Window.partitionBy("user_id").orderBy(col("amount").desc)
    // 为每个用户根据交易金额排序
    val dfWithRank = dfTransactioncc.withColumn("rank", F.row_number().over(windowSpec))
      .select("user_id", "transaction_id", "amount", "rank")
    dfWithRank.show(3,0)


    // 使用窗口函数为每个用户的交易记录分配行号
    val dfWithRank2 = dfTransactioncc.withColumn("rank", row_number().over(windowSpec))
    // 筛选出每个用户前5条记录
    val top5Df = dfWithRank2.filter(col("rank") <= 5)
      .select("user_id", "transaction_id", "amount", "rank")
    // 显示结果
    top5Df.show(10,0)

    // 内联接(JOIN)操作:将用户与Top5交易数据联接
    val dfUserTransactions = dfUsersWithAge.join(top5Df, "user_id")
    dfUserTransactions.show(3,0)

    // 查看查询的执行计划
    dfUserTransactions.explain(true)
    dfUserTransactions.coalesce(1).write.parquet("datas/sparksqldemo/usersTrans")

//    关闭spark
    spark.stop()
  }

}
代码执行过程图

    // 加载CSV文件
    val dfUsers = spark.read.option("header", "true").csv("datas/sparksqldemo/users/users.csv")
    val dfTransactions = spark.read.option("header", "true").csv("datas/sparksqldemo/transactions/transactions.csv")

    // 执行操作,查看缓存是否有效
    dfUsers.show(3,0)
    dfTransactions.show(3,false)
    dfUsers.printSchema()
    dfTransactions.printSchema()

    // 列转换:将年龄转换为整数类型
    val dfUsersWithAge = dfUsersFilled.withColumn("age", dfUsersFilled("age").cast("int"))
    val dfTransactioncc = dfTransactionsNoNull
      .withColumn("amount", dfTransactionsNoNull("amount").cast("double"))
      .withColumn("transaction_date", to_date(col("transaction_date"), "yyyy-MM-dd"))
    dfUsersWithAge.printSchema()
    dfTransactioncc.printSchema()

    val dfTotalSpent = spark.sql(totalSpentQuery)
    dfTotalSpent.show(3,0)

    // 定义窗口函数:按金额排序
    // col("amount").desc 降序排序,col("amount") 升序排序
    val windowSpec = Window.partitionBy("user_id").orderBy(col("amount").desc)
    // 为每个用户根据交易金额排序
    val dfWithRank = dfTransactioncc.withColumn("rank", F.row_number().over(windowSpec))
      .select("user_id", "transaction_id", "amount", "rank")
    dfWithRank.show(3,0)

    // 使用窗口函数为每个用户的交易记录分配行号
    val dfWithRank2 = dfTransactioncc.withColumn("rank", row_number().over(windowSpec))
    // 筛选出每个用户前5条记录
    val top5Df = dfWithRank2.filter(col("rank") <= 5)
      .select("user_id", "transaction_id", "amount", "rank")
    // 显示结果
    top5Df.show(10,0)

    // 内联接(JOIN)操作:将用户与Top5交易数据联接
    val dfUserTransactions = dfUsersWithAge.join(top5Df, "user_id")
    dfUserTransactions.show(3,0)

    // 查看查询的执行计划
    dfUserTransactions.explain(true)

任务清单:

1. 获取近三个月消费额大于所有用户同期消费平均值的用户的前三笔销售额清单

2. 将每个月每个用户的的总消费额数据存储到MySQL中表userTmonth保存。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/916119.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

PaoluGPT——窥视未知

上一题已经得到一个flag&#xff0c;还有一个flag 根据题目信息&#xff0c;说明还有一些聊天记录是没有公开的&#xff0c;另一个flag就在这些未公开的聊天记录中 下载题目附件看看&#xff0c;发现里面有个main.py&#xff1a; 可以看到有两条SQL查询语句&#xff0c;猜测应该…

初识C++ (三)

如果很迷茫的话&#xff0c;就学习吧 引用 一. 引用的概念 “引用(Reference)是 C 相对于C语言的又一个扩充。引用可以看做是数据的一个别名,通过这个别名和原来的名字都能够找到这份数据。 具体是什么意思呢&#xff1f; 我们这里来举个例子 比如&#xff1a;李逵&#xff0…

南京观海微电子----驱动电路中误导通及应对方法

驱动电路中的误导通 功率器件如 MOSFET、IGBT 可以看作是一个受门极电压控制的开关。当门极电压大于开通阈值时&#xff0c;功率器件就会 被开通&#xff0c;而当门极电压低于开通阈值时就会被关断。但是在实际的应用中&#xff0c;由于器件及外围线路寄生参数的影响&#xff0…

C++ —— 哈希详解 - 开散列与闭散列

目录 1. 哈希的概念 1.1 直接定址法 1.2 哈希冲突 1.3 负载因子 1.4 哈希函数 1.4.1 除法散列法/除留余数法 1.4.2 乘法散列法 1.4.3 全域散列法 1.5 处理哈希冲突 1.5.1 开放定址法&#xff08;闭散列&#xff09; 1. 线性探测&#xff08;挨着查找&#xff09; 2.…

NVR批量管理软件EasyNVR大华NVR管理平台如何在Linux环境下部署?

随着视频监控技术的不断进步&#xff0c;NVR&#xff08;网络视频录像机&#xff09;批量管理软件在维护公共安全、提升管理效能方面发挥着越来越重要的作用。EasyNVR作为一款功能强大的NVR批量管理软件&#xff0c;凭借其高效的视频处理能力、灵活的设备接入能力和智能分析功能…

js技能提升——手搓图片组展示——基础积累

// 图片组展示[{name:,src:}]showAltPics(picList[], index0) {if (picList.length 0) {layer.msg("图片路径不对&#xff0c;请重试&#xff01;", { time: 2000 });return false;}//解析展示let inPicListHtml ;let indexPic picList[index];for (let i 0; i &…

<项目代码>YOLOv8 番茄识别<目标检测>

YOLOv8是一种单阶段&#xff08;one-stage&#xff09;检测算法&#xff0c;它将目标检测问题转化为一个回归问题&#xff0c;能够在一次前向传播过程中同时完成目标的分类和定位任务。相较于两阶段检测算法&#xff08;如Faster R-CNN&#xff09;&#xff0c;YOLOv8具有更高的…

Llama架构及代码详解

Llama的框架图如图&#xff1a; 源码中含有大量分布式训练相关的代码&#xff0c;读起来比较晦涩难懂&#xff0c;所以我们对llama自顶向下进行了解析及复现&#xff0c;我们对其划分成三层&#xff0c;分别是顶层、中层、和底层&#xff0c;如下&#xff1a; Llama的整体组成…

冒泡选择法(c基础)

适合对象c语言初学者。 冒泡选择法 作用对一个数组进行排序。&#xff08;介绍一下数组(c基础)(详细版)-CSDN博客&#xff09; 核心要点 1: 数组元素个数 sz 2: 比较后的交换。 核心思路 进行&#xff08;sz - 1&#xff09;趟&#xff0c;每一趟把最大数的放到末尾。其…

深入浅出《钉钉AI》产品体验报告

1. 引言 随着人工智能技术的迅猛发展&#xff0c;企业协同办公领域迎来了新的变革。钉钉作为阿里巴巴集团旗下的企业级通讯与协同办公平台&#xff0c;推出了钉钉AI助理&#xff0c;旨在提高工作效率&#xff0c;优化用户体验。本报告将对钉钉AI助理进行全面的产品体验分析&am…

【GPTs】Gif-PT:DALL·E制作创意动图与精灵动画

博客主页&#xff1a; [小ᶻZ࿆] 本文专栏: AIGC | GPTs应用实例 文章目录 &#x1f4af;GPTs指令&#x1f4af;前言&#x1f4af;Gif-PT主要功能适用场景优点缺点 &#x1f4af;小结 &#x1f4af;GPTs指令 中文翻译&#xff1a; 使用Dalle生成用户请求的精灵图动画&#…

一文看懂什么1688跨境(专业解析)

1688跨境是什么? 最近火出圈的一个新词 今天老余带大家走近1688跨境 首先为什么会出现1688跨境&#xff1f; 因为&#xff1a; 新型服务商崛起&#xff0c;反向海淘成趋势 在过去三年&#xff0c;1688涌现了一批新型的服务商: 他们帮助海外B类买家到1688采购&#xff…

SpringBoot+Vue3实现数据可视化大屏

前端工程的地址:UserManagerFront: 数据可视化前端 (gitee.com) 效果展示&#xff0c;可以展现出来了&#xff0c;样式可能还有一些丑。 后端代码 后端主要是拿到数据并对数据进行处理&#xff0c;按照前端需要的格式进行返回即可。 import com.njitzx.entity.Student; impor…

<项目代码>YOLOv8 手机识别<目标检测>

YOLOv8是一种单阶段&#xff08;one-stage&#xff09;检测算法&#xff0c;它将目标检测问题转化为一个回归问题&#xff0c;能够在一次前向传播过程中同时完成目标的分类和定位任务。相较于两阶段检测算法&#xff08;如Faster R-CNN&#xff09;&#xff0c;YOLOv8具有更高的…

算法定制LiteAIServer摄像机实时接入分析平台烟火识别检测算法

在公共安全领域&#xff0c;火灾的及时发现与处理是保障人民群众生命财产安全的重要手段。传统的烟火检测手段受限于人工巡查的局限&#xff0c;难以做到全天候、无遗漏的监控。然而&#xff0c;随着人工智能技术的飞速发展&#xff0c;LiteAIServer摄像机实时接入分析平台烟火…

JMeter与大模型融合应用之JMeter日志分析服务化实战应用

JMeter与大模型融合应用之JMeter日志分析服务化 引言 在当今的互联网时代,网站和应用程序的性能直接影响到用户的体验和业务的成功。为了保证系统的稳定性和高效性,性能测试成为了软件开发过程中的一个重要环节。在这其中,Apache JMeter作为一款开源的性能测试工具,凭借其…

【Pikachu】任意文件上传实战

将过去和羁绊全部丢弃&#xff0c;不要吝惜那为了梦想流下的泪水。 1.不安全的文件上传漏洞概述 不安全的文件上传漏洞概述 文件上传功能在web应用系统很常见&#xff0c;比如很多网站注册的时候需要上传头像、上传附件等等。当用户点击上传按钮后&#xff0c;后台会对上传的…

STM32 ADC --- 单通道采样

STM32 ADC — 单通道采样 文章目录 STM32 ADC --- 单通道采样cubeMX配置代码修改&#xff1a;应用 使用cubeMX生成HAL工程 需求&#xff1a;有多个通道需要进行ADC采样&#xff0c;实现每次采样只采样一个通道&#xff0c;且可以随时采样不同通道的功能。 cubeMX配置 这里我们…

influxDB 时序数据库安装 flux语法 restful接口 nodjsAPI

安装 Install InfluxDB | InfluxDB OSS v2 Documentation Debian和Ubuntu用户可以用apt-get包管理来安装最新版本的InfluxDB。 对于Ubuntu用户&#xff0c;可以用下面的命令添加InfluxDB的仓库&#xff0c;添加之后即可apt-get 安装influxdb2 wget -q https://repos.influx…

【轻量化】YOLOv10 更换骨干网络之 MobileNetv4 | 模块化加法!非 timm 包!

之前咱们在这个文章中讲了timm包的加法,不少同学反馈要模块化的加法,那么这篇就讲解下模块化的加法,值得注意的是,这样改加载不了mobilebnetv4官方开源的权重了~ 论文地址:https://arxiv.org/pdf/2404.10518 代码地址:https://github.com/tensorflow/models/blob/master…