【pyspark速成专家】5_Spark之RDD编程3

目录

​编辑

六,共享变量

七,分区操作


六,共享变量

当spark集群在许多节点上运行一个函数时,默认情况下会把这个函数涉及到的对象在每个节点生成一个副本。

但是,有时候需要在不同节点或者节点和Driver之间共享变量。

Spark提供两种类型的共享变量,广播变量和累加器。

广播变量是不可变变量,实现在不同节点不同任务之间共享数据。

广播变量在每个机器上缓存一个只读的变量,而不是为每个task生成一个副本,可以减少数据的传输。

累加器主要是不同节点和Driver之间共享变量,只能实现计数或者累加功能。

累加器的值只有在Driver上是可读的,在节点上不可见。

#广播变量 broadcast 不可变,在所有节点可读

broads = sc.broadcast(100)

rdd = sc.parallelize(range(10))
print(rdd.map(lambda x:x+broads.value).collect())

print(broads.value)

[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
100

#累加器 只能在Driver上可读,在其它节点只能进行累加

total = sc.accumulator(0)
rdd = sc.parallelize(range(10),3)

rdd.foreach(lambda x:total.add(x))
total.value

45

# 计算数据的平均值
rdd = sc.parallelize([1.1,2.1,3.1,4.1])
total = sc.accumulator(0)
count = sc.accumulator(0)

def func(x):
    total.add(x)
    count.add(1)
    
rdd.foreach(func)

total.value/count.value

2.6

七,分区操作

分区操作包括改变分区操作,以及针对分区执行的一些转换操作。

glom:将一个分区内的数据转换为一个列表作为一行。

coalesce:shuffle可选,默认为False情况下窄依赖,不能增加分区。repartition和partitionBy调用它实现。

repartition:按随机数进行shuffle,相同key不一定在同一个分区

partitionBy:按key进行shuffle,相同key放入同一个分区

HashPartitioner:默认分区器,根据key的hash值进行分区,相同的key进入同一分区,效率较高,key不可为Array.

RangePartitioner:只在排序相关函数中使用,除相同的key进入同一分区,相邻的key也会进入同一分区,key必须可排序。

TaskContext: 获取当前分区id方法 TaskContext.get.partitionId

mapPartitions:每次处理分区内的一批数据,适合需要分批处理数据的情况,比如将数据插入某个表,每批数据只需要开启一次数据库连接,大大减少了连接开支

mapPartitionsWithIndex:类似mapPartitions,提供了分区索引,输入参数为(i,Iterator)

foreachPartition:类似foreach,但每次提供一个Partition的一批数据

glom

#glom将一个分区内的数据转换为一个列表作为一行。
a = sc.parallelize(range(10),2)
b = a.glom()
b.collect() 

[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]

coalesce

#coalesce 默认shuffle为False,不能增加分区,只能减少分区
#如果要增加分区,要设置shuffle = true
#parallelize等许多操作可以指定分区数
a = sc.parallelize(range(10),3)  
print(a.getNumPartitions())
print(a.glom().collect())

3
[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]

b = a.coalesce(2) 
print(b.glom().collect())

[[0, 1, 2], [3, 4, 5, 6, 7, 8, 9]]

repartition

#repartition按随机数进行shuffle,相同key不一定在一个分区,可以增加分区
#repartition实际上调用coalesce实现,设置了shuffle = True
a = sc.parallelize(range(10),3)  
c = a.repartition(4) 
print(c.glom().collect())

[[6, 7, 8, 9], [3, 4, 5], [], [0, 1, 2]]

#repartition按随机数进行shuffle,相同key不一定在一个分区
a = sc.parallelize([("a",1),("a",1),("a",2),("c",3)])  
c = a.repartition(2)
print(c.glom().collect())

[[('a', 1), ('a', 2), ('c', 3)], [('a', 1)]]

partitionBy

#partitionBy按key进行shuffle,相同key一定在一个分区
a = sc.parallelize([("a",1),("a",1),("a",2),("c",3)])  
c = a.partitionBy(2)
print(c.glom().collect())

mapPartitions

#mapPartitions可以对每个分区分别执行操作
#每次处理分区内的一批数据,适合需要按批处理数据的情况
#例如将数据写入数据库时,可以极大的减少连接次数。
#mapPartitions的输入分区内数据组成的Iterator,其输出也需要是一个Iterator
#以下例子查看每个分区内的数据,相当于用mapPartitions实现了glom的功能。
a = sc.parallelize(range(10),2)
a.mapPartitions(lambda it:iter([list(it)])).collect()

[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]

mapPartitionsWithIndex

#mapPartitionsWithIndex可以获取两个参数
#即分区id和每个分区内的数据组成的Iterator
a = sc.parallelize(range(11),2)

def func(pid,it):
    s = sum(it)
    return(iter([str(pid) + "|" + str(s)]))
    [str(pid) + "|" + str]
b = a.mapPartitionsWithIndex(func)
b.collect()

#利用TaskContext可以获取当前每个元素的分区
from pyspark.taskcontext import TaskContext
a = sc.parallelize(range(5),3)
c = a.map(lambda x:(TaskContext.get().partitionId(),x))
c.collect()

[(0, 0), (1, 1), (1, 2), (2, 3), (2, 4)]

foreachPartitions

#foreachPartition对每个分区分别执行操作
#范例:求每个分区内最大值的和
total = sc.accumulator(0.0)

a = sc.parallelize(range(1,101),3)

def func(it):
    total.add(max(it))
    
a.foreachPartition(func)
total.value

199.0

aggregate

#aggregate是一个Action操作
#aggregate比较复杂,先对每个分区执行一个函数,再对每个分区结果执行一个合并函数。
#例子:求元素之和以及元素个数
#三个参数,第一个参数为初始值,第二个为分区执行函数,第三个为结果合并执行函数。
rdd = sc.parallelize(range(1,21),3)
def inner_func(t,x):
    return((t[0]+x,t[1]+1))

def outer_func(p,q):
    return((p[0]+q[0],p[1]+q[1]))

rdd.aggregate((0,0),inner_func,outer_func)

(210, 20)

aggregateByKey

#aggregateByKey的操作和aggregate类似,但是会对每个key分别进行操作
#第一个参数为初始值,第二个参数为分区内归并函数,第三个参数为分区间归并函数

a = sc.parallelize([("a",1),("b",1),("c",2),
                             ("a",2),("b",3)],3)
b = a.aggregateByKey(0,lambda x,y:max(x,y),
                            lambda x,y:max(x,y))
b.collect()

[('b', 3), ('a', 2), ('c', 2)]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/642603.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

电商公司需不需要建数字档案室呢

建立数字档案室对于电商公司来说是非常有必要的。以下是一些原因: 1. 空间节约:数字档案室可以将纸质文件转化为电子文件,节省了大量存储空间。这对于电商公司来说尤为重要,因为他们通常会有大量的订单、客户信息和供应商合同等文…

OpenHarmony系统使用gdb调试init

前言 OpenAtom OpenHarmony(简称“OpenHarmony”)适配新的开发板时,启动流程init大概率会出现问题,其为内核直接拉起的第一个用户态进程,问题定位手段只能依赖代码走读和增加调试打印,初始化过程中系统崩溃…

封装了一个iOS中间放大的collectionView layout

效果图如下所示 原理:就是首先确定一个放大和缩小系数和原大小对应的基准位置,然后根据距离每个布局属性到视图中心的距离和基准点到中心的距离的差距/基准点到中心的距离, 计算出每个布局属性的缩放系数 下面是代码 // // LBHorizontalCe…

数据库--数据库基础(一)

目录 第一章 绪论 一.数据库的基本概念 1. 数据库的4个基本概念 2、数据库系统的特点 二.数据库和文件 三.数据模型 1.概念模型 2.逻辑模型(物理模型) 2.1关系模型 四.数据库系统的三级模式结构: 五数据库的二级映像功能与数据独立性 第二章 关系数据库…

web学习笔记(五十六)

目录 1.绑定类名和style 1.1 绑定类名 1.1.1 绑定单个类名 1.1.2 绑定多个类名 1.2 style相关知识 2. vue的响应式原理 3. v-once 4.本地搭建Vue单页应用 4.1 安装Vue脚手架 4.2 安装对应的包文件 4.3 运行项目 1.绑定类名和style 1.1 绑定类名 1.1.1 绑定单个类名…

【Unitydemo制作】音游制作—模式玩法的实现

👨‍💻个人主页:元宇宙-秩沅 👨‍💻 hallo 欢迎 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍💻 本文由 秩沅 原创 👨‍💻 收录于专栏:就业…

Redis(十三) 事务

文章目录 前言事务的特性Redis事务的执行原理Redis中使用事务WATCH UNWATCH实现乐观锁 前言 前面我们学习 MySQL 的时候,肯定也学习了事务。事务是什么?给大家举个例子:假如我给朋友微信转账,我给他转了 100 块钱,当我…

5.18 TCP机械臂模拟

#include <netinet/tcp.h>//包含TCP选项的头文件 #include <arpa/inet.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <linux/input.h>//读取输入事件 #include <sys/types.h> #include <sys/stat.h&…

C++vector的简单模拟实现

文章目录 目录 文章目录 前言 一、vector使用时的注意事项 1.typedef的类型 2.vector不是string 3.vector 4.算法sort 二、vector的实现 1.通过源码进行猜测vector的结构 2.初步vector的构建 2.1 成员变量 2.2成员函数 2.2.1尾插和扩容 2.2.2operator[] 2.2.3 迭代器 2…

Linux基础(五):常用基本命令

从本节开始&#xff0c;我们正式进入Linux的学习&#xff0c;通过前面的了解&#xff0c;我们知道我们要以命令的形式使用操作系统&#xff08;使用操作系统提供的各类命令&#xff0c;以获得字符反馈的形式去使用操作系统。&#xff09;&#xff0c;因此&#xff0c;我们是很有…

纯代码如何实现WordPress搜索包含评论内容?

WordPress自带的搜索默认情况下是不包含评论内容的&#xff0c;不过有些WordPress网站评论内容比较多&#xff0c;而且也比较有用&#xff0c;所以想要让用户在搜索时也能够同时搜索到评论内容&#xff0c;那么应该怎么做呢&#xff1f; 网络上很多教程都是推荐安装SearchWP插…

shelll 正则表达式

sort sort命令对行内容进行排序 sort语法&#xff1a; 1.sort &#xff08;选项&#xff09; 参数 2.cat file | sort 选项 选项&#xff1a; -n 按照数字进行排序 -r 反向排序 -k 指定排序 -f 忽略大小写 会将小写字母转化成大写字母来比较 -b 忽略每行前面的空格 .........…

CentOS上升级glibc2.17至glibc2.31

glibc是Linux系统中的重要组件之一。在CentOS中&#xff0c;glibc通常是作为系统的默认C标准库使用的&#xff0c;因为它是许多软件的基础库。在CentOS中&#xff0c;glibc的版本通常与CentOS版本一起发布。因为CentOS通常会优先选择稳定性而不是最新性&#xff0c;所以CentOS使…

【linux】docker下nextcloud安装人脸识别插件

一、插件源码地址&#xff1a; GitCode - 开发者的代码家园 二、插件官网地址&#xff1a; Releases - Face Recognition - Apps - App Store - Nextcloud 三、插件安装教程&#xff1a; 1、查看本地nextcloud版本号 http://ipAddress:8080/settings/admin/overview 2、找…

基于51单片机的火灾检测设计(仿真+程序+原理图+论文报告+讲解视频)

基于51单片机的火灾检测设计 基于51单片机的火灾检测设计&#xff08;仿真程序原理图论文报告&#xff09;功能要求仿真图&#xff1a;原理图&#xff1a;源程序&#xff1a;论文/报告&#xff1a;资料清单&#xff1a; 基于51单片机的火灾检测设计&#xff08;仿真程序原理图论…

论文阅读--CLIPasso

让计算机把真实图片抽象成简笔画&#xff0c;这个任务很有挑战性&#xff0c;需要模型捕获最本质的特征 以往的工作是找了素描的数据集&#xff0c;而且抽象程度不够高&#xff0c;笔画是固定好的&#xff0c;素描对象的种类不多&#xff0c;使得最后模型的效果十分受限 之所以…

在ubuntu中关于驱动得问题:如何将nouveau驱动程序加入黑名单和安装NVIDIA显卡驱动

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 一、nouveau驱动程序加入黑名单二、安装NVIDIA显卡驱动 一、nouveau驱动程序加入黑名单 (1) 打开黑名单列表文件 终端输入&#xff1a; sudo gedit /etc/modprobe…

共享单车(八):数据库

实现后台数据库访问模块的框架&#xff0c;能够实现验证请求并响应&#xff08;支持数据库操作&#xff09;。 数据库设计 class SqlTabel //负责数据库表的创建 { public:SqlTabel(std::shared_ptr<MysqlConnection> sqlconn) :sqlconn_(sqlconn) {}bool CreateUserI…

Mysql之基本架构

1.Mysql简介 mysql是一种关系型数据库&#xff0c;由表结构来存储数据与数据之间的关系&#xff0c;同时为sql(结构化查询语句)来进行数据操作。 sql语句进行操作又分为几个重要的操作类型 DQL: Data Query Language 数据查询语句 DML: Data Manipulation Language 添加、删…

Windows下安装部署rocketmq

1.1.下载安装rocketmq 下载 | RocketMQ 下载完后解压到自定义目录&#xff0c;MQ解压路径\rocketmq-all-4.6.0-bin-release&#xff1b;&#xff08;Windows10系统解压路径不要出现空格&#xff09; 1.2.配置环境变量 配置环境变量&#xff0c;变量名&#xff1a;ROCKETM…