Pyspark dataframe基本内置方法(5)

文章目录

  • Pyspark sql DataFrame
    • 相关文章
    • toDF 设置新列名
    • toJSON row对象转换json字符串
    • toLocallterator 获取迭代器
    • toPandas 转换python dataframe
    • transform dataframe转换
    • union unionALL 并集不去重(按列顺序)
    • unionByName 并集不去重(按列名)
    • unpivot 反转表(宽表转长表)
    • withColumn 添加列操作
    • withColumns 添加多列操作
    • withColumnRenamed 列重命名
    • withColumnsRenamed 多列重命名
    • withMetadata 设置元数据
    • write 存储表
      • write.saveAsTable
      • insertInto

Pyspark sql DataFrame

相关文章

Pyspark下操作dataframe方法(1)
Pyspark下操作dataframe方法(2)
Pyspark下操作dataframe方法(3)
Pyspark下操作dataframe方法(4)
Pyspark下操作dataframe方法(5)

toDF 设置新列名

列名更新,将会按照新列名顺序的替换原列名返回新dataframe,更新列名数量需要跟原始列名数量一致。

from pyspark.sql.functions import lit

data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12|  1|    男|     1|
|test1| 20|  1|    女|     1|
|test2| 26|  1|    男|     1|
|test3| 19|  1|    女|     1|
|test4| 51|  1|    女|     1|
|test5| 13|  1|    男|     1|
+-----+---+---+------+------+
data.toDF(*['n1','n2','n3','n5','n4']).show()
+-----+---+---+---+---+
|   n1| n2| n3| n5| n4|
+-----+---+---+---+---+
| ldsx| 12|  1| 男|  1|
|test1| 20|  1| 女|  1|
|test2| 26|  1| 男|  1|
|test3| 19|  1| 女|  1|
|test4| 51|  1| 女|  1|
|test5| 13|  1| 男|  1|
+-----+---+---+---+---+

toJSON row对象转换json字符串

把dataframe的row对象转换为json字符串,返回rdd

data.rdd.first()
Row(name='ldsx', age='12', id='1', gender='男', new_id='1')
# data.toJSON()返回rdd类型
data.toJSON().first()
'{"name":"ldsx","age":"12","id":"1","gender":"男","new_id":"1"}'

toLocallterator 获取迭代器

返回一个迭代器,其中包含此DataFrame中的所有行。迭代器将消耗与此DataFrame中最大分区一样多的内存。通过预取,它可能会消耗最多2个最大分区的内存。

d1 = data.toLocalIterator()
d1
<generator object _local_iterator_from_socket.<locals>.PyLocalIterable.__iter__ at 0x7f55c86e0570>
# 便利迭代器
for i in d1:
    print(i)
    
Row(name='ldsx', age='12', id='1', gender='男', new_id='1')
Row(name='test1', age='20', id='1', gender='女', new_id='1')
Row(name='test2', age='26', id='1', gender='男', new_id='1')
Row(name='test3', age='19', id='1', gender='女', new_id='1')
Row(name='test4', age='51', id='1', gender='女', new_id='1')
Row(name='test5', age='13', id='1', gender='男', new_id='1')

toPandas 转换python dataframe

需要python环境安装pandas的前提下使用,且dataframe需要很小,因为所有数据都加载到driver的内存中。

data.toPandas()
type(data.toPandas())
<class 'pandas.core.frame.DataFrame'>
    name age id gender new_id
0   ldsx  12  1      男      1
1  test1  20  1      女      1
2  test2  26  1      男      1
3  test3  19  1      女      1
4  test4  51  1      女      1
5  test5  13  1      男      1

transform dataframe转换

参数为处理函数,返回值必须为dataframe

data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12|  1|    男|     1|
|test1| 20|  1|    女|     1|
|test2| 26|  1|    男|     1|
|test3| 19|  1|    女|     1|
|test4| 51|  1|    女|     1|
|test5| 13|  1|    男|     1|
+-----+---+---+------+------+

# 处理函数自定义最后返回了dataframe
def ldsx(spark_df):
    colums = [ str(i)+'_ldsx' for i in range(len(spark_df.columns)) ]
    return spark_df.toDF(*colums)
    
