SparkSQL数据源与数据存储综合实践

文章目录

  • 1. 打开项目
  • 2. 查看数据集
    • 2.1 查看JSON格式数据
    • 2.2 查看CSV格式数据
    • 2.3 查看TXT格式数据
  • 3. 添加单元测试依赖
  • 4. 创建数据加载与保存对象
    • 4.1 创建Spark会话对象
    • 4.2 创建加载JSON数据方法
    • 4.3 创建加载CSV数据方法
    • 4.4 创建加载Text数据方法
    • 4.5 创建加载JSON数据扩展方法
    • 4.6 创建加载CSV数据扩展方法
    • 4.7 创建加载Text数据扩展方法
    • 4.8 创建保存文本文件方法
    • 4.9 查看程序完整代码
  • 5. 实战小结

1. 打开项目

  • 打开SparkSQLDataSource项目
    在这里插入图片描述

2. 查看数据集

2.1 查看JSON格式数据

  • 查看users.json文件
    在这里插入图片描述
{"name": "李小玲", "gender": "女", "age": 45}
{"name": "童安格", "gender": "男", "age": 26}
{"name": "陈燕文", "gender": "女", "age": 18}
{"name": "王晓明", "gender": "男", "age": 32}
{"name": "张丽华", "gender": "女", "age": 29}
{"name": "刘伟强", "gender": "男", "age": 40}
{"name": "赵静怡", "gender": "女", "age": 22}
{"name": "孙强东", "gender": "男", "age": 35}

2.2 查看CSV格式数据

  • 查看users.csv文件
    在这里插入图片描述
name,gender,age
李小玲,,45
童安格,,26
陈燕文,,18
王晓明,,32
张丽华,,29
刘伟强,,40
赵静怡,,22
孙强东,,35

2.3 查看TXT格式数据

  • 查看users.txt文件
    在这里插入图片描述
李小玲 女 45
童安格 男 26
陈燕文 女 18
王晓明 男 32
张丽华 女 29
刘伟强 男 40
赵静怡 女 22
孙强东 男 35

3. 添加单元测试依赖

  • pom.xml里添加单元测试框架依赖
    在这里插入图片描述
<dependency>                                    
    <groupId>junit</groupId>                    
    <artifactId>junit</artifactId>              
    <version>4.13.2</version>                   
</dependency>                                   
  • 刷新项目依赖
    在这里插入图片描述

4. 创建数据加载与保存对象

  • 创建net.huawei.practice
    在这里插入图片描述
  • practice子包里创建DataLoadAndSave对象
    在这里插入图片描述
  • 创建DataLoadAndSave伴生类
    在这里插入图片描述

4.1 创建Spark会话对象

  • 创建spark常量
    在这里插入图片描述
// 获取或创建Spark会话对象                                  
val spark = SparkSession.builder() // 创建Builder对象  
  .appName("DataLoadAndSave") // 设置应用程序名称          
  .master("local[*]") // 运行模式:本地运行                 
  .getOrCreate() // 获取或创建Spark会话对象                 

4.2 创建加载JSON数据方法

  • 创建loadJSONData()方法
    在这里插入图片描述
// 加载JSON数据方法                                       
def loadJSONData(filePath: String): DataFrame = {   
  spark.read.json(filePath)                         
}                                                   
  • 在伴生类里创建单元测试方法testLoadJSONData()方法
    在这里插入图片描述
@Test                                                      
def testLoadJSONData(): Unit = {                           
  // 加载JSON数据                                              
  val df = DataLoadAndSave.loadJSONData("data/users.json") 
  // 显示数据                                                  
  df.show()                                                
}                                                          
  • 运行testLoadJSONData()测试方法,查看结果
    在这里插入图片描述

4.3 创建加载CSV数据方法

  • 创建loadCSVData()方法
    在这里插入图片描述
// 加载CSV数据方法                                           
def loadCSVData(filePath: String): DataFrame = {       
  spark.read                                           
    .option("header", "true")                          
    .option("inferSchema", "true")                     
    .csv(filePath)                                     
}                                                      
  • 在伴生类里创建单元测试方法testLoadCSVData()方法
    在这里插入图片描述
@Test                                                       
def testLoadCSVData(): Unit = {                             
  // 加载CSV数据                                                
  val df = DataLoadAndSave.loadCSVData("data/users.csv")    
  // 显示数据                                                   
  df.show()                                                 
}                                                           
  • 运行testLoadCSVData()测试方法,查看结果
    在这里插入图片描述

4.4 创建加载Text数据方法

  • 创建loadTextData()方法
    在这里插入图片描述
