SparkSQL数据源与数据存储

文章目录

  • 1. 大数据分析流程
  • 2. Spark SQL数据源
    • 2.1 SparkSQL常见数据源
    • 2.2 SparkSQL支持的文本格式
    • 2.3 加载外部数据源步骤
  • 3. 本地文件系统加载数据
    • 3.1 本地文件系统加载JSON格式数据
      • 3.1.1 概述
      • 3.1.2 案例演示
    • 3.2 本地文件系统加载CSV格式数据
      • 3.2.1 概述
      • 3.2.2 案例演示
    • 3.3 本地文件系统加载TEXT格式数据
      • 3.3.1 概述
      • 3.3.2 案例演示
    • 3.4 本地文件系统加载Parquet格式数据
      • 3.4.1 概述
      • 3.4.2 案例演示
    • 3.5 通用加载文件方式加载各种格式数据
      • 3.5.1 概述
      • 3.5.2 案例演示
  • 4. 大数据存储概述
    • 4.1 数据存储的重要性
    • 4.2 常见的数据持久化外部系统
    • 4.3 大数据计算框架的基石
  • 5. 数据存储核心API使用
    • 5.1 持久化数据到外部文件系统步骤
    • 5.2 将数据帧保存到本地文件
    • 5.3 将数据帧保存到HDFS文件
  • 6. 数据源与数据存储小结

1. 大数据分析流程

  • 在互联网产业中大数据生态体系的主要作用就是存储、处理海量数据为企业创造价值、推动社会进步,数据分析流程存在三个主要流程:
    • 计算系统可以加载外部数据源
    • 资源系统可以为计算系统分配运行资源
    • 计算系统数据分析最终结果可以持久化到外部系统
      在这里插入图片描述
  • 通过图片可以得知存储系统才是大数据计算体系中的基石,学习一个计算框架应该先从如何使用当前计算框架加载外部数据源开始。

2. Spark SQL数据源

  • Spark SQL 是 Apache Spark 的模块之一,提供对结构化数据的查询能力。它支持多种数据源,包括 HDFS、S3、Hive、Parquet、JSON 等,允许用户通过 SQL 语句或 DataFrame API 访问和处理数据。Spark SQL 的优化器可以自动优化查询计划,提高执行效率。此外,它还支持外部数据源的集成,使得在不同存储系统间进行数据交换和分析变得简单快捷。

2.1 SparkSQL常见数据源

  • Hive 数据仓库
  • MySQL 关系型数据库
  • FileSystem 文件系统:本地文件系统、分布式文件系统
  • 由 RDD 生成 SparkSQL 数据源

2.2 SparkSQL支持的文本格式

数据格式描述
csvCSV(字段与字段之间的分隔符为逗号)
jsonJSON(是一种轻量级的数据交换格式,采用完全独立于编程语言的文本格式来存储和表示数据。
简洁和清晰的层次结构、易于人阅读和编写,同时也易于机器解析和生成)
textText(文本数据,字段与字段之间的分隔符没有限制)
parquetParquet(Parquet是一种面向列存储的文件格式,主要用于Hadoop生态系统。
对数据处理框架、数据模型和编程语言无关)

2.3 加载外部数据源步骤

  • 创建 SparkSession 实例对象
  • 通过 SparkSession 实例对象提供的方法加载外部数据

3. 本地文件系统加载数据

3.1 本地文件系统加载JSON格式数据

3.1.1 概述

  • JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,易于人阅读和编写,也易于机器解析和生成。在本地文件系统中加载JSON格式数据时,可以使用DataFrameReaderjson()方法或通过format("json")指定格式。

3.1.2 案例演示

  • 在项目根目录创建data目录
    在这里插入图片描述
  • data里创建users.json文件
{"name": "李小玲", "gender": "女", "age": 45}
{"name": "童安格", "gender": "男", "age": 26}
{"name": "陈燕文", "gender": "女", "age": 18}
{"name": "王晓明", "gender": "男", "age": 32}
{"name": "张丽华", "gender": "女", "age": 29}
{"name": "刘伟强", "gender": "男", "age": 40}
{"name": "赵静怡", "gender": "女", "age": 22}
{"name": "孙强东", "gender": "男", "age": 35}