data.transform(ldsx).show()
+------+------+------+------+------+
|0_ldsx|1_ldsx|2_ldsx|3_ldsx|4_ldsx|
+------+------+------+------+------+
|  ldsx|    12|     1|    男|     1|
| test1|    20|     1|    女|     1|
| test2|    26|     1|    男|     1|
| test3|    19|     1|    女|     1|
| test4|    51|     1|    女|     1|
| test5|    13|     1|    男|     1|
+------+------+------+------+------+

union unionALL 并集不去重(按列顺序)

获得新dataframe,unionall别名为union,如果要去重使用distinct方法,不会解析对应的列名合并,是按照列的顺序合并的,硬合

df2 = spark.createDataFrame([(3, 'C'), (4, 'D')], ['id', 'value'])
df1 = spark.createDataFrame([(1, 'A'), (2, 'B'),(3, 'C'),(3, 'C')], ['id', 'value'])
df1.show()
+---+-----+
| id|value|
+---+-----+
|  1|    A|
|  2|    B|
|  3|    C|
|  3|    C|
+---+-----+
df2.show()
+---+-----+
| id|value|
+---+-----+
|  3|    C|
|  4|    D|
+---+-----+
df1.union(df2)
DataFrame[id: bigint, value: string]
df1.union(df2).show()
+---+-----+
| id|value|
+---+-----+
|  1|    A|
|  2|    B|
|  3|    C|
|  3|    C|
|  3|    C|
|  4|    D|
+---+-----+

# 去重使用distinct
df1.union(df2).distinct().show()
+---+-----+
| id|value|
+---+-----+
|  2|    B|
|  1|    A|
|  3|    C|
|  4|    D|
+---+-----+

unionByName 并集不去重(按列名)

是否允许缺失列:allowMissingColumns,默认不允许

# 按照列名合并
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()
+----+----+----+
|col0|col1|col2|
+----+----+----+
|   1|   2|   3|
|   6|   4|   5|
+----+----+----+


# 对于不存在列进行填补
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6, 7]], ["col1", "col2", "col3", "col4"])
# allowMissingColumns True默认填补null
df1.unionByName(df2, allowMissingColumns=True).show()

+----+----+----+----+----+
|col0|col1|col2|col3|col4|
+----+----+----+----+----+
|   1|   2|   3|NULL|NULL|
|NULL|   4|   5|   6|   7|
+----+----+----+----+----+

unpivot 反转表(宽表转长表)

ids: 标识列
values:选中的列(LIST)
variableColumnName: 列名
valueColumnName:对应列的值

宽表转长表,一行变多行,除了选中的ids是不变的,但是会把选中的values中的列由列变成行记录,variableColumnName记录了反转前的列名,

valueColumnName 对应 variableColumnName 存储值。

data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12|  1|    男|     1|
|test1| 20|  1|    女|     1|
|test2| 26|  1|    男|     1|
|test3| 19|  1|    女|     1|
|test4| 51|  1|    女|     1|
|test5| 13|  1|    男|     1|
+-----+---+---+------+------+
# 一行变成三行,id不变 'age','name','gender'由列转行,c_col依次记录'age','name','gender',c_value则记录对应的值
data.unpivot('id',['age','name','gender'],'c_col','c_value').show()
+---+------+-------+
| id| c_col|c_value|
+---+------+-------+
|  1|   age|     12|
|  1|  name|   ldsx|
|  1|gender|     男|
|  1|   age|     20|
|  1|  name|  test1|
|  1|gender|     女|
|  1|   age|     26|
|  1|  name|  test2|
|  1|gender|     男|
|  1|   age|     19|
|  1|  name|  test3|
|  1|gender|     女|
|  1|   age|     51|
|  1|  name|  test4|
|  1|gender|     女|
|  1|   age|     13|
|  1|  name|  test5|
|  1|gender|     男|
+---+------+-------+

withColumn 添加列操作

通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
列表达式必须是此DataFrame上的表达式;列只能引用此数据集提供的属性。添加引用其他数据集的列是错误的。

可以使用lit设置常量作为列

可以使用表达式设置列

