DataFrame详解

清洗相关的API

清洗相关的API:

1.去重API: dropDupilcates

2.删除缺失值API: dropna

3.替换缺失值API: fillna

去重API: dropDupilcates

dropDuplicates(subset):删除重复数据

1.用来删除重复数据,如果没有指定参数subset,比对行中所有字段内容,如果全部相同,则认为是重复数据,会被删除

2.如果有指定参数subset,只比对subset中指定的字段范围

删除缺失值API: dropna

dropna(thresh,subset):删除缺失值数据.

1.如果不传递参数,只要任意一个字段值为null,就会删除整行数据

2.如果只指定了subset,那么空值的检查,就只会限定在subset指定范围内

3.如果只指定了thresh,那么空值检查的这些字段中,至少需要有thresh(>=thresh)个字段的值不为空,才不会被删除

 替换缺失值API: fillna

fillna(value,subset):替换缺失值数据

1.value:必须要传递参数,指定填充缺失值的数据

2.subset:限定缺失值的替换范围

注意:

        value如果不是字典,那么就只会替换字段类型匹配的空值

        最常用的是value传递字典形式

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F

# 绑定指定的python解释器
"""
基于RDD转换DataFrame的方式

需求分析:

1- 将每行内容切分得到单个的单词

2- 组织DataFrame的数据结构

2.1- 有两列。一列是单词,一列是次数
"""

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    print('API的清洗')
    # 创建Sparksession对象
    spark = SparkSession \
        .builder \
        .appName('api_etl_demo') \
        .master('local[*]') \
        .getOrCreate()
    # 数据输入
    init_df = spark.read.csv(
        path='file:///export/data/pyspark_projects/02_spark_sql/data/clear_data.csv',
        sep=',',
        header=True,
        inferSchema=True,
        encoding='utf8'
    )
    # 查看数据
    init_df.show()
    init_df.printSchema()
    # 数据处理
    print('=' * 50)
    # 去重API:  dropDuplicates
    init_df.dropDuplicates().show()
    # 指定字段去重
    init_df.dropDuplicates(subset=['id', 'name']).show()

    print('=' * 50)
    # 删除缺失值的API:  dropna
    init_df.dropna().show()
    # 指定字段删除
    init_df.dropna(subset='name').show()
    init_df.dropna(subset=['name', 'age', 'address']).show()
    init_df.dropna(thresh=1, subset=['name', 'age', 'address']).show()
    init_df.dropna(thresh=2, subset=['name', 'age', 'address']).show()
    print('=' * 50)
    # 替换缺失值API
    init_df.fillna(9999).show()
    # value传递字典形式
    init_df.fillna(value={'id': 9999, 'name': '刘亦菲', 'address': '北京'}).show()
    # 释放资源
    spark.stop()

Spark SQL的Shuffle分区设置

Spark SQL底层本质上还是Spark的RDD程序,认为 Spark SQL组件就是一款翻译软件,用于将SQL/DSL翻译为Spark RDD程序, 执行运行

Spark SQL中同样也是存在shuffle的分区的,在执行shuffle分区后, shuffle分区数量默认为 200个,但是实际中, 一般都是需要调整这个分区的, 因为当数据量比较少的数据, 200个分区相对来说比较大一些, 但是当数据量比较大的时候, 200个分区显得比较小

调整shuffle分区的数量:

方案一(不推荐):直接修改spark的配置文件spark-defaults.conf,全局设置,默认值为200

修改设置 spark.sql.shuffle.partitions 20

方案二(常用,推荐使用):在客户端通过指令submit命令提交的时候动态设置shuffle的分区数量,部署上线的时候,基于spark-submit提交运行的时候

        "./spark-submit --conf "spark.sql.shuffle.partitions=20"

方案三(比较常用):在代码中设置,主要在测试环境中使用,一般部署上线的时候,会删除,优先级也是最高的,一般的使用场景是数据量未来不会发生太大的波动

sparksession.conf.set("spark.sql.shuffle.partitions",20)

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F

