Spark SQL支持DataFrame操作的数据源

DataFrame提供统一接口加载和保存数据源中的数据,包括:结构化数据、Parquet文件、JSON文件、Hive表,以及通过JDBC连接外部数据源。一个DataFrame可以作为普通的RDD操作,也可以通过(registerTempTable)注册成一个临时表,支持在临时表的数据上运行SQL查询操作。

一、数据源加载保存操作

DataFrame数据源默认文件为Parquet格式,可以通过spark.sql.sources.default参数进行重新修改。
不论何种格式的数据源均采取统一API、read和write进行操作,代码如下:

// 读取parquet格式数据
val df =sqlContext.read.load("file:///$SPARK_HOME/examples/src/main/resources/users.parquet")
// 从DataFrame写数据并保存成Parquet格式
df.write.save("saveusers.parquet")

1,指定选项

Spark支持通过完全限定名称(如org.apache.spark.sql.parquet)指定数据源的附加选项,内置数据源可以使用短名称(json、parquet、jdbc),Spark SQL支持通过format将任何类型的DataFrames转换成其他类型。

val df = sqlContext.read.format("json").load("file:///$SPARK_HOME examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

2,保存模式

可以通过配置SaveMode指定如何处理现有数据,实现保存模式不使用任何锁定,而且不是原子操作;因此,多路数据写入相同位置是不安全的。当执行overwrite时,写入新数据之前原来数据将被删除。
在这里插入图片描述

3,保存持久表

当使用HiveContext时,DataFrames通过saveAsTable命令保存为持久表使用,与registerTempTable命令不同,saveAsTable实现Dataframe的内容,并创建一个指向Hive Metastore中数据的指针。即使Spark程序重新启动,连接相同Metastore的数据不会发生变化。
默认情况下saveAsTable将创建一个“管理表”,这意味着数据的位置将由Metastore控制,当表被删除时,管理表将表数据自动删除。

二、Parquet文件

Parquet是一种支持多种数据处理系统的存储格式,Spark SQL提供了读写Parquet文件,并且自动保存原始数据的模式。

1,Parquet文件优点

(1)高效,Parquet采取列式存储避免读入不需要的数据,具有极好的性能和GC。
(2)方便的压缩和解压缩,并具有极好的压缩比例。
(3)可以直接固化为Parquet文件,也可以直接读取Parquet文件,具有比磁盘更好的缓存效果。
Spark SQL对读写Parquet文件提供支持,方便加载Parquet文件数据到DataFrame,供Spark SQL操作,也可以将DataFrame写入Parquet文件,并自动保留原始Scheme架构。
在外部数据源方面,Spark对Parquet的支持有了很大的加强,更快的metadata discovery和schema merging;同时能够读取其他工具或者库生成的非标准合法的Parquet文件;以及更快、更鲁棒的动态分区插入。

2,加载数据编程

通过sqlContext.implicits._隐式转换一个RDD为DataFrame,并将DataFrame保存为Parquet文件;加载保存的Parquet文件,重新构建一个DataFrame,注册成临时表,供SQL查询使用。

// 创建sqlContextval 
sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 隐式转换为一个DataFrame
import sqlContext.implicits._
// 使用case定义schema,实现Person接口
case class Person(name: String, age: Int)
// 读取文件创建一个MappedRDD,并将数据写入Person模式类,隐式转换为DataFrame
val peopleDF = sc.textFile("file:///$SPARK_HOME/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
// 保存DataFrame,保存为Parquet格式
peopleDF.write.parquet("people.parquet")
// 加载Parquet文件作为DataFrame
val parquetFile = sqlContext.read.parquet("people.parquet")
// 将DataFrame注册为临时表,供SQL查询使用
parquetFile.registerTempTable("parquetTable")
val result = sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 13 AND age <= 19")
result.map(t => "Name: " + t(0)).collect().foreach(println)

3,分区发现(partition discovery)

表分区(table partitioning)是一种常见的优化方法,用于像Hive一样的系统。对于分区表,数据通常存储在不同的目录中,在每个分区目录路径中对分区列的值进行编码。
Parquet数据源能够自动发现和推断分区信息,使用以下目录结构存储以前使用的人口数据到一个分区表,以gender和country作为分区列:

path└──table
        ├── gender=male
        │   ├── ...
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
			├── ...
			├── country=US
			│   └── data.parquet
			├── country=CN
			│   └── data.parquet
			└── ...

通过路径path/table,使用SQLContext.read的parquet或load命令,Spark SQL自动提取分区信息,返回的DataFrame模式如下:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

分区列的数据类型是自动映射,支持numeric数据类型和string类型自动推断。

4,模式合并(schema merging)

如同ProtocolBuffer、Avro、Thrift,Parquet也支持模式演进,用户可以从一个简单的模式开始,逐步根据需要添加更多的列。通过这种方式,用户最终得到多个不同但是能相互兼容模式的Parquet文件,Parquet数据源能够自动检测这种情况,进而合并这些文件。
由于模式合并是相对昂贵的操作,在很多情况下并非必须,为了提升性能,在1.5.0版本中默认关闭。

// 隐式转换一个RDD为DataFrame
import sqlContext.implicits._
// 创建一个DataFrame,存储数据到一个分区目录
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")
// 创建一个新DataFrame,存储在一个新的分区目录
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")
// 读取分区表
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()
// 通过基础DataFrame函数,以树格式打印Schema,包含分区目录下全部的分区列
df3.printSchema()
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key: int (nullable = true)

Parquet数据源自动从文件路径中发现了key这个分区列,并且正确合并了两个不相同但相容的Schema。值得注意的是,如果最后的查询中查询条件跳过了key=1这个分区,Spark SQL的查询优化器会根据这个查询条件将该分区目录剪掉,完全不扫描该目录中的数据,从而提升查询性能。

5,配置

在SQLContext中使用setConf方法,或在运行时使用SQL命令SET key=value,实现对Parquet文件的配置
在这里插入图片描述

三、JSON数据集

Spark SQL可以自动推断出一个JSON数据集的Schema并作为一个DataFrame加载,通过SQLContext.read.json()方法使用JSON文件创建DataFrame。

// 创建sqlContextval 
sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 设置JSON数据集的路径,可以是单个文件或者一个目录
val path= file:///Spark_Home/examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)
// 打印schema,并显示推断的schema
people.printSchema()
// root
// |-- age: integer (nullable = true)
// |-- name: string (nullable = true)
// 注册DataFrame作为一个临时表
people.registerTempTable("jsonTable")
// 使用sql运行SQL表达式
val teenagers = sqlContext.sql("SELECT name FROM jsonTable WHERE age >= 13 AND age <= 19")

或者通过转换一个JSON对象的RDD[String]创建DataFrame。

val anotherRDD = sc.parallelize("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherRDD)

四、Hive表

Spark SQL支持从Hive表中读写数据,然而默认版本Spark组件并不包括Hive大量的依赖关系。Hive支持通过添加-Phive和-Phive-thriftserver标志对Spark重新构建一个包括Hive的新组件,Hive的新组件必须分发到所有的Worker节点上,因为Worker节点需要访问Hive的serialization和deserialization库(SerDes),以便于访问存储在Hive中的数据,所以该Hive集合Jar包必须拷贝到所有的Worker节点。
除了基本的SQLContext,Spark SQL还可以创建一个HiveContext,该HiveContext通过基本的SQLContext提供了一系列的方法集,可以使用更完整的HiveQL解析器查询,访问Hive的UDF,并从Hive表读取数据,以及SerDe支持。
在这里插入图片描述

1,示例数据

新建一个kv1.txt文件,数据如下:

238 val_238
86  val_86
311 val_311
27  val_27
165 val_165
409 val_409
255 val_255
278 val_278
98  val_98

2,创建HiveContext

使用Hive,必须先构建一个继承SQLContext的HiveContext对象,并加入在MetaStore中查找表和使用HiveQL写查询功能的支持;可以在conf目录hive-site.xml文件中添加Hive的配置文件,当运行一个YARN集群时,datanucleus jars和hive-site.xml必须在Driver和全部的Executors启动。
一个简单的方法如下:在spark-submit命令行通过–jars参数和–file参数加载,即使hive-site.xml文件没有配置,仍然可以创建一个HiveContext,并会在当前目录下自动地创建metastore_db和warehouse。

使用Scala语言说明HiveContext创建方式:

// SparkContext实例
val sc: SparkContext = ...
// 通过sc创建HiveContext的实例hiveContext
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

3,使用Hive操作数据

使用HiveContext无需单独安装Hive,可以使用spark.sql.dialect选项选择解析查询语句的SQL的特定转化,这个参数可以使用SQLContext上的setConf方法,也可以使用SQL上的SETkey=value命令进行修改。

// 通过HiveContext的sql命令创建表
hiveContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
// 加载数据, $SPARK_HOME指Spark文件安装目录,使用“file:// ...”标识的本地文件,使用“hdfs:// ...”标识的HDFS存储系统的文件
hiveContext.sql("LOAD DATA LOCAL INPATH 'file:///$Spark_Home/examples/src/main/resources/kv1.txt' INTO TABLE src")
// HiveQL的查询表达
hiveContext.sql("FROM src SELECT key,value").collect().foreach(println)
// 使用HiveContext创建表命令
CREATE [EXTERNAL] TABLE[IF NOT EXISTS] table_name
(col_name data_type,)
[PARTITIONED BY(col_name data_type,)]
[[ROW FORMAT row_format]]
[STORED AS file_format]
[LOCATION hdfs_path]

4,Spark支持的Hive特性

(1)Hive查询语句,包括:SELECT、GROUP BY、ORDER BY、CLUSTER BY、SORT BY;
(2)Hive运算符,包括:关系运算符(=、<>、、<>、<、>、>=、<=等)、算术运算符(+、-、*、/、%等)、逻辑运算符(AND、&&、OR、||等)、复杂类型构造函数、数据函数(sign、ln、cos等)、字符串函数(instr、length、printf等);
(3)用户自定义函数(UDF);
(4)用户自定义聚合函数(UDAF);
(5)用户定义的序列化格式(SerDes);
(6)连接操作,包括:JOIN、{LEFT|RIGHT|FULL}OUTER JOIN、LEFT SEMI JOIN、CROSS JOIN;
(7)联合操作(Unions);
(8)子查询:SELECT col FROM(SELECT a+b AS col from t1)t2;
(9)抽样(Sampling);
(10)解释(Explain);
(11)分区表(Partitioned tables);
(12)所有的HiveDDL操作函数,包括:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE;
(13)大多数Hive数据类型TINYINT、SMALLINT、INT、BIGINT、BOOLEAN、FLOAT、DOUBLE、STRING、BINARY、TIMESTAMP、DATE、ARRAY<>、MAP<>、STRUCT<>。

五、通过JDBC连接数据库

Spark SQL还包括一个可以通过JDBC从其他数据库读取数据的数据源,并返回一个DataFrame,在Spark SQL很容易处理,或者Join其他的数据源。除了Scala语言,Java或Python语言也很容易操作而不需要提供一个Class Tag。(不同于Spark SQL JDBC server允许其他应用程序使用Spark SQL运行查询。)

在Spark类路径中包含特定数据库的JDBC驱动程序,如通过Spark Shell连接postgresql命令:

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell
val jdbcDF = sqlContext.load("jdbc", Map(
   "url" -> "jdbc:postgresql:dbserver",
   "dbtable" -> "schema.tablename"))

使用数据源API,加载远程数据库的表作为一个DataFrame和Spark SQL临时表在这里插入图片描述

文章来源:《Spark核心技术与高级应用》 作者:于俊;向海;代其锋;马海平

文章内容仅供学习交流,如有侵犯,联系删除哦!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/661.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

嵌入式安防监控项目——实现真实数据的上传

目录 一、相关驱动开发 二、A9主框架 三、脚本及数据上传实验 https://www.yuque.com/uh1h8r/dqrma0/tx0fq08mw1ar1sor?singleDoc# 《常见问题》 上个笔记的相关问题 一、相关驱动开发 /* mpu6050六轴传感器 */ i2c138B0000 { /* #address-cells <1>…

web实现太极八卦图、旋转动画、定位、角度、坐标、html、css、JavaScript、animation

文章目录前言1、html部分2、css部分3、JavaScript部分4、微信小程序演示前言 哈哈 1、html部分 <div class"great_ultimate_eight_diagrams_box"><div class"eight_diagrams_box"><div class"eight_diagrams"><div class&…

SpringBoot-实用开发篇

SpringBoot开发实用篇开发实用篇中因为牵扯到SpringBoot整合各种各样的技术&#xff0c;所以在整合每一个技术之前&#xff0c;都会做一个快速的普及&#xff0c;这样的话内容整个开发实用篇所包含的内容就会比较多。在学习的时候&#xff0c;如果对某一个技术不是很清楚&#…

硬刚ChatGPT!文心一言能否为百度止颓?中国版ChatGPT“狂飙”的机会在哪儿?

文章目录目录产品背景发展历程科技简介主要功能合作伙伴结语文心一言 &#xff08;英文名&#xff1a;ERNIE Bot&#xff09; *是百度基于文心大模型技术推出的生成式对话产品&#xff0c;被外界誉为“中国版ChatGPT”&#xff0c;将于2023年3月份面向公众开放。 [40] 百度在人…

python自动化办公(二)

上接python自动化办公&#xff08;一&#xff09; 文章目录文件和目录操作使用shutil库文件查找globfnmatchhashlib文件和目录操作 使用shutil库 shutil库也是Python标准库&#xff0c;它可以处理文件、文件夹、压缩包&#xff0c;能实现文件复制、移动、压缩、解压缩等功能。…

Vue基础23之路由第二节

Vue基础23路由路由的query参数src/router/index.jsDetail.vueHomeMessage.vue路由的query参数命名路由src/router/index.jsHomeMessage.vueApp.vue总结路由的params参数src/router/index.jsHomeMessage.vueDetail.vue总结路由 路由的query参数 src/router/index.js //该文件专…

Gehpi的网络布局

Gehpi的网络布局1. 力引导布局2. 辅助布局布局是网络可视化中的重要概念&#xff0c;指将点和边通过某种策略进行排布&#xff0c;应尽可能满足以下4个原则&#xff1a; 节点均匀分布在有限的区域内避免边的交叉和弯曲保持边的长度一致整体布局能反映图内在的特性 Gephi的布局…

卷积神经网络

目录卷积神经网络概述神经网络原理卷积神经网络卷积层怎么控制输出数据&#xff1f;如何抓取特征池化层归一化层全连接层局部感受野权值共享多卷积核池化子采样多卷积层卷积神经网络的训练前向传播BackForward反向传播权值更新过程中的卷积网络结构层的排列规律层的尺寸设置规律…

web3:区块链共识机制系列-POS(Proof of Stake)股权证明算法

web3相关学习一并收录至该博客&#xff1a;web3学习博客目录大全 前情衔接&#xff1a;web3:区块链常见的几大共识机制及优缺点 目录前言算法公式与原理算法公式运作原理以Peer Coin为例缺陷优点缺点特点分类发展历程casper协议1.什么是无成本利益关系问题2.引入casper协议解决…

SpringBoot 动态操作定时任务(启动、停止、修改执行周期)增强版

前段时间编写了一篇博客SpringBoot 动态操作定时任务&#xff08;启动、停止、修改执行周期&#xff0c;该篇博客还是帮助了很多同学。 但是该篇博客中的方法有些不足的地方&#xff1a; 只能通过前端控制器controller手动注册任务。【具体的应该是我们提前配置好我们的任务&am…

selenium(4)-------自动化测试脚本(python)

webdriverAPI 一)定位元素的方式&#xff0c;必问 1.1)id来定位元素&#xff0c;前提是元素必须具有id属性&#xff0c;因为有的元素是没有id的 1.2)name&#xff0c;元素必须有name&#xff0c;并且必须全局唯一 1.3)tagname&#xff0c;元素是一定有的&#xff0c;但是必须全…

HTTP 缓存的工作原理

缓存是解决http1.1当中的性能问题主要手段。缓存可能存在于客户端浏览器上&#xff0c;也可以存在服务器上面&#xff0c;当使用过期缓存可能给用户展示的是错误的信息而导致一些bug。 HTTP 缓存&#xff1a;为当前请求复用前请求的响应 • 目标&#xff1a;减少时延&#xff1…

Python+Yolov8目标识别特征检测

Yolov8目标识别特征检测如需安装运行环境或远程调试&#xff0c;见文章底部个人QQ名片&#xff0c;由专业技术人员远程协助&#xff01;前言这篇博客针对<<Yolov8目标识别特征检测>>编写代码&#xff0c;代码整洁&#xff0c;规则&#xff0c;易读。 学习与应用推荐…

3分钟看完-丄-Python自动化测试【项目实战解析】经验分享

目录&#xff1a;导读 引言 自动化测试 背景 测试团队 测试体系发展 测试平台 自动化测试现状 现状一&#xff1a; 现状二&#xff1a; 现状三&#xff1a; 现状四&#xff1a; 现状五&#xff1a; 现状六&#xff1a; 失败的背景 失败的经历 失败总结 引言 内…

Java多线程系列--synchronized的原理

原文网址&#xff1a;Java多线程系列--synchronized的原理_IT利刃出鞘的博客-CSDN博客 简介 本文介绍Java的synchronized的原理。 反编译出字节码 Test.java public class Test {private static Object LOCK new Object();public static int main(String[] args) {synchro…

动态矢量瓦片缓存库方案

目录 前言 二、实现步骤 1.将数据写入postgis数据库 2.将矢量瓦片数据写入缓存库 3.瓦片接口实现 4.瓦片局部更新接口实现 总结 前言 矢量瓦片作为webgis目前最优秀的数据格式&#xff0c;其主要特点就是解决了大批量数据在前端渲染时出现加载缓慢、卡顿的问题&#xff0…

LeetCode 112. 路径总和

LeetCode 112. 路径总和 给你二叉树的根节点 root 和一个表示目标和的整数 targetSum 。判断该树中是否存在 根节点到叶子节点 的路径&#xff0c;这条路径上所有节点值相加等于目标和 targetSum 。如果存在&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 叶…

Python笔记 -- 文件和异常

文章目录1、文件1.1、with关键字1.2、逐行读取1.3、写入模式1.4、多行写入2、异常2.1、try-except-else2.2、pass1、文件 1.1、with关键字 with关键字用于自动管理资源 使用with可以让python在合适的时候释放资源 python会将文本解读为字符串 # -*- encoding:utf-8 -*- # 如…

Linux操作系统基础的常用命令

1&#xff0c;Linux简介Linux是一种自由和开放源码的操作系统&#xff0c;存在着许多不同的Linux版本&#xff0c;但它们都使用了Linux内核。Linux可安装在各种计算机硬件设备中&#xff0c;比如手机、平板电脑、路由器、台式计算机。1.1Linux介绍Linux出现于1991年&#xff0c…

操作技巧 | 在Revit中借用CAD填充图案的方法

在建模过程中&#xff0c;有时需要达到多种填充效果&#xff0c;而CAD中大量的二维填充图案&#xff0c;便是最直接的资源之一。 使用 填充图案之前 使用 填充图案之后 其中要用到主要命令便是对表面填充图案的添加与编辑 简单效果 如下 模型填充与绘图填充 区别 模型填…