目录
- 一、创建DataFrame
- 二、Sql语法
- 三、DSL语法
- 四、RDD与DataFrame互相转换
一、创建DataFrame
在SparkSql中SparkSession是创建DataFrame和执行Sql的入口,创建DataFrame有三种方式:
-
通过Spark的数据源进行创建
-
从一个存在的RDD进行转换
-
从Hive Table进行查询返回
二、Sql语法
Sql语法风格是指我们查询数据的时候使用Sql语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助
注意:
普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表,使用全局临时表时需要全路径访问,如:global_temp.people
对于DataFrame创建一个全局表:
df.createGlobalTempView("people")
通过Sql语句实现查询全表
三、DSL语法
DataFrame提供了一个特定领域语言(domain-specific language,DSL)去管理结构化的数据,可以在Scala,Java,Python和R中使用DSL,使用DSL语法风格不必去创建临时视图了
注意:
当涉及到运算的时候,每列都必须使用$,或者采用引号表达式:单引号+字段名
起别名:
查看age大于等于30的数据:
根据 age 分组,查看数据条数
四、RDD与DataFrame互相转换
在IDEA中开发程序时,如果需要RDD与DF或者DS之间互相操作,那么需要引入:
import spark.implicits._
这里的spark不是scala中的包名,而是创建的sparkSession对象的变量名称,所以必须先创建SparkSession对象再导入,这里的spark对象不能使用var声明,因为scala只支持val修饰的对象的引入,spark-shell中无需导入,自动完成此操作
用var修饰的话,编译不通过
rdd与dataframe互相转换:
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.getOrCreate()
import spark.implicits._
val rdd: RDD[Int] = spark.sparkContext.makeRDD(List(1,2,3,4,5))
val df: DataFrame = rdd.toDF("id")
val rdd2: RDD[Row] = df.rdd
df.show()
println("-" * 100)
rdd2.collect().foreach(println)