# 绑定指定的python解释器
"""
1.2 直接基于DataFrame来处理

需求分析:

1- 将每行内容切分得到单个的单词

2- 组织DataFrame的数据结构

2.1- 有两列。一列是单词,一列是次数
"""

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    print('直接基于DataFrame来处理')
    spark = SparkSession \
        .builder \
        .config("spark.sql.shuffle.partitions", 1) \
        .appName('dataFrame_world_count_demo') \
        .master('local[*]') \
        .getOrCreate()

    # 数据输入
    # text方式读取hdfs上的文件
    init_df = spark.read.text(paths='hdfs://node1:8020/source/word.txt')
    # # 查看数据
    # init_df.show()
    # # 打印dataframe表结构信息
    # init_df.printSchema()
    # 创建临时视图
    init_df.createTempView('words')
    # 数据处理
    """
    sparksql方式处理数据-子查询
    1.先切分每一行的数据
    2.使用炸裂函数获得一个word单词列
    3.使用子查询方式聚合统计每个单词出现的次数
    """
    spark.sql("""select word,count(*) as cnt 
    from (select explode(split(value,' ')) as word from words)
    group by word order by cnt desc
    """).show()
    """
       sparksql方式处理数据-侧视图
       1.先切分每一行的数据
       2.使用炸裂函数获得一个word单词列
       3.使用侧视图方式聚合统计每个单词出现的次数
       炸裂函数配合侧视图使用如下:
       格式:select 原表别名.字段名,侧视图名.字段名 from 原表 原表别名 lateral view explode(要炸开的字段)
       侧视图名 as 字段名
       """
    spark.sql("""select word,count(*) as cnt
    from words w 
    lateral view explode(split(value,' ')) t as word
    group by word order by cnt desc
    """).show()

    print('=' * 50)
    """
           DSL方式处理数据-方式一
           1.先切分每一行的数据
           2.使用炸裂函数获得一个word单词列
           3.调用API聚合统计单词个数再排序
    """
    init_df.select(
        F.explode(F.split('value', ' ')).alias('word')
    ).groupBy('word').count().orderBy('count', ascending=False).show()

    """
           DSL方式处理数据-方式二
           1.先切分每一行的数据
           2.使用炸裂函数获得一个word单词列
           3.调用API聚合统计单词个数再排序
           4.agg():推荐使用,更加通用。执行聚合操作。如果有多个聚合,聚合之间使用逗号分隔即可
    """
    init_df.select(
        F.explode(F.split('value', ' ')).alias('word')
    ).groupBy('word').agg(
        F.count('word').alias('cnt'),
        F.max('word').alias('max_word'),
        F.min('word').alias('min_word'),
    ).orderBy('cnt', ascending=False).show()

    """
    DSL方式处理数据-方式三
        withColumnRenamed(参数1,参数2):给字段重命名操作。参数1是旧字段名,参数2是新字段名
        withColumn(参数1,参数2):用来产生新列。参数1是新列的名称;参数2是新列数据的来源
    """
    init_df.withColumn(
        'word',
        F.explode(F.split('value', ' '))
    ).groupBy('word').agg(
        F.count('word').alias('cnt'),
        F.max('word').alias('max_word'),
        F.min('word').alias('min_word')
    ).orderBy('cnt', ascending=False).show()

    # 数据输出
    # 是否资源
    spark.stop()

数据写出操作

统一的输出语法:

对应的简写API格式如下,以CSV为例:
init_df.write.csv(
    path='存储路径',
    mode='模式',
    header=True,
    sep='\t',
    encoding='UTF-8'
)

输出到本地文件

常用参数说明:
    1- path:指定结果数据输出路径。支持本地文件系统和HDFS文件系统
    2- mode:当输出目录中文件已经存在的时候处理办法
        2.1- append:追加。如果文件已经存在,那么继续在该目录下产生新的文件
        2.2- overwrite:覆盖。如果文件已经存在,那么就先将已有的文件清除,再写入进去
        2.3- ignore:忽略。如果文件已经存在,那么不执行任何操作
        2.4- error:报错。如果文件已经存在,那么直接报错。会报错AnalysisException: path     
                    file:xxx already exists.
        
    3- sep:字段间的分隔符
    4- header:数据输出的时候,是否要将字段名称输出到文件的第一行。推荐设置为True
    5- encoding:文件输出的编码方式

 

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F

