Spark:DataFrame介绍及使用

1. DataFrame详解

DataFrame是基于RDD进行封装的结构化数据类型,增加了schema元数据,最终DataFrame类型在计算时,还是转为rdd计算。DataFrame的结构化数据有Row(行数据)和schema元数据构成。

  • Row 类型 表示一行数据
    • DataFrame就算是多行构成
# 导入行类Row
from pyspark.sql import Row

# 创建行数据
r1 = Row(1, '张三', 20)

# 行数取取值 按照下标取值
data = r1[0]
print(data)
data1 = r1[1]
print(data1)

# 指定字段创建行数据
r2 = Row(id=2, name='李四', age=22)
# 按照字段取值
data3 = r2['id']
print(data3)
data4 = r2['name']
print(data4)
  • schema表信息
    • 定义DataFrame中的表的字段名和字段类型。
# 导入数据类型
from pyspark.sql.types import *

# 定义schema信息
# 使用StructType类进行定义
# add()方法是指定字段信息
# 第一参数,字段名
# 第二个参数,字段信息
# 第三个参数是否允许为空值  默认是True,允许为空
schema_type = StructType().\
    add('id',IntegerType()).\
    add('name',StringType()).\
    add('age',IntegerType(),False)

2. DataFrame创建

创建datafram数据需要使用一个sparksession的类创建,SparkSession类是在SparkContext的基础上进行了封装,也就是SparkSession类中包含了SparkContext。

2.1 基本创建

#DataFrame 的基本创建
#Row就是行数据定义的类
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import *

#行数据创建
r1 = Row(1,"刘向阳",23,'男')
print(r1)

#行数据下标取值
print(r1[0])
print(r1[1])

#创建行数据时可以指定字段名
r2 = Row(id=2,name='李四',age=20,gender='女')
print(r2)
#使用字段名取值
print(r2['name'])

# 定义元数据
schema = (StructType().add('id', IntegerType()).add('username', StringType()).add('age', IntegerType()).add('gender', StringType()))
print(schema)

# 将元数据和行数据放在一起合成DataFrame
ss = SparkSession.builder.getOrCreate()

# 调用创建df的方法
df = ss.createDataFrame([r1,r2],schema=schema)

# 查看df中数据
df.show()

#查看元数据信息
df.printSchema()

运行结果:
在这里插入图片描述

2.2 RDD和DF之间的转化

  • rdd的二维数据转化为DataFrame
    • rdd.toDF()
      在这里插入图片描述
# rdd 和 dataframe的转化
from pyspark.sql import SparkSession

#创建SparkSession对象
ss = SparkSession.builder.getOrCreate()

#基于ss对象获取sparkContext
sc = ss.sparkContext

#创建rdd , 要使用二维列表指定每行数据
rdd = sc.parallelize([[1,'张三',20,'男'],[2,'李四',20,'男']])

#将rdd转为df
df = rdd.toDF(schema='id int,name string,age int,gender string')

#df数据查看
df.show()
df.printSchema()

#df可以转rdd
res = df.rdd.collect()
print(res)

rdd2 = df.rdd.map(lambda x:x['name'])

res2 = rdd2.collect()
print(res2)

运行结果:
在这里插入图片描述

2.3 pandas和spark之间转化

  • spark的df转为pandas的df
    • toPandas
#pandas 和 spark的dataframe转化
from pyspark.sql import SparkSession
import pandas as pd

ss = SparkSession.builder.getOrCreate()

#创建pandas的df
df_pd = pd.DataFrame(
    {
        'id':[1,2,3,4],
        'name':['张三','李四','王五','赵六'],
        'age':[1,2,3,4],
        'gender':['男','女','女','女']
    }
)
#查看数据
print(df_pd)

#取值
name = df_pd['name'][0]
print(name)
# 将pandas中的df转为spark的df
df_spark = ss.createDataFrame(df_pd)

#查看
df_spark.show()

#取值
row = df_spark.limit(1).first()
print(row['name'])

#将spark的df重新转为pandas的df
df_pandas = df_spark.toPandas()
print(df_pandas)

