7.spark sql编程

目录

  • 概述
  • RDD ,Datasets,DataFrames 之间的区别
    • Datasets , DataFrames和 RDD
  • 入门
    • people.json
    • SparkSession
    • 创建 DataFrames
    • DataFrame 操作
    • 编程方式运行 sql 查询
    • 创建 Datasets
    • DataFrames 与 RDDs 互相转换
      • 使用反射推断模式
        • 编码问题
      • 编程指定 Schema
        • 官方文档的代码不全问题
  • 结束

概述

spark 版本为 3.2.4,注意 RDDDataFrame 的代码出现的问题及解决方案

本文目标如下:

  • RDD ,Datasets,DataFrames 之间的区别
  • 入门
    • SparkSession
    • 创建 DataFrames
    • DataFrame 操作
    • 编程方式运行 sql 查询
    • 创建 Datasets
    • DataFramesRDDs 互相转换
      • 使用反射推断模式
      • 编程指定 Schema

参考 Spark 官网

相关文章链接如下

文章链接
spark standalone环境安装地址
Spark的工作与架构原理地址
使用spark开发第一个程序WordCount程序及多方式运行代码地址
RDD编程指南地址
RDD持久化地址

RDD ,Datasets,DataFrames 之间的区别

Datasets , DataFrames和 RDD

Dataset 是一个分布式的数据集合,DatasetSpark 1.6 中添加的一个新接口,它增益了 RDD (强类型,可以使用 lambda 函数的能力) 和 Spark sql 优化执行引擎的优势。Dataset 可以由JVM对象构建,然后使用函数转换(map、flatMap、filter等)进行操作。数据集API有Scala和Java版本。Python不支持数据集API。

DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的DataFrame APIScalaJavaPythonR中可用。在Scala API中,DataFrame只是Dataset[Row]的一个类型别名。而在Java API中,用户需要使用Dataset<Row>来表示DataFrame

DataFrame=RDD+SchemaRDD可以认为是表中的数据,Schema是表结构信息。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD

入门

Spark SQL是一个用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了更多关于正在执行的数据结构信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。有几种方法可以与SparkSQL进行交互,包括SQLDataset API。计算结果时,使用相同的执行引擎,与用于表示计算的API/语言无关。方便用户切换不同的方式进行操作

people.json

people.json文件准备
在这里插入图片描述

SparkSession

Spark sql 中所有功能入口点是 SparkSession类。创建一个基本的 SparkSession,只需使用 SparkSession.builder()

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

创建 DataFrames

使用 SparkSession,通过存在的RDDhive 表,或其它的Spark data sources 程序创建 DataFrames

val df = spark.read.json("/tmp/people.json")
df.show()

执行如下图
在这里插入图片描述

DataFrame 操作

使用数据集进行结构化数据处理的基本示例如下

// 需要引入 spark.implicits._ 才可使用 $
// This import is needed to use the $-notation
import spark.implicits._
// 打印schema 以树格式
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// 仅显示 name 列
// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+
// 显示所有,age 加1
// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// 过滤 人的 age 大于 21
// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// 按 age 分组统计
// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

spark-shell 执行如下图
在这里插入图片描述
在这里插入图片描述

编程方式运行 sql 查询

df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

执行如下:

scala> df.createOrReplaceTempView("people")

scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> sqlDF.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

创建 Datasets

Datasets类似于RDD,不是使用Java序列化或Kryo,而是使用专门的编码器来序列化对象,以便通过网络进行处理或传输。使用的格式允许Spark执行许多操作,如过滤、排序和哈希,而无需将字节反序列化为对象。

case class Person(name: String, age: Long)

// 为 case classes 创建编码器
// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()

// 为能用类型创建编码器,并提供 spark.implicits._ 引入 
// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// 通过定义类,将按照名称映射,DataFrames 能被转成 Dataset 
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "/tmp/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()

执行如下:

scala> case class Person(name: String, age: Long)
defined class Person

scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> caseClassDS.show()
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+


scala> val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> primitiveDS.map(_ + 1).collect()
res1: Array[Int] = Array(2, 3, 4)