# 绑定指定的python解释器
"""
基于RDD转换DataFrame的方式

需求分析:

1- 将每行内容切分得到单个的单词

2- 组织DataFrame的数据结构

2.1- 有两列。一列是单词,一列是次数
"""

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    print('数据输出本地文件')
    # 创建Sparksession对象
    spark = SparkSession \
        .builder \
        .appName('api_etl_demo') \
        .master('local[*]') \
        .getOrCreate()
    # 数据输入
    init_df = spark.read.csv(
        path='file:///export/data/pyspark_projects/02_spark_sql/data/clear_data.csv',
        sep=',',
        header=True,
        inferSchema=True,
        encoding='utf8'
    )
    # 数据处理
    result = init_df.where('age>20')
    # 数据查看
    result.show()
    result.printSchema()

    # 数据输出
    # 以csv格式输出,简写API
    result.write.csv(
        path='file:///export/data/pyspark_projects/02_spark_sql/data/output',
        mode='append',
        header=True,
        sep=',',
        encoding='utf8'
    )

    # 以json方式输出到本地文件系统,复杂API
    result.write \
        .format('json') \
        .option('encoding', 'utf8') \
        .mode('overwrite') \
        .save('file:///export/data/pyspark_projects/02_spark_sql/data/output_json')

数据输出到数据库

数据库的驱动包, 一般都是一些Jar包

如何放置【mysql-connector-java-5.1.41.jar】驱动包呢?  
    1- 放置位置一: 当spark-submit提交的运行环境为Spark集群环境的时候,以及运行模式为local, 默认从 spark的jars目录下加载相关的jar包,
        目录位置: /export/server/spark/jars
    
    2- 放置位置二: 当我们使用pycharm运行代码的时候, 基于python的环境来运行的, 需要在python的环境中可以加载到此jar包
        目录位置:
            /root/anaconda3/lib/python3.8/site-packages/pyspark/jars/
    
    3- 放置位置三: 当我们提交选择的on yarn模式 需要保证此jar包在HDFS上对应目录下
        hdfs的spark的jars目录下:  hdfs://node1:8020/spark/jars
        

    请注意: 以上三个位置, 主要是用于放置一些 spark可能会经常使用的jar包, 对于一些不经常使用的jar包, 在后续spark-submit 提交运行的时候, 会有专门的处理方案:  spark-submit --jars  ....

将中文输出到了数据表中乱码
解决办法:
1- 数据库连接要加上:useUnicode=true&characterEncoding=utf-8
2- 创建数据库的时候需要指定编码character set utf8

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F

# 绑定指定的python解释器
"""
基于RDD转换DataFrame的方式

需求分析:

1- 将每行内容切分得到单个的单词

2- 组织DataFrame的数据结构

2.1- 有两列。一列是单词,一列是次数
"""

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    print('API的清洗')
    # 创建Sparksession对象
    spark = SparkSession \
        .builder \
        .appName('api_etl_demo') \
        .master('local[*]') \
        .getOrCreate()
    # 数据输入
    init_df = spark.read.csv(
        path='file:///export/data/pyspark_projects/02_spark_sql/data/clear_data.csv',
        sep=',',
        header=True,
        inferSchema=True,
        encoding='utf8'
    )
    # 数据处理
    result = init_df.where('age>20')
    # 数据查看
    result.show()
    result.printSchema()

    # 数据输出
    # 以csv格式输出,简写API
    result.write.jdbc(
        url='jdbc:mysql://node1:3306/day06?useUnicode=true&characterEncoding=utf-8',
        table='student',
        mode='append',
        properties={'user': 'root', 'password': '123456'}
    )

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/310109.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

