Spark Core------算子介绍

RDD基本介绍

什么是RDD

RDD:英文全称Resilient Distributed Dataset,叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。

  • Resilient弹性:RDD的数据可以存储在内存或者磁盘当中,RDD的数据可以分区
  • Distributed分布式:RDD的数据可以分布式存储,可以进行并行计算
  • Dataset数据集:一个用于存放数据的集合

RDD的五大特征

1、(必须的)RDD是由一系列分区组成的
2、(必须的)对RDD做计算,相当于对RDD的每个分区做计算
3、(必须的)RDD之间存在着依赖关系,宽依赖和窄依赖
4、(可选的)对于KV类型的RDD,我们可以进行自定义分区方案
5、(可选的)移动数据不如移动计算,让计算程序离数据越近越好

RDD的五大特点

1、分区:RDD逻辑上是分区的,仅仅是定义分区的规则,并不是直接对数据进行分区操作,因为RDD本身不存储数据。
2、只读:RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。
3、依赖:RDD之间存在着依赖关系,宽依赖和窄依赖
4、缓存:如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据
5、checkpoint:与缓存类似的,都是可以将中间某一个RDD的结果保存起来,只不过checkpoint支持持久化保存

如何构建RDD

构建RDD对象的方式主要有两种:

1、通过 textFile(data): 通过读取外部文件的方式来初始化RDD对象,实际工作中经常使用。
2、通过 parallelize(data): 通过自定义列表的方式初始化RDD对象。(一般用于测试)

并行化本地集合方式

from pyspark import SparkConf, SparkContext
import os

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("并行化本地集合创建RDD")
    # 1- 创建SparkContext对象
    conf = SparkConf().setAppName('parallelize_rdd').setMaster('local[1]')
    sc = SparkContext(conf=conf)

    # 2- 数据输入
    # 并行化本地集合得到RDD
    init_rdd = sc.parallelize([1,2,3,4,5], numSlices=6)

    # 3- 数据处理
    # 4- 数据输出
    # 获取分区数
    print(init_rdd.getNumPartitions())
    # 获取具体分区内容
    print(init_rdd.glom().collect())
    # 5- 释放资源
    sc.stop()

读取外部数据源方式

from pyspark import SparkConf, SparkContext
import os

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("读取文件创建RDD")
    # 1- 创建SparkContext对象
    conf = SparkConf().setAppName('textfile_rdd').setMaster('local[1]')
    sc = SparkContext(conf=conf)
    # 2- 数据输入
    # 读取文件得到RDD
    init_rdd = sc.textFile("file:///export/data/gz16_pyspark/01_spark_core/data/content.txt",minPartitions=4)
    # 3- 数据处理
    # 4- 数据输出
    # 获取分区数
    print(init_rdd.getNumPartitions())
    # 获取具体分区内容
    print(init_rdd.glom().collect())
    # 5- 释放资源
    sc.stop()

处理小文件的操作