// 加载TEXT数据方法                                       
def loadTextData(filePath: String): DataFrame = {   
  spark.read.text(filePath)                         
}                                                   
  • 在伴生类里创建单元测试方法testLoadTextData()方法
    在这里插入图片描述
  • 运行testLoadTextData()测试方法,查看结果
    在这里插入图片描述

4.5 创建加载JSON数据扩展方法

  • 创建loadJSONDataExpand()方法
    在这里插入图片描述
// 加载JSON数据扩展方法                                         
def loadJSONDataExpand(filePath: String): DataFrame = { 
  spark.read.format("json").load(filePath)              
}                                                       
  • 在伴生类里创建单元测试方法testLoadJSONDataExpand()方法
    在这里插入图片描述
  • 运行testLoadJSONDataExpand()测试方法,查看结果
    在这里插入图片描述

4.6 创建加载CSV数据扩展方法

  • 创建loadCSVDataExpand()方法
    在这里插入图片描述
// 加载CSV数据扩展方法                                            
def loadCSVDataExpand(filePath: String): DataFrame = {    
  spark.read.format("csv")                                
    .option("header", "true")                             
    .option("inferSchema", "true")                        
    .load(filePath)                                       
}                                                         
  • 在伴生类里创建单元测试方法testLoadCSVDataExpand()方法
    在这里插入图片描述
  • 运行testLoadCSVDataExpand()测试方法,查看结果
    在这里插入图片描述

4.7 创建加载Text数据扩展方法

  • 创建loadTextDataExpand()方法
    在这里插入图片描述
//  加载TEXT数据扩展方法                                          
def loadTextDataExpand(filePath: String): DataFrame = {   
  spark.read.format("text").load(filePath)                
}                                                         
  • 在伴生类里创建单元测试方法testLoadTextDataExpand()方法
    在这里插入图片描述
  • 运行testLoadTextDataExpand()测试方法,查看结果
    在这里插入图片描述

4.8 创建保存文本文件方法

  • 创建saveTextFile()方法
    在这里插入图片描述
// 保存数据到文本文件方法                                                   
def saveTextFile(inputPath: String, outputPath: String): Unit = {
  // 加载文本数据                                                      
  val df = spark.read.format("text").load(inputPath)             
  // 保存文本数据                                                      
  df.write.mode("overwrite").format("text").save(outputPath)     
}                                                                
  • 在伴生类里创建单元测试方法testSaveTextFile()方法
    在这里插入图片描述
  • 运行testSaveTextFile()测试方法,查看结果
    在这里插入图片描述

4.9 查看程序完整代码

package net.huawei.practice

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.junit.Test

/**
 * 功能:数据加载与保存
 * 作者:华卫
 * 日期:2025年01月18日
 */
object DataLoadAndSave {
  // 获取或创建Spark会话对象
  val spark = SparkSession.builder() // 创建Builder对象
    .appName("DataLoadAndSave") // 设置应用程序名称
    .master("local[*]") // 运行模式:本地运行
    .getOrCreate() // 获取或创建Spark会话对象

  // 加载JSON数据方法
  def loadJSONData(filePath: String): DataFrame = {
    spark.read.json(filePath)
  }

  // 加载CSV数据方法
  def loadCSVData(filePath: String): DataFrame = {
    spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv(filePath)
  }

  // 加载TEXT数据方法
  def loadTextData(filePath: String): DataFrame = {
    spark.read.text(filePath)
  }

  // 加载JSON数据扩展方法
  def loadJSONDataExpand(filePath: String): DataFrame = {
    spark.read.format("json").load(filePath)
  }

