SparkSql读取数据的方式

一、读取普通文件 

方式一:给定读取数据源的类型和地址

spark.read.format("json").load(path)
spark.read.format("csv").load(path)
spark.read.format("parquet").load(path)

方式二:直接调用对应数据源类型的方法

spark.read.json(path)
spark.read.csv(path)
spark.read.parquet(path)

1、代码演示最普通的文件读取方式: 

from pyspark.sql import SparkSession
import os


if __name__ == '__main__':
	# 构建环境变量
	# 配置环境
	os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
	# 配置Hadoop的路径,就是前面解压的那个路径
	os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
	# 配置base环境Python解析器的路径
	os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
	os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

	# 获取sparkSession对象
	spark = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config(
		"spark.sql.shuffle.partitions", 2).getOrCreate()

	df01 = spark.read.json("../../datas/resources/people.json")
	df01.printSchema()
	df02 = spark.read.format("json").load("../../datas/resources/people.json")
	df02.printSchema()
	df03 = spark.read.parquet("../../datas/resources/users.parquet")
	df03.printSchema()
	#spark.read.orc("")
	df04 = spark.read.format("orc").load("../../datas/resources/users.orc")
	df04.printSchema()
	df05 = spark.read.format("csv").option("sep",";").load("../../datas/resources/people.csv")
	df05.printSchema()

	df06 = spark.read.load(
		path="../../datas/resources/people.csv",
		format="csv",
		sep=";"
	)
	df06.printSchema()

	spark.stop()

二、 通过jdbc读取数据库数据

先在本地数据库或者linux数据库中插入一张表:

CREATE TABLE `emp`  (
  `empno` int(11) NULL DEFAULT NULL,
  `ename` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `job` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `mgr` int(11) NULL DEFAULT NULL,
  `hiredate` date NULL DEFAULT NULL,
  `sal` decimal(7, 2) NULL DEFAULT NULL,
  `comm` decimal(7, 2) NULL DEFAULT NULL,
  `deptno` int(11) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of emp
-- ----------------------------
INSERT INTO `emp` VALUES (7369, 'SMITH', 'CLERK', 7902, '1980-12-17', 800.00, NULL, 20);
INSERT INTO `emp` VALUES (7499, 'ALLEN', 'SALESMAN', 7698, '1981-02-20', 1600.00, 300.00, 30);
INSERT INTO `emp` VALUES (7521, 'WARD', 'SALESMAN', 7698, '1981-02-22', 1250.00, 500.00, 30);
INSERT INTO `emp` VALUES (7566, 'JONES', 'MANAGER', 7839, '1981-04-02', 2975.00, NULL, 20);
INSERT INTO `emp` VALUES (7654, 'MARTIN', 'SALESMAN', 7698, '1981-09-28', 1250.00, 1400.00, 30);
INSERT INTO `emp` VALUES (7698, 'BLAKE', 'MANAGER', 7839, '1981-05-01', 2850.00, NULL, 30);
INSERT INTO `emp` VALUES (7782, 'CLARK', 'MANAGER', 7839, '1981-06-09', 2450.00, NULL, 10);
INSERT INTO `emp` VALUES (7788, 'SCOTT', 'ANALYST', 7566, '1987-04-19', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7839, 'KING', 'PRESIDENT', NULL, '1981-11-17', 5000.00, NULL, 10);
INSERT INTO `emp` VALUES (7844, 'TURNER', 'SALESMAN', 7698, '1981-09-08', 1500.00, 0.00, 30);
INSERT INTO `emp` VALUES (7876, 'ADAMS', 'CLERK', 7788, '1987-05-23', 1100.00, NULL, 20);
INSERT INTO `emp` VALUES (7900, 'JAMES', 'CLERK', 7698, '1981-12-03', 950.00, NULL, 30);
INSERT INTO `emp` VALUES (7902, 'FORD', 'ANALYST', 7566, '1981-12-03', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7934, 'MILLER', 'CLERK', 7782, '1982-01-23', 1300.00, NULL, 10);

dept的数据:

CREATE TABLE `dept`  (
  `deptno` int(11) NULL DEFAULT NULL,
  `dname` varchar(14) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `loc` varchar(13) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of dept
-- ----------------------------
INSERT INTO `dept` VALUES (10, 'ACCOUNTING', 'NEW YORK');
INSERT INTO `dept` VALUES (20, 'RESEARCH', 'DALLAS');
INSERT INTO `dept` VALUES (30, 'SALES', 'CHICAGO');
INSERT INTO `dept` VALUES (40, 'OPERATIONS', 'BOSTON');

查询时会报如下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o67.load.
: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:102)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:102)

