spark的学习-03

RDD的创建的两种方式:

方式一:并行化一个已存在的集合

方法:parallelize 并行的意思

将一个集合转换为RDD

方式二:读取外部共享存储系统

方法:textFile、wholeTextFile、newAPIHadoopRDD等

读取外部存储系统的数据转换为RDD

RDD的五大特征:

  1. 每个RDD 都由一系列的分区构成

  2. RDD 的转换操作本质上就是对RDD所有分区的并行转换

  3. 每个RDD 都会保存与其他RDD之间的依赖关系:血链机制或者血脉机制

  4. 如果是二元组【KV】类型的RDD,在Shuffle过程中可以自定义分区器,默认是hash分区(hash值取模进行分区)

  5. 可选的,Spark程序运行时,Task的分配可以指定实现本地优先计算:最优计算位置

 
 

RDD的五大特性分别是什么?

a. 每个RDD都可以由多个分区构成

b. 对RDD转换处理本质上是对RDD所有分区的并行转换处理

c. 对每个RDD都会保留与其他RDD之间的依赖关系:血脉机制

d. 可选的,对于KV结构的RDD,在经过Shuffle时,可以干预分区规则,默认是Hash分区

e. 可选的,Spark分配Task时会优先本地计算,尽量将Task分配到数据所在的节点

转换算子:

map:
# map:
list1 = [1, 2, 3, 4, 5]
# 目标是求出集合中各个元素的 3 次方
listRdd = sc.parallelize(list1)
mapRdd = listRdd.map(lambda x: math.pow(x, 3))
mapRdd.foreach(lambda x: print(x))  # foreach是触发算子

flatMap:
# flatMap:
# 目标是根据/切割,得到每个歌名
fileRdd = sc.textFile("../../datas/wordcount/songs.txt")
flatMapRdd = fileRdd.flatMap(lambda line: line.split("/"))
flatMapRdd.foreach(lambda x:print(x))

filter:

过滤算子

# filter :
# 目标是过滤掉不符合的文本
fileRdd2 = sc.textFile("../../datas/wordcount/songs2.txt")
filterRdd = fileRdd2.filter(lambda line: re.split("\s",line)[2] != '-1' and len(re.split("\s",line)) == 4)
filterRdd.foreach(lambda x: print(x))

union:

联合

list2 = [1,2,3,4,5,6,7,8]
list3 = [6,7,8,9,10]
rdd1 = sc.parallelize(list2)
rdd2 = sc.parallelize(list3)

rdd3 = rdd1.union(rdd2)

rdd3.foreach(lambda x: print(x))   # 1 2 3 4 5 6 7 8 6 7 8 9 10
distinct:

去重

rdd4 = rdd3.distinct()
rdd4.foreach(lambda x: print(x))  # 1 2 3 4 5 6 7 8 9 10
分组聚合算子 groupByKey 以及 reduceByKey:

groupByKey只根据key进行分组,但不聚合 reduceByKey根据key进行分组,且进行聚合 (必须进行shuffle,可以指定分区的数量和规则) groupByKey转换算子,只对 KV键值对的RDD 起作用

rdd5 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
rdd6 = rdd5.groupByKey()  # ("word",List[10,5])
rdd6.foreach(lambda x: print(x[0], *x[1]))   

rdd7 = rdd5.reduceByKey(lambda total, num: total + num)
rdd7.foreach(print)

重分区算子:repartition、coalesce :

二者都可以将分区变大变小

repartition必须经过shuffle 因为底层代码中 shuffle = True,可以将分区变小或者变大

而coalesce 可以选择经过不经过shuffle,默认情况下不经过,在默认情况下,只能将分区变小,不能将分区变大。假如shuffle=True,也可以将分区变大。

使用repartition更改分区的数量:
list01 = [1, 5, 2, 6, 9, 10, 4, 3, 8, 7]
# 没有指定分区,走默认,默认分区个数,因为是local 模式,所以跟核数有关,所以 分区数为2
rdd = sc.parallelize(list01)
print(rdd.getNumPartitions())  # getNumPartitions() 获取分区的数量 返回值不是RDD,所以不是转换算子,是触发算子   # 2
# 使用 repartition 将 分区数量改为4 或 1
changeRdd = rdd.repartition(4)  # 经过shuffle过程,将分区数量更改为4
print(changeRdd.getNumPartitions())  # 现在就将rdd 的分区更改为4了   # 4
# 还可以更改为1 (缩小分区)
print(rdd.repartition(1).getNumPartitions())   # 1
使用coalesce 更改分区的数量:

