spark第三篇sql

spark第三篇sql

        • sparksql概述
        • sparksql四大特性
        • dataframe概述
        • 通过读取数据源创建dataFrame
        • DataFrame常用操作
        • DataSet
        • 将RDD转换为DataFrame代码开发
        • sparksql 操作hivesql
        • sparksql读取mysql表中的数据
        • sparksql将结果数据写入到mysql中

sparksql概述
  • 1、sparksql发展史
    • shark为spark提供了分布式数据仓库系统
    • shark依赖于hive的代码、依赖于spark的版本
  • 2、sparksql是什么
    • sparksql是spark的一个模块,主要用来处理结构化数据。
    • 操作sparksql的方式: sql 、dataframe、dataSet
sparksql四大特性
  • 1、易整合
    • 把sql语句与spark程序进行无缝混合使用
    • 采用4种Api:java/scala/python/R
  • 2、统一的数据源访问方式
    • 可以通过一种相同的方式对接任何外部数据源
    • sparkSession.read.文件格式(文件格式的路径)
  • 3、兼容hivesql
    • 可以在sparksql中写hivesql
  • 4、支持标准的数据库连接
    • JDBC和ODBC
dataframe概述
  • dataframe是什么

    • dataframe前身是schemaRDD,在spark1.3.0之后才出现的
    • DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化。
  • DataFrame与RDD的区别

    • RDD可看作是分布式的对象的集合,Spark并不知道对象的详细模式信息,DataFrame可看作是分布式的Row对象的集合,其提供了由列组成的详细模式信息,使得Spark SQL可以进行某些形式的执行优化。

    在这里插入图片描述

  • dataFrame和rdd优缺点

    • rdd优缺点
      • 优点
        • 1、编译时类型安全
        • 2、具备面向对象编程风格
      • 缺点
        • 1、序列化和反序列化性能开销大
        • 2、频繁创建和销毁对象,造成大量的GC
    • dataFrame引入schema和off-heap(使用不在java堆中的内存,直接使用操作系统中的内存),决定了rdd缺点,同时丢失了rdd有点。dataframe不在是类型安全和面向对象编程风格。
通过读取数据源创建dataFrame
  • 1、读取文本文件创建dataFrame
    • sparkSession.read.text(“文本文件格式的路径”)
  • 2、读取json文件创建dataFrame
    • sparkSession.read.json(“json文件格式的路径”)
  • 3、读取parquet文件创建dataFrame
    • sparkSession.read.parquet(“parquet文件格式的路径”)
DataFrame常用操作
  • DSL语法风格

    • 它就是dataFrame自身提供的API

      1、打印DataFrame的schema
         printlnSchema
       2、查看dataFrame中的数据
         show
       3、取出第一位
         first
         head(N) 取出前N个
       4、查看某个字段
         peopleDF.select("name").show
         peopleDF.select(col("name")).show
         peopleDF.select($"name").show
         peopleDF.select(peopleDF("name")).show
       5、取出多个字段
         peopleDF.select("name","age").show
       6、让age字段+1
         peopleDF.select(col("age")+1).show
       7、过滤出年龄大于30的人数
         peopleDF.filter($"age" > 30).count
      
  • SQL风格语法

    • 通过将一个dataFrame注册成一张表,接下来就可以通过sql语句操作dataFrame

      • sparkSession.sql(sql语句)
      1、先需要将DataFrame注册成一张临时表
      	personDF.registerTempTable("t_person")
      	
      2、然后通过sparkSession.sql(sql语句)操作DataFrame
      	sparkSession.sql("select * from t_person").show
      
DataSet
  • 1、DataSet是什么
    • DataSet是分布式的数据集合,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束。DataSet是在Spark1.6中添加的新的接口。它集中了RDD的优点(强类型和可以用强大lambda函数)以及使用了Spark SQL优化的执行引擎。
  • 2、DataSet特性
    • 继承了RDD的优点
      • 编译时类型安全
      • 面向对象编程风格
  • 3、创建Dataset
    • 1、spark.createDataSet(“已经存在的scala集合”)
    • 2、spark.createDataSet(“已经存在RDD”)
    • 3、已经存在的scala集合调用toDs
    • 4、通过dataFrame转换生成 as[强类型]
  • 4、dataSet与dataFrame互相转换
    • 1、将dataSet转化生成dataFrame
      • dataSet.toDF
    • 2、将dataFrame转换成dataSet
      • dataFrame.as[强类型]
