Spark SQL 中DataFrame DSL的使用

在上一篇文章中已经大致说明了DataFrame APi,下面我们具体介绍DataFrame DSL的使用。DataFrame DSL是一种命令式编写Spark SQL的方式,使用的是一种类sql的风格语法。

文章链接:

一、单词统计案例引入

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object Demo2DSLWordCount {
  def main(args: Array[String]): Unit = {
    /**
     * 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession
     */
    val sparkSession: SparkSession = SparkSession.builder()
      .master("local")
      .appName("wc spark sql")
      .getOrCreate()

    /**
     * spark sql和spark core的核心数据类型不太一样
     *
     * 1、读取数据构建一个DataFrame,相当于一张表
     */
    val linesDF: DataFrame = sparkSession.read
      .format("csv") //指定读取数据的格式
      .schema("line STRING") //指定列的名和列的类型,多个列之间使用,分割
      .option("sep", "\n") //指定分割符,csv格式读取默认是英文逗号
      .load("spark/data/words.txt") // 指定要读取数据的位置,可以使用相对路径

    /**
     * DSL: 类SQL语法 api  介于代码和纯sql之间的一种api
     *
     * spark在DSL语法api中,将纯sql中的函数都使用了隐式转换变成一个scala中的函数
     * 如果想要在DSL语法中使用这些函数,需要导入隐式转换
     *
     */
    //导入Spark sql中所有的sql隐式转换函数
    import org.apache.spark.sql.functions._
    //导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理
    import sparkSession.implicits._

//    linesDF.select(explode(split($"line","\\|")) as "word")
//      .groupBy($"word")
//      .count().show()

    val resultDF: DataFrame = linesDF.select(explode(split($"line", "\\|")) as "word")
      .groupBy($"word")
      .agg(count($"word") as "counts")

    /**
     * 保存数据
     */
    resultDF
      .repartition(1)
      .write
      .format("csv")
      .option("sep","\t")
      .mode(SaveMode.Overwrite)
      .save("spark/data/sqlout2")

  }

}

注意:show()可以指定两个参数,第一个参数为展现的条数,不指定默认展示前20条数据,第二个参数默认为false,代表的是如果数据过长展示就会不完全,可以指定为true,使得数据展示完整,比如 : show(200,truncate = false)

二、数据源获取

查看官方文档:Data Sources - Spark 3.5.1 Documentation,看到DataFrame支持多种数据源的获取。

 1、csv-->json

    val sparkSession: SparkSession = SparkSession.builder()
      .master("local[2]")
      .appName("多种类型数据源读取演示")
      .config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个
      .getOrCreate()

    //导入spark sql中所有的隐式转换函数
    import org.apache.spark.sql.functions._
    //导入sparkSession下的所有隐式转换函数,后面可以直接使用$函数引用字段
    import sparkSession.implicits._

    /**
     * 读csv格式的文件-->写到json格式文件中
     */
    //1500100967,能映秋,21,女,文科五班
    val studentsDF: DataFrame = sparkSession.read
      .format("csv")
      .schema("id String,name String,age Int,gender String,clazz String")
      .option("sep", ",")
      .load("spark/data/student.csv")

    studentsDF.write
      .format("json")
      .mode(SaveMode.Overwrite)
      .save("spark/data/students_out_json.json")
    

2、json-->parquet


    val sparkSession: SparkSession = SparkSession.builder()
      .master("local")
      .appName("")
      .config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个
      .getOrCreate()

    //导入spark sql中所有的隐式转换函数
    //导入sparkSession下的所有隐式转换函数,后面可以直接使用$函数引用字段

    /**
     * 读取json数据格式,因为json数据有键值对,会自动的将健作为列名,值作为列值,不需要手动的设置表结构
     */

    //1500100967,能映秋,21,女,文科五班
    //方式1:
    //    val studentsJsonDF: DataFrame = sparkSession.read
    //      .format("json")
    //      .load("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")
    //方式2:实际上也是调用方式1,只是更简洁了
    // def json(paths: String*): DataFrame = format("json").load(paths : _*)
    val studebtsReadDF: DataFrame = sparkSession.read
      .json("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")

    studebtsReadDF.write
      .format("parquet")
      .mode(SaveMode.Overwrite)
      .save("spark/data/students_parquet")