  // 加载CSV数据扩展方法
  def loadCSVDataExpand(filePath: String): DataFrame = {
    spark.read.format("csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(filePath)
  }

  //  加载TEXT数据扩展方法
  def loadTextDataExpand(filePath: String): DataFrame = {
    spark.read.format("text").load(filePath)
  }

  // 保存数据到文本文件方法
  def saveTextFile(inputPath: String, outputPath: String): Unit = {
    // 加载文本数据
    val df = spark.read.format("text").load(inputPath)
    // 保存文本数据
    df.write.mode("overwrite").format("text").save(outputPath)
  }
}

// 伴生类
class DataLoadAndSave {
  @Test
  def testLoadJSONData(): Unit = {
    // 加载JSON数据
    val df = DataLoadAndSave.loadJSONData("data/users.json")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadCSVData(): Unit = {
    // 加载CSV数据
    val df = DataLoadAndSave.loadCSVData("data/users.csv")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadTextData(): Unit = {
    // 加载TEXT数据
    val df = DataLoadAndSave.loadTextData("data/users.txt")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadJSONDataExpand(): Unit = {
    // 加载JSON数据
    val df = DataLoadAndSave.loadJSONDataExpand("data/users.json")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadCSVDataExpand(): Unit = {
    // 加载CSV数据
    val df = DataLoadAndSave.loadCSVDataExpand("data/users.csv")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadTextDataExpand(): Unit = {
    // 加载TEXT数据
    val df = DataLoadAndSave.loadTextDataExpand("data/users.txt")
    // 显示数据
    df.show()
  }

  @Test
  def testSaveTextFile(): Unit = {
    // 保存数据到文本文件
    DataLoadAndSave.saveTextFile("data/users.txt", "result/users")
  }
}

5. 实战小结

  • 在本次实战中,我们通过SparkSQLDataSource项目深入学习了如何使用Spark SQL加载和保存不同格式的数据。首先,我们查看了JSON、CSV和TXT格式的数据集,并通过DataLoadAndSave对象实现了数据的加载与保存功能。我们创建了多个方法,如loadJSONData()loadCSVData()loadTextData(),分别用于加载不同格式的数据,并通过单元测试验证了这些方法的正确性。此外,我们还扩展了数据加载方法,使用format()方法灵活加载数据,并实现了数据保存功能,如saveTextFile()方法,将数据保存为文本文件。通过本次实战,我们掌握了Spark SQL处理多种数据格式的基本操作,为后续的数据处理和分析打下了坚实基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/956832.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙Harmony json转对象(1)

案例1 运行代码如下 上图的运行结果如下: 附加1 Json_msg interface 案例2 import {JSON } from kit.ArkTS; export interface commonRes {status: numberreturnJSON: ESObject;time: string } export interface returnRes {uid: stringuserType: number; }Entry Component …

Maven私服-Nexus3安装与使用

写在前面 安装简单&#xff0c;此博客主要是为了记录下怎么使用&#xff0c;以及一些概念性的东西 安装配置 下载 下载对应版本&#xff08;科学上网&#xff09; https://help.sonatype.com/en/download-archives—repository-manager-3.html 设置端口 /etc/nexus-defaul…

MindAgent:基于大型语言模型的多智能体协作基础设施

2023-09-18 &#xff0c;加州大学洛杉矶分校&#xff08;UCLA&#xff09;、微软研究院、斯坦福大学等机构共同创建的新型基础设施&#xff0c;目的在评估大型语言模型在游戏互动中的规划和协调能力。MindAgent通过CuisineWorld这一新的游戏场景和相关基准&#xff0c;调度多智…

【k8s面试题2025】2、练气初期

在练气初期&#xff0c;灵气还比较稀薄&#xff0c;只能勉强在体内运转几个周天。 文章目录 简述k8s静态pod为 Kubernetes 集群移除新节点&#xff1a;为 K8s 集群添加新节点Kubernetes 中 Pod 的调度流程 简述k8s静态pod 定义 静态Pod是一种特殊类型的Pod&#xff0c;它是由ku…

K8S-Pod资源清单的编写,资源的增删改查,镜像的下载策略

1. Pod资源清单的编写 1.1 Pod运行单个容器的资源清单 ##创建工作目录 mkdir -p /root/manifests/pods && cd /root/manifests/pods vim 01-nginx.yaml ##指定api版本 apiVersion: v1 ##指定资源类型 kind: Pod ##指定元数据 metadata:##指定名称name: myweb ##用户…

编辑器Vim基本模式和指令 --【Linux基础开发工具】

文章目录 一、编辑器Vim 键盘布局二、Linux编辑器-vim使用三、vim的基本概念正常/普通/命令模式(Normal mode)插入模式(Insert mode)末行模式(last line mode) 四、vim的基本操作五、vim正常模式命令集插入模式从插入模式切换为命令模式移动光标删除文字复制替换撤销上一次操作…

深度学习 DAY1:RNN 神经网络及其变体网络(LSTM、GRU)

实验介绍 RNN 网络是一种基础的多层反馈神经网络&#xff0c;该神经网络的节点定向连接成环&#xff0c;其内部状态可以展示动态时序行为。相比于前馈神经网络&#xff0c;该网络内部具有很强的记忆性&#xff0c;它可以利用它内部的记忆来处理任意时序的输入序列&#xff0c;…

svn tag

一般发布版本前&#xff0c;需要在svn上打个tag。步骤如下&#xff1a; 1、空白处右击&#xff0c;选择TortoiseSVN->Branch/tag; 2、填写To path&#xff0c;即tag的路基以及tag命名&#xff08;一般用版本号来命名&#xff09;&#xff1b;填写tag信息&#xff1b;勾选cr…

Astropay之坑

大家可能知道 Astropay 原来在日本也有业务&#xff0c;后来突然有一天业务关掉了&#xff0c;那里面的用户的钱当然也就取不出来了嘛。 我合计那就那么放着呗&#xff0c;等以后你们重返日本的时候我再去取嘛。 嗨&#xff0c;最近收到几个邮件&#xff0c;可把我气笑了。 简…

(7)(7.2) 围栏

文章目录 前言 1 通用设置 2 围栏类型 3 破坏栅栏行动 4 使用 RC 通道辅助开关启用栅栏 5 自动高度规避 6 在任务规划器中启用围栏 7 用于遥控飞行训练 8 MAVLink 支持 前言 ArduPilot 支持基于本机的圆柱形&#xff08;“TinCan”&#xff09;和多边形和/或圆柱形、…

ARP 表、MAC 表、路由表、跨网段 ARP

文章目录 一、ARP 表1、PC2、路由器 - AR22203、交换机 - S57004、什么样的设备会有 ARP 表&#xff1f; 二、MAC 表什么样的设备会有 MAC 表&#xff1f; 三、路由表什么样的设备会有路由表&#xff1f; 四、抓取跨网段 ARP 包 所谓 “透明” 就是指不用做任何配置 一、ARP 表…

信号与系统学习(二)

1.3信号的分类&#xff1a;能量与功率信号&#xff0c;因果与反因果 1.能量信号和功率信号 将信号f(t)施加与1Ω电阻上&#xff0c;它所消耗的瞬时功率为|f(t)|&#xff0c;在区间&#xff08;-∞&#xff0c;∞&#xff09;的能量和平均功率定义为 能量有限信号&#xff1a;…

k8s的CICD实施项目

环境需求&#xff1a; 目前领导需要做一个需求&#xff0c;临时把我从运维岗位&#xff0c;把我调度到到专家组让我主导cicd的项目实施 目前环境资源 k8s环境&#xff0c;28台服务器&#xff0c;上面是k8s集群&#xff0c;要实施一个测试环境的cicd以及一个生产环境的cicd gitl…

python轻量级框架-flask

简述 Flask 是 Python 生态圈中一个基于 Python 的Web 框架。其轻量、模块化和易于扩展的特点导致其被广泛使用&#xff0c;适合快速开发 Web 应用以及构建小型到中型项目。它提供了开发 Web 应用最基础的工具和组件。之所以称为微框架&#xff0c;是因为它与一些大型 Web 框架…

uniapp——App 监听下载文件状态,打开文件(三)

5 实现下载文件并打开 这里演示&#xff0c;导出Excel 表格 文章目录 5 实现下载文件并打开DEMO监听下载进度效果图为什么 totalSize 一直为0&#xff1f; 相关Api&#xff1a; downloader DEMO 提示&#xff1a; 请求方式支持&#xff1a;GET、POST&#xff1b;POST 方式需要…

Java设计模式—观察者模式

观察者模式 目录 观察者模式1、什么是观察者模式&#xff1f;2、观察者模式优缺点及注意事项&#xff1f;3、观察者模式实现&#xff1f;4、手写线程安全的观察者模式&#xff1f; 1、什么是观察者模式&#xff1f; - 实例&#xff1a;现实生活中很多事物都是依赖存在的&#x…

大象机器人发布首款穿戴式数据采集器myController S570,助力具身智能数据收集!

myController S570 具有较高的数据采集速度和远程控制能力&#xff0c;大大简化了人形机器人的编程。 myController S570 是一款可移动的轻量级外骨骼&#xff0c;具有 14 个关节、2 个操纵杆和 2 个按钮&#xff0c;它提供高数据采集速度&#xff0c;出色的兼容性&#xff0c…

模型部署工具01:Docker || 用Docker打包模型 Build Once Run Anywhere

Docker 是一个开源的容器化平台&#xff0c;可以让开发者和运维人员轻松构建、发布和运行应用程序。Docker 的核心概念是通过容器技术隔离应用及其依赖项&#xff0c;使得软件在不同的环境中运行时具有一致性。无论是开发环境、测试环境&#xff0c;还是生产环境&#xff0c;Do…

二、点灯基础实验

嵌入式基础实验第一个就是点灯&#xff0c;地位相当于编程界的hello world。 如下为LED原理图&#xff0c;要让相应LED发光&#xff0c;需要给I/O口设置输出引脚&#xff0c;低电平&#xff0c;二极管才会导通 2.1 打开初始工程&#xff0c;编写代码 以下会实现BLINKY常亮&…

推荐一个开源的轻量级任务调度器!TaskScheduler!

大家好&#xff0c;我是麦鸽。 这次推荐一款轻量级的嵌入式任务调度器&#xff0c;目前已经有1.4K的star&#xff0c;这个项目比较轻量化&#xff0c;只有5个源文件&#xff0c;可以作为学习的一个开源项目。 核心文件 项目概述&#xff1a; 这是一个轻量级的协作式多任务处理&…