目录
四,常用PairRDD的转换操作
五,缓存操作
四,常用PairRDD的转换操作
PairRDD指的是数据为长度为2的tuple类似(k,v)结构的数据类型的RDD,其每个数据的第一个元素被当做key,第二个元素被当做value.
reduceByKey
#reduceByKey对相同的key对应的values应用二元归并操作
rdd = sc.parallelize([("hello",1),("world",2),
("hello",3),("world",5)])
rdd.reduceByKey(lambda x,y:x+y).collect()
[('hello', 4), ('world', 7)]
groupByKey
#groupByKey将相同的key对应的values收集成一个Iterator
rdd = sc.parallelize([("hello",1),("world",2),("hello",3),("world",5)])
rdd.groupByKey().collect()
[('hello', <pyspark.resultiterable.ResultIterable at 0x119c6ae48>),
('world', <pyspark.resultiterable.ResultIterable at 0x119c6a860>)]
sortByKey
#sortByKey按照key排序,可以指定是否降序
rdd = sc.parallelize([("hello",1),("world",2),
("China",3),("Beijing",5)])
rdd.sortByKey(False).collect()
[('world', 2), ('hello', 1), ('China', 3), ('Beijing', 5)]
join
#join相当于根据key进行内连接
age = sc.parallelize([("LiLei",18),
("HanMeiMei",16),("Jim",20)])
gender = sc.parallelize([("LiLei","male"),
("HanMeiMei","female"),("Lucy","female")])
age.join(gender).collect()
[('LiLei', (18, 'male')), ('HanMeiMei', (16, 'female'))]
leftOuterJoin和rightOuterJoin
#leftOuterJoin相当于关系表的左连接
age = sc.parallelize([("LiLei",18),
("HanMeiMei",16)])
gender = sc.parallelize([("LiLei","male"),
("HanMeiMei","female"),("Lucy","female")])
age.leftOuterJoin(gender).collect()
[('LiLei', (18, 'male')), ('HanMeiMei', (16, 'female'))]
#rightOuterJoin相当于关系表的右连接
age = sc.parallelize([("LiLei",18),
("HanMeiMei",16),("Jim",20)])
gender = sc.parallelize([("LiLei","male"),
("HanMeiMei","female")])
age.rightOuterJoin(gender).collect()
[('LiLei', (18, 'male')), ('HanMeiMei', (16, 'female'))]
cogroup
#cogroup相当于对两个输入分别goupByKey然后再对结果进行groupByKey
x = sc.parallelize([("a",1),("b",2),("a",3)])
y = sc.parallelize([("a",2),("b",3),("b",5)])
result = x.cogroup(y).collect()
print(result)
print(list(result[0][1][0]))
[('a', (<pyspark.resultiterable.ResultIterable object at 0x119c6acc0>, <pyspark.resultiterable.ResultIterable object at 0x119c6aba8>)), ('b', (<pyspark.resultiterable.ResultIterable object at 0x119c6a978>, <pyspark.resultiterable.ResultIterable object at 0x119c6a940>))]
[1, 3]
subtractByKey
#subtractByKey去除x中那些key也在y中的元素
x = sc.parallelize([("a",1),("b",2),("c",3)])
y = sc.parallelize([("a",2),("b",(1,2))])
x.subtractByKey(y).collect()
[('c', 3)]
foldByKey
#foldByKey的操作和reduceByKey类似,但是要提供一个初始值
x = sc.parallelize([("a",1),("b",2),("a",3),("b",5)],1)
x.foldByKey(1,lambda x,y:x*y).collect()
[('a', 3), ('b', 10)]
五,缓存操作
如果一个rdd被多个任务用作中间量,那么对其进行cache缓存到内存中对加快计算会非常有帮助。
声明对一个rdd进行cache后,该rdd不会被立即缓存,而是等到它第一次被计算出来时才进行缓存。
可以使用persist明确指定存储级别,常用的存储级别是MEMORY_ONLY和EMORY_AND_DISK。
如果一个RDD后面不再用到,可以用unpersist释放缓存,unpersist是立即执行的。
缓存数据不会切断血缘依赖关系,这是因为缓存数据某些分区所在的节点有可能会有故障,例如内存溢出或者节点损坏。
这时候可以根据血缘关系重新计算这个分区的数据。
#cache缓存到内存中,使用存储级别 MEMORY_ONLY。
#MEMORY_ONLY意味着如果内存存储不下,放弃存储其余部分,需要时重新计算。
a = sc.parallelize(range(10000),5)
a.cache()
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_a
print(mean_a)
#persist缓存到内存或磁盘中,默认使用存储级别MEMORY_AND_DISK
#MEMORY_AND_DISK意味着如果内存存储不下,其余部分存储到磁盘中。
#persist可以指定其它存储级别,cache相当于persist(MEMORY_ONLY)
from pyspark.storagelevel import StorageLevel
a = sc.parallelize(range(10000),5)
a.persist(StorageLevel.MEMORY_AND_DISK)
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_a
a.unpersist() #立即释放缓存
print(mean_a)