将小分区变为大分区,必须进行shuffle过程 在coalesce的中默认shuffle=Flase,所以我们需要手动更改为True

changeRdd2 = rdd.coalesce(4,shuffle=True)  #
print(changeRdd2.getNumPartitions())  # 4

将大分区改为小分区,在coalesce中可以不进行shuffle过程,所以不需要改为True

print(rdd.coalesce(1).getNumPartitions())  # 1 
排序算子:sortBy、sortByKey:
 fileRdd = sc.textFile("../../datas/c.txt")
    #fileRdd.sortBy(lambda line:line.split(",")[1],ascending=False).foreach(print)

    # sortByKey  对KV类型的RDD进行排序
    rdd5 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
    #rdd5.sortByKey(ascending=False).foreach(print)

    # 假如你想根据value排序,怎么办?
    rdd5.sortBy(lambda tuple:tuple[1],ascending=False).foreach(print)  
    # ascending=False降序排序

触发算子:

常见的触发算子:count、foreach、take
# 较为常见的触发算子
# count  foreach  saveAsTextFile
# count
list1 = [1,2,3,4,5,6,7,8,9]
rdd1 = sc.parallelize(list1,2)
print(rdd1.count())  #9

rdd1.foreach(lambda x: print(x))

print(rdd1.take(3))  # [1 2 3]

其他触发算子:

first、take:
# first: 返回RDD集合中的第一个元素
print(rdd1.first())  # 1 

print(rdd1.take(3))  # [1 2 3]
collect:

我们在上面sortBy案例中写到了collect,如果不collect就直接打印结果的话,出来的是各个分区中排序的结果,并不是全局的(sortBy是全局排序的,只不过我们之前有分区,只在分区中排序)

想看到全局的排序,可以直接将分区数量更改为1,或者直接使用collect收集

reduce:

我们在上面的案例中也使用到了reduceByKey转换算子,这个和上面的差不多,只不过reduce只进行聚合,而不需要根据key分组什么的,因为就没有key

print(rdd1.reduce(lambda sum, num:sum + num))  # 45
top 和 takeOrdered:

先对RDD中的所有元素进行升序排序,top返回最大的几个元素、takeOrdered返回最小的几个元素

都不经过shuffle,将所有元素放入Driver内存中排序,性能更好,只能适合处理小数据量

list2 = [2,1,5,79,435,33,576]
rdd2 = sc.parallelize(list2)
print(rdd2.top(3))  # [576, 435, 79]
# takeOrdered 也是一个触发算子,返回排序之后的最小的几个值
print(rdd2.takeOrdered(3))  # [1, 2, 5]

join 方面的算子:

join leftOuterJoin rightOuterJoin fullOuterJoin 都为转换算子

join的过程,必然引发相同key值的数据汇总在一起,引发shuffle 操作

join:
rdd_singer_age = sc.parallelize([("周杰伦", 43), ("陈奕迅", 47), ("蔡依林", 41), ("林子祥", 74), ("陈升", 63)],
                                numSlices=2)
rdd_singer_music = sc.parallelize(
    [("周杰伦", "青花瓷"), ("陈奕迅", "孤勇者"), ("蔡依林", "日不落"), ("林子祥", "男儿当自强"),
     ("动力火车", "当")], numSlices=2)

# join
joinRdd = rdd_singer_age.join(rdd_singer_music).foreach(lambda x : print(x))   
# ('周杰伦', (43, '青花瓷'))
# ('蔡依林', (41, '日不落'))
# ('陈奕迅', (47, '孤勇者'))
# ('林子祥', (74, '男儿当自强'))
leftOuterJoin:

和sql中的leftjoin一样,左边的值全出,右边的值有的就显示,没有就显示null

rightOuterJoin 同理