3、parquet-->csv

    val sparkSession: SparkSession = SparkSession.builder()
      .master("local")
      .appName("")
      .config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个
      .getOrCreate()

    //导入Spark sql中所有的sql隐式转换函数
    import org.apache.spark.sql.functions._
    //导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理
    import sparkSession.implicits._


    /**
     * parquet:压缩的比例由信息熵决定,通俗的说就是数据的重复程度决定
     */

    val studebtsReadDF: DataFrame = sparkSession.read
      .format("parquet")
      .load("spark/data/students_parquet/part-00000-8b815a03-97f7-4d71-8b71-4e7e30f60995-c000.snappy.parquet")

    studebtsReadDF.write
      .format("csv")
      .mode(SaveMode.Overwrite)
      .save("spark/data/students_csv")

三、DataFrame DSL API的使用

1、select


import org.apache.spark.sql.{DataFrame, SparkSession}

object Demo1Select {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkSession.builder()
      .master("local")
      .appName("select函数演示")
      .getOrCreate()

    //导入Spark sql中所有的sql隐式转换函数
    import org.apache.spark.sql.functions._
    //导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理
    import sparkSession.implicits._

    val studentsDF: DataFrame = sparkSession.read
      .format("csv")
      .schema("id String,name String,age String,gender String,clazz String")
      .option("sep", ",")
      .load("spark/data/student.csv")

    /**
     * select函数
     */
    //方式1:只能查询原有字段,不能对字段做出处理,比如加减、起别名之类
    studentsDF.select("id", "name", "age")
    //方式2:弥补了方式1的不足
    studentsDF.selectExpr("id","name","age+1 as new_age")
    //方式3:使用隐式转换函数中的$将字段变为一个对象
    val stuDF: DataFrame = studentsDF.select($"id", $"name", $"age")
        //3.1使用对象对字段进行处理
//    stuDF.select($"id", $"name", $"age",$"age".+(1) as "new_age").show()       //不可使用未变为对象的字段
    stuDF.select($"id", $"name", $"age",$"age" + 1 as "new_age")                 // +是函数,可以等价于该语句
        //3.2可以在select中使用sql函数
    studentsDF.select($"id", $"name", $"age", substring($"id", 0, 2))

    
  }
}

2、where

    /**
     * where函数:过滤数据
     */
    //方式1:直接将sql中的where语句以字符串形式传参
    studentsDF.where("clazz='文科一班' and gender='男'")
    //方式2:使用$列对象形式过滤
    /**
     * 注意在此种方式下:等于和不等于符号与我们平常使用的有所不同
     * 等于:===
     * 不等于:=!=
     */
    studentsDF.where($"clazz" === "文科一班" and $"gender"=!="男").show()

3、groupBy和agg

    /**
     * groupby:分组函数     agg:聚合函数
     * 注意:
     * 1、groupby与agg函数通常都是一起使用
     * 2、分组聚合之后的结果DataFrame中只会包含分组字段与聚合字段
     * 3、分组聚合之后select中无法出现不是分组的字段
     */
    //需求:根据班级分组,求每个班级的人数和平均年龄
    studentsDF.groupBy($"clazz")
      .agg(count($"clazz") as "clazz_number",avg($"age") as "avg_age")
      .show()

4、join

/**
     * 5、join:表关联
     */
    val subjectDF1: DataFrame = sparkSession.read
      .format("csv")
      .option("sep", ",")
      .schema("id String,subject_id String,score Int")
      .load("spark/data/score.csv")

    val subjectDF2: DataFrame = sparkSession.read
      .format("csv")
      .option("sep", ",")
      .schema("sid String,subject_id String,score Int")
      .load("spark/data/score.csv")

    //关联场景1:所关联的字段名字一样
    studentsDF.join(subjectDF1,"id")
    //关联场景2:所关联的字段名字不一样
    studentsDF.join(subjectDF2,$"id"===$"sid","inner")
