Spark SQL 概述

Spark SQL 概述

Spark SQL 是 Apache Spark 的一个模块,专门用于处理结构化数据。它集成了 SQL 查询和 Spark 编程的强大功能,使得处理大数据变得更加高效和简便。通过 Spark SQL,用户可以直接在 Spark 中使用 SQL 查询,或者使用 DataFrame 和 DataSet API 进行数据操作。

  • 一、Spark SQL 架构
  • 二、Spark SQL 特点
  • 三、Spark SQL 运行原理
  • 四、Spark SQL API 相关概述
  • 五、Spark SQL 依赖
  • 六、Spark SQL 数据集
    • 1、DataFrame
    • 2、Dataset
    • 3、DataFrame 和 Dataset 的关系
  • 七、Spark Sql 基本用法
    • 1、Scala 创建 SparkSession 对象
    • 2、DataFrame 和 Dataset 的创建方式
    • 3、DataFrame API

一、Spark SQL 架构

Spark SQL 的架构主要由以下几个组件组成:

  1. SparkSession:Spark 应用的统一入口点,用于创建 DataFrame、DataSet 和执行 SQL 查询。
  2. Catalyst 优化器:Spark SQL 的查询优化引擎,负责解析、分析、优化和生成物理执行计划。
  3. DataFrame 和 DataSet API:提供面向对象的编程接口,支持丰富的数据操作方法。
  4. 数据源接口:支持多种数据源,如 HDFS、S3、HBase、Cassandra、Hive 等。
  5. 执行引擎:将优化后的查询计划转换为执行任务,并在分布式集群上并行执行这些任务。

二、Spark SQL 特点

  • 统一数据访问接口:支持多种数据源(如 CSV、JSON、Parquet、Hive、JDBC、HBase 等)并提供一致的查询接口。
  • DataFrame 和 Dataset API:提供面向对象的编程接口,支持类型安全的操作,便于数据处理。
  • Catalyst 优化器:自动将用户的查询转换为高效的执行计划,提升查询性能。
  • 与 Hive 的集成:无缝集成 Hive,能够直接访问现存的 Hive 数据,并使用 Hive 的 UDF 和 UDAF。
  • 高性能:通过 Catalyst 优化器和 Tungsten 执行引擎,实现高效的查询性能和内存管理。
  • 多种操作方式:支持 SQL 和 API 编程两种操作方式,灵活性高。
  • 外部工具接口:提供 JDBC/ODBC 接口供第三方工具借助 Spark 进行数据处理。
  • 高级接口:提供了更高层级的接口,方便地处理数据。

三、Spark SQL 运行原理

在这里插入图片描述

查询解析(Query Parsing):将 SQL 查询解析成抽象语法树(AST)。

逻辑计划生成(Logical Plan Generation):将 AST 转换为未优化的逻辑计划。

逻辑计划优化(Logical Plan Optimization):使用 Catalyst 优化器对逻辑计划进行一系列规则优化。

物理计划生成(Physical Plan Generation):将优化后的逻辑计划转换为一个或多个物理计划,并选择最优的物理计划。

执行(Execution):将物理计划转换为 RDD,并在集群上并行执行。

四、Spark SQL API 相关概述

SparkContext:SparkContext 是 Spark 应用程序的主入口点,负责连接到 Spark 集群,管理资源和任务调度。在 Spark 2.0 之后,推荐使用 SparkSession 取代 SparkContext。

SQLContext:SQLContext 是 Spark SQL 的编程入口点,允许用户通过 SQL 查询或 DataFrame API 进行数据处理。它提供了基本的 Spark SQL 功能。

HiveContext:HiveContext 是 SQLContext 的子集,增加了对 Hive 的集成支持,可以直接访问 Hive 中的数据和元数据,使用 Hive 的 UDF 和 UDAF。

SparkSession:SparkSession 是 Spark 2.0 引入的新概念,合并了 SQLContext 和 HiveContext 的功能,提供了统一的编程接口。SparkSession 是 Spark SQL 的建议入口点,支持使用 DataFrame 和 Dataset API 进行数据处理。