在这里插入图片描述

  • net.huawei.sql包里创建LoadJSON对象
    在这里插入图片描述
package net.huawei.sql

import org.apache.spark.sql.SparkSession

/**
 * 功能:加载JSON数据
 * 作者:华卫
 * 日期:2025年01月17日
 */
object LoadJSON {
  def main(args: Array[String]): Unit = {
    // 获取或创建Spark会话对象
    val spark = SparkSession.builder() // 创建Builder对象
      .appName("LoadJSON") // 设置应用程序名称
      .master("local[*]") // 运行模式:本地运行
      .getOrCreate() // 获取或创建Spark会话对象

    // 使用json()方法加载本地JSON文件
    val df_json = spark.read.json("data/users.json")
    // 显示数据
    df_json.show()

    // 关闭会话对象
    spark.stop()
  }
}
  • 运行程序,查看结果
    在这里插入图片描述

3.2 本地文件系统加载CSV格式数据

3.2.1 概述

  • CSV(Comma-Separated Values)是一种常用的表格数据存储格式,数据以纯文本形式存储,字段间用逗号分隔。加载CSV格式数据时,可以使用DataFrameReadercsv()方法或通过format("csv")指定格式。

3.2.2 案例演示

  • data里创建users.csv文件
    在这里插入图片描述
name,gender,age
李小玲,女,45
童安格,男,26
陈燕文,女,18
王晓明,男,32
张丽华,女,29
刘伟强,男,40
赵静怡,女,22
孙强东,男,35
  • net.huawei.sql包里创建loadCSV对象
    在这里插入图片描述
package net.huawei.sql

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.csv.MultiLineCSVDataSource.inferSchema
import org.json4s.scalap.scalasig.ClassFileParser.header

/**
 * 功能:加载CSV数据
 * 作者:华卫
 * 日期:2025年01月17日
 */
object LoadCSV {
  def main(args: Array[String]): Unit = {
    // 获取或创建Spark会话对象
    val spark = SparkSession.builder() // 创建Builder对象
      .appName("LoadCSV") // 设置应用程序名称
      .master("local[*]") // 运行模式:本地运行
      .getOrCreate() // 获取或创建Spark会话对象

    // 使用csv()方法加载本地CSV文件
    val df_csv = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv("data/users.csv")

    // 显示数据
    df_csv.show()

    // 关闭会话对象
    spark.stop()
  }
}
  • 运行程序,查看结果
    在这里插入图片描述

3.3 本地文件系统加载TEXT格式数据

3.3.1 概述

  • TEXT格式数据通常指纯文本文件,每行数据作为一个字符串处理。加载TEXT格式数据时,可以使用DataFrameReadertext()方法或通过format("text")指定格式。

3.3.2 案例演示

  • data里创建users.txt文件
    在这里插入图片描述
李小玲 女 45
童安格 男 26
陈燕文 女 18
王晓明 男 32
张丽华 女 29
刘伟强 男 40
赵静怡 女 22
孙强东 男 35
  • net.huawei.sql包里创建LoadText对象
    在这里插入图片描述
package net.huawei.sql

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 功能:加载TEXT数据
 * 作者:华卫
 * 日期:2025年01月17日
 */
object LoadText {
  def main(args: Array[String]): Unit = {
    // 获取或创建Spark会话对象
    val spark = SparkSession.builder() // 创建Builder对象
      .appName("LoadTEXT") // 设置应用程序名称
      .master("local[*]") // 运行模式:本地运行
      .getOrCreate() // 获取或创建Spark会话对象

    // 使用text()方法加载本地TEXT文件
    val df_text = spark.read.text("data/users.txt")

    // 显示数据
    df_text.show()

    // 关闭会话对象
    spark.stop()
  }
}
  • 运行程序,查看结果
    在这里插入图片描述

3.4 本地文件系统加载Parquet格式数据

