一、SparkSQL 概述
1. 概念
Spark SQL 是 Spark 用于结构化数据 (structured data) 处理的 Spark 模块,使用 SQL 的方式简化 RDD 的开发
2. Hive VS SparkSQL
- Hive 是早期唯一运行在 Hadoop 上的 SQL-on-Hadoop 工具,但是 MapReduce 计算过程中大量的中间磁盘落地过程消耗了大量的 I/O,降低的运行效率
- Shark 是为了提高 SQL-on-Hadoop的效率而产生的 SQL-on-Hadoop 工具,是基于 Hive 所开发的,它修
改了 Hive 中的内存管理、物理计划和执行三个模块,并使之能运行在 Spark 引擎上 - 由于 Shark 对于 Hive 存在太多依赖(如采用 Hive 的语法解析器、查询优化器等等),制约了其发展,SparkSQL 由此应运而生,它抛弃了原有 Shark 的代码,但汲取了 Shark 的一些优点,如内存列存储(In-Memory Columnar Storage)、 Hive 兼容性等
- 数据兼容方面 SparkSQL 不但兼容 Hive,还可以从 RDD、parquet 文件、JSON 文件中获取数据,未来版本甚至支持获取 RDBMS 数据以及 cassandra 等 NOSQL 数据
- 性能优化方面 除了采取 In-Memory Columnar Storage、byte-code generation 等优化技术外、将会引进 Cost Model 对查询进行动态评估、获取最佳物理计划等等
- 组件扩展方面 无论是 SQL 的语法解析器、分析器还是优化器都可以重新定义,进行扩展
- Shark 停止开发后,发展出了两个支线,其中 SparkSQL 作为 Spark 生态的一员继续发展,而不再受限于 Hive,只是兼容 Hive;而 Hive on Spark 是一个 Hive 的发展计划,该计划将 Spark 作为 Hive 的底层引擎之一,由此 Hive 将不再受限于一个引擎,可以采用 Map-Reduce、Tez、Spark 等引擎
3. SparkSQL 特点
- 易整合:无缝的整合了 SQL 查询和 Spark 编程
- 统一的数据访问:使用相同的方式连接不同的数据源
- 兼容 Hive:在已有的仓库上直接运行 SQL 或者 HiveQL
- 标准数据连接:通过 JDBC 或者 ODBC 来连接
4. 两大数据模型
- DataFrame:一种以 RDD 为基础的分布式数据集,类似于传统数据库中的二维表格
- DataSet:是 Spark 1.6 中添加的一个新抽象,是 DataFrame 的一个扩展
二、SparkSQL 核心编程
1. 上下文对象
- SparkCore 中执行应用程序前构建的上下文环境对象为 SparkContext
- SparkSQL 中执行应用程序前构建的上下文环境对象为:
- 老版本:SQLContext,用于 Spark 自己提供的 SQL 查询;HiveContext,用于连接 Hive 的查询
- 新版本:SparkSession,实质上是 SQLContext 和 HiveContext 的组合,内部封装了 SparkContext
2. DataFrame
2.1 介绍
- DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库中的二维表格。与 RDD 的主要区别在于,DataFrame 带有 schema 元信息,即 DataFrame 所表示的二维表数据集的每一列都带有名称和类型
- 与 Hive 类似,DataFrame 也支持嵌套数据类型(struct、array 和 map)
- DataFrame 是为数据提供了 Schema 的视图,可以把它当做数据库中的一张表来看待
- DataFrame 是懒执行的,但性能上比 RDD 要高,主要原因:优化的执行计划,即查询计划通过 Spark catalyst optimiser 进行优化
2.2 创建
2.2.1 从 Spark 数据源创建
/**
基本语法:SparkSession.read.[data_format]
可用的数据源格式:csv format jdbc json load option options orc parquet schema table text textFile
*/
// 从一个 json 文件中创建 DataFrame
val df = sparkSession.read.json("data/user.json")
// 展示 DataFrame 数据
df.show()
2.2.2 从 RDD 创建
/**
基本语法:RDD.toDF(col1, col2, ...)
注意:
1.必须先创建 SparkSession 对象,使用 val 修饰且命名为 spark
2.引入:import spark.implicits._ (此处的 spark 为 SparkSession 对象)
*/
val rdd = sc.makeRDD(List(1,2,3,4))
val df = rdd.toDF("id")
df.show()
//使用样例类
case class User(val name: String, val age: Int)
val rdd1 = sc.makeRDD(List(
("tom", 18), ("jerry", 17)
)).map(t => User(t._1, t._2))
rdd1.toDF.show
// DataFrame 转换为 RDD
val rdd2: RDD[org.apache.spark.sql.Row] = df.rdd
2.2.3 从 Hive Table 创建
/**
基本语法:SparkSession.sql("select * from hive_table")
*/
val df = sparkSession.sql("select * from user")
df.show()
2.3 SQL 语法
// 1.创建临时视图
// 1.1 session 范围有效
df.createTempView("user")
df.createOrReplaceTempView("user")
// 1.2 应用范围有效
df.createGlobalTempView("emp")
df.createOrReplaceGlobalTempView("emp")
// 2.使用 SQL 语法查询视图
sparkSession.sql("select * from user").show()
// 访问全局视图需要加上 global_temp 前缀
sparkSession.newSession().sql("select * from global_temp.emp").show()
2.4 DSL 语法
domain-specific language,可以直接在 DataFrame 中管理结构化的数据
// 1.查看 DataFrame 的 Schema 信息
df.printSchema()
// 2.查看某列数据
df.select("username").show()
// 2.1 查看某列计算后的数据:涉及到运算的时候,每列都必须使用 $ 或者单引号表达式
// 注意:必须先创建 SparkSession 对象,使用 val 修饰且命名为 spark,然后 import spark.implicits._
df.select($"age" + 1).show()
df.select($"username", $"age" + 1).show()
df.select('username, 'age + 1).show()
// 3.按条件查询数据
df.filter('age > 20).show()
// 4.分组计数
df.groupBy("age").count().show()
3. DataSet
3.1 介绍
- DataSet 是具有强类型的数据集合,需要提供对应的类型信息,比如 DataSet[Car],DataSet[Person]
- DataSet 是 DataFrame API 的一个扩展,是 SparkSQL 最新的数据抽象
- 用户友好的 API 风格,既具有类型安全检查也具有 DataFrame 的查询优化特性
- 用样例类来对 DataSet 中定义数据的结构信息,样例类中每个属性的名称直接映射到 DataSet 中的字段名称
- DataFrame 是 DataSet 的特列,DataFrame = DataSet[Row] ,所以可以通过 as 方法将 DataFrame 转换为 DataSet。Row 是一个类型,跟 Car、Person 这些的类型一样,所有的表结构信息都用 Row 来表示。获取数据时需要指定顺序
3.2 创建
3.2.1 从序列创建
/**
基本语法:List/Seq.toDS()
*/
// 1.基本类型序列创建
val list = List(1,2,3,4)
val ds = list.toDS()
ds.show()
// 2.样例类序列创建
case class Person(name: String, age: Int)
val list1 = List(Person("tom", 20), Person("jerry", 18))
val ds1 = list1.toDS()
ds1.show()
3.2.2 从 RDD 创建
/**
基本语法:RDD.toDS(),最好使用样例类类型的 RDD
DataSet 转成 RDD:DataSet.rdd()
*/
case class Person(name: String, age: Int)
val rdd = sc.makeRDD(Person("tom", 20), Person("jerry", 18))
val ds = rdd.toDS()
ds.show()
val rdd1 = ds.rdd()
3.2.3 从 DataFrame 创建
/**
基本语法:DataFrame.as[Class]
DataSet 转成 DataFrame:DataSet.toDF
*/
val df = sc.makeRDD(List(
("tom", 20), ("jerry", 18)
)).toDF("name", "age")
df.show()
// 定义样例类型
case class Person(name: String, age: Int)
val ds = df.as[Person]
ds.show()
val df1 = ds.toDF()
df1.show()
4. RDD/DataFrame/DataSet 的关系
- RDD 产生于 Spark1.0 版本,DataFrame 产生于 Spark1.3 版本,Dataset 产生于 Spark1.6 版本
- 三者都是 Spark 平台下的分布式弹性数据集,为处理超大型数据提供便利,都有惰性机制,都会根据 Spark 的内存情况自动缓存运算,都有 partition 的概念
- RDD 不支持 SparkSQL 操作,而 DataFrame 与 DataSet 均支持 SparkSQL 的操作;DataFrame 其实是 DataSet 的一个特例