将RDD转换为DataFrame代码开发
  • 导包

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.0.2</version>
    </dependency>
    
  • 1、 通过定义case class样例类利用反射机制推断Schema

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{Column, DataFrame, SparkSession}
    
    //todo:利用反射机制(case class )指定dataFrame的schema
    
    case class Person(id:Int,name:String,age:Int)
    object CaseClassSchema {
      def main(args: Array[String]): Unit = {
         //1、创建SparkSession
          val spark: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()
         //2、获取sparkContext
          val sc: SparkContext = spark.sparkContext
          sc.setLogLevel("WARN")
         //3、读取数据文件
          val data: RDD[Array[String]] = sc.textFile("d:\\person.txt").map(x=>x.split(" "))
         //4、将rdd与样例类关联
          val personRDD: RDD[Person] = data.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
         //5、获取DataFrame
          //手动导入隐式转换
          import spark.implicits._
          val personDF: DataFrame = personRDD.toDF
    
          //---------------DSL语法-------------start
         //1、打印dataframe的schema元信息
          personDF.printSchema()
         //2、 显示dataFrame结果数据
         personDF.show()
         personDF.show(2)
         //3、显示第一条数据
         println(personDF.head())
         //4、查询name字段结果数据
        personDF.select("name").show()
        personDF.select($"name").show()
        personDF.select(new Column("name")).show()
         //5、把age字段结果加1
        personDF.select($"id",$"name",$"age",$"age"+1).show()
         //6、把age 大于30的人过滤出来
        personDF.filter($"age" > 30).show()
         //7、按照age进行分组
        personDF.groupBy("age").count().show()
    
        //---------------DSL语法-------------end
    
        //--------------SQL语法--------------start
         //把dataFrame注册成一张表
          personDF.createTempView("t_person")
        //通过sparksession调用sql方法
          spark.sql("select * from t_person").show()
          spark.sql("select * from t_person where name='lisi'").show()
          spark.sql("select * from t_person order by age desc ").show()
        //--------------SQL语法--------------end
    
        //关闭操作
        sc.stop()
        spark.stop()
    
      }
    }
    
    
  • 2、通过StructType指定schema

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    
    //todo:将rdd转换成dataFrame,通过StructType来指定schema
    object SparkSqlSchema {
      def main(args: Array[String]): Unit = {
           //1、创建sparkSession
           val spark: SparkSession = SparkSession.builder().appName("SparkSqlSchema").master("local[2]").getOrCreate()
           //2、获取得到sparkContext
            val sc: SparkContext = spark.sparkContext
            sc.setLogLevel("WARN")
           //3、读取数据文件
            val data: RDD[Array[String]] = sc.textFile("d:\\person.txt").map(_.split(" "))
           //4、将rdd与Row类型关联
            val rowRDD: RDD[Row] = data.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
           //5、指定schema
            val schema:StructType=StructType(
                                           StructField("id", IntegerType, true) ::
                                           StructField("name", StringType, false) ::
                                           StructField("age", IntegerType, false) :: Nil)
    
           val personDF: DataFrame = spark.createDataFrame(rowRDD,schema)
    
            //打印schema
          personDF.printSchema()
            //显示数据
          personDF.show()
    
          //dataframe注册成一张表
          personDF.createTempView("t_person")
    
         spark.sql("select * from t_person").show()
    
         //关闭
        sc.stop()
        spark.stop()
    
      }
    }
    
    