leftJoinRdd = rdd_singer_age.leftOuterJoin(rdd_singer_music).foreach(lambda x:print(x))
#('周杰伦', (43, '青花瓷'))
#('蔡依林', (41, '日不落'))
#('陈升', (63, None))
#('陈奕迅', (47, '孤勇者'))
#('林子祥', (74, '男儿当自强'))
fullOuterJoin:
fullJoinRdd = rdd_singer_age.fullOuterJoin(rdd_singer_music).foreach(lambda x: print(x)) 
# ('动力火车', (None, '当'))
# ('周杰伦', (43, '青花瓷'))
# ('蔡依林', (41, '日不落'))
# ('陈升', (63, None))
# ('陈奕迅', (47, '孤勇者'))
# ('林子祥', (74, '男儿当自强'))

分区算子 mapPartitions -- 转换算子 foreachParition -- 触发算子

mapPartitions:
input_rdd = sc.parallelize((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), numSlices=2)
# 使用mapPartitions:对每个分区进行处理
def map_partition(part):
    rs = [i * 2 for i in part]
    return rs

# 每个分区会调用一次:将这个分区的数据放入内存,性能比map更好,优化型算子,注意更容易出现内存溢出
map_part_rdd = input_rdd.mapPartitions(lambda part: map_partition(part))

foreachParition:

- 优点:性能快、节省外部连接资源 - 缺点:如果单个分区的数据量较大,容易出现内存溢出 - 场景: -数据量不是特别大,需要提高性能【将整个分区的数据放入内存】 -需要构建外部资源时【基于每个分区构建一份资源】

def save_part_to_mysql(part):
    # 构建MySQL连接
    for i in part:
        # 利用MySQL连接将结果写入MySQL
        print(i)

# 将每个分区的数据直接写入MySQL,一个分区就构建一个连接
map_part_rdd.foreachPartition(lambda part: save_part_to_mysql(part))

Spark的容错机制:(重点)

1、RDD容错机制:persist持久化机制

其中有三个算子: cache 、 persist 、 unpersist

cache:
# 功能:将RDD缓存在内存中
# 本质其实底层还是调用的 persist ,但是只缓存在内存中,如果内存不足的话,缓存就会失败
语法:cache()
persist :

  与cache不同的是,persist 可以自己指定缓存的方式(级别)

# 将RDD缓存在磁盘中
StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
StorageLevel.DISK_ONLY_3 = StorageLevel(True, False, False, False, 3)

# 将RDD缓存在内存中
StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False)
StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)

# 将RDD优先缓存在内存中,如果内存不足,就缓存在磁盘中
StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False)
StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)

# 使用堆外内存
StorageLevel.OFF_HEAP = StorageLevel(True, True, True, False, 1)

# 使用序列化
StorageLevel.MEMORY_AND_DISK_DESER = StorageLevel(True, True, False, True)

常用的有

MEMORY_AND_DISK_2   -- 先缓存内存,如果内存不足就缓存在磁盘
MEMORY_AND_DISK_DESER   -- 使用序列化
unpersist :

功能就是将缓存释放出去

  • unpersist(blocking=True):等释放完再继续下一步 (默认为False)

  • 场景:明确RDD已经不再使用,后续还有很多的代码需要执行,将RDD的数据从缓存中释放,避免占用资源

  • 注意:如果不释放,这个Spark程序结束,也会释放这个程序中的所有内存

总体代码演示:

 # step3: 保存结果
  # 对RDD进行缓存
  rs_rdd.cache() # 只缓存在内存中
  rs_rdd.persist(StorageLevel.MEMORY_AND_DISK)
    # 打印结果:构建RDD
  rs_rdd.foreach(lambda x: print(x))
    # 打印第一行:重新构建一遍
  print(rs_rdd.first())
    # 统计行数:重新构建一遍
    print(rs_rdd.count())

    # todo:3-关闭SparkContext
    time.sleep(10000)
  # 如果这个RDD明确后续代码中不会再被使用,一定要释放缓存
    rs_rdd.unpersist(blocking=True)

# unpersist(blocking=True):等RDD释放完再继续下一步
# blocking = True:阻塞

2、checkPoint 检查点

checkpoint需要在触发算子的前面设置检查点,之后设置的话可能会出现只产生文件夹,而不产生结果的情况