创建 SparkContext 和 SparkSession 的注意事项:如果同时需要创建 SparkContext 和 SparkSession,必须先创建 SparkContext,再创建 SparkSession。如果先创建 SparkSession,再创建 SparkContext,会导致异常,因为在同一个 JVM 中只能运行一个 SparkContext。

五、Spark SQL 依赖

<properties>
    <spark.version>3.1.2</spark.version>
    <spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${spark.scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>

六、Spark SQL 数据集

在 Spark SQL 中,数据集主要分为以下几种类型:DataFrame 和 Dataset。它们是处理和操作结构化和半结构化数据的核心抽象。

1、DataFrame

Dataset 是在 Spark 2.0 中引入的新的抽象数据结构,它是强类型的,可以存储 JVM 对象。Dataset API 结合了 DataFrame 的操作简便性和类型安全性,适用于需要更高级别数据类型控制和面向对象编程风格的场景。具体特点如下:

  • 类似于二维表格:DataFrame 类似于传统的关系数据库中的二维表格。
  • Schema(数据结构信息):在 RDD 的基础上加入了 Schema,描述数据结构的信息。
  • 支持嵌套数据类型:DataFrame 的 Schema 支持嵌套的数据类型,如 structmaparray
  • 丰富的 SQL 操作 API:提供更多类似 SQL 操作的 API,便于进行数据查询和操作。

2、Dataset

Dataset 是在 Spark 2.0 中引入的新的抽象数据结构,它是强类型的,可以存储 JVM 对象。Dataset API 结合了 DataFrame 的操作简便性和类型安全性,适用于需要更高级别数据类型控制和面向对象编程风格的场景。具体特点如下:

  • 强类型:Spark 1.6中引入的一个更通用的数据集合,Dataset 是强类型的,提供类型安全的操作。
  • RDD + Schema:可以认为 Dataset 是 RDD 和 Schema 的结合,既有 RDD 的分布式计算能力,又有 Schema 描述数据结构的信息。
  • 适用于特定领域对象:可以存储和操作特定领域对象的强类型集合。
  • 并行操作:可以使用函数或者相关操作并行地进行转换和操作。

3、DataFrame 和 Dataset 的关系

  • DataFrame 是特殊的 Dataset:DataFrame 是 Dataset 的一个特例,即 DataFrame = Dataset[Row]
  • 数据抽象和操作方式的统一:DataFrame 和 Dataset 统一了 Spark SQL 的数据抽象和操作方式,提供了灵活且强大的数据处理能力。

七、Spark Sql 基本用法

1、Scala 创建 SparkSession 对象

import org.apache.spark.sql.SparkSession
object SparkSqlContext {

  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象,设置应用程序的配置
    val conf: SparkConf = new SparkConf()
      .setMaster("local[4]")   // 设置本地运行模式,使用 4 个线程
      .setAppName("spark sql") // 设置应用程序名称为 "spark sql"

    // 创建 SparkSession 对象,用于 Spark SQL 的编程入口
    val spark: SparkSession = SparkSession.builder()
      .config(conf) // 将 SparkConf 配置应用于 SparkSession
      .getOrCreate() // 获取现有的 SparkSession,或者新建一个
	
    // 获取 SparkContext 对象,可以直接从 SparkSession 中获取
    val sc: SparkContext = spark.sparkContext

    // 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法
    import spark.implicits._

    // 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...

    // 停止 SparkSession,释放资源
    spark.stop()
  }
}

2、DataFrame 和 Dataset 的创建方式

1、从集合创建

case class Person(name: String, age: Int)				// 下同

val data1 = Seq(Person("Alice", 25), Person("Bob", 30))	
val ds: Dataset[Person] = spark.createDataset(data)		// 这里的spark是SparkSession对象(如上代码),下同

val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")

1、从文件系统读取

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))

val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]

val dfCsv: DataFrame = spark.read
	// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
	// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。  
  .schema(schema)		  
   // 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。 
  .option("header", "true")
  .csv("/path/to/csv/file")

3、从关系型数据库读取

val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")

val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]

val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)

4、从非结构化数据源读取