# 使用d1上的列或者用常量列
d1.withColumn('c_value2',d1.c_value).show()
+---+------+-------+--------+
| id| c_col|c_value|c_value2|
+---+------+-------+--------+
|  1|   age|     12|      12|
|  1|  name|   ldsx|    ldsx|
|  1|gender|     男|      男|
|  1|   age|     20|      20|
|  1|  name|  test1|   test1|
|  1|gender|     女|      女|
|  1|   age|     26|      26|
|  1|  name|  test2|   test2|
|  1|gender|     男|      男|
|  1|   age|     19|      19|
|  1|  name|  test3|   test3|
|  1|gender|     女|      女|
|  1|   age|     51|      51|
|  1|  name|  test4|   test4|
|  1|gender|     女|      女|
|  1|   age|     13|      13|
|  1|  name|  test5|   test5|
|  1|gender|     男|      男|
+---+------+-------+--------+
# 使用常量补充列
from pyspark.sql.functions import lit
d1.withColumn('c_value2',lit('ldsx')).show()
+---+------+-------+--------+
| id| c_col|c_value|c_value2|
+---+------+-------+--------+
|  1|   age|     12|    ldsx|
|  1|  name|   ldsx|    ldsx|
|  1|gender|     男|    ldsx|
|  1|   age|     20|    ldsx|
|  1|  name|  test1|    ldsx|
|  1|gender|     女|    ldsx|
|  1|   age|     26|    ldsx|
|  1|  name|  test2|    ldsx|
|  1|gender|     男|    ldsx|
|  1|   age|     19|    ldsx|
|  1|  name|  test3|    ldsx|
|  1|gender|     女|    ldsx|
|  1|   age|     51|    ldsx|
|  1|  name|  test4|    ldsx|
|  1|gender|     女|    ldsx|
|  1|   age|     13|    ldsx|
|  1|  name|  test5|    ldsx|
|  1|gender|     男|    ldsx|
+---+------+-------+--------+
# 使用表达式设置列
data = [(1,), (2,), (3,), (4,)]
df = spark.createDataFrame(data, ["number"])
df.show()
+------+
|number|
+------+
|     1|
|     2|
|     3|
|     4|
+------+
from pyspark.sql.functions import col, when
df.withColumn("new_number", when(df.number < 3, "Low").otherwise("High")).show()
------+----------+
|number|new_number|
+------+----------+
|     1|       Low|
|     2|       Low|
|     3|      High|
|     4|      High|
+------+----------+

withColumns 添加多列操作

通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
列表达式必须是此DataFrame上的表达式;列只能引用此数据集提供的属性。添加引用其他数据集的列是错误的。

可以使用lit设置常量作为列

可以使用表达式设置列

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.withColumns({'age2': df.age + 2, 'age3': df.age + 3}).show()
+---+-----+----+----+
|age| name|age2|age3|
+---+-----+----+----+
|  2|Alice|   4|   5|
|  5|  Bob|   7|   8|
+---+-----+----+----+

# 可使用表达式
df.withColumns({'h1': when(df.age < 2, "Low").otherwise("High"), 'h2': df.age + 3}).show()
+---+-----+----+---+
|age| name|  h1| h2|
+---+-----+----+---+
|  2|Alice|High|  5|
|  5|  Bob|High|  8|
+---+-----+----+---+

withColumnRenamed 列重命名

不存在的列重命名报错,返回新dataframe。

列,重命名列

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.withColumnRenamed('age', 'age2').show()
+----+-----+
|age2| name|
+----+-----+
|   2|Alice|
|   5|  Bob|
+----+-----+

withColumnsRenamed 多列重命名

字典,列名的映射

df.withColumnsRenamed({'age':'new_age','name':'new_name'}).show()
+-------+--------+
|new_age|new_name|
+-------+--------+
|      2|   Alice|
|      5|     Bob|
+-------+--------+

withMetadata 设置元数据

更新元数据,返回新dataframe

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
# 查看列的元数据
df.schema['age'].metadata
{}
# 设置元数据
df_meta = df.withMetadata('age', {'foo': 'bar'})
df_meta.schema['age'].metadata
{'foo': 'bar'}

write 存储表

write.saveAsTable

当追加插入的时候dataframe只需要scheam一致,会自动匹配

  • name: str, 表名

  • format: Optional[str] = None, 格式类型 hive,parquet…

  • mode: Optional[str] = None, 写入方式

    1. append:将this:class:DataFrame的内容附加到现有数据中,数据格式需要一致。
    2. “overwrite”:覆盖现有数据,数据格式不重要了,已此次覆盖为准。
    3. errorerrorifeists:如果数据已经存在,则抛出异常。
    4. ‘ignore’:如果数据已经存在,则自动忽略此操作。
  • partitionBy: Optional[Union[str, List[str]]] = None, 分区列表

df.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+
# 覆盖重写
df.write.saveAsTable('ldsx_test','parquet','overwrite',['age'])

