摸鱼大数据——Spark SQL——DataFrame详解一

1.DataFrame基本介绍

 DataFrame表示的是一个二维的表。二维表,必然存在行、列等表结构描述信息
 ​
 表结构描述信息(元数据Schema): StructType对象
 字段: StructField对象,可以描述字段名称、字段数据类型、是否可以为空
 行: Row对象
 列: Column对象,包含字段名称和字段值
 ​
 在一个StructType对象下,由多个StructField组成,构建成一个完整的元数据信息

如何构建表结构信息数据:

2.DataFrame的构建方式

 方式1: 使用SparkSession的createDataFrame(data,schema)函数创建
     data参数
         1.基于List列表数据进行创建
         2.基于RDD弹性分布式数据集进行创建
         3.基于pandas的DataFrame数据进行创建
     schema参数
         1: 字符串
             格式一 :“字段名1 字段类型,字段名2 字段类型”
             格式二(推荐):“字段名1:字段类型,字段名2:字段类型”
         2: List
             格式: ["字段名1","字段名2"]  
         3: DataType(推荐,用的最多)
            格式一:schema=StructType().add('字段名1',字段类型).add('字段名2',字段类型)
            格式二:schema=StructType([StructField('字段名1',类型),StructField('字段名1',类型)])
  
 方式2: 使用DataFrame的toDF(colNames)函数创建
     DataFrame的toDF方法是一个在Apache Spark的DataFrame API中用来创建一个新的DataFrame的方法。这个方法可以将一个RDD转换为DataFrame,或者将一个已存在的DataFrame转换为另一个DataFrame。在Python中,你可以使用toDF方法来指定列的名字。如果你不指定列的名字,那么默认的列的名字会是_1, _2等等。 
     格式: rdd.toDF([列名])
 ​
 ​
 方式3: 使用SparkSession的read()函数创建
     在 Spark 中,SparkSession 的 read 是用于读取数据的入口点之一,它提供了各种方法来读取不同格式的数据并将其加载到 Spark 中进行处理。
     统一API格式: 
         spark.read
             .format('text|csv|json|parquet|orc|...')  : 读取外部文件的方式
             .option('k','v')   : 选项  可以设置相关的参数 (可选)
             .schema(StructType | String)  :  设置表的结构信息
             .load('加载数据路径')  : 读取外部文件的路径, 支持 HDFS 也支持本地
     简写API格式:
         注意: 以上所有的外部读取方式,都有简单的写法。spark内置了一些常用的读取方案的简写
         格式: spark.read.文件读取方式()
 ​
         注意: parquet:是Spark中常用的一种列式存储文件格式和Hive中的ORC差不多, 他俩都是列存储格式
 ​

2.1 createDataFrame()创建

场景:一般用在开发和测试中。因为只能处理少量的数据

2.1.1 基于列表
 # 导包
 import os
 from pyspark.sql import SparkSession
 ​
 # 绑定指定的python解释器
 os.environ['SPARK_HOME'] = '/export/server/spark'
 os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
 os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
 ​
 # 创建main函数
 if __name__ == '__main__':
     # 1.创建SparkContext对象
     spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()
 ​
     # 2.创建DF对象
     data = [(1, '张三', 18), (2, '李四', 28), (3, '王五', 38)]
     df1 = spark.createDataFrame(data,schema=['id','name','age'])
     # 展示数据
     df1.show()
     # 查看结构信息
     df1.printSchema()
 ​
     print('---------------------------------------------------------')
     df2 = spark.createDataFrame(data,schema='id int,name string,age int')
     # 展示数据
     df2.show()
     # 查看结构信息
     df2.printSchema()
 ​
     print('---------------------------------------------------------')
     df3 = spark.createDataFrame(data,schema='id:int,name:string,age:int')
     # 展示数据
     df3.show()
 ​
     # 查看结构信息
     df3.printSchema()
 ​
     # 3.关闭资源
     spark.stop()
2.1.2 基于RDD普通方式

场景:RDD可以存储任意结构的数据;而DataFrame只能处理二维表数据。在使用Spark处理数据的初期,可能输入进来的数据是半结构化或者是非结构化的数据,那么可以先通过RDD对数据进行ETL处理成结构化数据,再使用开发效率高的SparkSQL来对后续数据进行处理分析。