3.4.1 概述

  • Parquet 是一种列式存储格式,广泛用于大数据处理。相比行式存储(如 CSV),Parquet 具有高效压缩、高性能查询和广泛兼容性(支持 Spark、Hive 等)。在 Spark 中,可通过 parquet()format("parquet") 加载 Parquet 文件,适合大规模数据存储与处理。

3.4.2 案例演示

  • 将CSV格式数据转换成Parquet格式数据
  • net.huawei.sql包里创建CSVToParquet对象
    在这里插入图片描述
package net.huawei.sql

/**
 * 功能:将CSV转成Parquet
 * 作者:华卫
 * 日期:2025年01月17日
 */
import org.apache.spark.sql.SparkSession

object CSVToParquet {
  def main(args: Array[String]): Unit = {
    // 获取或创建Spark会话对象
    val spark = SparkSession.builder() // 创建Builder对象
      .appName("CSV To Parquet") // 设置应用程序名称
      .master("local[*]") // 运行模式:本地运行
      .getOrCreate() // 获取或创建Spark会话对象

    // 读取CSV文件
    val df = spark.read
      .option("header", true) // 第一行作为列名
      .option("inferSchema", true) // 自动推断数据类型
      .csv("data/users.csv") // CSV文件路径

    // 打印Schema和数据
    println("===模式===")
    df.printSchema()
    println("===数据===")
    df.show()

    // 将DataFrame保存为Parquet文件
    df.write.parquet("data/users.parquet")
    println("成功生成users.parquet文件~")

    // 关闭Spark会话对象
    spark.stop()
  }
}
  • 运行程序,查看结果
    在这里插入图片描述
  • 查看users.parquetParquet文件是二进制格式,无法直接查看,但可以通过Spark或其他工具读取。
    在这里插入图片描述
  • net.huawei.sql包里创建LoadParequet对象
    在这里插入图片描述
package net.huawei.sql

import org.apache.spark.sql.SparkSession

/**
 * 功能:加载Parquet数据
 * 作者:华卫
 * 日期:2025年01月17日
 */
object LoadParquet {
  def main(args: Array[String]): Unit = {
    // 获取或创建Spark会话对象
    val spark = SparkSession.builder() // 创建Builder对象
      .appName("LoadParquet") // 设置应用程序名称
      .master("local[*]") // 运行模式:本地运行
      .getOrCreate() // 获取或创建Spark会话对象

    // 使用parquet()方法加载本地Parquet文件
    val df_parquet = spark.read.parquet("data/users.parquet")

    // 显示数据
    df_parquet.show()
    // 显示数据结构
    df_parquet.printSchema()

    // 关闭会话对象
    spark.stop()
  }
}
  • 运行程序,查看结果
    在这里插入图片描述

3.5 通用加载文件方式加载各种格式数据

3.5.1 概述

  • 通过DataFrameReaderformat()load()方法,可以灵活地加载不同格式的数据文件。这种方式不仅适用于JSON格式,还可以用于CSV、TEXT、Parquet等其他格式。

3.5.2 案例演示

  • net.huawei.sql包里创建LoadData对象
    在这里插入图片描述
package net.huawei.sql

import org.apache.spark.sql.SparkSession

/**
 * 功能:加载各种格式数据
 * 作者:华卫
 * 日期:2025年01月17日
 */
object LoadData {
  def main(args: Array[String]): Unit = {
    // 获取或创建Spark会话对象
    val spark = SparkSession.builder() // 创建Builder对象
      .appName("LoadData") // 设置应用程序名称
      .master("local[*]") // 运行模式:本地运行
      .getOrCreate() // 获取或创建Spark会话对象

    //使用format()和load()方法加载本地JSON文件
    val df_json = spark.read.format("json")
      .load("data/users.json")
    //使用format()和load()方法加载本地CSV文件
    val df_csv = spark.read.format("csv")
      .option("header", true)
      .option("inferSchema",true)
      .load("data/users.csv")
    //使用format()和load()方法加载本地TEXT文件
    val df_text = spark.read.format("text")
      .load("data/users.txt")
    //使用format()和load()方法加载本地Parquet文件
    val df_parquet = spark.read.format("parquet")
      .load("data/users.parquet")

    // 显示数据
    println("===显示加载的JSON数据===")
    df_json.show()
    println("===显示加载的CSV数据===")
    df_csv.show()
    println("===显示加载的TEXT数据===")
    df_text.show()
    println("===显示加载的Parquet数据===")
    df_parquet.show()

    // 关闭会话对象
    spark.stop()
  }
}
  • 运行程序,查看结果
    在这里插入图片描述