分布式数据库原理及技术题目汇总(上)

题目汇总 选择 1.(单选题,3.0分)以下说法中不正确的是(B )。 A.HIVE中create table命令使用时,表类型可以存储为ORC。 B.HIVE中create table命令使用时,必须包含row format delimited。 C.HIVE中create table命令使用时若含r…

儿童护眼台灯什么品牌好?儿童护眼台灯品牌排行

台灯大家都不陌生,但使用它的人有多少呢,准确使用的人又有多少呢,我们就是为了照明才会去买台灯,而时间久了,你就会眼睛刺痛,那就是没有选对台灯和没有正确使用台灯,还是建议大家买具有护眼功能…

首次落地零担快运!商用车自动驾驶跑出交付加速度

即将迈入2024年,还活着的自动驾驶玩家,身上有两个显著标签:选对了细分赛道、会玩。 10月以来,Cruise宣布在美国德州奥斯汀、休斯顿、亚利桑那州凤凰城和加州旧金山全面停止所有自动驾驶出租车队运营服务,通用汽车计划…

OSS 上传的操作

OSS 上传的操作&#xff1a; 依赖包&#xff1a; <!-- 阿里云OSS --><dependency><groupId>com.aliyun.oss</groupId><artifactId>aliyun-sdk-oss</artifactId><version>3.15.1</version></dependency> 配置文…

基于Java SSM框架实现音乐推荐网站项目【项目源码+论文说明】

基于java的SSM框架实现音乐推荐网站演示 摘要 中国风音乐推介网站近年来已成为风靡全球的新兴艺术形式。国内涌现出了大批优秀、有才华的爱好者和许多经久不衰的经典作品。中国风音乐推介网站的兴起打破了音乐界格局,也突破了原有分类唱法发展中的瓶颈,为声乐艺术的发展开辟了…

为啥领导都爱说“我只看结果”?

点击下方“JavaEdge”&#xff0c;选择“设为星标” 第一时间关注技术干货&#xff01; 免责声明~ 任何文章不要过度深思&#xff01; 万事万物都经不起审视&#xff0c;因为世上没有同样的成长环境&#xff0c;也没有同样的认知水平&#xff0c;更「没有适用于所有人的解决方案…

SpringCloud 之HttpClient、HttpURLConnection、OkHttpClient切换源码

承接上文&#xff0c;之前已经分析过OpenFegin 的创建、发送请求源码了&#xff0c;接下来&#xff0c;分析下底层的HttpClient、HttpURLConnection、OkHttpClient切换从源码级别来看是如何做到的。 Spring Cloud OpenFegin&#xff08;创建、发送请求&#xff09;源码 Http…

创建mysql普通用户

一、创建mysql普通用户的原因&#xff1a; 权限控制&#xff1a;MySQL的权限系统允许您为每个用户分配特定的权限。通过创建普通用户&#xff0c;您可以根据需要为每个用户分配特定的数据库和表权限&#xff0c;而不是将所有权限授予一个全局管理员用户。这有助于提高数据库的…

浮动和定位

目录​​​​​​​ &#x1f333;浮动 &#x1f340;去浮动 &#x1f343;方法一 &#x1f343;方法二 &#x1f343;方法三 &#x1f333;定位 &#x1f340;相对定位 &#x1f340;绝对定位 &#x1f340;固定定位 &#x1f333;转义字符 浮动 浮动会脱离文档流.导…

部署vue项目的常见问题汇总--许锅锅

文章目录 vue项目的常见问题版本问题/控制台指令识别问题【node和npm的版本要对应】加载慢的问题【设置镜像即可】设置淘宝镜像cmd窗口内容 npm/cnpm install的问题 npm WARN deprecated core-js3.6.5: core-js&#xff1c;3.23.3 is no longer maintained and not recommended…

计算机毕业设计------SSH宿舍管理系统