# 创建sc对象
conf = SparkConf().setMaster("local[2]").setAppName("第一个pysparkDemo")
sc = SparkContext(conf=conf)

fileRdd = sc.textFile("../../datas/wordcount/sogou.tsv")
mapRdd = (fileRdd.filter(lambda line: len(re.split("\s+", line)) == 6) \
          .map(lambda line: (re.split("\s+", line)[0], re.split("\s+", line)[1], re.split("\s+", line)[2][1:-1])))

sc.setCheckpointDir("../datas/chk/chk1")

mapRdd.checkpoint()
# checkpoint需要在触发算子的前面设置检查点,之后设置的话可能会出现只产生文件夹,而不产生结果的情况

print(mapRdd.count())

time.sleep(100)

sc.stop()

容错机制面试题:

RDD的cache、persist持久化机制和checkpoint检查点机制有什么区别?

  • 存储位置

    • persist:将RDD缓存在内存或者磁盘中

    • chk:将RDD的数据存储在文件系统磁盘中

  • 生命周期

    • persist:当代码中遇到了unpersist或者程序结束,缓存就会被自动清理

    • chk:检查点的数据是不会被自动清理的,只能手动删除

  • 存储内容

    • persist:会保留RDD的血脉关系,如果缓存丢失,可以通过血脉进行恢复

    • chk:会斩断RDD的血脉关系,不会保留RDD的血脉关系的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/910457.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python练习10

Python日常练习 题目: 编写程序,输出如下所示图案。 * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 要求: 使用for循环的方式完成 --------------------------------------------------------- 注意: …

【云原生开发】K8S多集群资源管理平台架构设计

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,…

「iOS」——知乎日报一二周总结

知乎日报仿写 前言效果Manager封装网络请求线程冲突问题下拉刷新添加网络请求的图片通过时间戳和日期格式化获取时间 总结 前言 前两周内容的仿写,主要完成了首页的仿写,进度稍慢。 效果 Manager封装网络请求 知乎日报的仿写需要频繁的申请网络请求&am…

【ArcGISPro】汉化嵌入的NoteBook界面

效果展示 下载Notebook汉化包 pip install jupyterlab-language-pack-zh-CN 添加环境变量 LANG zh_CN.UTF8 汉化结果

长亭那个检测能力超强的 WAF,出免费版啦

告诉你们一个震撼人心的消息,那个检测能力超强的 WAF——长亭雷池,他推出免费社区版啦,体验地址见文末。 八年前我刚从学校毕业,在腾讯做安全研究,看到宇森在 BlackHat 上演讲的议题 《永别了,SQL 注入》 …

了解bootstrap改造asp.net core MVC的样式模板

我们都知道,在使用默认的asp.net core MVC模板建立项目的时候,里面的样式是已经事先被写好了的。一般来说都在css目录下的site.css和bootstrap.css及下面的bootstrap.min.css中。我们打开bootstrap这些样式文件,里面有大量的样式类的定义&…

scratch计算台阶 2024年9月scratch四级真题 中国电子学会 图形化编程 scratch四级真题和答案解析

目录 scratch计算台阶 一、题目要求 1、准备工作 2、功能实现 二、案例分析 1、角色分析 2、背景分析 3、前期准备 三、解题思路 1、思路分析 2、详细过程 四、程序编写 五、考点分析 六、推荐资料 1、入门基础 2、蓝桥杯比赛 3、考级资料 4、视频课程 5、p…

数据结构之二叉树前序,中序,后序习题分析(递归图)

1.比较相同的树 二叉树不能轻易用断言,因为树一定有空 2.找结点值 3.单值二叉树 4.对称二叉树 5.前序遍历

mysql死锁查看和解决

文章目录 一、MySQL死锁排查1、查看正在进行中的事务2、查看正在锁的事务3、查看等待锁的事务4、查询是否锁表5、查看最近死锁的日志6、杀死死锁 本文是基于Mysql8.0进行讲解。 本文只讲解如何查询数据库中是否有死锁及死锁的解决,若想了解更多关于死锁的信息&…

【STM32笔记】定时器(TIM1)无法工作

目录 项目场景: 问题描述 原因分析: 解决方案: 项目场景: 编译环境:keil uverison5 初次学习stm32(stm32f103xx)控制舵机(mg90s)时,舵机不转动&#xf…

