SparkSQL编程入口和模型
SparkSQL编程模型
主要通过两种方式操作SparkSQL,一种就是SQL,另一种为DataFrame和Dataset。
1)SQL:SQL不用多说,就和Hive操作一样,但是需要清楚一点的是,SQL操作的是表,所以要想用SQL进行操作,就需要将SparkSQL对应的编程模型转化成为一张表才可以。同时支持,通用sql和hivesql。
2)DSL(DataFrame&DataSet):在支持SQL编程的同时,方便大家使用函数式编程的思想,类似sparkcore的编程模式,sparksql也支持DSL(Domain Specified Language,领域专用语言,或者特定领域语言),即通过DataFrame和Dataset来支持类似RDD的编程。
DataFrame和Dataset是SparkSQL中的编程模型。DataFrame和Dataset我们都可以理解为是一张mysql中的二维表,表有什么?表头,表名,字段,字段类型。RDD其实说白了也是一张二维表,但是这张二维表相比较于DataFrame和Dataset却少了很多东西,比如表头,表名,字段,字段类型,只有数据。
Dataset是在spark1.6.2开始出现的api,DataFrame是1.3的时候出现的,早期的时候DataFrame叫SchemaRDD,SchemaRDD和SparkCore中的RDD相比较,就多了Schema,所谓约束信息,元数据信息。
一般的,将RDD称之为Spark体系中的第一代编程模型;DataFrame比RDD多了一个Schema元数据信息,被称之为Spark体系中的第二代编程模型;Dataset吸收了RDD的优点(强类型推断和强大的函数式编程)和DataFrame中的优化(SQL优化引擎,内存列存储),成为Spark的最新一代的编程模型。
RDD V.S. DataFrame V.S. Dataset
RDD
弹性分布式数据集,是Spark对数据进行的一种抽象,可以理解为Spark对数据的一种组织方式,更简单些说,RDD就是一种数据结构,里面包含了数据和操作数据的方法。从字面上就能看出的几个特点:
1)弹性:数据可完全放内存或完全放磁盘,也可部分存放在内存,部分存放在磁盘,并可以自动切换。RDD出错后可自动重新计算(通过血缘自动容错)。可checkpoint(设置检查点,用于容错),可persist或cache(缓存),里面的数据是分片的(也叫分区,partition),分片的大小可自由设置和细粒度调整。
2)分布式:RDD中的数据可存放在多个节点上。
3)数据集:即数据的集合,相对于DataFrame和Dataset,RDD是Spark最底层的抽象,目前是开发者用的最多的,但逐步会转向DataFrame和Dataset(当然,这是Spark的发展趋势)调整。
DataFrame
理解了RDD,DataFrame就容易理解些,DataFrame的思想来源于Python的pandas库,RDD是一个数据集,DataFrame在RDD的基础上加了Schema(描述数据的信息,可以认为是元数据,DataFrame曾经就有个名字叫SchemaRDD)。
假设RDD中的两行数据长这样,如图-5所示。
图-5 rdd数据
那么DataFrame中的数据长这样,如图-6所示。
图-6 dataframe数据
从上面两个图可以看出,DataFrame比RDD多了一个表头信息(Schema),像一张表了,DataFrame还配套了新的操作数据的方法,DataFrame API(如df.select())和SQL(select id, name from xx_table where ...)。
有了DataFrame这个高一层的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据了,对开发者来说,易用性有了很大的提升。
不仅如此,通过DataFrame API或SQL处理数据,会自动经过Spark 优化器(Catalyst)的优化,即使你写的程序或SQL不高效,也可以运行的很快。
Dataset:相对于RDD,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束,下图-7是官网对于dataset的表述。
图-7 dataset
假设RDD中的两行数据如同-5所示,那么Dataset中的数据长这样,如图-8所示。
图-8 dataset数据
或者也可以如图-9所示,其中每行数据是个Object。
图-9 dataset数据
使用Dataset API的程序,会经过Spark SQL的优化器进行优化(优化器叫什么还记得吗?)
目前仅支持Scala、Java API,尚未提供Python的API(所以一定要学习Scala),相比DataFrame,Dataset提供了编译时类型检查,对于分布式程序来讲,提交一次作业太费劲了(要编译、打包、上传、运行),到提交到集群运行时才发现错误,实在是不方便,这也是引入Dataset的一个重要原因。
使用DataFrame的代码中json文件中并没有score字段,但是能编译通过,但是运行时会报异常!如图-10代码所示。
图-10 dataframe编码
而使用Dataset实现,会在IDE中就报错,出错提前到了编译之前,如下图-11所示。
图-11 dataset编码
SparkSession
在SparkSQL中的编程模型,不再是SparkContext,但是创建需要依赖SparkContext。SparkSQL中的编程模型,在spark2.0以前的版本中为SQLContext和HiveContext,HiveContext是SQLContext的一个子类,提供Hive中特有的一些功能,比如row_number开窗函数等等,这是SQLContext所不具备的,在Spark2.0之后将这两个进行了合并——SparkSession。SparkSession的构建需要依赖SparkConf或者SparkContext。使用工厂构建器(Builder方式)模式创建SparkSession。
SparkSQL基本编程
SparkSQL编程初体验
1)SparkSession的构建:
val spark = SparkSession.builder()
.appName("SparkSQLOps")
.master("local[*]")
//.enableHiveSupport()//支持hive的相关操作
.getOrCreate()
2)基本编程:
object SparkSQLOps {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("SparkSQLOps")
.master("local[*]")
//.enableHiveSupport()//支持hive的相关操作
.getOrCreate()
//加载数据
val pdf:DataFrame = spark.read.json("file:///E:/data/spark/sql/people.json")
//二维表结构
pdf.printSchema()
//数据内容 select * from tbl
pdf.show()
//具体的查询 select name, age from tbl
pdf.select("name", "age").show()
//导入sparksession中的隐式转换操作,增强sql的功能
import spark.implicits._
pdf.select($"name",$"age").show()
//列的运算,给每个人的年龄+10 select name, age+10,height-1 from tbl
pdf.select($"name",$"height" - 1, new Column("age").+(10)).show()
//起别名
select name, age+10 as age,height-1 as height from tbl
pdf.select($"name",($"height" -1).as("height")).show()
//做聚合统计 统计不同年龄的人数
select age, count(1) counts from tbl group by age
pdf.select($"age").groupBy($"age").count().show()
//条件查询 获取年龄超过18的用户
//pdf.select("name", "age", "height").where($"age".>(18)).show()
pdf.select("name", "age", "height").where("age > 18").show()
//sql风格
//pdf.registerTempTable()
//在spark2.0之后处于维护状态,使用createOrReplaceTempView
/*
从使用范围上说,分为global和非global
global是当前SparkApplication中可用,非global只在当前SparkSession中可用
从创建的角度上说,分为createOrReplace和不Replace
createOrReplace会覆盖之前的数据
create不Replace,如果视图存在,会报错
*/
pdf.createOrReplaceTempView("people")
spark.sql(
"""
|select
| age,
| count(1) as countz
|from people
|group by age
""".stripMargin).show
spark.stop()
}
}
SparkSQL编程模型的操作
DataFrame的构建方式
在Spark SQL中SparkSession是创建DataFrames和执行SQL的入口,创建DataFrames有三种方式,一种是可以从一个存在的RDD进行转换,还可以从Hive Table进行查询返回,或者通过Spark的数据源进行创建。
从Spark数据源进行创建:
package chapter1
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
object Create_DataFrame {
def main(args: Array[String]): Unit = {
//创建程序入口
val spark = SparkSession.builder()
.appName("createDF")
.master("local[*]")
.getOrCreate()
//调用sparkContext
val sc: SparkContext = spark.sparkContext
//设置控制台日志输出级别
sc.setLogLevel("WARN")
//从数据源创建DataFrame
val personDF = spark.read.json("resources/people.json")
//展示数据
personDF.show()
}
}
从RDD进行转换:
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object Create_DataFrame1 {
def main(args: Array[String]): Unit = {
//创建程序入口
val spark= SparkSession.builder()
.appName("createDF")
.master("local[*]")
.getOrCreate()
//调用sparkContext
val sc: SparkContext = spark.sparkContext
//设置控制台日志输出级别
sc.setLogLevel("WARN")
//导包
import spark.implicits._
//加载数据
val file: RDD[String] = sc.textFile("E:\\资料\\data\\person.txt")
//按照分隔符进行切分
val spliFile: RDD[Array[String]] = file.map(line=>line.split(" "))
//指定字段类型
val personRDD: RDD[(Int, String, Int)] = spliFile.map(line=>(line(0).toInt,line(1),line(2).toInt))
//调用toDF方法指定列名
val personDF: DataFrame = personRDD.toDF("id","name","age")
//展示数据
personDF.show()
//释放资源
spark.stop()
sc.stop()
}
}
通过反射创建DataFrame:
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
case class person(id:Int,name:String,age:Int)
object createDataFrame2 {
def main(args: Array[String]): Unit = {
//创建程序入口
val spark = SparkSession.builder()
.appName("createDF")
.master("local[*]")
.getOrCreate()
//调用sparkContext
val sc: SparkContext = spark.sparkContext
//设置控制台日志输出级别
sc.setLogLevel("WARN")
//导包
import spark.implicits._
//加载数据
val file: RDD[String] = sc.textFile("E:\\资料\\data\\person.txt")
//按照分隔符进行切分
val spliFile: RDD[Array[String]] = file.map(line=>line.split(" "))
//指定字段类型
val personRDD: RDD[person] = spliFile.map(line=>person(line(0).toInt,line(1),line(2).toInt))
//调用toDF方法指定列名
val personDF: DataFrame = personRDD.toDF()
//展示数据
personDF.show()
//释放资源
spark.stop()
sc.stop()
}
}
动态编程:
/*
使用动态编程的方式构建DataFrame
Row-->行,就代表了二维表中的一行记录,jdbc中的resultset,就是java中的一个对象
*/
val row:RDD[Row] = spark.sparkContext.parallelize(List(
Row(1, "李伟", 1, 180.0),
Row(2, "汪松伟", 2, 179.0),
Row(3, "常洪浩", 1, 183.0),
Row(4, "麻宁娜", 0, 168.0)
))
//表对应的元数据信息
val schema = StructType(List(
StructField("id", DataTypes.IntegerType, false),
StructField("name", DataTypes.StringType, false),
StructField("gender", DataTypes.IntegerType, false),
StructField("height", DataTypes.DoubleType, false)
))
val df = spark.createDataFrame(row, schema)
df.printSchema()
df.show()
说明,这里学习三个新的类:
1)Row:代表的是二维表中的一行记录,或者就是一个Java对象。
2)StructType:是该二维表的元数据信息,是StructField的集合。
3)StructField:是该二维表中某一个字段/列的元数据信息(主要包括,列名,类型,是否可以为null)。
Dataset的构建方式
Dataset是DataFrame的升级版,创建方式和DataFrame类似,但有不同。
//dataset的构建
object SparkSQLDatasetOps {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("SparkSQLDataset")
.master("local[*]")
.getOrCreate()
//dataset的数据集
val list = List(
new Student(1, "王盛芃", 1, 19),
new Student(2, "李金宝", 1, 49),
new Student(3, "张海波", 1, 39),
new Student(4, "张文悦", 0, 29)
)
import spark.implicits._
val ds = spark.createDataset[Student](list)
ds.printSchema()
ds.show()
spark.stop()
}
}
case class Student(id:Int, name:String, gender:Int, age:Int)
在编码中需要注意的是,如果导入spark.implicits隐式转换或者数据类型不是case class,便会出现如图-12所示的bug。
图-12 dataset编码注意的问题
在创建Dataset的时候,需要注意数据的格式,必须使用case class,或者基本数据类型,同时需要通过import spark.implicts._来完成数据类型的编码,而抽取出对应的元数据信息,否则编译无法通过。
RDD和DataFrame以及DataSet的互相转换
RDD→DataFrame:
def beanRDD2DataFrame(spark:SparkSession): Unit = {
val stuRDD:RDD[Student] = spark.sparkContext.parallelize(List(
new Student(1, "王盛芃", 1, 19),
new Student(2, "李金宝", 1, 49),
new Student(3, "张海波", 1, 39),
new Student(4, "张文悦", 0, 29)
))
val sdf =spark.createDataFrame(stuRDD, classOf[Student])
sdf.printSchema()
sdf.show()
}
RDD→Dataset:
Def rdd2Dataset(spark:SparkSession): Unit = {
val stuRDD = spark.sparkContext.parallelize(List(
Student(1, "王盛芃", 1, 19),
Student(2, "李金宝", 1, 49),
Student(3, "张海波", 1, 39),
Student(4, "张文悦", 0, 29)
))
import spark.implicits._
val ds:Dataset[Student] = spark.createDataset[Student](stuRDD)
ds.show()
}
case class Student(id:Int, name:String, gender:Int, age:Int)
RDD转换为DataFrame和Dataset的时候可以有更加简单的方式,如下:
import spark.implicits._
rdd.toDF()
rdd.toDS()
DataFrame→RDD:
val rdd:RDD[Row] = df.rdd
rdd.foreach(row => {
val id = row.getInt(0)
val name = row.getString(1)
val gender = row.getInt(2)
val height = row.getAs[Double]("height")
println(s"id=${id},name=$name,gender=$gender,height=$height")
})
Dataset→RDD:
val stuDS: Dataset[Student] = list2Dataset(spark)
val stuRDD:RDD[Student] = stuDS.rdd
stuRDD.foreach(println)
Dataset→DataFrame:
val stuDS: Dataset[Student] = list2Dataset(spark)
//dataset --->dataframe
val df:DataFrame = stuDS.toDF()
df.show()
DataFrame→Dataset:无法直接将DataFrame转化为Dataset,需要通过as方法添加泛型。