Schema选择StructType对象来定义DataFrame的“表结构”转换RDD

 # 导包
 import os
 from pyspark.sql import SparkSession
 ​
 # 绑定指定的python解释器
 from pyspark.sql.types import StructType, StringType, StructField
 ​
 os.environ['SPARK_HOME'] = '/export/server/spark'
 os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
 os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
 ​
 # 创建main函数
 if __name__ == '__main__':
     # 1.创建SparkContext对象
     spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()
     sc = spark.sparkContext
 ​
 ​
     # 2.读取生成rdd
     textRDD = sc.textFile('file:///export/data/spark_project/spark_sql/data/data1.txt')
     print(type(textRDD)) # <class 'pyspark.rdd.RDD'>
     etlRDD = textRDD.map(lambda line:line.split(',')).map(lambda l:(l[0],l[1]))
     # 3.定义schema结构信息
     schema1 = StructType().add('name',StringType(),True).add('age',StringType(),True)
     schema2 = StructType([StructField('name',StringType(),True),StructField('age',StringType(),True)])
     schema3 = ['name','age']
     schema4 = 'name string,age string'
     schema5 = 'name:string,age:string'
     # 4.创建DF对象
     dfpeople = spark.createDataFrame(etlRDD,schema5)
     # 5.df展示结构信息
     dfpeople.show()
     dfpeople.printSchema()
     # 6.拓展: 创建临时视图,方便sql查询
     dfpeople.createTempView('peoples')
     r = spark.sql('select * from peoples')
     r.show()
 ​
 ​
     # 7.关闭资源
     sc.stop()
     spark.stop()
 ​
2.1.3 基于RDD反射方式

Schema使用反射方法来推断Schema模式Spark SQL 可以将 Row 对象的 RDD 转换为 DataFrame,从而推断数据类型。

 # 导包
 import os
 from pyspark.sql import SparkSession
 ​
 # 绑定指定的python解释器
 from pyspark.sql.types import StructType, StringType, StructField, Row
 ​
 os.environ['SPARK_HOME'] = '/export/server/spark'
 os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
 os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
 ​
 # 创建main函数
 if __name__ == '__main__':
     # 1.创建SparkContext对象
     spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()
     sc = spark.sparkContext
 ​
 ​
     # 2.读取生成rdd
     # 3.定义schema结构信息
     textRDD = sc.textFile('file:///export/data/spark_project/spark_sql/data/data1.txt')
     etlRDD_schema = textRDD.map(lambda line:line.split(',')).map(lambda l:Row(name=l[0],age=l[1]))
 ​
     # 4.创建DF对象
     dfpeople = spark.createDataFrame(etlRDD_schema)
 ​
     # 5.df展示结构信息
     dfpeople.show()
     dfpeople.printSchema()
     # 6.拓展: 创建临时视图,方便sql查询
     dfpeople.createTempView('peoples')
     r = spark.sql('select * from peoples')
     r.show()
 ​
 ​
 ​
     # 7.关闭资源
     sc.stop()
     spark.stop()
2.2 toDF()创建

schema模式编码在字符串中,toDF参数用于指定列的名字。如果你不指定列的名字,那么默认的列的名字会是_1, _2等等。

 # 导包
 import os
 from pyspark.sql import SparkSession
 ​
 # 绑定指定的python解释器
 from pyspark.sql.types import StructType, StringType, StructField, Row
 ​
 os.environ['SPARK_HOME'] = '/export/server/spark'
 os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
 os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
 ​
 # 创建main函数
 if __name__ == '__main__':
     # 1.创建SparkContext对象
     spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()
     sc = spark.sparkContext
 ​
 ​
     # 2.读取生成rdd
     # 3.定义schema结构信息
     textRDD = sc.textFile('file:///export/data/spark_project/spark_sql/data/data1.txt')
     etlRDD = textRDD.map(lambda line:line.split(','))
 ​
     # 4.创建DF对象
     dfpeople = etlRDD.toDF(['name','age'])
 ​
     # 5.df展示结构信息
     dfpeople.show()
     dfpeople.printSchema()
     # 6.拓展: 创建临时视图,方便sql查询
     dfpeople.createTempView('peoples')
     r = spark.sql('select * from peoples')
     r.show()
 ​
     # 7.关闭资源
     sc.stop()
     spark.stop()

2.3 read读取外部文件

复杂API

 统一API格式: 
 spark.read
     .format('text|csv|json|parquet|orc|avro|jdbc|.....') # 读取外部文件的方式
     .option('k','v') # 选项  可以设置相关的参数 (可选)
     .schema(StructType | String) #  设置表的结构信息
     .load('加载数据路径') # 读取外部文件的路径, 支持 HDFS 也支持本地

