我们使用sparksql进行编程,编程的过程我们需要创建dataframe对象,这个对象的创建方式我们是先创建RDD然后再转换rdd变成为DataFrame对象。
但是sparksql给大家提供了多种便捷读取数据的方式。
//原始读取数据方式
sc.textFile().toRDD
sqlSc.createDataFrame(rdd,schema)
//更便捷的使用方式
sqlSc.read.text|orc|parquet|jdbc|csv|json
df.write.text|orc|parquet|jdbc|csv|json
write写出存储数据的时候也是文件夹的,而且文件夹不能存在。
- csv是一个介于文本和excel之间的一种格式,如果是文本打开用逗号分隔的。
- text文本普通文本,但是这个文本必须只能保存一列内容。
以上两个文本都是只有内容的,没有列的。
- json是一种字符串结构,本质就是字符串,但是存在kv,例子 {"name":"zhangsan","age":20}
多平台解析方便,带有格式信息。
- orc格式一个列式存储格式,hive专有的。
- parquet列式存储,顶级项目
以上都是列式存储问题,优点(1.列式存储,检索效率高,防止冗余查询 2.带有汇总信息,查询特别快 3.带有轻量级索引,可以跳过大部分数据进行检索),他们都是二进制文件,带有格式信息。
jdbc 方式,它是一种协议,只要符合jdbc规范的服务都可以连接,mysql,oracle,hive,sparksql
整体代码:
package com.hainiu.spark
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.{SparkConf, SparkContext}
import java.util.Properties
object TestMovieWithSql {
def main(args: Array[String]): Unit = {
//??movie???
//1.id middle=name last=type
val conf = new SparkConf()
conf.setAppName("movie")
conf.setMaster("local[*]")
conf.set("spark.shuffle.partitions","20")
val sc = new SparkContext(conf)
val sqlSc = new SQLContext(sc)
import sqlSc.implicits._
//deal data
val df = sc.textFile("data/movies.txt")
.flatMap(t => {
val strs = t.split(",")
val mid = strs(0)
val types = strs.reverse.head
val name = strs.tail.reverse.tail.reverse.mkString(" ")
types.split("\\|").map((mid, name, _))
}).toDF("mid", "mname", "type")
df.limit(1).show()
val df1 = sc.textFile("data/ratings.txt")
.map(t=>{
val strs = t.split(",")
(strs(0),strs(1),strs(2).toDouble)
}).toDF("userid","mid","score")
df1.limit(1).show()
import org.apache.spark.sql.functions._
val df11 = df.join(df1, "mid").groupBy("userid", "type")
.agg(count("userid").as("cnt"))
.withColumn("rn", row_number().over(Window.partitionBy("userid").orderBy($"cnt".desc)))
.where("rn = 1")
.select("userid", "type")
val df22 = df.join(df1, "mid").groupBy("type", "mname")
.agg(avg("score").as("avg"))
.withColumn("rn", row_number().over(Window.partitionBy("type").orderBy($"avg".desc)))
.where("rn<4")
.select("type", "mname")
val df33 = df11.join(df22, "type")
//spark3.1.2?? spark2.x
// df33.write.csv()
df33.write
.format("csv")
.save("data/csv")
// df33.write.
// csv("data/csv")
// df33.write.json("data/json")
// df33.write.parquet("data/parquet")
// df33.write.orc("data/orc")
// val pro = new Properties()
// pro.put("user","root")
// pro.put("password","hainiu")
// df33.write.jdbc("jdbc:mysql://11.99.173.24:3306/hainiu","movie",pro)
}
}
为了简化存储的计算方式:
package com.hainiu.spark
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object TestSink {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("test sink")
conf.setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlSc = new SQLContext(sc)
import sqlSc.implicits._
import org.apache.spark.sql.functions._
val df = sc.textFile("data/a.txt")
.map(t=>{
val strs = t.split(" ")
(strs(0),strs(1),strs(2),strs(3))
}).toDF("id","name","age","gender")
.withColumn("all",concat_ws(" ",$"id",$"name",$"age",$"gender"))
.select("all")
// df.write.csv("data/csv")
// df.write.format("org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2")
// .save("data/csv")
// df.write.parquet("data/parquet")
// df.write.format("org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2")
// .save("data/parquet")
// df.write.format("org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2")
// .save("data/json")
df.write.format("org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2")
.save("data/text")
}
}
读取数据代码:
package com.hainiu.spark
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import java.util.Properties
object TestReadData {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("movie")
conf.setMaster("local[*]")
conf.set("spark.shuffle.partitions", "20")
val sc = new SparkContext(conf)
val sqlSc = new SQLContext(sc)
// sqlSc.read.text("data/text").show()
// sqlSc.read.csv("data/csv").show()
//
// sqlSc.read.parquet("data/parquet").show()
// sqlSc.read.json("data/json").show()
sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2").load("data/text").show()
sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2").load("data/csv").show()
sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2").load("data/json").show()
sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2").load("data/parquet").show()
sqlSc.read.orc("data/orc").show()
val pro = new Properties()
pro.put("user","root")
pro.put("password","hainiu")
sqlSc.read.jdbc("jdbc:mysql://11.99.173.24:3306/hainiu","movie",pro).show()
}
}