项目介绍 本项目分为三种角色&#xff1a;系统管理员、楼宇管理员、学生&#xff1b; 系统管理员主要功能如下&#xff1a; 楼宇管理员管理、学生管理、楼宇管理、宿舍管理、学生入住登记、学生寝室调换、学生迁出登记、学生缺勤记录、修改密码、退出登录 楼宇管理员主要功能…

新接入荣耀 SDK,混淆出包,登录提示框显示不全

荣耀联运客户端 SDK 刚出来不就&#xff0c;看文档第一个对外版本也就是 2023 年 8 月&#xff0c;所以最近开始接入&#xff0c;中间也遇到了一些麻烦折腾了不少时间。 什么意思呢 正常的登录提示 UI 是这样的&#xff0c;能够完整的显示 UI 部分&#xff08;此前是通过定制…

计算机毕业设计-----SSH校园精品课程网前后台

项目介绍 本项目是很不错的一个校园精品课程网源码&#xff0c;前台和后台源码都有&#xff0c;分为管理员与学生两种角色&#xff1b; 前台功能&#xff1a;网站首页&#xff0c;校园新闻&#xff0c;课程中心&#xff0c;资源下载&#xff0c;互动交流&#xff0c;个人中心…

借助GPT理解 “ Android中 点击弹框外部 取消弹框”

在平常的开发工作中 或 阅读技术博客/书籍 时&#xff0c;难免会遇到我们不懂的知识点&#xff0c;网络上搜索的资料 需要有准确性&#xff0c;系统性&#xff0c;可实操性。 这样的资料查询很费时间且还不一定能找到&#xff0c;但是如果借助训练过的的gpt&#xff0c;就会省下…

视频转gif的在线转换怎么操作?告别繁琐,轻松搞定

视频转gif的在线转换怎么操作&#xff1f;在当今社交媒体盛行的时代&#xff0c;GIF动图已经成为了我们表达自我、分享生活的重要方式。但是&#xff0c;很多小伙伴可能还在为如何将心爱的视频片段转为GIF而烦恼。今天&#xff0c;我们就来一起学习如何将视频轻松转换为GIF的在…

09-责任链模式-C语言实现

责任链模式&#xff1a;Avoid coupling the sender of a request to its receiver by giving more than one object a chance to handle the request.Chain the receiving objects and pass the request along the chain until an object handles it.&#xff08;使多个对象都有…

基于ssm文化遗产的保护与旅游开发论文

摘 要 信息数据从传统到当代&#xff0c;是一直在变革当中&#xff0c;突如其来的互联网让传统的信息管理看到了革命性的曙光&#xff0c;因为传统信息管理从时效性&#xff0c;还是安全性&#xff0c;还是可操作性等各个方面来讲&#xff0c;遇到了互联网时代才发现能补上自古…

适合游泳的骨传导耳机,推荐四款高质量游泳耳机!

游泳是一项全身性的运动&#xff0c;对于锻炼身体和塑形都很有帮助&#xff0c;但是游泳的时候往往会因为水的阻力而感到动作笨拙&#xff0c;同时也会感到枯燥无味。而一款好的游泳耳机则能够让你在游泳的过程中享受音乐或者其他的音频内容&#xff0c;增加游泳的趣味性&#…

chrony 时间同步

一.chrony简介 chrony 的优势&#xff1a; ① 更快的同步&#xff0c;从而最大程度减少了时间和频率误差&#xff0c;对于并非全天 24 小时运行的虚拟计算机而言非常有用。 相对于NTP来说&#xff0c;chrony性能更好 NTP是网络时间协议(Network Time Protocol)&#xff0c;它…

十大开放式耳机品牌哪个好?开放式耳机详细评测攻略分享,精华篇

相信很多朋友在选购开放式耳机的时候&#xff0c;发现市场上的开放式耳机品牌琳琅满目&#xff0c;完全不清楚哪家的产品更胜一筹&#xff0c;有些人甚至跟风购入&#xff0c;因此很容易买到不合适自己的而踩雷。 其实这种耳机我很早之前就有在使用&#xff0c;我对于耳机数码类…