//    studentsDF.join(subjectDF2,$"id"===$"sid","left").show()

    /**
     * 上面两种关联场景默认inner连接方式(内连接),可以指定参数选择连接方式,比如左连接、右连接、全连接之类
     * * @param joinType Type of join to perform. Default `inner`. Must be one of:
     * *                 `inner`, `cross`, `outer`, `full`, `fullouter`,`full_outer`, `left`,
     * *                 `leftouter`, `left_outer`, `right`, `rightouter`, `right_outer`.
     */

5、开窗

    /**
     * 开窗函数
     * 1、ROW_NUMBER():为分区中的每一行分配一个唯一的序号。序号是根据ORDER BY子句定义的顺序分配的
     * 2、RANK()和DENSE_RANK():为分区中的每一行分配一个排名。RANK()在遇到相同值时会产生间隙,而DENSE_RANK()则不会。
     *
     */

    //需求:统计每个班级总分前三的学生
    val stu_scoreDF: DataFrame = studentsDF.join(subjectDF2, $"id" === $"sid")

    //方式1:在select中使用row_number() over Window.partitionBy().orderBy()
    stu_scoreDF.groupBy($"clazz", $"id")
      .agg(sum($"score") as "sum_score")
      .select($"clazz", $"id", $"sum_score", row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc) as "score_rank")
      .where($"score_rank" <= 3)

    //方式2:使用withcolumn()函数,会新增一列,但是要预先指定列名
    stu_scoreDF
      .repartition(1)
      .groupBy($"clazz", $"id")
      .agg(sum($"score") as "sum_score")
      .withColumn("score_rank",row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc))
      .where($"score_rank" <= 3)
      .show()

注意:

      DSL API 不直接对应 SQL 的关键字执行顺序(如 SELECT、FROM、WHERE、GROUP BY 等),但可以按照构建逻辑查询的方式来组织代码,使其与 SQL 查询的逻辑结构相似。

在构建 Spark DataFrame 转换和操作时,常用流程介绍:

  1. 选择数据源:使用 spark.read 或从其他 DataFrame 派生。
  2. 转换:使用各种转换函数(如 selectfiltermapflatMapjoin 等)来修改 DataFrame。
  3. 聚合:使用 groupBy 和聚合函数(如 sumavgcount 等)对数据进行分组和汇总。
  4. 排序:使用 orderBy 或 sort 对数据进行排序。
  5. 输出:使用 showcollectwrite 等函数将结果输出到控制台、收集到驱动程序或写入外部存储。

四、RDD与DataFrame的转换

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}


object RddToDf {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkSession.builder()
      .appName("Rdd与Df之间的转换")
      .master("local")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    import org.apache.spark.sql.functions._
    import sparkSession.implicits._

    val sparkContext: SparkContext = sparkSession.sparkContext
    val idNameRdd: RDD[(String, String)] = sparkContext.textFile("spark/data/student.csv")
      .map(_.split(","))
      .map {
        case Array(id: String, name: String, _, _, _) => (id, name)
      }

    /**
     * Rdd-->DF
     * 因为在Rdd中不会存储文件的结构(schema)信息,所以要指定字段
     */
    val idNameDF: DataFrame = idNameRdd.toDF("id", "name")
    idNameDF.createOrReplaceTempView("idNameTb")

    sparkSession.sql("select id,name from idNameTb").show()


