大数据学习18之Spark-SQL

1.概述

1.1.简介

        Spark SQL 是 Apache Spark 用于处理结构化数据的模块。

1.2.历史

1.2.1.Shark

        Hadoop诞生初期,Hive是唯一在Hadoop上运行的SQL-on-Hadoop工具,MR的中间计算过程产生了大量的磁盘落地操作,消耗了大量的I/O,降低了程序的运行效率。

        为了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具被开发,Spark-SQL便是其中的一种,Spark-SQL的前身就是Shark。

1.2.2.Hive on Spark*

        Spark的开发较晚,那时候的主流数据仓库便是Hive,为了通用性,Spark就与Hive结合起来了,对于SQL语句的解析都交给Hive来进行处理,并且Spark程序一定程度上替代了Hive底层的MapReduce程序,提高了作业的计算效率。

1.2.3.Spark on Hive*

        随着Spark发展,Shark 对于 Hive 的太多依赖(如采用 Hive 的语法解析器、查询优化器等等)制约了 Spark的 One stack to rule them all 的既定方针,制约了 Spark 各个组件的相互集成,所以就提出了 SparkSQL 项目。

        并且Hive本身的迭代更新速度较慢,就算是现在的最新版本的Hive支持的Spark也才2.x.x,

同时Spark在3.0.0版本时做出了一系列的优化,如果还是依赖于与Hive的化Spark3.0以上的版本的是用不了的,那么Spark的优化就没有意义了。

        为了让Spark的优化变得可用,Spark就自己开发了一套用于SQL操作的模块,由之前的Shark来到了现在的Spark-SQL。

        经过这次的转变,Spark由原来的依赖Hive解析SQL变成了由自己的Spark-SQL模块解析的方式,但是保留了对Hive的元数据访问。

        也就是说,现在的Spark除了元数据外,几乎可以说是一个一栈式大数据框架了。

1.2.4.Hive on Spark vs. Spark on Hive

        Hive on Spark:Hive为主体,在Hive中继承Spark,Hive即存储元数据,也解析SQL语句,只是Hive将引擎从MR更换为Spark由 ,Spark 负责运算工作,但部署较为复杂。

        Spark on Hive:Spark为主体,Hive只负责元数据的存储,由Spark来解析和执行SQL语句,其中SQL语法为Spark-SQL语法,且部署简单。Spark on Hive 的优点在于它提供了更灵活的编程接口,适用于各种数据处理需求。

2.数据模型

2.1. RDD 和 DataFrame

2.1.1.RDD转DataFrame

//创建样例类
scala> case class User(id: Int, name: String, age: Int, gender: Int)
defined class User