Spark 程序开发与提交:本地与集群模式全解析

Spark 的介绍与搭建:从理论到实践-CSDN博客 Spark 的Standalone集群环境安装与测试-CSDN博客 PySpark 本地开发环境搭建与实践-CSDN博客 目录 一、本地开发与远程提交测试 (一)问题背景 (二)解决方案 集群环境准…

利用亚马逊AWS IoT核心和MQTT进行数据采集的综合指南

论文标题:A Comprehensive Guide on Data Acquisition Utilizing Amazon AWS IOT Core and MQTT 中文标题:利用亚马逊AWS IoT核心和MQTT进行数据采集的综合指南 作者信息: Tanishq. I. KohliPradip R. Selokar 两位作者均来自印度那格浦尔…

鸿萌数据迁移服务: 企业服务器整机在线热迁移, 实现不停机业务转移

天津鸿萌科贸发展有限公司从事数据安全服务二十余年,致力于为各领域客户提供专业的数据存储、数据恢复、数据备份、数据迁移等解决方案与服务,并针对企业面临的数据安全风险,提供专业的相关数据安全培训。 鸿萌数据迁移业务为众多企业顺利高效…

软件工程笔记一

目录 软件的概念、特性和分类 软件与程序 软件的特性 软件的分类 软件危机与软件工程 软件危机 如何摆脱软件危机? 软件工程概念的提出 什么是软件工程? 软件工程的若干定义 系统工程的目标 软件工程的基本原理 软件工程的目标 软件的质量特性 软件生存…

使用VS Code 安装VUE.js开发环境的搭建并创建第一个项目

初步掌握VUE.js开发环境的搭建并创建第一个项目的操作方法和实验步骤 题目 安装Visual Studio Code。安装VS Code汉化插件。安装Vue官方支持插件。使用VS Code运行第一个HTML页面。安装Node.js并验证其版本。验证npm版本。配置npm的下载镜像源。配置Yarn的下载镜像源。使用Vi…

Java | Leetcode Java题解之第541题反转字符串II

题目&#xff1a; 题解&#xff1a; class Solution {public String reverseStr(String s, int k) {int n s.length();char[] arr s.toCharArray();for (int i 0; i < n; i 2 * k) {reverse(arr, i, Math.min(i k, n) - 1);}return new String(arr);}public void reve…

【物联网技术】ESP8266 WIFI模块STA、AP、STA+AP、TCP/UDP透传工作模式介绍与AT指令介绍

前言:本文对ESP8266 WIFI模块STA、AP、STA+AP、TCP/UDP透传工作模式进行介绍;以及AT指令介绍,包括基础AT指令,WIFI功能AT指令、TCP/IP相关AT指令、常用AT指令实例进行介绍。 ESP8266 WIFI模块的接线及固件烧写可参考我的这篇博客:正点原子ATK-ESP8266 WIFI模块接线及固件…

【大数据学习 | kafka】kafka的数据存储结构

以上是kafka的数据的存储方式。 这些数据可以在服务器集群上对应的文件夹中查看到。 [hexuanhadoop106 __consumer_offsets-0]$ ll 总用量 8 -rw-rw-r--. 1 hexuan hexuan 10485760 10月 28 22:21 00000000000000000000.index -rw-rw-r--. 1 hexuan hexuan 0 10月 28 …

软件测试面试题——移动端

一、常用的adb命令有哪些&#xff1f; 命令含义adb devices展示当前电脑连接的设备&#xff0c;如果电脑上有多个手机&#xff0c;需要adb -s指定对应设备adb install xxx.apk直接安装xxx.apk到手机中&#xff0c;注意&#xff1a;必须打开手机设置里的USB安装adb install -r …

软考教材重点内容 信息安全工程师 第1章 网络信息安全概述

第 1 章 网络信息安全概述 1.1.1 网络信息安全相关概念 狭义上的网络信息安全特指网络信息系统的各组成要素符合安全属性的要求&#xff0c;即机密性、完整性、可用性、抗抵赖性、可控性。 广义上的网络信息安全是涉及国家安全、城市安全、经济安全、社会安全、生产安全、人身安…