接着放驱动程序: 

Python环境放入MySQL连接驱动

  • 找到工程中pyspark库包所在的环境,将驱动包放入环境所在的jars目录中
  • 如果是Linux上:注意集群模式所有节点都要放。

第一种情况:

假如你是windows环境:

我的最终的路径是在这里:

第二种情况:linux环境下,按照如下方式进行

# 进入目录
cd /opt/installs/anaconda3/lib/python3.8/site-packages/pyspark/jars

# 上传jar包:mysql-connector-java-5.1.32.jar

代码演示: 

import os

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType


if __name__ == '__main__':
	# 获取sparkSession对象
	# 设置 任务的环境变量
	os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
	# 配置Hadoop的路径,就是前面解压的那个路径
	os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
	# 配置base环境Python解析器的路径
	os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
	os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
	# 得到sparkSession对象
	spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()
	

	dict = {"user":"root","password":"root"}
	jdbcDf = spark.read.jdbc(url="jdbc:mysql://localhost:3306/spark",table="emp",properties=dict)
	jdbcDf.show()
	# jdbc的另一种写法
	jdbcDf2 = spark.read.format("jdbc") \
		.option("driver", "com.mysql.cj.jdbc.Driver") \
		.option("url", "jdbc:mysql://localhost:3306/spark") \
		.option("dbtable", "spark.dept") \
		.option("user", "root") \
		.option("password", "root").load()
	jdbcDf2.show()


	# 关闭
	spark.stop()

三、 读取table中的数据【hive】

  • 场景:Hive底层默认是MR引擎,计算性能特别差,一般用Hive作为数据仓库,使用SparkSQL对Hive中的数据进行计算
    • 存储:数据仓库:Hive:将HDFS文件映射成表
    • 计算:计算引擎:SparkSQL、Impala、Presto:对Hive中的数据表进行处理
  • 问题:SparkSQL怎么能访问到Hive中有哪些表,以及如何知道Hive中表对应的

1)集群环境操作hive 

需要启动的服务: 

先退出base环境:conda deactivate
启动服务:
启动hdfs:  start-dfs.sh  因为hive的数据在那里存储着
启动yarn:  start-yarn.sh 因为spark是根据yarn部署的,假如你的spark是standalone模式,不需要启动yarn.
日志服务也需要启动一下:
mapred --daemon start historyserver
# 启动Spark的HistoryServer:18080
/opt/installs/spark/sbin/start-history-server.sh
启动metastore服务: 因为sparkSQL需要知道表结构,和表数据的位置
hive-server-manager.sh start metastore
启动spark服务: 啥服务也没有了,已经启动完了。
查看metastore服务:
hive-server-manager.sh status metastore

修改配置: 

cd /opt/installs/spark/conf
新增:hive-site.xml
vi hive-site.xml

在这个文件中,编写如下配置:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://bigdata01:9083</value>
    </property>
</configuration>

接着将该文件进行分发:
xsync.sh hive-site.xml

操作sparkSQL: 

/opt/installs/spark/bin/pyspark --master local[2] --conf spark.sql.shuffle.partitions=2

此处的pyspark更像是一个客户端,里面可以通过python编写spark代码而已。而我们以前安装的pyspark更像是spark的python运行环境。

进入后,通过内置对象spark:

>>> spark.sql("show databases").show()
+---------+
|namespace|
+---------+
|  default|
|     yhdb|
+---------+

>>> spark.sql("select * from yhdb.student").show()
+---+------+                                                                    
|sid| sname|
+---+------+
|  1|laoyan|
|  1|廉德枫|
|  2|  刘浩|
|  3|  王鑫|
|  4|  司翔|
+---+------+

2)开发环境如何编写代码,操作hive:

代码实战:

from pyspark.sql import SparkSession
import os



if __name__ == '__main__':
	# 构建环境变量
	# 配置环境
	os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
	# 配置Hadoop的路径,就是前面解压的那个路径
	os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
	# 配置base环境Python解析器的路径
	os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
	os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
	# 防止在本地操作hdfs的时候,出现权限问题
	os.environ['HADOOP_USER_NAME'] = 'root'

	# 获取sparkSession对象
	spark = SparkSession \
		.builder \
		.appName("HiveAPP") \
		.master("local[2]") \
		.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \
		.config('hive.metastore.uris', 'thrift://bigdata01:9083') \
		.config("spark.sql.shuffle.partitions", 2) \
		.enableHiveSupport() \
		.getOrCreate()

	spark.sql("select * from yhdb.student").show()

    spark.read.table("yhdb.student").show()

	spark.stop()