# 追加写入
df.write.saveAsTable('ldsx_test','parquet','append',['age'])

# 另一种写法
df.write.format('parquet').mode('append').partitionBy(['age']).saveAsTable('ldsx_test')

在这里插入图片描述

在这里插入图片描述

insertInto

不会对scheam进行校验,按位置插入

d2.show()
+-----+----+
|name1|age1|
+-----+----+
|ldsx1|   2|
|ldsx2|   3|
+-----+----+
d2.write.insertInto('ldsx_test')
d2.schema
StructType([StructField('name1', StringType(), True), StructField('age1', LongType(), True)])

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/882686.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

力扣234 回文链表 Java版本

文章目录 题目描述代码 题目描述 给你一个单链表的头节点 head &#xff0c;请你判断该链表是否为 回文链表 。如果是&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,2,1] 输出&#xff1a;true 示例 2&…

Mac电脑上最简单安装Python的方式

背景 最近换了一台新的 MacBook Air 电脑&#xff0c;所有的开发软件都没有了&#xff0c;需要重新配环境&#xff0c;而我现在最常用的开发程序就是Python。这篇文章记录一下我新Mac电脑安装Python的全过程&#xff0c;也给大家一些思路上的提醒。 以下是我新电脑的配置&…

初识模版!!

初识模版 1.泛型编程1.1 如何实现一个交换函数呢&#xff08;使得所有数据都可以交换&#xff09;&#xff1f;1.2 那可以不可以让编译器根据不同的类型利用该模子来生成代码呢&#xff1f; 2.模版类型2.1 模版概念2.2 函数模版的原理2.3 函数模板的实例化2.4 模板参数的匹配原…

如何在openEuler上安装和配置openGauss数据库

本文将详细介绍如何在openEuler 22.03 LTS SP1上安装和配置openGauss数据库&#xff0c;包括数据库的启动、停止、远程连接配置等关键步骤。 1、安装 使用OpenEuler-22.03-LTS-SP1-x64版本的系统&#xff0c;通过命令行安装openGauss数据库。 1.1、确保系统软件包索引是最新…

2024最受欢迎的3款|数据库管理和开发|工具

1.SQLynx&#xff08;原SQL Studio&#xff09; 概述&#xff1a; SQLynx是一个原生基于Web的SQL编辑器&#xff0c;由北京麦聪软件有限公司开发。它最初被称为SQL Studio&#xff0c;后改名为SQLynx&#xff0c;支持企业的桌面和Web数据库管理。SQLynx支持所有流行的数据库&a…

lettuce引起的Redis command timeout异常

项目使用Lettuce&#xff0c;在自己的环境下跑是没有问题的。在给客户做售前压测时&#xff0c;因为客户端环境比较恶劣&#xff0c;service服务和中间件服务不在同一机房。服务启动后不一会就会出现Redis command timeout异常。 经过差不多两周的追查&#xff0c;最后没办法把…

Fyne ( go跨平台GUI )中文文档-Fyne总览(二)

本文档注意参考官网(developer.fyne.io/) 编写, 只保留基本用法 go代码展示为Go 1.16 及更高版本, ide为goland2021.2​​​​​​​ 这是一个系列文章&#xff1a; Fyne ( go跨平台GUI )中文文档-入门(一)-CSDN博客 Fyne ( go跨平台GUI )中文文档-Fyne总览(二)-CSDN博客 Fyne…

本地生活商城开发搭建 同城O2O线上线下推广

同城本地化商城目前如火如荼&#xff0c;不少朋友咨询本地生活同城平台怎么开发&#xff0c;今天商淘云与大家分享同城O2O线上商城的设计和开发。 本地生活商城一般会涉及到区域以及频道类&#xff0c;一般下单需要支持用户定位、商家定位&#xff0c;这样利于用户可以快速找到…

Leetcode 反转链表

使用递归 /*** Definition for singly-linked list.* public class ListNode {* int val;* ListNode next;* ListNode() {}* ListNode(int val) { this.val val; }* ListNode(int val, ListNode next) { this.val val; this.next next; }* }*/ class S…

音频3A——初步了解音频3A

文章目录 前言一、3A使用的场景和原理1.AEC2.AGC3.ANS/ANR4.硬件3A和软件3A的区别1&#xff09;层级不同2&#xff09;处理顺序不同3&#xff09;优缺点 5.处理过程 二、3A带来的问题三、开源3A算法总结 前言 在日常的音视频通话过程中&#xff0c;说话的双端往往会面对比较复…