4. 大数据存储概述

4.1 数据存储的重要性

  • 在大数据生态系统中,存储系统是核心组成部分。无论是数据采集、数据处理,还是数据分析,都离不开高效、可靠的存储系统。存储系统不仅需要保存原始数据,还需要存储经过分析后的有价值的结果,以供各部门使用。

4.2 常见的数据持久化外部系统

  • 文件系统:包括本地文件系统和分布式文件系统(如HDFS)。文件系统适合存储大规模的非结构化或半结构化数据。
  • 关系型数据库:适用于结构化数据的存储和高效查询,常用于事务处理和复杂查询。
  • Hive数据仓库:基于Hadoop的数据仓库工具,适合大规模数据的批处理和分析。
  • 其他存储系统:如果以上系统不能满足业务需求,我们可以将DataFrame或DataSet转换为RDD,利用RDD支持的多种外部存储系统。

4.3 大数据计算框架的基石

  • 存储系统是大数据计算框架的基石。一个计算框架首先需要从存储系统中加载数据,形成可处理的数据模型(如DataFrameDataSetRDD)。基于这些数据模型,我们可以进行各种数据分析操作。最终,分析结果需要持久化到外部存储系统,以便各部门使用。

5. 数据存储核心API使用

5.1 持久化数据到外部文件系统步骤

  • 创建SparkSession实例对象
  • 通过SparkSession实例对象提供的方法加载外部数据
  • 数据分析
  • 对数据分析结果进行持久化

5.2 将数据帧保存到本地文件

  • net.huawei.sql包里创建SaveData对象
    在这里插入图片描述
package net.huawei.sql

import org.apache.spark.sql.SparkSession

/**
 * 功能:保存数据到本地文件
 * 作者:华卫
 * 日期:2025年01月17日
 */
// 声明用户样例类
case class User(name: String, gender: String, age: Long)

object SaveData {
  def main(args: Array[String]): Unit = {
    // 获取或创建Spark会话对象
    val spark = SparkSession.builder() // 创建Builder对象
      .appName("SaveData") // 设置应用程序名称
      .master("local[*]") // 运行模式:本地运行
      .getOrCreate() // 获取或创建Spark会话对象

    // 导入隐式转换
    import spark.implicits._
    // 基于序列创建数据帧
    val userDF = Seq(
      User("陈燕文", "女", 20),
      User("张小文", "男", 27),
      User("王丽霞", "女", 18)
    ).toDF()
    // 显示数据
    userDF.show()
    // 保存数据到本地文件
    userDF.write.mode("overwrite").save("log/users.parquet")
    println("users.parquet保存成功~")
    userDF.write.mode("overwrite").csv("log/users.csv")
    println("users.csv保存成功~")
    userDF.write.mode("overwrite").json("log/users.json")
    println("users.json保存成功~")

    // 关闭会话对象
    spark.stop()
  }
}
  • 运行程序,查看结果
    在这里插入图片描述
  • 查看保存在本地的各种格式的数据文件
    在这里插入图片描述

5.3 将数据帧保存到HDFS文件

  • net.huawei.sql包里创建SaveDataToHDFS对象
    在这里插入图片描述
package net.huawei.sql

import org.apache.spark.sql.SparkSession

/**
 * 功能:保存数据到HDFS
 * 作者:华卫
 * 日期:2025年01月17日
 */