    /**
     * DF-->Rdd
     */
    val idNameRdd2: RDD[Row] = idNameDF.rdd
    idNameRdd2.foreach(println)
    
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/639989.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Proteus仿真小技巧(隔空连线)

用了好几天Proteus了.总结一下使用的小技巧. 目录 一.隔空连线 1.打开添加网络标号 2.输入网络标号 二.常用元件 三.运行仿真 四.总结 一.隔空连线 引出一条线,并在末尾点一下. 1.打开添加网络标号 选择添加网络标号, 也可以先点击按钮,再去选择线(注意不要点端口) 2.…

【调试笔记-20240525-Windows-配置 QEMU/x86_64 运行 OpenWrt-23.05 发行版并搭建 WordPress 博客网站】

调试笔记-系列文章目录 调试笔记-20240525-Windows-配置 QEMU/x86_64 运行 OpenWrt-23.05 发行版并搭建 WordPress 博客网站 文章目录 调试笔记-系列文章目录调试笔记-20240525-Windows-配置 QEMU/x86_64 运行 OpenWrt-23.05 发行版并搭建 WordPress 博客网站 前言一、调试环境…

【EXCEL_VBA_基础知识】15 使用ADO操作外部数据

课程来源&#xff1a;王佩丰老师的《王佩丰学VBA视频教程》&#xff0c;如有侵权&#xff0c;请联系删除&#xff01; 目录 1. 使用ADO链接外部数据源 2. 常用SQL语句&#xff08;Execute(SQL语句)&#xff09; 2.1 查询数据、查询某几个字段、带条件查询、合并两表数据、插…

OpenWrt 23.05 安装中文语言包 教程 软路由实测 系列三

1 web 登录 #更改阿里云下载源&#xff0c;可参考第一篇文章:OpenWrt U盘安装使用 详细教程 x86/64平台 软路由实测 系列一-CSDN博客

【Linux】icmp_seq=1 Destination Host Unreachable

执行ping 命令提示&#xff1a;From 192.168.XX.XX icmp_seq1 Destination Host Unreachable 这个错误消息通常表示以下几种情况之一&#xff1a; 网络连接问题&#xff1a;目标主机可能没有连接到网络&#xff0c;或者网络中的某个路由器无法将数据包转发到目标主机。 目标主…

Vue从入门到实战Day11

一、为什么要学Vue3 Vue3官网&#xff1a;简介 | Vue.js 1. Vue3的优势 2. Vue2选项式API vs Vue3组合式API 示例&#xff1a; 二、create-vue搭建Vue3项目 1. 认识create-vue create-vue是Vue官方新的脚手架工具&#xff0c;底层切换到了vite(下一代构建工具)&#xff0c;为…

【OpenGL实践-09】用图元作图笔记

文章目录 一、说明二、下列程序使用库三、OpenGL图元盘点四、图元解析4.1 线段4.2 表面surface4.3 三角形表面surface 五、图元作图示例5.1 三角链和圆环GL_TRIANGLE_STRIP5.2 三角链和圆环 六、三维物体渲染6.1 直纹面6.2 旋转面 七、GLSL程序优化代码7.1 顶点着色器7.2 几何着…

AI视频教程下载:全面掌握ChatGPT和LangChain开发AI应用(附源代码)

这是一门深入的课程&#xff0c;涉及ChatGPT、LangChain和Python。打造专注于现实世界AI集成的AI应用&#xff0c;课件附有每一节涉及到的源代码。 **你将学到什么&#xff1a;** - 将ChatGPT集成到LangChain的生产风格应用中 - 使用LangChain组件构建复杂的文本生成管道 - …

AI助力农田作物智能化激光除草,基于轻量级YOLOv8n开发构建农田作物场景下常见20种杂草检测识别分析系统

随着科技的飞速发展&#xff0c;人工智能&#xff08;AI&#xff09;技术在各个领域的应用愈发广泛&#xff0c;其中农业领域也不例外。近年来&#xff0c;AI助力农田作物场景下智能激光除草的技术成为了农业领域的一大亮点&#xff0c;它代表着农业智能化、自动化的新趋势。智…

【小tips】当机器里面有多个版本的gcc时,该如何切换当前的gcc版本?

背景切换gcc版本 背景 有时候因为项目需求&#xff0c;可能不同的项目需要不同的gcc版本&#xff0c;所以机器上会安装多个版本的gcc&#xff0c;那我们如何切换到想要使用的版本&#xff1f; 切换gcc版本 比如我的机器上有两个版本的gcc&#xff1a; 我当前的版本是gcc-4…

node版本管理nvm详细教程

安装 nvm 之前先清理node相关的所有配置&#xff0c;如环境变量、.npmrc文件、node_cache、node_global 等 一、下载nvm 任选一处下载即可 官网&#xff1a;Releases coreybutler/nvm-windows (github.com) 码云&#xff1a;nvm下载仓库: nvm下载仓库 百度网盘&#xff1…

strcpy函数及其模拟实现

1. 前言 在本文中&#xff0c;我将带着各位读者从了解strcpy函数&#xff0c;到会用strcpy函数去实现我们编程时的需求&#xff0c;最后再来自己模拟实现一个strcpy函数。 “毕竟只有自己做的&#xff0c;自己才敢放心食用“&#x1f602;&#x1f602;&#x1f602; 2. strc…

开源网页视频会议,WebRTC音视频功能比较

1. 概述 OpenAI 发布了新一代旗舰生成模型 GPT-4o,这是一款真正的多模态大模型,可以「实时对音频、视觉和文本进行推理」。 支持与 AI 实时语音对话,且响应时间达到毫秒级;交互中可识别人类情绪并以相应的情感做出回应;多语言能力的提升,WebRTC 成为大模型关键能力。 视频会议…

系统架构师-考试-基础题-错题集锦2

108.总线-全双工、半双工&#xff1a; 109.软件配置管理-产品配置&#xff1a; 产品配置&#xff1a;指一个产品在其生命周期各个阶段所产生的各种形式和各种版本的文档、计算机程序、部件及数据的集合。 注意&#xff1a;选项中的需求规格说明、设计说明等均可归属于文档。 …

远动通讯屏柜的组成及各装置的作用

远动通讯屏柜的组成及各装置的作用 远动通讯屏是基于公共电网安全而投入的远方监控遥控设备&#xff1b;主要由远动装置、通讯管理机、交换机、调制解调器、GPS对时装置、数字通道防雷器、模拟通道防雷器、插线板、空气开关、屏柜及附件等设备组成、标配尺寸2260*800*600&…

软考之信息系统管理知识点(3)

流水线&#xff1a;是指在程序执行时多条指令重叠进行操作的一种准并行处理实现技术。各种部件同时处理是针对不同指令而言的&#xff0c;它们可同时为多条指令的不同部分进行工作&#xff0c;以提高各部件的利用率和指令的平均执行速度。 编译得过程 关系数据库是表的集合 …

初步学习pygame,使用pygame搭建简单的窗口效果

在VSCode上使用pygame 第一步&#xff1a;创建 Python 虚拟环境 打开 VSCode 中的 Terminal&#xff08;在菜单栏中选择 View > Terminal&#xff09;使用 cd 命令切换到你的项目文件夹输入以下命令来创建一个新的虚拟环境&#xff1a; python3 -m venv env这将在你的项目…

计算机毕业设计hadoop+spark微博舆情大数据分析 微博爬虫可视化 微博数据分析 微博采集分析平台 机器学习(大屏+LSTM情感分析+爬虫)

电商数据建模 一、分析背景与目的 1.1 背景介绍 电商平台数据分析是最为典型的一个数据分析赛道&#xff0c;且电商数据分析有着比较成熟的数据分析模型&#xff0c;比如&#xff1a;人货场模型。此文中我将通过分析国内最大的电商平台——淘宝的用户行为&#xff0c;来巩固数…

element ui 的密码输入框点击显示隐藏密码时,图标随之改变

场景图&#xff1a; 原理&#xff1a; 通过修改el-input框的type属性&#xff0c;来设置显示或者隐藏。从而改变图标地址。 <el-input class"passwordinput" :type"pwdObj.pwdType" ref"pwdInput" placeholder"密码"v-model"…

Unity LayerMask避坑笔记

今天使用Physics2D.OverlapAreaNonAlloc进行物理检测时候&#xff0c;通过LayerMask.NameToLayer传入了int值的LayerMask&#xff0c;结果一直识别不到&#xff0c;经过Debug才找到问题&#xff0c;竟是LayerMask的“值”传输有问题&#xff0c;记录一下。 直接贴代码输出结果&…