scala> val path = "/tmp/people.json"
path: String = /tmp/people.json

scala> val peopleDS = spark.read.json(path).as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

scala> peopleDS.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

DataFrames 与 RDDs 互相转换

Spark SQL支持两种不同的方法将现有RDD转换为Datasets

  • 第一种方法使用反射来推断包含特定类型对象的RDD的模式。这种基于反射的方法可以生成更简洁的代码,当知道 schema 结构的时间,会有更好的效果。
  • 第二种方法是通过编程接口,构造 schema,然后将其应用于现有的RDD。虽然此方法更详细,直至运行时,才能知道他们的字段和类型,用于构造 Datasets

使用反射推断模式

代码如下:

object RddToDataFrameByReflect {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("RddToDataFrameByReflect")
      .master("local")
      .getOrCreate()

    // 用于从RDD到DataFrames的隐式转换
    // For implicit conversions from RDDs to DataFrames
    import spark.implicits._

    // Create an RDD of Person objects from a text file, convert it to a Dataframe
    val peopleDF = spark.sparkContext
      .textFile("/Users/hyl/Desktop/fun/sts/spark-demo/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()
    // Register the DataFrame as a temporary view
    peopleDF.createOrReplaceTempView("people")

    // SQL statements can be run by using the sql methods provided by Spark
    val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

    // The columns of a row in the result can be accessed by field index
    teenagersDF.map(teenager => "Name: " + teenager(0)).show()

    // or by field name
    teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
  }

  case class Person(name: String, age: Long)
}

执行如下图:
在这里插入图片描述

编码问题

关于 Spark 官网 上复杂类型编码问题,直接加下面一句代码

teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect().foreach(println(_))

报以下图片错误
在这里插入图片描述
将原有代码改变如下:

 // 没有为 Dataset[Map[K,V]] 预先定义编码器,需要自己定义
 // No pre-defined encoders for Dataset[Map[K,V]], define explicitly
 implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
 // 也可以如下操作
 // Primitive types and case classes can be also defined as
 // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

 // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
 teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect().foreach(println(_))
 // Array(Map("name" -> "Justin", "age" -> 19))

在这里插入图片描述
通过这一波操作,就可以理解什么情况下,需要编码器,以及编码器的作用

编程指定 Schema

代码如下:

object RddToDataFrameByProgram {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local")
      .getOrCreate()

    import org.apache.spark.sql.Row

    import org.apache.spark.sql.types._

    // 加上此解决报错问题
    import spark.implicits._

    // Create an RDD
    val peopleRDD = spark.sparkContext.textFile("/Users/hyl/Desktop/fun/sts/spark-demo/people.txt")

    // The schema is encoded in a string
    val schemaString = "name age"

    // Generate the schema based on the string of schema
    val fields = schemaString.split(" ")
      .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)

    // Convert records of the RDD (people) to Rows
    val rowRDD = peopleRDD
      .map(_.split(","))
      .map(attributes => Row(attributes(0), attributes(1).trim))

    // Apply the schema to the RDD
    val peopleDF = spark.createDataFrame(rowRDD, schema)

    // Creates a temporary view using the DataFrame
    peopleDF.createOrReplaceTempView("people")

    // SQL can be run over a temporary view created using DataFrames
    val results = spark.sql("SELECT name FROM people")

    // The results of SQL queries are DataFrames and support all the normal RDD operations
    // The columns of a row in the result can be accessed by field index or by field name
    results.map(attributes => "Name: " + attributes(0)).show()
  }
}

执行如下图
在这里插入图片描述

官方文档的代码不全问题

Unable to find encoder for type String. An implicit Encoder[String] is needed to store String instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
results.map(attributes => "Name: " + attributes(0)).show()

在这里插入图片描述
加下以下代码

// 加上此解决报错问题
import spark.implicits._

如下图解决
在这里插入图片描述

结束

spark sql 至此结束,如有问题,欢迎评论区留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/120114.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

idea使用gradle教程 (idea gradle springboot)2024