sparksql 操作hivesql
  • 导包

            <!--引入 spark-hive依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>2.0.2</version>
            </dependency>
    
  • 代码开发

    import org.apache.spark.sql.SparkSession
    
    //todo:利用sparksql操作hivesql
    object HiveSupport {
      def main(args: Array[String]): Unit = {
          //1、创建sparkSession
         val spark: SparkSession = SparkSession.builder()
                                                .appName("HiveSupport")
                                                .master("local[2]")
                                                .enableHiveSupport()   //开启对hivesql的支持
                                                .getOrCreate()
        //2、利用sparkSession操作hivesql
            //2.1 创建hive表
              //spark.sql("create table if not exists student(id int,name string,age int) row format delimited fields terminated by ',' ")
            //2.2 加载数据到hive表中
              //spark.sql("load data local inpath './data/student.txt' into table student")
            //2.3 查询表中数据
              spark.sql("select * from student").show()
    
        //3、关闭
          spark.stop()
      }
    }
    
    
sparksql读取mysql表中的数据
  • 代码开发

    import java.util.Properties
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    //todo:利用sparksql从mysql表中读取数据
    object DataFromMysql {
      def main(args: Array[String]): Unit = {
         //1、创建sparkSession
          val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate()
    
        //2、通过sparksession读取mysql表中的数据
          //定义url
          val url="jdbc:mysql://192.168.200.100:3306/spark"
          //定义表名
          val tableName="iplocation"
          //定义相关的属性
          val properties=new Properties
           properties.setProperty("user","root")
           properties.setProperty("password","123456")
          val jdbcDataFrame: DataFrame = spark.read.jdbc(url,tableName,properties)
        //显示schema
        jdbcDataFrame.printSchema()
        //打印结果数据
        jdbcDataFrame.show()
    
        //关闭
        spark.stop()
      }
    }
    
    
  • spark-shell 操作读取mysql表中

    • 启动spark-shell脚本

      spark-shell \
      --master spark://hdp-node-01:7077 \
      --executor-memory 1g \
      --total-executor-cores  2 \
      --jars /opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar \
      --driver-class-path /opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar
      
    • 执行代码

      val mysqlDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.200.100:3306/spark", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "iplocation", "user" -> "root", "password" -> "123456")).load()
      
sparksql将结果数据写入到mysql中
  • 代码开发

    import java.util.Properties
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    //todo:利用sparksql将结果数据写入到mysql表中
    
    case class Student(id:Int,name:String,age:Int)
    object Data2Mysql {
      def main(args: Array[String]): Unit = {
         //1、创建sparkSession
          val spark: SparkSession = SparkSession.builder().appName("Data2Mysql").getOrCreate()
         //2、获取sparkcontext
          val sc: SparkContext = spark.sparkContext
            sc.setLogLevel("WARN")
         //3、读取数据文件
          val rdd: RDD[Array[String]] = sc.textFile(args(0)).map(_.split(" "))
         //4、将样例类与rdd进行关联
          val studentRDD: RDD[Student] = rdd.map(x=>Student(x(0).toInt,x(1),x(2).toInt))
         //5、将rdd转换成dataframe
         import spark.implicits._
          val studentDF: DataFrame = studentRDD.toDF
    
          //打印结果数据
          studentDF.show()
    
          //dataFrame注册成一张表
        studentDF.createTempView("t_student")
    
          //通过sparkSession操作这个表
        val result: DataFrame = spark.sql("select * from t_student order by age desc")
          //把结果数据写入到mysql表中
          //定义url
          val url="jdbc:mysql://192.168.200.100:3306/spark"
        //定义表名
         val tableName=args(1)
        //定义相关的属性
        val properties=new Properties
        properties.setProperty("user","root")
        properties.setProperty("password","123456")
        //mode:指定数据插入模式
        //overwrite: 覆盖(事先会创建一张表)
        //append: 追加(事先会创建一张表)
        //ignore:忽略(如果当前这个表已经存在,不执行操作)
        //error:如果当前这个表存在,这个时候就报错
        result.write.mode("append").jdbc(url,tableName,properties)
    
        //关闭
        sc.stop()
        spark.stop()
        }  
       }
    
  • 把程序打成jar包 提交到集群中运行

    
    spark-submit --master spark://node1:7077 --class cn.包名.sql.Data2Mysql --executor-memory 1g --total-executor-cores 2 --jars /export/servers/hive/lib/mysql-connector-java-5.1.35.jar --driver-class-path /export/servers/hive/lib/mysql-connector-java-5.1.35.jar original-spark_class06-2.0.jar /person.txt student100
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/696734.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