运行结果:
在这里插入图片描述

2.4 读取文件数据转为df

通过read方法读取数据转为df

  • ss.read
#读取文件转为df
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

#读取不同文件数据转为df
# txt文件
df = ss.read.text('hdfs://node1:8020/data/students.txt')
df.show()

# json 文件
df_json = ss.read.json('hdfs://node1:8020/data/baike_qa_valid.json')
df_json.show()

#orc文件
df_orc = ss.read.orc('hdfs://node1:8020/data/users.orc')
df_orc.show()

#去取csv文件
#header或csv文件中的第一行作为表头字段数据
df_csv = ss.read.csv('hdfs://node1:8020/data/students.csv')
df_csv.show()

3. DataFrame基本使用

3.1 SQL语句

使用sparksession提供的sql方法,编写sql语句执行

#使用sql操作dataframe结构化数据
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

#读取文件数据转为df
df_csv = ss.read.csv('hdfs://node1:8020/data/students.csv', header=True,sep=',')

#使用sql操作df数据
#将df指定一个临时表名
df_csv.createTempView('stu')

#编写sql字符串语句,支持hivesql语法
sql_str ="""
select * from stu 
"""

#执行sql语句,执行结果返回一个新的df
df_res = ss.sql(sql_str)
df_csv.show()
df_res.show()

3.2 DSL方法

DSL方法是df提供的数据操作函数
使用方式:

  • df.方法()
  • 可以进行链式调用
  • df.方法().方法().方法()
  • 方法执行后返回一个新的df保存计算结果
  • new_df = df.方法()

spark提供DSL方法和sql的关键词一样,使用方式和sql基本类似,在进行数据处理时,要按照sql的执行顺序去思考如何处理数据。
from join 知道数据在哪 df本身就是要处理的数据 df.join(df2) from 表
where 过滤需要处理的数据 df.join(df2).where()
group by 聚合 数据的计算 df.join(df2).where().groupby().sum()
having 计算后的数据进行过滤 df.join(df2).where().groupby().sum().where()
select 展示数据的字段 df.join(df2).where().groupby().sum().where().select()
order by 展示数据的排序 df.join(df2).where().groupby().sum().where().select().orderBy()
limit 展示数据的数量 df.join(df2).where().groupby().sum().where().select().orderBy().limit()
DSL方法执行完成后会得到一个处理后的新的df

#使用DSL方法操作dataframe
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

#读取文件数据转为df
df_csv = ss.read.csv('hdfs://node1/data/students.csv', header=True,sep=',')

#使用DSL方法对df数据进行操作
df2 = df_csv.select('id','name')

#查看结果
df2.show()

#第二种指定字段的方式
df3 = df_csv.select(df_csv.age,df_csv.gender)

#给字段起别名
df4 = df_csv.select(df_csv.age.alias('new_age'),df_csv.gender)
df4.show()

#修改字段类型
df_csv.printSchema()
df5 = df_csv.select(df_csv.age.cast('int'),df_csv.gender)
df5.printSchema()

#where 的数据过滤
age = 20
df6 = df_csv.where(f'age > {age}')
df6.show()

#过滤年龄大于20并且性别为女性的学生信息
df7 = df_csv.where(f'age > 20 and gender = "女" ')
df7.show()

#使用第二种字段判断方式
df8 = df_csv.where(df_csv.age == age)
df8.show()

#分组聚合计算
df9 = df_csv.select(df_csv.gender,df_csv.cls,df_csv.age.cast('int').alias('age')).groupby('gender','cls').sum('age')
df9.show()

#分组后过滤where 聚合计算时只能一次计算一个聚合数据
df10 = df_csv.select(df_csv.gender,df_csv.cls,df_csv.age.cast('int').alias('age')).groupby('gender','cls').sum('age').where('sum(age) > 80')
df10.show()

#排序
df11 = df_csv.orderBy('age')  #默认排序
df11.show()

df12 = df_csv.orderBy('age',ascending=False)  #降序
df12.show()

#分页
df13 = df_csv.limit(5)
df13.show()