这里白眉大叔,写一下我工作时候idea怎么使用gradle的实战步骤吧 ----windows 环境----------- 1-本机安装gradle 环境 (1)下载gradle Gradle需要JDK的支持,安装Gradle之前需要提前安装JDK8及以上版本 https://downloads.gra…

Python - 面向现实世界的人脸复原 GFP-GAN 简介与使用

目录 一.引言 二.GFP-GAN 简介 1.GFP-GAN 数据 2.GFP-GAN 架构 3.GFP-GAN In Wave2Lip 三.GFPGAN 实践 1.环境搭建 2.模型下载 3.代码测试 4.测试效果 四.总结 一.引言 近期 wav2lip 大火,其通过语音驱动唇部动作并对视频质量进行修复,其中…

【微信小程序】新版获取手机号码实现一键登录(uniapp语法)(完整版附源码)

需求 如图,点击按钮,获取用户手机号实现一键登录,当然,用户也可以自行输入其他手机号进行登录 问题 要想获取用户手机号并不复杂,但由于近几年微信小程序获取手机号的api进行了更新,当前很多帖子使用的…

VB.NET—DataGridView控件教程详解

目录 前言: 过程: 第一步: 第二步: 第三步: 第四步: 第五步: 番外篇: 总结: 前言: DataGridView是.NET FormK中的一个Windows窗体控件,它提供了一个可视化的表格控件,允许用户以表格形式显示和编辑数据。它通常用于显示和编辑数据库…

Rust教程5:泛型和特征

文章目录 泛型函数特征特征泛型 Rust系列&#xff1a;初步⚙所有权⚙结构体和枚举类⚙函数进阶 泛型函数 Rust采纳了C中的泛型机制&#xff0c;并且形式上也几乎借鉴了C&#xff0c;示例如下 fn add<T: std::ops::Add<Output T>>(a:T, b:T) -> T {a b } fn…

Java智慧工地管理平台可视化大数据建造工地APP源码

建筑行业是国民经济的重要物质生产部门和支柱产业之一&#xff0c;同时&#xff0c;建筑业也是一个安全事故多发的高危行业。如何加强施工现场安全管理、降低事故发生频率、杜绝各种违规操作和不文明施工、提高建筑工程质量&#xff0c;是摆在各级政府部门、施工企业面前的一道…

一文学会Scala【Scala一站式学习笔记】

文章目录 为什么要学习Scala语言什么是Scala如何快速掌握Scala语言Scala环境安装配置Scala命令行 Scala的基本使用变量数据类型操作符if 表达式语句终结符循环高级for循环 Scala的集合体系集合SetListMapArrayArrayBuffer数组常见操作Tuple总结 Scala中函数的使用函数的定义函数…

Python+Selenium+Unittest 之selenium12--WebDriver操作方法2-鼠标操作1(ActionChains类简介)

在我们平时的使用过程中&#xff0c;会使用鼠标去进行很多操作&#xff0c;比如鼠标左键点击、双击、鼠标右键点击&#xff0c;鼠标指针悬浮、拖拽等操作。在selenium中&#xff0c;我们也可以去实现常用的这些鼠标操作&#xff0c;这时候就需要用到selenium中的ActionChains类…

Android transform旋转rotate圆角矩形图roundedCorners,Kotlin

Android transform旋转rotate圆角矩形图roundedCorners&#xff0c;Kotlin import android.graphics.Bitmap import android.os.Bundle import android.util.Log import android.widget.ImageView import androidx.appcompat.app.AppCompatActivity import com.bumptech.glide.…

【IO多路转接】pollepoll

文章目录 1 :peach:poll:peach:1.1 :apple:poll函数接口:apple:1.2 :apple:poll接口的使用:apple:1.3 :apple:poll的优缺点:apple: 2 :peach:epoll:peach:2.1 :apple:epoll函数接口:apple:2.1.1 :lemon:epoll_create:lemon:2.1.2 :lemon:epoll_ctl:lemon:2.1.3 :lemon:epoll_wa…

pcie对phy的skew要求

