1、map算子
对RDD内的元素进行逐个处理,并返回一个新的RDD,可以使用lambda以及链式编程,简化代码。
注意:再python中的lambda只能有行,如果有多行,要写成外部函数;(T)->U表示要传入一个函数
from pyspark import SparkConf,SparkContext
import os
# pyspark无法自动寻到python的编译器,所以需要我们自己手动配置
os.environ['PYSPARK_PYTHON']='"D:\\softer\\anaconda\\Anacond\\python.exe"'
conf =SparkConf().setMaster("local[*]").setAppName("text_spark")
sc =SparkContext(conf=conf)
rdd1=sc.parallelize(["123,123","123,123"]) # list类型
rdd2=rdd1.map(lambda x:x.split(","))
print(rdd2.collect())
sc.stop()
2、FlatMap算子
整体逻辑与map相同,但多了一个嵌套解除功能
from pyspark import SparkConf,SparkContext
import os
# pyspark无法自动寻到python的编译器,所以需要我们自己手动配置
os.environ['PYSPARK_PYTHON']='"D:\\softer\\anaconda\\Anacond\\python.exe"'
conf =SparkConf().setMaster("local[*]").setAppName("text_spark")
sc =SparkContext(conf=conf)
rdd1=sc.parallelize(["123,123","123,123"]) # list类型
rdd2=rdd1.flatMap(lambda x:x.split(","))
print(rdd2.collect())
sc.stop()
flatMap -> ['123', '123', '123', '123']
map -> [['123', '123'], ['123', '123']](少了一层[ ])
3、reduce算子
功能:对传入的数据进行聚合
from pyspark import SparkConf,SparkContext
import os
# pyspark无法自动寻到python的编译器,所以需要我们自己手动配置
os.environ['PYSPARK_PYTHON']='"D:\\softer\\anaconda\\Anacond\\python.exe"'
conf =SparkConf().setMaster("local[*]").setAppName("text_spark")
sc =SparkContext(conf=conf)
rdd1=sc.parallelize([1,2,3,4,5,6,6]) # list类型
print(rdd1.reduce(lambda x,y: x+y)) # 27
sc.stop()
4、reduceBykey算子
功能:传入数据组,能进行分组,并进行逻辑运算。
from pyspark import SparkConf,SparkContext
import os
# pyspark无法自动寻到python的编译器,所以需要我们自己手动配置
os.environ['PYSPARK_PYTHON']='"D:\\softer\\anaconda\\Anacond\\python.exe"'
conf =SparkConf().setMaster("local[*]").setAppName("text_spark")
sc =SparkContext(conf=conf)
rdd1=sc.parallelize([('k1',10),("k2",20),('k1',30),("k2",40)]) # list类型
rdd2=rdd1.reduceByKey(lambda x,y: x+y)
print(rdd2.collect())
sc.stop()
#[('k1', 40), ('k2', 60)]
5、filter算子
功能:过滤,保留想要的数据,结果为True就对该结果进行返回;
6、distinct算子
功能:对传入的数据进行去重,不需要传入参数,直接调用该方法即可
7、sortBy算子
功能:排序,可自定义排序;func:(T)->U;ascending=False(降序)/True(升序)
numPartition=>分区(可设置为1)