Davinci 大数据可视化分析

Davinci 大数据可视化分析 一、Davinci 架构设计1.1 Davinci定义1.2 Davinci 应用场景 二、Davinci 安装部署2.1 部署规划2.2 前置环境准备2.3 Davinci部署2.3.1 物料准备2.3.2 安装配置 2.4 环境变量配置2.5 初始化数据库2.5.1 创建数据库及用户 2.5.2 建表2.6 初始化配置 三、…

Java反射机制入门:解锁运行时类信息的秘密

反射技术&#xff1a; 其实就是对类进行解剖的技术 类中有什么&#xff1f;构造方法 成员方法成员变量 结论&#xff1a;反射技术就是把一个类进行了解剖&#xff0c;然后获取到 构造方法、成员变量、成员方法 反射技术的应用案例&#xff1a; idea框架技术&#xff1a;Spr…

网络安全-ssrf

目录 一、环境 二、漏洞讲解 三、靶场讲解 四、可利用协议 4.1 dict协议 4.2 file协议 4.3 gopher协议 五、看一道ctf题吧&#xff08;长亭的比赛&#xff09; 5.1环境 5.2开始测试 ​编辑 一、环境 pikachu&#xff0c;这里我直接docker拉取的&#xff0c;我只写原…

基于vue框架的传统文化传播网站设计与实现f7r43(程序+源码+数据库+调试部署+开发环境)系统界面在最后面。

系统程序文件列表 项目功能&#xff1a;用户,文化类型,传统文化 开题报告内容 基于Vue框架的传统文化传播网站设计与实现开题报告 一、研究背景 在全球化加速的今天&#xff0c;各国文化相互交融&#xff0c;但也面临着传统文化被边缘化的风险。中国拥有五千年文明史&#…

【通俗易懂介绍OAuth2.0协议以及4种授权模式】

文章目录 一.OAuth2.0协议介绍二.设计来源于生活三.关于令牌与密码的区别四.应用场景五.接下来分别简单介绍下四种授权模式吧1.客户端模式1.1 介绍1.2 适用场景1.3 时序图 2.密码模式2.1 介绍2.2 适用场景2.3时序图 3.授权码模式3.1 介绍3.2 适用场景3.3 时序图 4.简化模式4.1 …

数据的表示和存储 第3讲 C语言中的整数

深耕AI ​互联网行业 算法研发工程师 概括 本讲主要介绍了C语言中的整数表示。 无符号整数能够表示的最大值比带符号整数要大。带符号整数使用补码来表示&#xff0c;补码的运算系统是一种模运算系统&#xff0c;能够实现加减运算的统一。在C语言中&#xff0c;如果一个表达式…

利用F.interpolate()函数进行插值操作

函数简介 功能&#xff1a; 利用插值方法&#xff0c;对输入的张量数组进行上\下采样操作&#xff0c;换句话说就是科学合理地改变数组的尺寸大小&#xff0c;尽量保持数据完整。 torch.nn.functional.interpolate(input, sizeNone, scale_factorNone, modenearest, align_c…

【赵渝强老师】K8s的DaemonSets控制器

DaemonSet控制器相当于在节点上启动了一个守护进程。通过使用DaemonSet可以确保一个Pod的副本运行在 Node节点上。如果有新的Node节点加入集群&#xff0c;DaemonSet也会自动给新加入的节点增加一个Pod的副本&#xff1b;反之&#xff0c;当有Node节点从集群中移除时&#xff0…

EdgeRoute_镜像烧录

1. EdgeRouter 概述 EdgeRouter Lite 是由 Ubiquiti Networks 公司生产的一款高性能网络路由器&#xff0c;适用于家庭和小型办公环境。它的尺寸为200 x 90 x 30 mm&#xff0c;重量为345克&#xff0c;配备了双核500 MHz的MIPS64处理器&#xff0c;并带有硬件加速功能&#x…

MySQL_数据类型简介

课 程 推 荐我 的 个 人 主 页&#xff1a;&#x1f449;&#x1f449; 失心疯的个人主页 &#x1f448;&#x1f448;入 门 教 程 推 荐 &#xff1a;&#x1f449;&#x1f449; Python零基础入门教程合集 &#x1f448;&#x1f448;虚 拟 环 境 搭 建 &#xff1a;&#x1…