我正在「拾陆楼」和朋友们讨论有趣的话题&#xff0c;你⼀起来吧&#xff1f; 拾陆楼知识星球入口 pcie的设计中有这样一条要求&#xff0c;所有但phy/tx*_clk pin的clock skew要小于skew要求。 这里提供一下实现方法&#xff0c;如果你有更好的办法可以在评论区留言或者私信…

利用maven的dependency插件将项目依赖从maven仓库中拷贝到一个指定的位置

https://maven.apache.org/plugins/maven-dependency-plugin/copy-dependencies-mojo.html 利用dependency:copy-dependencies可以将项目的依赖从maven仓库中拷贝到一个指定的位置。 使用默认配置拷贝依赖 如果直接执行mvn dependency:copy-dependencies&#xff0c;是将项目…

IP地址与MAC地址(硬件地址)的区别

IP地址和硬件地址都是用于标识网络设备的地址&#xff0c;但它们的作用和使用方式不同。IP地址是用于在网络中唯一标识一个设备的逻辑地址它是由网络协议栈分配的&#xff0c;可以动态地分配和改变。而硬件地址是设备的物理地址&#xff0c;也称为MAC地址&#xff0c;是由设备制…

TCP/IP的基础知识

文章目录 TCP/IP的基础知识硬件&#xff08;物理层&#xff09;网络接口层&#xff08;数据链路层&#xff09;互联网层&#xff08;网络层&#xff09;TCP/IP的具体含义传输层应用层&#xff08;会话层以上的分层&#xff09;TCP/IP分层模型与通信示例发送数据包的一个例子接收…

什么是微服务?与分布式又有什么区别?

什么是微服务&#xff0c;我们先从传统的单体结构进行了解&#xff0c;对两者进行对比。 单体结构 单体结构是一种传统的软件架构模式&#xff0c;它将应用程序划分为一组相互依赖的模块和组件。这些模块和组件通常都是构建在同一个平台上的&#xff0c;并且紧密耦合在一起。…

一种可以实现安全便捷文件摆渡的跨网文件安全交换软件

为了保护数据的安全性和完整性&#xff0c;很多企业都采用了内外网物理隔离的方式&#xff0c;防止核心数据泄露或被恶意篡改。然而&#xff0c;这也给企业内部或与外部合作伙伴之间的文件交换带来了很多不便和挑战。如何在保证数据安全的前提下&#xff0c;实现跨网文件的快速…

【h5 uniapp】 滚动 滚动条,数据跟着变化

uniapp项目 需求&#xff1a; 向下滑动时&#xff0c;数据增加&#xff0c;上方的日历标题日期也跟着变化 向上滑动时&#xff0c;上方的日历标题日期跟着变化 实现思路&#xff1a; 初次加载目前月份的数据 以及下个月的数据 this.getdate()触底加载 下个月份的数据 onReach…

缓冲流详解

缓冲流概述 缓冲流也称为高效流、或者高级流。之前学习的字节流可以称为原始流。 作用&#xff1a;缓冲流自带缓冲区、可以提高原始字节流、字符流读写数据的性能。 字节缓冲流 字节缓冲流性能优化原理&#xff1a; 字节缓冲输入流自带了8KB缓冲池&#xff0c;以后我们直接…

计算机找不到MSVCR120.dll,MSVCR120.dll丢失的三种解决方法

在计算机使用过程中&#xff0c;我们经常会遇到一些错误提示&#xff0c;其中之一就是“MSVCR120.dll丢失”。这个错误通常出现在运行某些程序时&#xff0c;导致程序无法正常启动。那么&#xff0c;如何解决MSVCR120.dll丢失的问题呢&#xff1f;小编将详细介绍解决方法&#…

第二证券:北交所30%的涨跌幅限制?

随着我国股市的不断发展&#xff0c;股市生意的涨跌幅束缚也成为了一个备受注重的论题。在北交所&#xff0c;股票的涨跌幅束缚为30%&#xff0c;这一束缚是否合理呢&#xff1f;本文将从多个角度进行剖析。 首先&#xff0c;涨跌幅束缚对于股市的安稳起着重要的效果。股票价格…