不要在一个python 文件中,创建两个不同的sparkSession对象,否则对于sparksql获取hive的元数据,有影响。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/913408.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LSTM+LightGBM+Catboost的stacking融合模型

基本介绍 针对目前大部分数据同时具有特征连续和特征不连续的特点&#xff0c;将神经网络模型如LSTM和回归树模型如XGboost,基于stacking集成学习原理进行融合 附有模型评价指标R2、RMSE、MAE、MSE&#xff0c;代码包含注释&#xff0c;可以直接运行。 融合过程 在机器学习中…

重学 Android 自定义 View 系列:动手实现专属 TextView

前言 前面一篇介绍了自定义View的基础概念(皮毛)&#xff0c;接下来全部是自定义View实战&#xff0c;让我们一起开启自定义View之旅吧&#xff01; 1. 实现目标 本篇将实现一个自定义的TextView&#xff0c;通过自定义属性让我们可以配置文本内容、颜色、字体大小。主要是掌…

多用户商城系统的功能及设计和开发

多用户商城系统的功能及设计与开发&#xff08;基于 PHP MySQL&#xff09; 在现代电子商务平台的开发中&#xff0c;PHP MySQL 是一对非常流行且高效的技术栈。PHP作为服务器端脚本语言&#xff0c;结合MySQL数据库&#xff0c;可以高效地处理多用户商城系统的各种需求。本…

丹摩征文活动|快速上手 CogVideoX-2b:智谱清影 6 秒视频生成部署教程

文章目录 一、生成视频效果 二、CogVideoX 技术新起点三、CogVideoX 上手部署3.1 创建丹摩实例3.2 配置环境和依赖3.3 模型与配置文件3.4 运行3.5 问题与处理方法 四、CogVideoX-2b 用创新点燃未来 一、生成视频效果 A street artist, clad in a worn-out denim jacket and a c…

实现 think/queue 日志分离

当我们使用think/queue包含了比较多的不同队列,日志会写到runtime/log目录下,合并写入的,不好排查问题,我们遇到一个比较严重的就是用了不同用户来执行,权限冲突了,导致部分队列执行不了. 为了解决以上问题,本来希望通过Log::init设置不同日志路径的,但是本地测试没生效,于是用…

Ubuntu24.04安装Perforce服务

安装 参考链接:https://www.perforce.com/manuals/p4sag/Content/P4SAG/install.linux.packages.install.html Perforce是一款收费的版本控制管理工具,当然其中也有一些免费的教学版本,应需要下载。 下载网址: https://www.perforce.com/downloads/helix-core-p4d安装前…

使用 GitHub Actions 部署到开发服务器的详细指南

使用 GitHub Actions 部署到开发服务器的详细指南 在本篇博客中&#xff0c;我们将介绍如何使用 GitHub Actions 实现自动化部署&#xff0c;将代码从 GitHub 仓库的 dev 分支自动部署到开发服务器。通过这种方式&#xff0c;可以确保每次在 dev 分支推送代码时&#xff0c;服…

Logrus入门

Logrus入门 1. 下载 go get github.com/sirupsen/logrus2. logrus常用方法 logrus.Debugln("Debugln") logrus.Infoln("Infoln") logrus.Warnln("Warnln") logrus.Errorln("Errorln") logrus.Println("Println")// 输出如…

告别重启大法,CPU飙高问题如何排查详细教程以及解决方案

文章目录 0 前言1.确定问题进程2.获取线程信息3.转换线程ID为十六进制4.获取线程堆栈5.分析代码6.性能分析工具7. 查看GC日志8.检查系统资源总结 0 前言 本篇是本人认为最实用的一篇&#xff0c;在日常开发运维工作中&#xff0c;经常遇到CPU较高的情况&#xff0c;一开始时还不…

在 Jupyter Notebook 中使用 Matplotlib 进行交互式可视化的教程