val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]

val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")

5、手动创建 Dataset

import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))

val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]

val dfManual: DataFrame = spark.createDataFrame(
  spark.sparkContext.parallelize(data), schema
)

3、DataFrame API

语法示例一

模拟数据(1000条):

id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...

需求:哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。

// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
import spark.implicits._

// 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
val schema = StructType(Seq(
  StructField("id", LongType),
  StructField("name", StringType),
  StructField("gender", StringType),
  StructField("age", IntegerType),
  StructField("city", StringType),
))

// 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val WindowSpec: WindowSpec = Window
  .partitionBy($"gender")
  .orderBy($"avg_age".desc)

// 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
   // .schema(schema)	// 应用我们定义的schema 
  .option("header", "true") 							// 使用CSV的header作为列名
  .csv("D:\\projects\\sparkSql\\people.csv")			// DataFrame
  .select($"id", $"name", $"age", $"city", $"gender") 	// 选择需要的列(不写默认就是全选)
  .groupBy($"city", $"gender") 							// 按城市和性别分组
  .agg(			// 多重聚合
    count($"id").as("count"),   		// 计算每个组的ID数量
    round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
  )
  .where($"count".gt(50))  		// 过滤出ID数量大于(可以使用>)50的组
  .orderBy($"avg_age".desc)     // 按平均年龄降序排序

  .select($"city", $"gender", $"avg_age",
    dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank"))
  .show() // 显示结果

结果:

+------+------+-------+-------------------+
|  city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市|    男|  41.05|                  1|
|  东莞|    男|  42.81|                  2|
|上海市|    男|  43.92|                  3|
|成都市|    男|  45.89|                  4|
|  中山|    男|  47.08|                  5|
|广州市|    男|  47.47|                  6|
|  深圳|    男|  48.36|                  7|
|上海市|    女|  46.02|                  1|
|  中山|    女|  49.55|                  2|
+------+------+-------+-------------------+

语法示例二:视图,sql

// 读取CSV文件到DataFrame,使用header作为列名
val dfPeople: DataFrame = spark.read
    .option("header", "true") // 使用CSV的header作为列名
    .csv("D:\\projects\\sparkSql\\people.csv")

// 将DataFrame注册为临时视图
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql(
      """
        |select * from people_view
        |where gender = '男'
        |""".stripMargin
    ).show()

语法示例三:join

case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)

val frmStu = spark.createDataFrame(
  Seq(
    Student("张三", 1),
    Student("李四", 1),
    Student("王五", 2),
    Student("赵六", 2),
    Student("李明", 2),
    Student("王刚", 4),
    Student("王朋", 5),
  )
)

val frmClass = spark.createDataFrame(
  Seq(
    Class(1, "name1"),
    Class(2, "name2"),
    Class(3, "name3"),
    Class(4, "name4")
  )
)

left 左连接,rignt 右连接, full 全外连接,anti左差集,semi左交集

// 别名 + inner 内连接
frmStu.as("S")
	.join(frmClass.as("C"), $"S.classId" === $"C.classId")	// joinType 默认 inner内连接
	.show()

// 使用左外连接将df和frmClass根据classId合并
frmStu
  .join(frmClass, Seq("classId"), "left")	
  .show()

// 左差集
frmStu
  .join(frmClass, Seq("classId"), "anti")	
  .show()

// 左交集
frmStu
  .join(frmClass, Seq("classId"), "semi")	
  .show()

结果

别名 + inner 内连接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|张三|      1|      1|    name1|
|李四|      1|      1|    name1|
|王五|      2|      2|    name2|
|赵六|      2|      2|    name2|
|李明|      2|      2|    name2|
|王刚|      4|      4|    name4|
+----+-------+-------+---------+

使用左外连接将df和frmClass根据classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
|      1|张三|    name1|
|      1|李四|    name1|
|      2|王五|    name2|
|      2|赵六|    name2|
|      2|李明|    name2|
|      4|王刚|    name4|
|      5|王朋|     null|
+-------+----+---------+

左差集
+-------+----+
|classId|name|
+-------+----+
|      5|王朋|
+-------+----+