简写API

 请注意: 以上所有的外部读取方式,都有简单的写法。spark内置了一些常用的读取方案的简写
 格式: 
     spark.read.读取方式()
     
 例如: 
     df = spark.read.csv(
         path='file:///export/data/_03_spark_sql/data/stu.txt',
         header=True,
         sep=' ',
         inferSchema=True,
         encoding='utf-8',
     )
2.3.1 Text方式读取
 text方式读取文件:
     1- 不管文件中内容是什么样的,text会将所有内容全部放到一个列中处理
     2- 默认生成的列名叫value,数据类型string
     3- 只能够在schema中修改字段value的名称,其他任何内容不能修改
 # 导包
 import os
 from pyspark.sql import SparkSession
 ​
 # 绑定指定的python解释器
 from pyspark.sql.types import StructType, StringType, StructField, Row
 ​
 os.environ['SPARK_HOME'] = '/export/server/spark'
 os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
 os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
 ​
 # 创建main函数
 if __name__ == '__main__':
     # 1.创建SparkContext对象
     spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()
 ​
     # 2.读取数据
     # 注意: 读取text文件默认只有1列,且列名交value,可以通过schema修改
     df = spark.read\
         .format('text')\
         .schema('info string')\
         .load('file:///export/data/spark_project/spark_sql/data/data1.txt')
 ​
 ​
     # 5.df展示结构信息
     df.show()
     df.printSchema()
     # 6.拓展: 创建临时视图,方便sql查询
     df.createTempView('peoples')
     r = spark.sql('select * from peoples')
     r.show()
 ​
 ​
     # 6.关闭资源
     spark.stop()

2.3.2 CSV方式读取
 csv格式读取外部文件:
     1- 复杂API和简写API都必须掌握
     2- 相关参数作用说明:
         2.1- path:指定读取的文件路径。支持HDFS和本地文件路径
         2.2- schema:手动指定元数据信息
         2.3- sep:指定字段间的分隔符
         2.4- encoding:指定文件的编码方式
         2.5- header:指定文件中的第一行是否是字段名称
         2.6- inferSchema:根据数据内容自动推断数据类型。但是,推断结果可能不精确
 # 导包
 import os
 from pyspark.sql import SparkSession
 ​
 # 绑定指定的python解释器
 from pyspark.sql.types import StructType, StringType, StructField, Row
 ​
 os.environ['SPARK_HOME'] = '/export/server/spark'
 os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
 os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
 ​
 # 创建main函数
 if __name__ == '__main__':
     # 1.创建SparkContext对象
     spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()
 ​
     # 2.读取数据
     # 注意: csv文件可以识别多个列,可以使用schema指定列名,类型
     # 原始方式
     # df = spark.read\
     #     .format('csv')\
     #     .schema('name string,age int')\
     #     .option('sep',',')\
     #     .option('encoding','utf8')\
     #     .option('header',False)\
     #     .load('file:///export/data/spark_project/spark_sql/data/data1.txt')
     # 简化方式
     df = spark.read.csv(
         schema='name string,age int',
         sep=',',
         encoding='utf8',
         header=False,
         path='file:///export/data/spark_project/spark_sql/data/data1.txt'
     )
 ​
     # 5.df展示结构信息
     df.show()
     df.printSchema()
     # 6.拓展: 创建临时视图,方便sql查询
     df.createTempView('peoples')
     r = spark.sql('select * from peoples')
     r.show()
 ​
 ​
     # 7.关闭资源
     spark.stop()
 ​

2.3.3 JSON方式读取
 json读取数据:
 1- 需要手动指定schema信息。如果手动指定的时候,字段名称与json中的key名称不一致,会解析不成功,以null值填充
 2- csv/json中schema的结构,如果是字符串类型,那么字段名称和字段数据类型间,只能以空格分隔

json的数据内容

 {'id': 1,'name': '张三','age': 20}
 {'id': 2,'name': '李四','age': 23,'address': '北京'}
 {'id': 3,'name': '王五','age': 25}
 {'id': 4,'name': '赵六','age': 29}