//创建 RDD
scala> val rdd = sc.makeRDD(List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu",20, 1)))

//RDD 转 DataFrame
scala> val df = rdd.toDF

2.1.2.DataFrame 转 RDD

//创建 DataFrame
scala> val df = spark.read.json("file:///opt/spark-local/data/user/user.json")

//DataFrame 转 RDD
scala> val rdd = df.rdd


 2.2.RDD 和 Dataset

2.2.1. RDD 转 Dataset

        RDD 和 Dataset 两个都是强类型模型,所以可以相互直接转换。

//创建样例类
scala> case class User(id: Int, name: String, age: Int, gender: Int)

//创建 RDD
scala> val rdd = sc.makeRDD(List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu",20, 1)))

//RDD 转 Dataset。
scala> val ds = rdd.toDS

2.2.2.Dataset 转 RDD

scala> val rdd = ds.rdd

2.3. DataFrame 和 Dataset

2.3.1.DataFrame 转 Dataset

        配合样例类使用 as[类型] 转换为 DataSet。

scala> val df = spark.read.json("file:///opt/yjx/spark-scalocal/data/user/user.json")

scala> val ds = df.as[User]

2.3.2.Dataset 转 DataFrame

//创建 Dataset
scala> case class User(id: Int, name: String, age: Int, gender: Int)defined class User

scala> val list = List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu", 20, 1))
list: List[User] = List(User(1,zhangsan,18,1), User(2,lisi,19,0), User(3,wangwu,20,1))

scala> val ds = list.toDS

//Dataset 转 DataFrame
scala> val df = ds.toDF

3. IDEA 开发 SparkSQL

        创建普通 Maven 项目,添加以下依赖。

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.2</version>
</dependency>

3.1.DataFrame

object DataFrameDemo {
case class User(id: Int, name: String, age: Int, gender: Int)
def main(args: Array[String]): Unit = {
// ==================== 建立连接 ====================
// 初始化配置对象并设置运行模式与 AppName
val conf = new SparkConf().setMaster("local[*]").setAppName("DataFrameDemo")
// 根据配置对象初始化 SparkSession 对象
val spark = SparkSession.builder().config(conf).getOrCreate()
// 日志级别
val sc = spark.sparkContext
sc.setLogLevel("ERROR")
// 在 IDEA 中开发 SparkSQL 如果遇到模型转换,需要导入隐式转换
import spark.implicits._
// ==================== 业务处理 ====================
// RDD 转 DataFrame
val rdd = sc.makeRDD(List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu",
20, 1)))
val df1: DataFrame = rdd.toDF()
df1.show()
// 直接创建 DataFrame
val df2 = spark.read.json("data/user/user.json")
df2.show()
// 创建临时表
df2.createOrReplaceTempView("t_user")
// 编写 SQL
lazy val sql = "SELECT id, name, age, gender FROM t_user ORDER BY age DESC"
// 执行 SQL
spark.sql(sql).show()
// ==================== 关闭连接 ====================
spark.stop
}

3.2.Dataset

def main(args: Array[String]): Unit = {
// ==================== 建立连接 ====================
// 初始化配置对象并设置运行模式与 AppName
val conf = new SparkConf().setMaster("local[*]").setAppName("DatasetDemo")
// 根据配置对象初始化 SparkSession 对象
val spark = SparkSession.builder().config(conf).getOrCreate()
// 日志级别
val sc = spark.sparkContext
sc.setLogLevel("ERROR")
// 在 IDEA 中开发 SparkSQL 如果遇到模型转换,需要导入隐式转换
import spark.implicits._
// ==================== 业务处理 ====================
// RDD 转 Dataset
val rdd = sc.makeRDD(List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu",
20, 1)))
val ds1: Dataset[User] = rdd.toDS()
ds1.show()
// 创建 DataFrame
val df: DataFrame = spark.read.json("data/user/user.json")
// 通过 DataFrame 使用 as[类型] 转换为 DataSet
val ds2: Dataset[User] = df.as[User]
ds2.show()
// 创建临时表
ds2.createOrReplaceTempView("t_user")
// 编写 SQL
lazy val sql = "SELECT id, name, age, gender FROM t_user ORDER BY age DESC"
// 执行 SQL
spark.sql(sql).show
// ==================== 关闭连接 ====================
spark.stop
}

4.DSL 领域特定语言

        DSL 为 Domain Specific Language 的缩写,翻译过来为领域特定语言。简单理解就是 Spark 独有的结构化数据操作语法。

        此处不做赘述。

5.自定义函数

5.1.UDF用户定义普通函数

案例: 

object UDFDemo {
case class Emp(empno: Int, ename: String, job: String, mgr: Int, hiredate: String, sal: Double,
comm: Double, deptno: Int)
def main(args: Array[String]): Unit = {
// ==================== 建立连接 ====================
// 初始化配置对象并设置运行模式与 AppName
val conf = new SparkConf().setMaster("local[*]").setAppName("UDFDemo")
// 根据配置对象初始化 SparkSession 对象
val spark = SparkSession.builder().config(conf).getOrCreate()
// 日志级别
val sc = spark.sparkContext
sc.setLogLevel("ERROR")
// 在 IDEA 中开发 SparkSQL 如果遇到模型转换,需要导入隐式转换
import spark.implicits._
// ==================== 业务处理 ====================
// 数据准备
val df: DataFrame = spark.read
.option("header", "true")
.option("sep", ",")
.option("inferSchema", "true")
.csv("data/scott/emp.csv")
val emp: Dataset[Emp] = df.as[Emp]
emp.createOrReplaceTempView("emp")
// 注册 UDF 函数
val prefix_name = spark.udf.register("prefix_name", (name: String) => {
"Hello: " + name
})
// 在 SQL 中使用
val sql =
"""
|SELECT ename, prefix_name(ename) AS new_name FROM emp
|""".stripMargin
spark.sql(sql).show(5)
// 在 DSL 中使用
emp.select('job, prefix_name('job).as("new_job")).show(5)
// ==================== 关闭连接 ====================
spark.stop
}
}

5.2.UDAF用户定义聚合函数

案例:

object UDAFDemo03_Spark3 {
case class Emp(empno: Int, ename: String, job: String, mgr: Int, hiredate: String, sal: Double,
comm: Double, deptno: Int)
def main(args: Array[String]): Unit = {
// ==================== 建立连接 ====================
// 初始化配置对象并设置运行模式与 AppName
val conf = new SparkConf().setMaster("local[*]").setAppName("UDAFDemo02")
// 根据配置对象初始化 SparkSession 对象
val spark = SparkSession.builder().config(conf).getOrCreate()
// 日志级别
val sc = spark.sparkContext
sc.setLogLevel("ERROR")
// 在 IDEA 中开发 SparkSQL 如果遇到模型转换,需要导入隐式转换
import spark.implicits._
// ==================== 业务处理 ====================
// 数据准备
val df: DataFrame = spark.read
.option("header", "true")
.option("sep", ",")
.option("inferSchema", "true")
.csv("data/scott/emp.csv")
val emp: Dataset[Emp] = df.as[Emp]
emp.createOrReplaceTempView("emp")
// 注册 UDAF 函数(强类型自定义 UDAF 在 Spark 3.0.0 中的使用方式)
val my_avg = spark.udf.register("my_avg", functions.udaf(MyAvg))
// 在 SQL 中使用
val sql =
"""
|SELECT my_avg(sal) AS avg_sal FROM emp
|""".stripMargin
spark.sql(sql).show()
// 在 DSL 中使用
emp.select(my_avg('sal).as("avg_sal")).show()
// ==================== 关闭连接 ====================
spark.stop
}
// 缓存区数据的结构 Buff(求和, 计数)
case class Buff(var sum: Double, var count: Long)
/**
* 自定义 UDAF 聚合函数:计算薪资的平均值
* IN:输入数据的类型
* BUFF:缓存区数据的类型
* OUT:返回值数据的类型
*/
object MyAvg extends Aggregator[Double, Buff, Double] {
// 初始化缓冲区 Buff(求和, 计数)
override def zero: Buff = Buff(0D, 0L)
// 根据输入的数据更新缓冲区的数据
override def reduce(b: Buff, in: Double): Buff = {
// 累加每次输入的数据
b.sum += in
// 计数器每次 +1
b.count += 1
// 返回缓冲区对象
b
}
// 合并缓冲区
override def merge(b1: Buff, b2: Buff): Buff = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// 计算最终结果
override def finish(b: Buff): Double = b.sum / b.count
// 缓冲区数据的编码处理
// Encoders.product 是进行 Scala 元组和 case 类转换的编码器
//override def bufferEncoder: Encoder[Buff] = Encoders.product
// 或者
override def bufferEncoder: Encoder[Buff] = Encoders.kryo(classOf[Buff])
// 输出数据的编码处理
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

5.3.UDTF用户定义表创建函数

        先添加依赖:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20220924</version>
</dependency>

     案例:

/**
* 数据:{"movie": [{"movie_name": "肖申克的救赎", "movie_type": "犯罪" }, {"movie_name": "肖申克的救赎",
"movie_type": "剧情" }]}
* 需求:从一行 JSON 格式数据中取出 movie_name 和 movie_type 两个 Key 及其对应的 Value。K-V 输出的格式为:
* movie_name movie_type
* 肖申克的救赎 犯罪
* 肖申克的救赎 剧情
*/
class MyUDTF extends GenericUDTF {
// 实例化 UDTF 对象,判断传入参数的长度以及数据类型
// 和 Hive 的自定义 UDTF 不一样的是,Spark 使用的是已经过时的 initialize(ObjectInspector[] argOIs)
override def initialize(argOIs: Array[ObjectInspector]): StructObjectInspector = {
// 获取入参
// 参数校验,判断传入参数的长度以及数据类型
if (argOIs.length != 1) throw new UDFArgumentLengthException("参数个数必须为 1")
if (ObjectInspector.Category.PRIMITIVE != argOIs(0).getCategory) {
/*
UDFArgumentTypeException(int argumentId, String message)
异常对象需要传入两个参数:
int argumentId:参数的位置,ObjectInspector 中的下标
String message:异常提示信息
*/
throw new UDFArgumentTypeException(0, "参数类型必须为 String")
}
// 自定义函数输出的字段和类型
// 创建输出字段名称的集合
val columNames = new util.ArrayList[String]
// 创建字段数据类型的集合
val columType = new util.ArrayList[ObjectInspector]
columNames.add("movie_name")
columType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)
columNames.add("movie_type")
columType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)
ObjectInspectorFactory.getStandardStructObjectInspector(columNames, columType)
}
// 处理数据
override def process(objects: Array[AnyRef]): Unit = {
val outline = new Array[String](2)
if (objects(0) != null) {
val jsonObject = new JSONObject(objects(0).toString)
val jsonArray: JSONArray = jsonObject.getJSONArray("movie")
var i = 0
while ( {
i < jsonArray.length
}) {
outline(0) = jsonArray.getJSONObject(i).getString("movie_name")
outline(1) = jsonArray.getJSONObject(i).getString("movie_type")
// 将处理好的数据通过 forward 方法将数据按行写出
forward(outline)
i += 1
}
}
}
override def close(): Unit = {}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/921981.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

医学AI公开课·第一期|Machine LearningTransformers in Med AI

小罗碎碎念 从这周开始&#xff0c;我计划每个周末录一个视频&#xff0c;分享一些医学人工智能领域的进展。 作为第一期视频&#xff0c;我打算介绍一下机器学习和Transformer在医学AI领域中的应用。 为了准备这期视频&#xff0c;总共做了24页PPT&#xff08;三部分内容&…

小白投资理财 - 解读威廉指标 WR

小白投资理财 - 解读威廉指标 WR WR 指标WR 指标特点WR 指标解读WR 与其他指标的结合实战案例&#xff1a;WR 计算WR 的优缺点WR 和 Williams Fractals 的主要区别总结 上篇《小白投资理财 - 解读威廉分形指标 Williams Fractals》&#xff0c;今天我们来了解另外一个威廉指标 …

前端速通(HTML)

1. HTML HTML基础&#xff1a; 什么是HTML&#xff1f; 超文本&#xff1a; "超文本"是指通过链接连接不同网页或资源的能力。HTML支持通过<a>标签创建超链接&#xff0c;方便用户从一个页面跳转到另一个页面。 标记语言&#xff1a; HTML使用一组预定义的标签…

电商一件发货软件闲管家使用教程

闲鱼闲管家是一款专为闲鱼卖家设计的电脑版工作台&#xff0c;旨在帮助卖家更高效地管理其在闲鱼平台上的业务。以下是关于闲鱼闲管家的一些主要特点和功能&#xff1a; 主要特点&#xff1a; 多账号管理&#xff1a;支持同时管理多达30个闲鱼账号&#xff0c;方便大型卖家或…

第一个autogen与docker项目

前提条件&#xff1a;在windows上安装docker 代码如下&#xff1a; import os import autogen from autogen import AssistantAgent, UserProxyAgentllm_config {"config_list": [{"model": "GLM-4-Plus","api_key": "your api…

JavaEE 【知识改变命运】02 多线程(1)

文章目录 线程是什么&#xff1f;1.1概念1.1.1 线程是什么&#xff1f;1.1.2 为什么要有线程1.1.3 进程和线程的区别1.1.4 思考&#xff1a;执行一个任务&#xff0c;是不是创建的线程或者越多是不是越好&#xff1f;&#xff08;比如吃包子比赛&#xff09;1.1.5 ) Java 的线程…

LeetCode 力扣 热题 100道(八)相交链表(C++)

给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点&#xff0c;返回 null 。 图示两个链表在节点 c1 开始相交&#xff1a; 题目数据 保证 整个链式结构中不存在环。 注意&#xff0c;函数返回结果后&…

全面解析 JMeter 后置处理器:概念、工作原理与应用场景

在性能测试中&#xff0c;Apache JMeter是一个非常流行的工具&#xff0c;它不仅能够模拟大量用户进行并发访问&#xff0c;还提供了丰富的扩展机制来满足各种复杂的测试需求。后置处理器&#xff08;Post-Processor&#xff09;是JMeter中非常重要的组件之一&#xff0c;用于在…

java八股-SpringCloud微服务-Eureka理论

文章目录 SpringCloud架构Eureka流程Nacos和Eureka的区别是&#xff1f;CAP定理Ribbon负载均衡策略自定义负载均衡策略如何实现&#xff1f;本章小结 SpringCloud架构 Eureka流程 服务提供者向Eureka注册服务信息服务消费者向注册中心拉取服务信息服务消费者使用负载均衡算法挑…

每天五分钟机器学习:支持向量机数学基础之超平面分离定理

本文重点 超平面分离定理(Separating Hyperplane Theorem)是数学和机器学习领域中的一个重要概念,特别是在凸集理论和最优化理论中有着广泛的应用。该定理表明,在特定的条件下,两个不相交的凸集总可以用一个超平面进行分离。 定义与表述 超平面分离定理(Separating Hy…

day05(单片机高级)PCB基础

目录 PCB基础 什么是PCB&#xff1f;PCB的作用&#xff1f; PCB的制作过程 PCB板的层数 PCB设计软件 安装立创EDA PCB基础 什么是PCB&#xff1f;PCB的作用&#xff1f; PCB&#xff08;Printed Circuit Board&#xff09;&#xff0c;中文名称为印制电路板&#xff0c;又称印刷…

电脑自动关机时间如何定?Wise Auto Shutdown 设置关机教程

在日常使用电脑的过程中&#xff0c;有时我们需要让电脑在特定的时间自动关机&#xff0c;比如在下载大文件完成后、执行长时间的任务结束时&#xff0c;或者只是单纯想在某个预定时间让电脑自动关闭以节省能源。这时候&#xff0c;Wise Auto Shutdown 这款软件就能派上大用场了…

Lucene(2):Springboot整合全文检索引擎TermInSetQuery应用实例附源码

前言 本章代码已分享至Gitee: https://gitee.com/lengcz/springbootlucene01 接上文。Lucene(1):Springboot整合全文检索引擎Lucene常规入门附源码 如何在指定范围内查询。从lucene 7 开始&#xff0c;filter 被弃用&#xff0c;导致无法进行调节过滤。 TermInSetQuery 指定…

使用Kubernetes部署第一个应用

目录 前提条件 启动集群 部署 nginx 应用 创建 YAML 文件 应用 YAML 文件 查看部署结果 理解Pods 相关命令 公布应用程序 问题背景 Kubernetes Service&#xff08;服务&#xff09;概述 服务和标签 为Deployment 创建一个 Service 伸缩应用程序 Scaling&#x…

使用 Maven 创建 jar / war 项目

使用 Maven 创建 jar 项目 maven-archetype-quickstart 这个Archetype&#xff0c;基本内容包括&#xff1a; 一个包含junit依赖声明的 pom.xml 、src/main/java主代码目录及一个名为App的类 、src/test/java测试代码目录及一个名为 AppTest的测试用例maven-archetype-webapp 一…

HDR视频技术之四:HDR 主要标准

HDR 是 UHD 技术中最重要维度之一&#xff0c;带来新的视觉呈现体验。 HDR 技术涉及到采集、加工、传输、呈现等视频流程上的多个环节&#xff0c;需要定义出互联互通的产业标准&#xff0c;以支持规模化应用和部署。本文整理当前 HDR 应用中的一些代表性的国际标准。 1 HDR 发…

Ubuntu中使用多版本的GCC

我的系统中已经安装了GCC11.4&#xff0c;在安装cuda时出现以下错误提示&#xff1a; 意思是当前的GCC版本过高&#xff0c;要在保留GCC11.4的同时安装GCC9并可以切换&#xff0c;可以通过以下步骤实现&#xff1a; 步骤 1: 安装 GCC 9 sudo apt-get update sudo apt-get ins…

dubbo-go框架介绍

框架介绍 什么是 dubbo-go Dubbo-go 是 Apache Dubbo 的 go 语言实现&#xff0c;它完全遵循 Apache Dubbo 设计原则与目标&#xff0c;是 go 语言领域的一款优秀微服务开发框架。dubbo-go 提供&#xff1a; API 与 RPC 协议&#xff1a;帮助解决组件之间的 RPC 通信问题&am…

DataGear 5.2.0 发布,数据可视化分析平台

DataGear 企业版 1.3.0 已发布&#xff0c;欢迎体验&#xff01; http://datagear.tech/pro/ DataGear 5.2.0 发布&#xff0c;图表插件支持定义依赖库、严重 BUG 修复、功能改进、安全增强&#xff0c;具体更新内容如下&#xff1a; 重构&#xff1a;各模块管理功能访问路径…

2023年3月GESPC++一级真题解析

一、单选题&#xff08;每题2分&#xff0c;共30分&#xff09; 题目123456789101112131415答案BAACBDDAADBCDBC 1.以下不属于计算机输入设备的有&#xff08; &#xff09;。 A &#xff0e;键盘 B &#xff0e;音箱 C &#xff0e;鼠标 D &#xff0e;传感器 【答案】 …