左交集
+-------+----+
|classId|name|
+-------+----+
|      1|张三|
|      1|李四|
|      2|王五|
|      2|赵六|
|      2|李明|
|      4|王刚|
+-------+----+

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/792231.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++基础语法:链表和数据结构

前言 "打牢基础,万事不愁" .C的基础语法的学习 引入 链表是最基础的数据集合,对标数组.数组是固定长度,随机访问,链表是非固定长度,不能随机访问.数组查找快,插入慢;链表是插入快,查找慢. 前面推导过"数据结构算法数据集合".想建立一个数据集合,就要设计数…

Python-找客户软件

软件功能 请求代码&#xff1a; 填充表格&#xff1a; 可以search全国各个区县的所有企业信息&#xff0c;过滤手机号、查看是否续存/在业状态。方便找客户。 支持定-制-其他引-留-阮*件&#xff08;XHSS&#xff0c;DYY&#xff0c;KS&#xff0c;Bi-li*Bi-li&#xff09; V*…

嵌入式转行2个星期,一些真心话建议~

在开始前刚好我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「嵌入式的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#xff01; 嵌入式转行2个星期&…

【计算机方向】中科院三区,国人发文占比>50%,录用容易,认可度不低~

今天小编带来计算机领域SCI快刊的解读&#xff01; 如有相关领域作者有意投稿&#xff0c;可作为重点关注&#xff01; 期刊解析 01 期刊信息 出版商&#xff1a;Springer Singapore ISSN&#xff1a;1672-6529 E-ISSN&#xff1a;2543-2141 期刊官方网站: https://www.sprin…

市面上值得入手的骨传导耳机怎么选?一次给你搞定全方位的选购攻略

随着骨传导耳机市场的日益发展&#xff0c;有很多人使用了一些不合适的骨传导耳机导致听力损伤等问题&#xff0c;这些问题也引起很多人日益关注的。原因大致就是&#xff0c;市面上出现了大量由非专业品牌贴牌和有网红生产的骨传导耳机产品&#xff0c;他们的核心技术的研发和…

CSDN回顾与前行:我的创作纪念日——2048天的技术成长与感悟

CSDN回顾与前行&#xff1a;我的创作纪念日——2048天的技术成长与感悟 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 前言 时光荏苒&#xff0c;岁月如梭。转眼间&#xff0c;从我在CSDN上写下第一篇技术博客《2-6 带头结点的链式表操作…

寻找并可视化交互

「AI秘籍」系列课程&#xff1a; 人工智能应用数学基础 人工智能Python基础 人工智能基础核心知识 人工智能BI核心知识 人工智能CV核心知识 使用特征重要性、弗里德曼 H 统计量和 ICE 图分析相互作用 本文中的代码需要安装 R 语言包 药物的副作用可能取决于你的性别。吸入…

解决浏览器 CORS跨域问题

跨域问题其实就是不同源请求导致 解决跨域问题时&#xff0c;Chrome 插件 requestly 解决 1、Chrome 应用商店 &#xff1a;chrome://extensions/ 搜索 requestly 插件。并添加到扩展程序 2、打开扩展程序&#xff0c;为当前接口设置请求头 在response Header 中设置 Acce…

samba共享windows和ubuntu的文件

通过Samba服务器实现Windows与Ubuntu之间的文件共享是一个常见的需求&#xff0c;下面是实现这一目标的详细步骤&#xff1a; 一、Ubuntu开启Samba服务器 安装Samba&#xff1a; 打开终端&#xff0c;使用以下命令安装Samba服务&#xff1a; sudo apt update sudo apt install…

辐射神经场算法——Instant-NGP / Mipi-NeRF 360 / 3D Gaussian Splatting

辐射神经场算法——Instant-NGP / Mipi-NeRF 360 / 3D Gaussian Splatting 1. Instant-NGP1. MultiResolution Hash Encoding1.2 Accelerated Ray Marching1.3 实验结果 2. Mip-NeRF 3602.1 场景参数化2.2 在线蒸馏2.3 失真正则化2.4 实验结果 3. 3D Gaussian Splatting3.1 Dif…