在 Jupyter Notebook 中使用 Matplotlib 进行交互式可视化的教程 引言 数据可视化是数据分析的重要组成部分&#xff0c;能够帮助我们更直观地理解数据。Matplotlib 是 Python 中最流行的绘图库之一&#xff0c;而 Jupyter Notebook 则是进行数据分析和可视化的理想环境。本文…

数据库SQL——什么是实体-联系模型(E-R模型)?

目录 什么是实体-联系模型&#xff1f; 1.实体集 2.联系集 3.映射基数 一对一&#xff08;1:1&#xff09; 一对多&#xff08;1:n&#xff09; 多对一&#xff08;n:1&#xff09; 多对多&#xff08;m:n&#xff09; 全部参与&#xff1a; 4.主码 弱实体集&#xf…

机器学习4_支持向量机_核函数——MOOC

目录 核函数的定义 核函数以及低维到高维的映射 的相互关系 例1&#xff1a;已知 求 K 例2&#xff1a;已知核函数 K 求 映射 的例子 核函数 K 求 映射 是一一对应的关系 支持向量机优化问题 K 满足交换性和半正定性 内积的形式 例如&#xff1a;可以证明 核函数…

LRU-LFU缓存算法

文章目录 缓存算法LRU缓存算法LFU缓存算法定义实现方法一&#xff1a;哈希表平衡二叉树方法二&#xff1a;双哈希表哈希链表方法三&#xff1a;双哈希表 缓存算法 LRU缓存算法 https://labuladong.online/algo/data-structure/lru-cache/ LRU&#xff08;Least Recently Use…

斯坦福泡茶机器人DexCap源码解析:涵盖收集数据、处理数据、模型训练三大阶段

前言 因为我司「七月在线」关于dexcap的复现/优化接近尾声了&#xff0c;故准备把dexcap的源码也分析下。​下周则分析下iDP3的源码——为队伍「iDP3人形的复现/优化」助力 最开始&#xff0c;dexcap的源码分析属于此文《DexCap——斯坦福李飞飞团队泡茶机器人&#xff1a;带…

DICOM标准:DICOM医学影像中的覆盖层(Overlay)概念详解

引言 DICOM&#xff08;数字成像和通信医学&#xff09;标准在医学影像的存储、传输和交换中起着关键作用。覆盖层&#xff08;Overlay&#xff09;作为DICOM标准中的一个重要组成部分&#xff0c;用于在医学影像上叠加图形信息&#xff0c;如注释、标记、测量结果等。本文将深…

Windows搭建流媒体服务并使用ffmpeg推流播放rtsp和rtmp流

文章目录 搭建流媒体服务方式一安装mediamtx启动meidamtx关闭meidamtx 方式二安装ZLMediaKit启动ZLMediaKit关闭ZLMediaKit 安装FFmpeg进行推流使用FFmpeg进行rtmp推流使用VLC播放rtmp流停止FFmpeg的rtmp推流使用FFmpeg进行rtsp推流使用VLC播放rtmp流停止FFmpeg的rtsp推流 本文…

[ Linux 命令基础 5 ] Linux 命令详解-网络管理命令

&#x1f36c; 博主介绍 &#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 _PowerShell &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【数据通信】 【通讯安全】 【web安全】【面试分析】 &#x1f389;点赞➕评论➕收藏 养成习…

深入浅出WebSocket(实践聊天室demo)

文章目录 什么是WebSocket?WebSocket连接过程WebSocket与Http的区别重连机制完整代码使用方法心跳机制实现聊天室demo(基于Socket.io)参考文章、视频小广告~什么是WebSocket? WebSocket 是一种在单个TCP连接上进行全双工通信的协议(计算机网络应用层的协议) 在 WebSocket A…

时序预测 | 改进图卷积+informer时间序列预测,pytorch架构

时序预测 | 改进图卷积informer时间序列预测&#xff0c;pytorch架构 目录 时序预测 | 改进图卷积informer时间序列预测&#xff0c;pytorch架构预测效果基本介绍参考资料 预测效果 基本介绍 改进图卷积informer时间序列预测代码 CTR-GC卷积,informer&#xff0c;CTR-GC 图卷积…

vue+Leaflet.PM插件实现创建和编辑几何图形(点、线、面、圆等)

场景 VueLeaflet实现加载OSM显示地图&#xff1a;https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/122317394在上面加载显示OSM的基础上&#xff0c;使用Leaflet.pm插件实现在页面上绘制、编辑、剪切、移动几何元素。Leaflet.pm插件 用于创建和编辑几何图层的插件可…