代码实现

 # 导包
 import os
 from pyspark.sql import SparkSession
 ​
 # 绑定指定的python解释器
 from pyspark.sql.types import StructType, StringType, StructField, Row
 ​
 os.environ['SPARK_HOME'] = '/export/server/spark'
 os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
 os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
 ​
 # 创建main函数
 if __name__ == '__main__':
     # 1.创建SparkContext对象
     spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()
 ​
     # 2.读取数据
     # 注意: json的key和schema指定的字段名不一致,会用null补充,如果没有数据也是用null补充
     # 简化方式
     df = spark.read.json(
         schema='id int,name string,age int,address string',
         encoding='utf8',
         path='file:///export/data/spark_project/spark_sql/data/data2.txt'
     )
 ​
     # 5.df展示结构信息
     df.show()
     df.printSchema()
     # 6.拓展: 创建临时视图,方便sql查询
     df.createTempView('peoples')
     r = spark.sql('select * from peoples')
     r.show()
 ​
     # 关闭资源
     spark.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/779844.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

服务器BMC基础知识总结

前言 因为对硬件方面不太理解&#xff0c;所以打算先从服务器开始学习&#xff0c;也想和大家一起分享一下&#xff0c;有什么不对的地方可以纠正一下哦&#xff01;谢谢啦&#xff01;互相学习共同成长~ 1.BMC是什么&#xff1f; 官方解释&#xff1a;BMC全名Baseboard Mana…

【聚星文社 绘唐3】MJ版一键AI工具使用文档

MJ版一键AI工具使用文档 绘唐地址下载 欢迎使用MJ版一键AI工具&#xff01;这个工具可以帮助您快速生成各种类型的文本&#xff0c;包括文章、对话、代码等等。 使用方法&#xff1a; 登录&#xff1a;首先&#xff0c;您需要登录到您的MJ版账户。如果您还没有账户&#xff0…

Spring AOP源码篇二之 代理工厂ProxyFactory学习

了解AspectJ表达式以及PointCut、Advice、Advisor后&#xff0c;继续学习Spring AOP代理工厂 AspectJ表达式参考&#xff1a;Spring AOP之AspectJ表达式-CSDN博客 PointCut、Advice、Advisor参考&#xff1a;Spring AOP源码篇一之 PointCut、Advice、Advisor学习-CSDN博客 简单…

从零开始实现大语言模型(四):简单自注意力机制

1. 前言 理解大语言模型结构的关键在于理解自注意力机制(self-attention)。自注意力机制可以判断输入文本序列中各个token与序列中所有token之间的相关性&#xff0c;并生成包含这种相关性信息的context向量。 本文介绍一种不包含训练参数的简化版自注意力机制——简单自注意…

STM32-PWR和WDG看门狗

本内容基于江协科技STM32视频学习之后整理而得。 文章目录 1. PWR1.1 PWR简介1.2 电源框图1.3 上电复位和掉电复位1.4 可编程电压监测器1.5 低功耗模式1.6 模式选择1.7 睡眠模式1.8 停止模式1.9 待机模式1.10 库函数 2. WDG看门狗2.1 WDG简介2.2 IWDG框图2.3 IWDG键寄存器2.4 …

ACM ICPS独立出版 | 2024年第三届计算与人工智能国际会议(ISCAI 2024)

会议简介 Brief Introduction 2024年第三届计算与人工智能国际会议(ISCAI 2024) 会议时间&#xff1a;2024年11月22 -24日 召开地点&#xff1a;中国大理 大会官网&#xff1a;www.iscai.org 2024年第三届计算与人工智能国际会议(ISCAI 2024)将围绕“计算与人工智能”的最新研究…

排序 -- 冒泡排序和快速排序

一、 交换排序 1、基本思想 所谓交换&#xff0c;就是根据序列中两个记录键值的比较结果来对换这两个记录在序列中的位置&#xff0c;交换排序的特点是&#xff1a;将键值较大的记录向序列的尾部移动&#xff0c;键值较小的记录向序列的前部移动。 2、常见的交换排序 1、冒泡…

Java Selenium入门程序

需求&#xff1a;使用chrome浏览器打开百度首页 1.配置浏览器驱动 &#xff08;1&#xff09;下载浏览器驱动&#xff0c;浏览器版本需与驱动版本一致&#xff1b; &#xff08;2&#xff09;编辑系统环境变量-->编辑Path-->填入浏览器驱动路径&#xff1a; 2.maven工…

【反悔贪心 反悔堆】1642. 可以到达的最远建筑