常规处理小文件的办法:
1- 大数据框架提供的现有的工具或者命令
	1.1- hadoop fs -getmerge /input/small_files/*.txt /output/merged_file.txt
	1.2- hadoop archive -archiveName myhar.har -p /small_files /big_files
2- 可以通过编写自定义的代码,将小文件读取进来,在代码中输出的时候,输出形成大的文件
wholeTextFiles: 读取小文件。
	1-支持本地文件系统和HDFS文件系统。参数minPartitions指定最小的分区数。
	2-通过该方式读取文件,会尽可能使用少的分区数,可能会将多个小文件的数据放到同一个分区中进行处理。
    3-一个文件要完整的存放在一个元组中,也就是不能将一个文件分成多个进行读取。文件是不可分割的。
    4-RDD分区数量既受到minPartitions参数的影响,同时受到小文件个数的影响

RDD分区数量如何确定

在这里插入图片描述

1- RDD的分区数量,一般设置为机器CPU核数的2-3倍。为了充分利用服务器的硬件资源

2- RDD的分区数据量受到多个因素的影响,例如:机器CPU的核数、调用的算子、算子中参数的设置、集群的类型等。RDD具体有多少个分区,直接通过getNumPartitions查看

3- 当初始化SparkContext对象的时候,其实就确定了一个参数spark.default.parallelism,默认为CPU的核数。如果是本地集群,就取决于local[num]中设置的数字大小;如果是集群,默认至少有2个分区

4- 通过parallelize来构建RDD,如果没有指定分区数,默认就取spark.default.parallelism参数值;如果指定了分区数,也就是numSlices参数,那么numSlices的优先级会更高一些,最终RDD的分区数取该参数的值。

5- 通过textFile来构建RDD
	5.1- 首先确认defaultMinPartition参数的值。该参数的值,如果没有指定textFile的minPartition参数,那么就根据公式min(spark.default.parallelism,2);如果有指定textFile的minPartition参数,那么就取设置的值
	5.2- 再根据读取文件所在的文件系统的不同,来决定最终RDD的分区数:
		5.2.1- 本地文件系统: RDD分区数 = max(本地文件分片数,defaultMinPartition)
		5.2.2- HDFS文件系统: RDD分区数 = max(文件block块的数量,defaultMinPartition)

RDD相关算子

RDD算子: 指的是RDD对象中提供了非常多的具有特殊功能的函数, 我们将这些函数称为算子(函数/方法/API)
相关的算子的官方文档: https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.html#rdd-apis

RDD算子的分类

整个RDD算子, 共分为两大类:

Transformation(转换算子): 
	返回值: 是一个新的RDD
	特点: 转换算子只是定义数据的处理规则,并不会立即执行,是lazy(惰性)的。需要由Action算子触发
	
Action(动作算子):
	返回值: 要么没有返回值None,或者返回非RDD类型的数据
	特点: 动作算子都是立即执行。执行的时候,会将它上游的其他算子一同触发执行

相关转换算子:
在这里插入图片描述
相关的动作算子:
在这里插入图片描述

RDD的转换算子

(单)值类型算子

  • map算子:
    • 格式:rdd.map(fn)
      说明: 主要根据传入的函数,对数据进行一对一的转换操作,传入一行,返回一行
输入: init_rdd = sc.parallelize([0,1,2,3,4,5,6,7,8,9])
需求: 数字加一后返回
代码: init_rdd.map(lambda num:num+1).collect()
结果: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  • groupBy算子:
    • 格式: groupBy(fn)
    • 说明: 根据用户传入的自定义函数,对数据进行分组操作
输入: init_rdd = sc.parallelize([0,1,2,3,4,5,6,7,8,9])
需求: 将数据分成奇数和偶数
代码: init_rdd.groupBy(lambda num:"偶数" if num%2==0 else "奇数").mapValues(list).collect()
结果: [('偶数', [0, 2, 4, 6, 8]), ('奇数', [1, 3, 5, 7, 9])]
总结: mapValues(list)将数据类型转成List列表
  • filter算子:
    • 格式:filter(fn)
    • 说明:根据用户传入的自定义函数对数据进行过滤操作。自定义函数的返回值类型是bool类型。True表示满足过滤条件,会将数据保留下来;False会将数据丢弃掉
输入: init_rdd = sc.parallelize([0,1,2,3,4,5,6,7,8,9])
需求: 过滤掉数值<=3的数据
代码: init_rdd.filter(lambda num:num>3).collect()
结果: [4, 5, 6, 7, 8, 9]
  • flatMap算子:
    • 格式:rdd.flatMap(fn)
    • 说明:在map算子的基础上,加入一个压扁的操作, 主要适用于一行中包含多个内容的操作,实现一转多的操作
输入: init_rdd = sc.parallelize(['张三 李四 王五','赵六 周日'])
需求: 将姓名一个一个的输出
代码: init_rdd.flatMap(lambda line:line.split()).collect()
结果: ['张三', '李四', '王五', '赵六', '周日']
说明: split()默认会按照空白字符对内容进行切分处理。例如:空格、制表符、回车。还是推荐明确指定你所需要分割的符号。

双值类型算子

  • union(并集) 和intersection(交集)
    • 格式: rdd1.union(rdd2) rdd1.intersection(rdd2)
输入: rdd1 = sc.parallelize([3,3,2,6,8,0])
	 rdd2 = sc.parallelize([3,2,1,5,7])

并集: rdd1.union(rdd2).collect()
结果: [3, 3, 2, 6, 8, 0, 3, 2, 1, 5, 7]
说明: union取并集不会对重复出现的数据去重

对并集的结果进行去重: rdd1.union(rdd2).distinct().collect()
结果: [8, 0, 1, 5, 2, 6, 3, 7]
说明: distinct()是转换算子,用来对RDD中的元素进行去重处理

交集: rdd1.intersection(rdd2).collect()
结果: [2, 3]
说明: 交集会对结果数据进行去重处理

key-value数据类型算子

  • groupByKey()

    • 格式: rdd.groupByKey()
    • 说明: 对键值对类型的RDD中的元素按照键key进行分组操作。只会进行分组
输入: rdd = sc.parallelize([('c01','张三'),('c02','李四'),('c02','王五'),('c01','赵六'),('c03','田七'),('c03','周八'),('c02','李九')])
需求: 对学生按照班级分组统计
代码: rdd.groupByKey().mapValues(list).collect()
结果: [('c01', ['张三', '赵六']), ('c02', ['李四', '王五', '李九']), ('c03', ['田七', '周八'])]
  • reduceByKey()
    • 格式: rdd.reduceByKey(fn)
    • 说明: 根据key进行分组,将一个组内的value数据放置到一个列表中,对这个列表基于fn进行聚合计算操作
输入: rdd = sc.parallelize([('c01','张三'),('c02','李四'),('c02','王五'),('c01','赵六'),('c03','田七'),('c03','周八'),('c02','李九')])
需求: 统计每个班级学生人数
代码: rdd.map(lambda tup:(tup[0],1)).reduceByKey(lambda agg,curr:agg+curr).collect()
结果: [('c01', 2), ('c02', 3), ('c03', 2)]
  • sortByKey()算子:
    • 格式:rdd.sortByKey(ascending=True|False)
    • 说明: 根据key进行排序操作,默认按照key进行升序排序,如果需要降序,设置 ascending 参数的值为False
输入: rdd = sc.parallelize([(10,2),(15,3),(8,4),(7,4),(2,4),(12,4)])
需求: 根据key进行排序操作,演示升序
代码: rdd.sortByKey().collect()
结果: [(2, 4), (7, 4), (8, 4), (10, 2), (12, 4), (15, 3)]

需求: 根据key进行排序操作,演示降序
代码: rdd.sortByKey(ascending=False).collect()
结果: [(15, 3), (12, 4), (10, 2), (8, 4), (7, 4), (2, 4)]


输入: rdd = sc.parallelize([('a01',2),('A01',3),('a011',2),('a03',2),('a021',2),('a04',2)])
需求: 根据key进行排序操作,演示升序
代码: rdd.sortByKey().collect()
结果: [('A01', 3), ('a01', 2), ('a011', 2), ('a021', 2), ('a03', 2), ('a04', 2)]
总结: 对字符串类型的key进行排序的时候,按照ASCII码表进行排序。大写字母排在小写字母的前面;如果前缀一样,短的排在前面,长的排在后面。

RDD的动作算子

  • collect() 算子:

    • 格式: collect()

    • 作用: 收集各个分区的数据,将数据汇总到一个大的列表返回

  • reduce() 算子:

    • 格式: reduce(fn)
    • 作用: 根据用户传入的自定义函数,对数据进行聚合操作。该算子是Action动作算子;而reduceByKey是Transformation转换算子。
输入: rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
需求: 统计所有元素之和是多少
代码: 
def mysum(agg,curr):
	print(f"中间临时聚合结果{agg},当前遍历到的元素{curr}")
	return agg+curr

rdd.reduce(mysum)

rdd.reduce(lambda agg,curr:agg+curr)
结果: 
中间临时聚合结果6,当前遍历到的元素7
中间临时聚合结果13,当前遍历到的元素8
中间临时聚合结果21,当前遍历到的元素9
中间临时聚合结果30,当前遍历到的元素10
中间临时聚合结果1,当前遍历到的元素2
中间临时聚合结果3,当前遍历到的元素3
中间临时聚合结果6,当前遍历到的元素4
中间临时聚合结果10,当前遍历到的元素5
中间临时聚合结果15,当前遍历到的元素40
55

说明: 初始化的时候,agg,表示中间临时聚合结果,默认取列表中的第一个元素值,curr表示当前遍历到的元素,默认取列表中的第二个元素的值。
  • first() 算子:
    • 格式: rdd.first()
    • 说明: 取RDD中的第一个元素。不会对RDD中的数据排序
输入: rdd = sc.parallelize([3,1,2,4,5,6,7,8,9,10])
需求: 获取第一个元素
代码: rdd.first()
结果: 3

take() 算子

  • 格式: rdd.take(N)
  • 说明: 取RDD中的前N元素。不会对RDD中的数据排序
输入: rdd = sc.parallelize([3,1,2,4,5,6,7,8,9,10])
需求: 获取前3个元素
代码: rdd.take(3)
结果: [3, 1, 2]
说明: 返回结果是List列表。必须要传递参数N,而且不能是负数。
  • top()算子:
    • 格式: top(N,[fn])
    • 说明: 对数据集进行倒序排序操作,如果kv(键值对)类型,针对key进行排序,获取前N个元素
    • fn: 可以自定义排序,按照谁来排序
输入: rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
需求: 获取前3个元素
代码: rdd.top(3)
结果: [10, 9, 8]

输入: rdd = sc.parallelize([('c01',5),('c02',8),('c04',1),('c03',4)])
需求: 按照班级人数降序排序,取前2个
代码: rdd.top(2,key=lambda tup:tup[1])
结果: [('c02', 8), ('c01', 5)]

需求: 按照班级人数升序排序,取前2个
代码: rdd.top(2,key=lambda tup:-tup[1])
结果: [('c04', 1), ('c03', 4)]
  • count() 算子
    • 说明:统计RDD中一共有多少个元素
输入: rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
需求: 获取一共有多少个元素
代码: rdd.count()
结果: 10
  • foreach() 算子
    • 格式: foreach(fn)
    • 作用: 遍历RDD中的元素,对元素根据传入的函数进行自定义的处理
输入: rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
需求: 对数据进行遍历打印
代码: rdd.foreach(lambda num:print(num))
结果: 
6
7
8
9
10
1
2
3
4
5
说明: 
	1- foreach()算子对自定义函数不要求有返回值,另外该算子也没有返回值
	2- 因为底层是多线程运行的,因此输出结果分区间可能是乱序
	3- 该算子,一般用来对结果数据保存到数据库或者文件中

在这里插入图片描述

RDD的重要算子

基本算子

在这里插入图片描述

分区算子

分区算子:针对整个分区数据进行处理的算子。

  • mapPartitions和foreachPartition

    说明:map和foreach算子都有对应的分区算子。分区算子适用于有反复消耗资源的操作,例如:文件的打开和关闭、数据库的连接和关闭等,能够减少操作的次数。

输入: rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],3)
查看分区情况: rdd.glom().collect()
结果: [[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]]
演示: map和mapPartitions
需求: 对数字加一
================================map==================================
自定义函数: 
def my_add(num):
	print(f"传递进来的数据{num}")
	return num+1

rdd.map(my_add).collect()
结果: 
传递进来的数据4
传递进来的数据5
传递进来的数据6
传递进来的数据1
传递进来的数据2
传递进来的数据3
传递进来的数据7
传递进来的数据8
传递进来的数据9
传递进来的数据10
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]


请问: my_add被调用了几次?
回答: 10
弊端: 会导致消耗资源的操作反复多次的执行,非常消耗资源
def my_add(num):
	# 打开数据库连接
	
	# 将数据保存到数据库
	
	# 关闭数据库连接
	print(f"传递进来的数据{num}")
	return num+1


=============================mapPartitions===========================
自定义函数: 
def my_add(list):
	print("输入的参数",list)
	
	new_list = []
	
	for i in list:
		new_list.append(i + 1)
	return new_list

rdd.mapPartitions(my_add).collect()
结果: 
输入的参数 <itertools.chain object at 0x7ff21ae9d940>
输入的参数 <itertools.chain object at 0x7ff21ae9d940>
输入的参数 <itertools.chain object at 0x7ff21ae94e50>
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
演示: foreach和foreachPartition
需求: 遍历打印
==============================foreach================================
自定义函数: 
def my_print(num):
	print(f"传递进来的数据{num}")
	print(num)
	
rdd.foreach(my_print)
结果:
传递进来的数据1
1
传递进来的数据2
2
传递进来的数据3
3
传递进来的数据4
4
传递进来的数据5
5
传递进来的数据6
6
传递进来的数据7
7
传递进来的数据8
8
传递进来的数据9
9
传递进来的数据10
10

==========================foreachPartition===========================
自定义函数: 
def my_print(list):
	print(f"传递进来的数据{list}")
	
	for i in list:
		print(i)
	
rdd.foreachPartition(my_print)

结果:
传递进来的数据<itertools.chain object at 0x7ff21ae9d2b0>
1
2
3
传递进来的数据<itertools.chain object at 0x7ff21ae9d2b0>
4
5
6
传递进来的数据<itertools.chain object at 0x7ff21ae94a60>
7
8
9
10
总结: 

1- map和foreach算子都有对应的分区算子,分别是mapPartitions和foreachPartition

2- 分区算子适用于有反复消耗资源的操作,例如:文件的打开和关闭、数据库的连接和关闭等,能够减少操作的次数。

3- 如果没有反复消耗资源的操作,调用两类算子,效果一样

重分区算子

重分区算子:对RDD的分区重新进行分区操作的算子,也就是改变RDD分区数的算子。

  • repartition算子
    • 格式:repartition(num)
    • 作用:改变RDD分区数。既能够增大RDD分区数,也能够减小RDD分区数。但是都会导致发生Shuffle过程。
输入: rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],3)
查看分区情况: rdd.glom().collect()
结果: [[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]]

增大分区: rdd.repartition(5).glom().collect()
结果: [[], [1, 2, 3], [7, 8, 9, 10], [4, 5, 6], []]

减少分区: rdd.repartition(2).glom().collect()
结果: [[1, 2, 3, 7, 8, 9, 10], [4, 5, 6]]
  • coalesce算子
    • 格式:coalesce(num,shuffle=True|False)
    • 作用:改变RDD分区数。但是,默认只能减小RDD分区数,不能增大,减小过程中不会发生Shuffle过程。如果想增大分区,需要将参数shuffle设置为True,但是会导致Shuffle过程。
输入: rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],3)
查看分区情况: rdd.glom().collect()
结果: [[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]]

减少分区: rdd.coalesce(2).glom().collect()
结果: [[1, 2, 3], [4, 5, 6, 7, 8, 9, 10]]

增大分区: rdd.coalesce(5).glom().collect()
结果: [[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]]

将参数2设置为True,再增大分区: rdd.coalesce(5,shuffle=True).glom().collect()
结果: [[], [1, 2, 3], [7, 8, 9, 10], [4, 5, 6], []]

将参数2设置为True,再减小分区: rdd.coalesce(2,shuffle=True).glom().collect()
结果: [[1, 2, 3, 7, 8, 9, 10], [4, 5, 6]]

在这里插入图片描述

repartition 和 coalesce总结:

1- 这两个算子都是用来改变RDD的分区数

2- repartition 既能够增大RDD分区数,也能够减小RDD分区数。但是都会导致发生Shuffle过程。

3- 默认只能减小RDD分区数,不能增大,减小过程中不会发生Shuffle过程。如果想增大分区,需要将参数shuffle设置为True,但是会导致Shuffle过程。

4- repartition 底层实际上是调用了coalesce算子,并且将shuffle参数设置为了True
  • partitionBy算子
    • 格式:partitionBy(num,[fn])
    • 作用:该算子主要是用来改变key-value键值对数据类型RDD的分区数的。num表示要设置的分区数;fn参数是可选,用来让用户自定义分区规则。
注意:
默认情况下,根据key进行Hash取模分区。
如果对默认分区规则不满意,可以传递参数fn来自定义分区规则。
但是自定义分区规则函数需要满足两个条件,
条件一:分区编号的数据类型需要是int类型;
条件二:传递给自定义分区函数的参数是key
输入: rdd = sc.parallelize([(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10)],5)
查看分区情况: rdd.glom().collect()
结果: [[(1, 1), (2, 2)], [(3, 3), (4, 4)], [(5, 5), (6, 6)], [(7, 7), (8, 8)], [(9, 9), (10, 10)]]


需求: 增大分区,尝试分为20个分区
代码: rdd.partitionBy(20).glom().collect()
结果: [[], [(1, 1)], [(2, 2)], [(3, 3)], [(4, 4)], [(5, 5)], [(6, 6)], [(7, 7)], [(8, 8)], [(9, 9)], [(10, 10)], [], [], [], [], [], [], [], [], []]

需求: 减少分区,尝试分为2个分区
代码: rdd.partitionBy(2).glom().collect()
结果: [[(2, 2), (4, 4), (6, 6), (8, 8), (10, 10)], [(1, 1), (3, 3), (5, 5), (7, 7), (9, 9)]]


需求: 将 key>5 放置在一个分区,剩余放置到另一个分区
代码: rdd.partitionBy(2,partitionFunc=lambda key:0 if key>5 else 1).glom().collect()
结果: [[(6, 6), (7, 7), (8, 8), (9, 9), (10, 10)], [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)]]
注意: 分区编号的数据类型需要是int类型

聚合算子

  • 单值类型的聚合算子
    • reduce(fn1):根据传入函数对数据进行聚合处理
    • fold(defaultAgg,fn1):根据传入函数对数据进行聚合处理,同时支持给agg设置初始值
    • aggregate(defaultAgg, fn1, fn2):根据传入函数对数据进行聚合处理。defaultAgg设置agg的初始值,fn1对各个分区内的数据进行聚合计算,fn2 负责将各个分区的聚合结果进行汇总聚合
输入: rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],3)
查看分区情况: rdd.glom().collect()
结果: [[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]]
需求: 求和计算, 求所有数据之和

================================reduce================================
代码: 
def my_sum(agg,curr):
	return agg+curr
	
rdd.reduce(my_sum)
结果: 55


================================fold================================
代码: 
def my_sum(agg,curr):
	return agg+curr
	
rdd.fold(5,my_sum)
结果: 75


================================aggregate================================
代码: 
def my_sum_1(agg,curr):
	return agg+curr
	
def my_sum_2(agg,curr):
	return agg+curr
	
rdd.aggregate(5,my_sum_1,my_sum_2)
结果: 75

在这里插入图片描述

总结:
reduce、fold、aggregate算子都能实现聚合操作。reduce的底层是fold,fold底层是aggregate。

在工作中,如果能够通过reduce实现的,就优先选择reduce;否则选择fold,实在不行就选择aggregate
  • KV类型的聚合函数
    相关的算子:
    • reduceByKey(fn1)
    • foldByKey(defaultAgg, fn1)
    • aggregateByKey(defaultAgg, fn1, fn2);

以上三个与单值是一样的,只是在单值的基础上加了分组的操作而已,针对每个分组内的数据进行聚合而已。另外有一个:groupByKey() 仅分组,不聚合统计

问题:groupByKey() + 聚合操作 和  reduceByKey()  都可以完成分组聚合统计,谁的效率更高一些?  

reduceByKey(),因为底层会进行局部的聚合操作,会减小后续处理的数据量

在这里插入图片描述
在这里插入图片描述

关联算子

关联函数,主要是针对kv类型的数据,根据key进行关联操作

相关的算子:

  • join:实现两个RDD的join关联操作
  • leftOuterJoin:实现两个RDD的左关联操作
  • rightOuterJoin:实现两个RDD的右关联操作
  • fullOuterJoin:实现两个RDD的满外(全外)关联操作
输入:
rdd1 = sc.parallelize([('c01','张三'),('c02','李四'),('c02','王五'),('c01','赵六'),('c03','田七'),('c03','周八'),('c02','李九'),('c04','老张')])
	
rdd2 = sc.parallelize([('c01','1'),('c02','2'),('c03','3'),('c05','5')])


================================join================================
代码: rdd1.join(rdd2).collect()
结果: 
[
   	('c01', ('张三', '1')), 
   	('c01', ('赵六', '1')), 
   	('c02', ('李四', '2')), 
   	('c02', ('王五', '2')), 
   	('c02', ('李九', '2')), 
   	('c03', ('田七', '3')), 
   	('c03', ('周八', '3'))
]

================================leftOuterJoin================================
代码: rdd1.leftOuterJoin(rdd2).collect() 
结果: 
[
	('c04', ('老张', None)), 
	('c01', ('张三', '1')), 
	('c01', ('赵六', '1')), 
	('c02', ('李四', '2')), 
	('c02', ('王五', '2')),
    ('c02', ('李九', '2')),
    ('c03', ('田七', '3')), 
    ('c03', ('周八', '3'))
]

	

================================rightOuterJoin================================
代码: rdd1.rightOuterJoin(rdd2).collect()
结果: 
[
	('c05', (None, '5')), 
	('c01', ('张三', '1')), 
	('c01', ('赵六', '1')), 
	('c02', ('李四', '2')), 
	('c02', ('王五', '2')),
    ('c02', ('李九', '2')), 
    ('c03', ('田七', '3')), 
    ('c03', ('周八', '3'))
]
		

================================fullOuterJoin================================
代码: rdd1.fullOuterJoin(rdd2).collect()
结果: 
[
	('c04', ('老张', None)), 
	('c05', (None, '5')),
    ('c01', ('张三', '1')), 
    ('c01', ('赵六', '1')), 
    ('c02', ('李四', '2')),
    ('c02', ('王五', '2')), 
    ('c02', ('李九', '2')),
    ('c03', ('田七', '3')),
    ('c03', ('周八', '3'))
]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/301967.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue简体繁体互转无需做字库

第一种方法 vue-i18n 需要自己写字库库很麻烦,而且不支持后端传值 第二种 opencc 这个库前端去使用的时候 数据较多的情况非常慢.影响使用 第三种 language-hk-loader npm i language-hk-loader 从其他博客中看到的一种,很方便不需要写字库,但是在打包的时候去整体的去翻译…

Samtec技术Demo | 优秀的PCIe 6.0性能:Rohde Schwarz公司VNA验证Samtec高速电缆

【摘要/前言】 Design-Con 2023的现场产品演示展现了Samtec Flyover高速电缆组件和Rohde & SchwarzZNA矢量网络分析仪的杰出PCIe 6.0性能。 在本期的视频中&#xff0c;Rohde & Schwarz公司的VNA产品规划经理Greg Vaught和Samtec公司的技术营销经理Matt Burns带领我们…

PyTorch|构建自己的卷积神经网络--池化操作

在卷积神经网络中&#xff0c;一般在卷积层后&#xff0c;我们往往进行池化操作。实现池化操作很简单&#xff0c;pytorch中早已有相应的实现。 nn.MaxPool2d(kernel_size ,stride ) 这种池化叫做最大池化。 最大池化原理很简单&#xff0c;就是一个filter以一定的stride在原…

led手电筒照明线性恒流驱动芯片推荐:SM2123EGL双通道可调光

LED手电筒照明线性恒流驱动芯片是一种专门用于LED手电筒的照明系统的关键组件。它采用了线性恒流驱动技术&#xff0c;可以确保LED手电筒在不同电池电压和温度变化下&#xff0c;保持恒定的亮度输出&#xff0c;提高了LED手电筒的稳定性和可靠性。 LED手电筒照明线性恒流驱动芯…

3.6 QUERYING DEVICE PROPERTIES

我们关于将执行资源分配给区块的讨论提出了一个重要问题。我们如何确定可用资源的数量&#xff1f;当CUDA应用程序在系统上执行时&#xff0c;它如何确定设备中的SM数量以及可以分配给每个SM的块和线程数量&#xff1f;可能与执行CUDA应用程序相关的其他资源尚未讨论。一般来说…

Python 自学(六) 之函数

目录 1. python函数的基本结构 P168 2. python函数的可变参数(不定长) *parameter P169 3. python函数的返回值(单个或多个) P173 4. python的匿名函数 lambda P177 1. python函数的基本结构 P168 2. python函数的可变参数(不定…

Samtec卓越应用 | SEARAY:最大限度提高设计灵活性和密度

【摘要/前言】 SEARAY™是Samtec 的高速、高密度栅格阵列连接器系列。SEARAY™为设计人员提供了大量的设计灵活性&#xff0c;远远超过业内任何其他阵列产品。 【灵活性】 SEARAY™ 是一种 1.27 毫米 X 1.27 毫米的栅格。它是一种开放式引脚字段栅格阵列&#xff0c;即引脚不…

找不到api-mswin-crt-runtime-|1-1-0.dll的修复方法解析

在计算机使用过程中&#xff0c;我们经常会遇到一些错误提示&#xff0c;其中之一就是“api-ms-win--crt-runtime-|1-1-0.dll丢失”。这个错误通常发生在Windows操作系统中&#xff0c;它表示一个关键的动态链接库文件丢失或损坏。这个问题可能会导致某些应用程序无法正常运行&…

docker/华为云cce 部署nacos 2.3.0 集群模式

镜像地址 https://hub.docker.com/r/nacos/nacos-server 版本 nacos/nacos-server:v2.3.0-slim 关键环境变量 使用mysql数据源 变量值备注MODEcluster启用集群模式MYSQL_SERVICE_DB_NAME数据库名MYSQL_SERVICE_USER数据库用户名MYSQL_SERVICE_PASSWORD数据库密码SPRING_D…

【愚公系列】2023年12月 HarmonyOS教学课程 049-Stage模型(AbilityStage组件容器)

&#x1f3c6; 作者简介&#xff0c;愚公搬代码 &#x1f3c6;《头衔》&#xff1a;华为云特约编辑&#xff0c;华为云云享专家&#xff0c;华为开发者专家&#xff0c;华为产品云测专家&#xff0c;CSDN博客专家&#xff0c;CSDN商业化专家&#xff0c;阿里云专家博主&#xf…

护眼灯哪个品牌最好?2024年十大护眼灯品牌排行榜

由于科技水平的提高和电子产品的普及&#xff0c;儿童青少年的近视率正逐年攀升&#xff0c;出现低龄化现象&#xff0c;面对眼健康问题的严峻形势&#xff0c;我们应该还有爱眼意识、加强眼健康知识普及&#xff01;现在呢&#xff0c;护眼台灯被越来越多的人发现了&#xff0…

【设计模式】解释器模式

一起学习设计模式 目录 前言 一、概述 二、结构 三、案例实现 四、优缺点 五、使用场景 总结 前言 【设计模式】——行为型模式。 一、概述 如上图&#xff0c;设计一个软件用来进行加减计算。我们第一想法就是使用工具类&#xff0c;提供对应的加法和减法的工具方法。 …

C++类与对象基础(5)——日期类的实现

对于实现日期类中需要用到的例如&#xff1a;构造函数&#xff0c;析构函数&#xff0c;运算符重载等内容&#xff0c;已经在前面几篇文章中进行介绍&#xff0c;故本文只给出关于类和对象中日期类的代码实现&#xff0c;对于代码的原理不给予详细的解释&#xff1a; 1.头文件…

[VUE]1-创建vue工程

目录 基于脚手架创建前端工程 1、环境要求 2、操作过程 3、工程结构 4、启动前端服务 &#x1f343;作者介绍&#xff1a;双非本科大三网络工程专业在读&#xff0c;阿里云专家博主&#xff0c;专注于Java领域学习&#xff0c;擅长web应用开发、数据结构和算法&#xff0c…

Commander One for Mac:强大的双窗格文件管理器,让你的工作效率倍增!

Commander One for Mac是一款功能强大的文件管理工具&#xff0c;具有以下主要功能&#xff1a; 双窗格设计&#xff1a;主界面分为两个窗格&#xff0c;用户可以在左侧窗格中导航和浏览文件系统的目录结构&#xff0c;在右侧窗格中查看文件和文件夹的内容。文件操作&#xff…

Java:结束本机端口被占用进程

前言 在实际开发当中我们&#xff0c;往往在idea中将某个服务的启动给关闭了&#xff0c;但是在nacos的某个服务上&#xff0c;我们却可以看到本地别名服务还是在上面挂载着本地再次启动的时候就提示【端口被占用】&#xff0c;今天就说一下如何解决这个问题 操作 点击即可预…

C++_命令行操作

命令行操作 介绍第一步编译 源码第二部 找到exe 可执行文件第三步看图操作代码测试源码测试结果 介绍 本文介绍命令行操作 1.argc 表示当前输入 参数个数 2.argv 表示当前输入 字符串内容 第一步编译 源码 #include<iostream> #include<string>using namespace st…

el-table 展开行表格,展开的内容高度可以变化时,导致的固定列错位的问题

问题描述 一个可展开的表格&#xff08;列设置了type“expand”&#xff09;&#xff0c;并且展开后的内容高度可以变化&#xff0c;会导致后面所有行的固定列错位&#xff0c;图如下&#xff0c;展示行中是一个树形表格&#xff0c;默认不展示子级&#xff0c;点击树形表格的…

R4S软路由如何在iStoreOS后配置远程桌面本地电脑公网地址

文章目录 简介一、配置远程桌面公网地址二、家中使用永久固定地址 访问公司电脑**具体操作方法是&#xff1a;** 正文开始前给大家推荐个网站&#xff0c;前些天发现了一个巨牛的 人工智能学习网站&#xff0c; 通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家…

斑马斑马跳

欢迎来到程序小院 斑马斑马跳 玩法&#xff1a;行走的斑马&#xff0c;点击鼠标左键斑马左右跳动&#xff0c;左右两侧有大树&#xff0c;和移动的小鸟&#xff0c; 撞到大树和小鸟游戏结束&#xff0c;统计分数&#xff0c;快去斑马跳吧^^。开始游戏https://www.ormcc.com/pl…