HTML静态网页成品作业(HTML+CSS)—— 环保主题介绍网页(5个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;未使用Javacsript代码&#xff0c;共有5个页面。 二、作品演示 三、代…

MQ解决的问题

系统中MQ能解决哪些问题&#xff1f; 1.不同语言的程序使用MQ通信 2.分布式&#xff0c;微服务&#xff0c;之间的通信&#xff0c;实现服务质检解耦 3.高并发实现销峰作用 4.实现异步&#xff0c;提高用户体验。

“程序员职业素养全解析:技能、态度与价值观的融合“

文章目录 每日一句正能量前言专业精神专业精神的重要性技术执着追求的故事结论 沟通能力沟通能力的重要性团队合作意识实际工作中的沟通案例结论 持续学习持续学习的重要性学习方法进步经验结论 后记 每日一句正能量 梦不是为想象&#xff0c;而是让我们继续前往。 前言 在数字…

Python数据分析与机器学习在电子商务推荐系统中的应用

文章目录 &#x1f4d1;引言一、推荐系统的类型二、数据收集与预处理2.1 数据收集2.2 数据预处理 三、基于内容的推荐3.1 特征提取3.2 计算相似度3.3 推荐物品 四、协同过滤推荐4.1 基于用户的协同过滤4.2 基于物品的协同过滤 五、混合推荐与评估推荐系统5.1 结合推荐结果5.2 评…

docker 下载镜像发现超时,加速加速方法

报错原因有可能旧的不能用了&#xff01;&#xff01;&#xff01;换下面的&#xff01;&#xff01;&#xff01; cat /etc/docker/daemon.json "registry-mirrors": ["https://bhu1x6ya.mirror.aliyuncs.com"] 编辑完成后执行以下命令重启docker即可&a…

企业官网:过时了,但又没完全过时

作为一名互联网冲浪级选手&#xff0c;我经常会看到一些有趣的产品。 这两年比较让我感兴趣的产品有「飞聊」、「即刻」及其旗下的「橙 App」等等&#xff0c;然后我就想上它们的官网看看。 虽然现在 app 是主流&#xff0c;但我非常不喜欢下载 app&#xff0c;一是麻烦&…

基于STM32开发的智能空气质量监控系统

⬇帮大家整理了单片机的资料 包括stm32的项目合集【源码开发文档】 点击下方蓝字即可领取&#xff0c;感谢支持&#xff01;⬇ 点击领取更多嵌入式详细资料 问题讨论&#xff0c;stm32的资料领取可以私信&#xff01; 目录 引言环境准备智能空气质量监控系统基础代码实现&…

【吊打面试官系列-Mysql面试题】MySQL_fetch_array 和 MySQL_fetch_object 的区别是什么 ?

大家好&#xff0c;我是锋哥。今天分享关于 【MySQL_fetch_array 和 MySQL_fetch_object 的区别是什么 &#xff1f;】面试题&#xff0c;希望对大家有帮助&#xff1b; MySQL_fetch_array 和 MySQL_fetch_object 的区别是什么 &#xff1f; 以下是 MySQL_fetch_array 和 MySQL…

高考志愿填报的技巧和方法

高考过后&#xff0c;最让家长和学生需要重视的就是怎样填报志愿。高考完和出成绩之前有一段很长的时间&#xff0c;而成绩出来之后往往报考的时间非常的紧张。在很短的时间内&#xff0c;高考的学生和他的家长要综合高考的成绩&#xff0c;考虑院校&#xff0c;专业&#xff0…

【9】openssl 代码调试

0x01 前言 最近在学习密码学&#xff0c;但是国密算法(SM2&#xff0c;SM3,SM4,SM9)的细节都在openssl项目里&#xff0c;当然一些国际算法也在。想着看下代码执行过程和理论结合起来。中间走了一些弯路&#xff0c;做个笔记。 0x02 openssl安装 一开始认为是不是直接下载好的…

万向节锁死(Gimbal Lock)

Gimbal Lock是一个常见的3D动画问题,主要由旋转顺序引起的。我来详细解释一下它的成因: 在三维空间中,任何旋转都可以分解为绕X,Y,Z三个轴的欧拉旋转(Euler Rotation)。每个轴的旋转是按照一定顺序进行的,比如XYZ或ZYX等。 理论上,通过这三个旋转值的组合,可以达到任意的空间…

14. RTCP 协议

RTCP 协议概述 RTCP&#xff08;Real-time Transport Control Protocol 或 RTP Control Protocol 或简写 RTCP&#xff09;&#xff0c;实时传输控制协议&#xff0c;是实时传输协议&#xff08;RTP&#xff09;的一个姐妹协议。 注&#xff1a;RTP 协议和 RTP 控制协议&#…

Policy-Based Reinforcement Learning(1)

之前提到过Discount Return&#xff1a; Action-value Function &#xff1a; State-value Function: &#xff08;这里将action A积分掉&#xff09;这里如果策略函数很好&#xff0c;就会很大&#xff1b;反之策略函数不好&#xff0c;就会很小。 对于离散类型&#xff1a; …

QPS,平均时延和并发数

我们当前有两个服务A和B&#xff0c;想要知道哪个服务的性能更好&#xff0c;该用什么指标来衡量呢&#xff1f; 1. 单次请求时延 一种最简单的方法就是使用同一请求体同时请求两个服务&#xff0c;性能越好的服务时延越短&#xff0c;即 R T 返回结果的时刻 − 发送请求的…

error 12154 received logging on to the standby报错处理

错误 处理方法 该参数不是主库的servicename &#xff08;低级错误&#xff09; SQL> alter system set log_archive_dest_2 SERVICEstandby ASYNC VALID_FOR(ONLINE_LOGFILES,PRIMARY_ROLE) DB_UNIQUE_NAMEstandby; System altered. 观察主库日志: 备库日志: 该问题会影…

SpringBoot 配置事务

SpringBoot 在启动时已经加载了事务管理器&#xff0c;所以只需要在需要添加事务的方法/类上添加Transactional即可生效&#xff0c;无需额外配置。 TransactionAutoConfiguration 事务的自动配置类解析&#xff1a; SpringBoot 启动时加载/META-INF/spring/org.springframewor…

月薪6万,想离职...

大家好&#xff0c;我是无界生长&#xff0c;国内最大AI付费社群“AI破局俱乐部”初创合伙人。这是我的第 39 篇原创文章——《月薪6万&#xff0c;想离职...》 是的&#xff0c;你没有看错&#xff0c;我月薪6万&#xff0c;却想离职&#xff0c;很不可思议吧&#xff1f;周围…

matlab使用教程(95)—显示地理数据

下面的示例说明了多种表示地球地貌的方法。此示例中的数据取自美国商务部海洋及大气管理局 (NOAA) 国家地理数据中心&#xff0c;数据通告编号为 88-MGG-02。 1.关于地貌数据 数据文件 topo.mat 包含地貌数据。topo 是海拔数据&#xff0c;topomap1 是海拔的颜色图。 load t…

UART基本定义、三种编程方式、freertos内怎么用、怎么封装

文章目录 串口基本概念串口的三种编程方式uart编程查询方式不常用、其他两个方式用的多中断方式&#xff1a;代码原理 DMA方式&#xff1a;配置DMA原理代码 效率最高的UART编程方式&#xff1a;是什么&#xff1f;操作 在freertos里面调用uart应该怎么做&#xff1f;代码 面向对…

【PL理论】(16) 形式化语义:语义树 | <Φ, S> ⇒ M | 形式化语义 | 为什么需要形式化语义 | 事实:部分编程语言的设计者并不会形式化语义

&#x1f4ad; 写在前面&#xff1a;本章我们将继续探讨形式化语义&#xff0c;讲解语义树&#xff0c;然后我们将讨论“为什么需要形式化语义”&#xff0c;以及讲述一个比较有趣的事实&#xff08;大部分编程语言设计者其实并不会形式化语义的定义&#xff09;。 目录 0x00…