spark第三篇sql
- sparksql概述
- sparksql四大特性
- dataframe概述
- 通过读取数据源创建dataFrame
- DataFrame常用操作
- DataSet
- 将RDD转换为DataFrame代码开发
- sparksql 操作hivesql
- sparksql读取mysql表中的数据
- sparksql将结果数据写入到mysql中
sparksql概述
- 1、sparksql发展史
- shark为spark提供了分布式数据仓库系统
- shark依赖于hive的代码、依赖于spark的版本
- 2、sparksql是什么
- sparksql是spark的一个模块,主要用来处理结构化数据。
- 操作sparksql的方式: sql 、dataframe、dataSet
sparksql四大特性
- 1、易整合
- 把sql语句与spark程序进行无缝混合使用
- 采用4种Api:java/scala/python/R
- 2、统一的数据源访问方式
- 可以通过一种相同的方式对接任何外部数据源
- sparkSession.read.文件格式(文件格式的路径)
- 3、兼容hivesql
- 可以在sparksql中写hivesql
- 4、支持标准的数据库连接
- JDBC和ODBC
dataframe概述
-
dataframe是什么
- dataframe前身是schemaRDD,在spark1.3.0之后才出现的
- DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化。
-
DataFrame与RDD的区别
- RDD可看作是分布式的对象的集合,Spark并不知道对象的详细模式信息,DataFrame可看作是分布式的Row对象的集合,其提供了由列组成的详细模式信息,使得Spark SQL可以进行某些形式的执行优化。
-
dataFrame和rdd优缺点
- rdd优缺点
- 优点
- 1、编译时类型安全
- 2、具备面向对象编程风格
- 缺点
- 1、序列化和反序列化性能开销大
- 2、频繁创建和销毁对象,造成大量的GC
- 优点
- dataFrame引入schema和off-heap(使用不在java堆中的内存,直接使用操作系统中的内存),决定了rdd缺点,同时丢失了rdd有点。dataframe不在是类型安全和面向对象编程风格。
- rdd优缺点
通过读取数据源创建dataFrame
- 1、读取文本文件创建dataFrame
- sparkSession.read.text(“文本文件格式的路径”)
- 2、读取json文件创建dataFrame
- sparkSession.read.json(“json文件格式的路径”)
- 3、读取parquet文件创建dataFrame
- sparkSession.read.parquet(“parquet文件格式的路径”)
DataFrame常用操作
-
DSL语法风格
-
它就是dataFrame自身提供的API
1、打印DataFrame的schema printlnSchema 2、查看dataFrame中的数据 show 3、取出第一位 first head(N) 取出前N个 4、查看某个字段 peopleDF.select("name").show peopleDF.select(col("name")).show peopleDF.select($"name").show peopleDF.select(peopleDF("name")).show 5、取出多个字段 peopleDF.select("name","age").show 6、让age字段+1 peopleDF.select(col("age")+1).show 7、过滤出年龄大于30的人数 peopleDF.filter($"age" > 30).count
-
-
SQL风格语法
-
通过将一个dataFrame注册成一张表,接下来就可以通过sql语句操作dataFrame
- sparkSession.sql(sql语句)
1、先需要将DataFrame注册成一张临时表 personDF.registerTempTable("t_person") 2、然后通过sparkSession.sql(sql语句)操作DataFrame sparkSession.sql("select * from t_person").show
-
DataSet
- 1、DataSet是什么
- DataSet是分布式的数据集合,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束。DataSet是在Spark1.6中添加的新的接口。它集中了RDD的优点(强类型和可以用强大lambda函数)以及使用了Spark SQL优化的执行引擎。
- 2、DataSet特性
- 继承了RDD的优点
- 编译时类型安全
- 面向对象编程风格
- 继承了RDD的优点
- 3、创建Dataset
- 1、spark.createDataSet(“已经存在的scala集合”)
- 2、spark.createDataSet(“已经存在RDD”)
- 3、已经存在的scala集合调用toDs
- 4、通过dataFrame转换生成 as[强类型]
- 4、dataSet与dataFrame互相转换
- 1、将dataSet转化生成dataFrame
- dataSet.toDF
- 2、将dataFrame转换成dataSet
- dataFrame.as[强类型]
- 1、将dataSet转化生成dataFrame
将RDD转换为DataFrame代码开发
-
导包
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.0.2</version> </dependency>
-
1、 通过定义case class样例类利用反射机制推断Schema
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Column, DataFrame, SparkSession} //todo:利用反射机制(case class )指定dataFrame的schema case class Person(id:Int,name:String,age:Int) object CaseClassSchema { def main(args: Array[String]): Unit = { //1、创建SparkSession val spark: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate() //2、获取sparkContext val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //3、读取数据文件 val data: RDD[Array[String]] = sc.textFile("d:\\person.txt").map(x=>x.split(" ")) //4、将rdd与样例类关联 val personRDD: RDD[Person] = data.map(x=>Person(x(0).toInt,x(1),x(2).toInt)) //5、获取DataFrame //手动导入隐式转换 import spark.implicits._ val personDF: DataFrame = personRDD.toDF //---------------DSL语法-------------start //1、打印dataframe的schema元信息 personDF.printSchema() //2、 显示dataFrame结果数据 personDF.show() personDF.show(2) //3、显示第一条数据 println(personDF.head()) //4、查询name字段结果数据 personDF.select("name").show() personDF.select($"name").show() personDF.select(new Column("name")).show() //5、把age字段结果加1 personDF.select($"id",$"name",$"age",$"age"+1).show() //6、把age 大于30的人过滤出来 personDF.filter($"age" > 30).show() //7、按照age进行分组 personDF.groupBy("age").count().show() //---------------DSL语法-------------end //--------------SQL语法--------------start //把dataFrame注册成一张表 personDF.createTempView("t_person") //通过sparksession调用sql方法 spark.sql("select * from t_person").show() spark.sql("select * from t_person where name='lisi'").show() spark.sql("select * from t_person order by age desc ").show() //--------------SQL语法--------------end //关闭操作 sc.stop() spark.stop() } }
-
2、通过StructType指定schema
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} //todo:将rdd转换成dataFrame,通过StructType来指定schema object SparkSqlSchema { def main(args: Array[String]): Unit = { //1、创建sparkSession val spark: SparkSession = SparkSession.builder().appName("SparkSqlSchema").master("local[2]").getOrCreate() //2、获取得到sparkContext val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //3、读取数据文件 val data: RDD[Array[String]] = sc.textFile("d:\\person.txt").map(_.split(" ")) //4、将rdd与Row类型关联 val rowRDD: RDD[Row] = data.map(x=>Row(x(0).toInt,x(1),x(2).toInt)) //5、指定schema val schema:StructType=StructType( StructField("id", IntegerType, true) :: StructField("name", StringType, false) :: StructField("age", IntegerType, false) :: Nil) val personDF: DataFrame = spark.createDataFrame(rowRDD,schema) //打印schema personDF.printSchema() //显示数据 personDF.show() //dataframe注册成一张表 personDF.createTempView("t_person") spark.sql("select * from t_person").show() //关闭 sc.stop() spark.stop() } }
sparksql 操作hivesql
-
导包
<!--引入 spark-hive依赖--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.0.2</version> </dependency>
-
代码开发
import org.apache.spark.sql.SparkSession //todo:利用sparksql操作hivesql object HiveSupport { def main(args: Array[String]): Unit = { //1、创建sparkSession val spark: SparkSession = SparkSession.builder() .appName("HiveSupport") .master("local[2]") .enableHiveSupport() //开启对hivesql的支持 .getOrCreate() //2、利用sparkSession操作hivesql //2.1 创建hive表 //spark.sql("create table if not exists student(id int,name string,age int) row format delimited fields terminated by ',' ") //2.2 加载数据到hive表中 //spark.sql("load data local inpath './data/student.txt' into table student") //2.3 查询表中数据 spark.sql("select * from student").show() //3、关闭 spark.stop() } }
sparksql读取mysql表中的数据
-
代码开发
import java.util.Properties import org.apache.spark.sql.{DataFrame, SparkSession} //todo:利用sparksql从mysql表中读取数据 object DataFromMysql { def main(args: Array[String]): Unit = { //1、创建sparkSession val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate() //2、通过sparksession读取mysql表中的数据 //定义url val url="jdbc:mysql://192.168.200.100:3306/spark" //定义表名 val tableName="iplocation" //定义相关的属性 val properties=new Properties properties.setProperty("user","root") properties.setProperty("password","123456") val jdbcDataFrame: DataFrame = spark.read.jdbc(url,tableName,properties) //显示schema jdbcDataFrame.printSchema() //打印结果数据 jdbcDataFrame.show() //关闭 spark.stop() } }
-
spark-shell 操作读取mysql表中
-
启动spark-shell脚本
spark-shell \ --master spark://hdp-node-01:7077 \ --executor-memory 1g \ --total-executor-cores 2 \ --jars /opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar \ --driver-class-path /opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar
-
执行代码
val mysqlDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.200.100:3306/spark", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "iplocation", "user" -> "root", "password" -> "123456")).load()
-
sparksql将结果数据写入到mysql中
-
代码开发
import java.util.Properties import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} //todo:利用sparksql将结果数据写入到mysql表中 case class Student(id:Int,name:String,age:Int) object Data2Mysql { def main(args: Array[String]): Unit = { //1、创建sparkSession val spark: SparkSession = SparkSession.builder().appName("Data2Mysql").getOrCreate() //2、获取sparkcontext val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //3、读取数据文件 val rdd: RDD[Array[String]] = sc.textFile(args(0)).map(_.split(" ")) //4、将样例类与rdd进行关联 val studentRDD: RDD[Student] = rdd.map(x=>Student(x(0).toInt,x(1),x(2).toInt)) //5、将rdd转换成dataframe import spark.implicits._ val studentDF: DataFrame = studentRDD.toDF //打印结果数据 studentDF.show() //dataFrame注册成一张表 studentDF.createTempView("t_student") //通过sparkSession操作这个表 val result: DataFrame = spark.sql("select * from t_student order by age desc") //把结果数据写入到mysql表中 //定义url val url="jdbc:mysql://192.168.200.100:3306/spark" //定义表名 val tableName=args(1) //定义相关的属性 val properties=new Properties properties.setProperty("user","root") properties.setProperty("password","123456") //mode:指定数据插入模式 //overwrite: 覆盖(事先会创建一张表) //append: 追加(事先会创建一张表) //ignore:忽略(如果当前这个表已经存在,不执行操作) //error:如果当前这个表存在,这个时候就报错 result.write.mode("append").jdbc(url,tableName,properties) //关闭 sc.stop() spark.stop() } }
-
把程序打成jar包 提交到集群中运行
spark-submit --master spark://node1:7077 --class cn.包名.sql.Data2Mysql --executor-memory 1g --total-executor-cores 2 --jars /export/servers/hive/lib/mysql-connector-java-5.1.35.jar --driver-class-path /export/servers/hive/lib/mysql-connector-java-5.1.35.jar original-spark_class06-2.0.jar /person.txt student100