Monaco 使用 DefinitionProvider

DefinitionProvider 可以弹出方法定义&#xff0c;效果如下&#xff0c;按住 command 鼠标左键&#xff0c;弹出方法说明。 点击时 Monaco Editor 会调用注册函数&#xff0c;注册函数返回文件地址和需要显示的位置&#xff0c;实现代码如下 return monaco.languages.register…

自主研发接口测试框架

测试任务&#xff1a;将以前完成的所有的脚本统一改写为unitest框架方式 1、需求原型 1.1 框架目录结构 V1.0&#xff1a;一般的设计思路分为配置层、脚本层、数据层、结果层&#xff0c;如下图所示 V 2.0&#xff1a;加入驱动层testdriver 1.2 框架各层需要完成的工作 1、配…

Swiper轮播图实现

如上图&#xff0c;列表左右滚动轮播&#xff0c;用户鼠标移动到轮播区域&#xff0c;动画停止&#xff0c;鼠标移开轮播继续。 此例子实现技术框架是用的ReactCSS。 主要用的是css的transform和transition来实现左右切换动画效果。 React代码&#xff1a; import React, { us…

WEB-INF 泄露-RoarCTF-2019-EasyJava(BUUCTF)

题目页面 点开help 这里存在文件下载漏洞&#xff0c;参数选择POST传参&#xff08;使用HackBar插件&#xff09; 查看文件内容 下载存有web信息的XML文件&#xff0c;这里补充一点知识点 WEB-INF主要包含一下文件或目录&#xff1a; /WEB-INF/web.xml&#xff1a;Web应用程序…

关于思维和智能体模型的思考(1)

思维的本质&#xff1a;它的能力似乎源自于那些智能体之间复杂的交错关联 --马文 明斯基 最近阅读美国马文 明斯基写的书《心智社会》&#xff0c;觉得忽然开朗。他对人类思维&#xff0c;智能&#xff0c;智能体等概念做了十分优雅的解读。 个人觉得&#xff0c;他利…

【银河麒麟高级服务器操作系统】数据中心系统异常卡死分析处理建议

了解银河麒麟操作系统更多全新产品&#xff0c;请点击访问&#xff1a;https://product.kylinos.cn 1.服务器环境以及配置 【机型】浪潮NF5280M5 处理器&#xff1a; Intel 内存&#xff1a; 1T 【内核版本】 4.19.90-24.4.v2101.ky10.x86_64 【OS镜像版本】 银河麒麟…

element UI时间组件两种使用方式

加油&#xff0c;新时代打工&#xff01; 组件官网&#xff1a;https://element.eleme.cn/#/zh-CN/component/date-picker 先上效果图&#xff0c;如下&#xff1a; 第一种实现方式 <div class"app-container"><el-formref"submitForm":model&q…

滑动窗口,最长子序列最好的选择 -> O(N)

最近在学校上短学期课程&#xff0c;做程序设计题&#xff0c;一下子回忆起了大一学数据结构与算法的日子&#xff01; 这十天我会记录一些做题的心得&#xff0c;今天带来的是对于最长子序列长度题型的解题框架&#xff1a;滑动窗口 本质就是双指针算法&#xff1a; 通过le…

VSCode神仙插件——通义灵码 (AI编程助手)

1、安装&登录插件 安装时,右下角会有弹窗,让你登录该软件 同意登录后,会跳转浏览器页面 VSCode右下角出现如下图标即登录成功 2、使用 (1)点击左侧栏中的如下图标,打开通义灵码,可以进行智能问答 (2) 选中代码,右键 但是,上述所有的操作会在左侧问答栏中提供答案,并无法直…

2024最适合小白的Midjourney教程,值得收藏!

一、Midjourney 的提示词 1、提示可以包括一个或多个图像 URL、多个文本短语以及一个或多个参数 1&#xff09;Image Prompts&#xff08;图像提示&#xff09;&#xff1a;可以将图像 URL 添加到提示中以影响最终结果的样式和内容。图像 URL 始终出现在提示的前面。文件应以.…