#转为rdd
res = df_csv.rdd.collect()[5:10]
print(res)
df_new = ss.createDataFrame(res)
df_new.show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/892775.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

中文情感分析课程设计

中文情感分析 中文情感分析的实质是文本分类问题,本项目分别采用CNN和BI-LSTM两种模型解决文本分类任务,并用于情感分析,达到不错的效果。 两种模型在小数据集上训练,在验证集的准确率、号回率及F1因子均接近90% 项目设计的目标…

短链接能有多短?颠覆你的认知

在我们平时的网络活动中,经常会遇到需要将长链接缩短的情况。有细心的小伙伴会发现,平时收到的短信里面都会携带一个很短的链接,这就是将长链接缩短之后的效果。 缩短链接的主要目的有两个:一是使链接更加简洁美观;二…

基于SSM的网上拍卖平台

文未可获取一份本项目的java源码和数据库参考。 1. 选题背景 网络在人们的日常生活所占的比重越来越重,人们对网络信息的依赖性也越来越高。为用户提供良好的网络服务,可以给用户带来便捷的同时,也为网络服务开发商带来了客观的收益。当前&…

4-20mA采集卡 USB温度采集卡 USB热电偶采集 USB5601多功能采集卡

阿尔泰科技 型号:USB5601 概述: 产品外形图: 外形尺寸图: 主要指标: 8 路差分模拟量采集、8 路隔离数字量输入和 8 路隔离数字量输出 要了解更多技术和产品知识关注我吧!

最大公共子序列c++

最大公共子序列c 概念基本的概念 递归算法代码优化map基础优化代码 概念 基本的概念 子序列: 由原序列中若干个元素组成,元素可以不连续,但和原序列的顺序一致。最长公共子序列: 一个序列即是甲序列的子序列,也是乙序…

DNDC模型下载与安装;土壤碳储量;点尺度和区域尺度模拟;气象数据、土地数据、土壤数据处理、农田减排潜力分析、温室气体排放分析等

实现美丽中国建设目标,“双碳”行动将会发挥非常重要的作用。碳循环的精确模拟是实现“双碳”行动的关键。DNDC(Denitrification-Decomposition,反硝化-分解模型)是目前国际上最为成功的模拟生物地球化学循环的模型之一&#xff0…

spark:Structured Streaming介绍

文章目录 1. Structured Streaming介绍1.1 实时计算和离线计算1.1.1 实时计算1.1.2 离线计算 1.2 有界和无界数据 2. 简单使用3. 编程模型4. 数据处理流程4.1 读取数据Source4.1.1 文件数据处理 4.2 计算操作 Operation4.3 数据输出 Sink4.3.1 输出模式4.3.2 指定输出位置4.3.3…

Html/Vue浏览器下载并重命名文件