object SaveDataToHDFS {
  def main(args: Array[String]): Unit = {
    // 获取或创建Spark会话对象
    val spark = SparkSession.builder() // 创建Builder对象
      .appName("SaveDataToHDFS") // 设置应用程序名称
      .master("local[*]") // 运行模式:本地运行
      .config("dfs.client.use.datanode.hostname", "true") // 设置HDFS节点名称
      .getOrCreate() // 获取或创建Spark会话对象

    // 导入隐式转换
    import spark.implicits._
    // 基于序列创建数据帧
    val userDF = Seq(
      User("陈燕文", "女", 20),
      User("张小文", "男", 27),
      User("王丽霞", "女", 18)
    ).toDF()
    // 显示数据
    userDF.show()
    // 保存数据到HDFS文件
    userDF.write.mode("overwrite").json("hdfs://bigdata1:9000/log/users.json")
    println("hdfs://bigdata1:9000/log/users.json保存成功~")

    // 关闭会话对象
    spark.stop()
  }
}
  • 运行程序,查看结果
    在这里插入图片描述

  • 执行命令:hdfs dfs -ls /log/users.json
    在这里插入图片描述

  • 执行命令:hdfs dfs -cat /log/users.json/*
    在这里插入图片描述

6. 数据源与数据存储小结

  • 在大数据生态系统中,数据源和数据存储是核心组成部分。Spark SQL 支持多种数据源,包括 HDFS、S3、Hive、Parquet、JSON 等,能够灵活加载和处理结构化数据。通过 DataFrameReaderDataFrameWriter,用户可以轻松地从本地文件系统或分布式文件系统加载数据,并将分析结果持久化到外部存储系统。常见的存储格式如 CSV、JSON、Parquet 各有优势:CSV 适合人类阅读,JSON 灵活易用,而 Parquet 则以高效的列式存储和压缩性能著称。数据存储不仅是数据处理的起点,也是分析结果的归宿,选择合适的存储格式和系统对提升数据处理效率至关重要。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/956411.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LLM - 大模型 ScallingLaws 的 CLM 和 MLM 中不同系数(PLM) 教程(2)

欢迎关注我的CSDN:https://spike.blog.csdn.net/ 本文地址:https://spike.blog.csdn.net/article/details/145188660 免责声明:本文来源于个人知识与公开资料,仅用于学术交流,欢迎讨论,不支持转载。 Scalin…

AI agent 在 6G 网络应用,无人机群控场景

AI agent 在 6G 网络应用,无人机群控场景 随着 6G 时代的临近,融合人工智能成为关键趋势。借鉴 IT 行业 AI Agent 应用范式,提出 6G AI Agent 技术框架,包含多模型融合、定制化 Agent 和插件式环境交互理念,构建了涵盖四层结构的框架。通过各层协同实现自主环境感知等能力…

信息奥赛一本通 1168:大整数加法

这道题是一道大整数加法,涉及到高精度的算法,比如说有两个数要进行相加,1111111111111111111111111111111111111112222222222222222222222222222222,那么如果这两个数很大的话我们常用的数据类型是不能进行计算的,那么…

基于YOLOv4与Tkinter的口罩识别系统

往期精彩 基于YOLOv11的番茄成熟度实时检测系统设计与实现 用YOLOv11检测美国手语:挥动手腕的科技魔法 基于YOLOv11模型PyQt的实时鸡行为检测系统研究 OpenCV与YOLO在人脸识别中的应用研究(论文源码) 计算机视觉:农作物病虫害检测系统:基于Y…

机器学习:监督学习与非监督学习

监督学习是利用带有标签的数据进行训练,模型通过学习输入和输出之间的关系来进行预测。也就是说,数据集中既有输入特征,也有对应的输出标签,模型的目标是找到从输入到输出的映射关系。 而无监督学习则使用没有标签的数据进行训练,模型的任务是发现数据中的内在结构或模式…

【unity进阶篇】不同Unity版本对应的C# 版本和API 兼容级别(Api Compatibility Level)选择

考虑到每个人基础可能不一样,且并不是所有人都有同时做2D、3D开发的需求,所以我把 【零基础入门unity游戏开发】 分为成了C#篇、unity通用篇、unity3D篇、unity2D篇。 【C#篇】:主要讲解C#的基础语法,包括变量、数据类型、运算符、…

H3CNE-13-静态路由(二)

1.路由优先级 路由类型DirectOSPFStaticRIP管理距离01060100 2.路由度量 配置示例: 配置接口IP、静态路由(去包、回包) 3.静态路由之路由备份 RTB: ip route-static 192.168.1.0 24 10.0.12.1 ip route-ststic 192.168.1.0 24 20.0.12.1 …

【数据分享】1929-2024年全球站点的逐年平均气温数据(Shp\Excel\无需转发)

气象数据是在各项研究中都经常使用的数据,气象指标包括气温、风速、降水、湿度等指标,其中又以气温指标最为常用!说到气温数据,最详细的气温数据是具体到气象监测站点的气温数据!本次我们为大家带来的就是具体到气象监…

[Qualcomm]Qualcomm MDM9607 SDK代码下载操作说明

登录Qualcomm CreatePoing Qualcomm CreatePointhttps://createpoint.qti.qua

PID控制算法原理,并用python实现演示

PID算法控制运用在哪些地方? PID:比列(Proportion),积分(Integral),微分(Differential) PID算法可以用来控制温度,压强,流量,化学成分,速度等等。汽车的定速巡航;伺服驱…

C语言之文本加密程序设计

🌟 嗨,我是LucianaiB! 🌍 总有人间一两风,填我十万八千梦。 🚀 路漫漫其修远兮,吾将上下而求索。 文本加密程序设计 摘要:本文设计了一种文本加密程序,旨在提高信息安…

数字图像处理:实验二

任务一: 将不同像素(32、64和256)的原图像放大为像素大 小为1024*1024的图像(图像自选) 要求:1)输出一幅图,该图包含六幅子图,第一排是原图,第 二排是对应放大…

latin1_swedish_ci(latin1 不支持存储中文、日文、韩文等多字节字符)

文章目录 1、SHOW TABLE STATUS WHERE Name batch_version;2、latin1_swedish_ci使用场景注意事项修改字符集和排序规则修改表的字符集和排序规则修改列的字符集和排序规则修改数据库的默认字符集和排序规则 3、ALTER TABLE batch_version CONVERT TO CHARACTER SET utf8mb4 C…

基于微信小程序的安心陪诊管理系统

作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏:…

如何将自己本地项目开源到github上?

环境: LLMB项目 问题描述: 如何将自己本地项目开源到github上? 解决方案: 步骤 1: 准备本地项目 确保项目整洁 确认所有的文件都在合适的位置,并且项目的 README.md 文件已经完善。检查是否有敏感信息&#xff0…

CSS笔记01

黑马程序员视频地址: 前端Web开发HTML5CSS3移动web视频教程https://www.bilibili.com/video/BV1kM4y127Li?vd_source0a2d366696f87e241adc64419bf12cab&spm_id_from333.788.videopod.episodes 目录 引入方式 CSS特性 继承性 层叠性 优先级 Emmet写法 …

【机器学习】制造业转型:机器学习如何推动工业 4.0 的深度发展

我的个人主页 我的领域:人工智能篇,希望能帮助到大家!!!👍点赞 收藏❤ 引言 在当今科技飞速发展的时代,制造业正经历着前所未有的变革,工业4.0的浪潮席卷而来。工业4.0旨在通过将…

【游戏设计原理】72 - 学习曲线

学习曲线的观点本质上强调了玩家在游戏中逐渐掌握新技能的过程,旨在通过设计合适的难度和反馈机制,确保玩家在学习的过程中感受到挑战,同时又不会感到过于困难或无聊。 1. 学习曲线的定义和重要性 学习曲线反映了玩家在完成某个任务时&…

【Linux】进程优先级与进程切换

🔥个人主页🔥:孤寂大仙V 🌈收录专栏🌈:Linux 🌹往期回顾🌹:【Linux】进程状态 🔖流水不争,争的是滔滔不 一、进程优先级是什么二、查看系统进程三…

windows 极速安装 Linux (Ubuntu)-- 无需虚拟机

1. 安装 WSL 和 Ubuntu 打开命令行,执行 WSL --install -d ubuntu若报错,则先执行 WSL --update2. 重启电脑 因安装了子系统,需重启电脑才生效 3. 配置 Ubuntu 的账号密码 打开 Ubuntu 的命令行 按提示,输入账号,密…