本文涉及知识点 反悔贪心 反悔堆 LeetCode1642. 可以到达的最远建筑 给你一个整数数组 heights &#xff0c;表示建筑物的高度。另有一些砖块 bricks 和梯子 ladders 。 你从建筑物 0 开始旅程&#xff0c;不断向后面的建筑物移动&#xff0c;期间可能会用到砖块或梯子。 当…

刷题之删除有序数组中的重复项(leetcode)

删除有序数组中的重复项 这题简单题&#xff0c;双指针&#xff0c;一个指针记录未重复的数的个数&#xff0c;另一个记录遍历的位置。 以下是简单模拟&#xff0c;可以优化&#xff1a; class Solution { public:int removeDuplicates(vector<int>& nums) {int l0…

STL--求交集,并集,差集(set_intersection,set_union,set_difference)

set_intersection(重要) 求两个有序的序列的交集. 函数声明如下: template<class InputIterator1, class InputIterator2, class OutputIterator>OutputIterator set_intersection(InputIterator1 _First1, //容器1开头InputIterator1 _Last1, //容器2结尾(不包含)Inp…

ChatGPT4深度解析:探索智能对话新境界

大模型chatgpt4分析功能初探 目录 1、探测目的 2、目标变量分析 3、特征缺失率处理 4、特征描述性分析 5、异常值分析 6、相关性分析 7、高阶特征挖掘 1、探测目的 1、分析chat4的数据分析能力&#xff0c;提高部门人效 2、给数据挖掘提供思路 3、原始数据&#xf…

Navicat终于免费了, 但是这个结果很奇葩

个人用下载地址: 点呀 好家伙, 每个机构最多5个用户, 对于正在审计的公司…

DAY1: 实习前期准备

文章目录 VS Code安装的插件C/CCMakeGitHub CopilotRemote-SSH收获 VS Code 下载链接&#xff1a;https://code.visualstudio.com 安装的插件 C/C 是什么&#xff1a;C/C IntelliSense, debugging, and code browsing. 为什么&#xff1a;初步了解如何在VS Code里使用C输出…

Vulnhub-Os-hackNos-1(包含靶机获取不了IP地址)

https://download.vulnhub.com/hacknos/Os-hackNos-1.ova #靶机下载地址 题目&#xff1a;要找到两个flag user.txt root.txt 文件打开 改为NAT vuln-hub-OS-HACKNOS-1靶机检测不到IP地址 重启靶机 按住shift 按下键盘字母"E"键 将图中ro修改成…

筛选Github上的一些优质项目

每个项目旁都有标签说明其特点&#xff0c;如今日热捧、多模态、收入生成、机器人、大型语言模型等。 项目涵盖了不同的编程语言和领域&#xff0c;包括人工智能、语言模型、网页数据采集、聊天机器人、语音合成、AI 代理工具集、语音转录、大型语言模型、DevOps、本地文件共享…

7-6 每日升学消息汇总

复旦附中清北比例大涨&#xff0c;从统计数据来看&#xff0c;今年复附的清北人数将创历史新高&#xff0c;达到前所未有年进43人。离上海7月9号中考出分&#xff0c;还有3天。小道消息说&#xff0c;画狮的数游天下又回来了&#xff0c;目前还未官方消息。2024第二届国际数学夏…

安卓虚拟位置修改1.25beta支持路线模拟、直接定位修改

导语:更新支持安卓14/15&#xff0c;支持路线模拟、直接定位修改&#xff0c;仅支持单一版本 无root需根据教程搭配下方链接所提供的虚拟机便可进行使用 有root且具备XP环境可直接真机运行 如你有特殊需求 重启问题设置打开XP兼容 针对具有虚拟机检测的软件 建议如下 度娘搜索…

多表查询sql

概述&#xff1a;项目开发中,在进行数据库表结构设计时,会根据业务需求及业务模块之间的关系,分析并设计表结构,由于业务之间相互关联,所以各个表结构之间也存在着各种联系&#xff0c;分为三种&#xff1a; 一对多多对多一对一 一、多表关系 一对多 案例&#xff1a;部门与…

在CMD中创建虚拟环境并在VSCode中使用和管理

1. 使用Conda创建虚拟环境 在CMD或Anaconda Prompt中执行以下代码以创建一个新的虚拟环境&#xff1a; conda create -n my_env python 3.8 这样会创建一个名为 my_env 的环境&#xff0c;并在Anaconda环境目录下生成一个相应的文件夹&#xff0c;包含该虚拟环境所需的所有…