Html/Vue浏览器下载并重命名文件 row是上方图片的数据对象 download(row) {const link document.createElement(a);link.style.display none;// 设置下载地址link.setAttribute(href, row.url);// 设置文件名(这里可以重新设置名字,下载之后的文件就是你重新命名…

【数据结构与算法】链表(上)

记录自己所学&#xff0c;无详细讲解 无头单链表实现 1.项目目录文件 2.头文件 Slist.h #include <stdio.h> #include <assert.h> #include <stdlib.h> struct Slist {int data;struct Slist* next; }; typedef struct Slist Slist; //初始化 void SlistI…

死锁和活锁和线程饥饿

死锁 死锁是指两个或两个以上的线程在执行过程中&#xff0c;因争夺资源而造成的一种相互等待的现象&#xff0c;若无外力作用&#xff0c;它们都将无法推进下去。 原因&#xff1a;两个或两个以上的线程共同访问两个或多个相同的资源&#xff08;如对象锁&#xff09;&#…

Unity使用TriangleNet参考

TriangleNet下载如下&#xff1a; TriangleNet 效果如下&#xff1a; 代码参考如下&#xff1a; using System.Collections.Generic; using UnityEngine; using TriangleNet.Geometry;public class TestTriangleNet : MonoBehaviour {[SerializeField]Material material;voi…

耳夹式耳机哪个品牌音质好?五大优质音质的耳夹式耳机!

随着健康理念的传播&#xff0c;运动健身成为大众追求高品质生活的重要部分。传统耳机在运动场景下有局限性&#xff0c;稳定性差、清洁不便&#xff0c;影响运动体验。这时&#xff0c;耳夹式耳机出现&#xff0c;以独特设计和传音方式重塑运动音乐享受&#xff0c;无需入耳&a…

游戏推荐业务中基于 sentinel 的动态限流实践

作者&#xff1a;来自 vivo 互联网服务器团队- Gao Meng 本文介绍了一种基于 sentinel 进行二次开发的动态限流解决方案&#xff0c;包括什么是动态限流、为什么需要引入动态限流、以及动态限流的实现原理。 一、背景 1.1 当前的限流方案 随着互联网的发展及业务的增长&…

Flythings学习(四)串口通信

文章目录 1 串口编程基本步骤1.1 打开串口1.2 配置串口 1.3 读串口1.4 发送串口1.5 关闭串口 2 综合使用3 如何在软件上保证串口稳定通信4 flythings中的串口通讯5 协议接收部分使用和修改方法6 通讯协议数据怎么和UI控件对接 1 串口编程基本步骤 串口通信有5个步骤 1.打开串口…

Telegram——Bot 机器人/小程序入门指南

一、Bot 介绍 在 TG 中,机器人可以用于接收和发送消息、管理群组(在有权限的情况下可以封禁用户、删除消息、置顶消息等)、通过API进行编程操作、使用 Inline 查询功能在不同的聊天室中提供查询服务、创建自定义键盘按钮、发出账单并收款、接入小程序游戏等。 然而,Bot 默…

单片机中断概念以及示例

中断允许控制寄存器 CPU对中断系统所有中断以及某个中断源的开放和屏蔽是由中断允许寄存器IE控制的。 EX0(IE.0)&#xff0c;外部中断0允许位&#xff1b;EX01&#xff0c;打开外部中断0中断&#xff1b;EX00关闭外部中断0中断。 ET0(IE.1)&#xff0c;定时/计数器T0中断允许…

沪尚茗居装修秘籍:嵌入式蒸烤箱,让厨房生活更精彩

在装修厨房时&#xff0c;选择一款合适的嵌入式蒸烤箱不仅能提升烹饪效率&#xff0c;还能为厨房增添一份现代感。沪尚茗居深知用户对厨房电器的需求&#xff0c;从实际出发&#xff0c;为用户推荐选购嵌入式蒸烤箱的实用技巧&#xff0c;让厨房生活更加美好。    首先&…

Real-World Image Variation by Aligning Diffusion Inversion Chain

https://proceedings.neurips.cc/paper_files/paper/2023/file/61960fdfda4d4e95fa1c1f6e64bfe8bc-Paper-Conference.pdfhttps://rival-diff.github.io/ 问题引入 针对的是image varation generation这个任务&#xff0c;tuning free的&#xff1b; methods C C C表示condit…

【closerAI ComfyUI】电商模特一键换装解决方案来了!细节到位无瑕疵!再加上flux模型加持,这个工作流不服不行!

不得了了兄弟们。这应该是电商界的福音&#xff0c;电商模特一键换装解决方案来了&#xff01;细节到位无瑕疵&#xff01;再加上flux模型加持&#xff0c;这个工作流不服不行&#xff01; 这期我们主要讨论如何使用stable diffusion comfyUI 制作完美无瑕疵的换装工作流。** …

《Linux从小白到高手》综合应用篇:详解Linux系统调优之服务器硬件优化

List item 本篇介绍Linux服务器硬件调优。硬件调优主要包括CPU、内存、磁盘、网络等关键硬件组。 1. CPU优化 选择适合的CPU&#xff1a; –根据应用需求选择多核、高频的CPU&#xff0c;以满足高并发和计算密集型任务的需求。CPU缓存优化&#xff1a; –确保CPU缓存&#x…