一.RDD
28.RDD_为什么需要RDD
29.RDD_定义
30.RDD_五大特性总述
31.RDD_五大特性1
32.RDD_五大特性2
33.RDD_五大特性3
34.RDD_五大特性4
35.RDD_五大特性5
36.RDD_五大特性总结
37.RDD_创建概述
38.RDD_并行化创建
演示代码:
//
获取当前
RDD
的分区数
@Since
(
"1.6.0"
)
final def
getNumPartitions
:
Int
=
partitions
.
length
//
显示出
RDD
被分配到不同分区的信息
/**Return an RDD created by coalescing all
elements within each partition into an
array.*/
def
glom
():
RDD
[
Array
[
T
]]
1
2
3
4
5
6
package
com
.
itbaizhan
.
rdd
//1.
导入
SparkConf
类、
SparkContext
import
org
.
apache
.
spark
.
rdd
.
RDD
import
org
.
apache
.
spark
.{
SparkConf
,
SparkContext
}
object
CreateByParallelize
{
def
main
(
args
:
Array
[
String
]):
Unit
=
{
//2.
构建
SparkConf
对象。并设置本地运行和程序的
名称
val
conf
=
new
SparkConf
().
setMaster
(
"local[2]"
).
setAppName
(
"CreateRdd1"
)
//3.
构建
SparkContext
对象
val
sc
=
new
SparkContext
(
conf
)
//4.
通过并行化创建
RDD
对象:将本地集合
->
分布式的
RDD
对象
1
2
3
4
5
6
7
8
9
10
11
12
79
//val rdd: RDD[Int] =
sc.parallelize[Int](List(1, 2, 3, 4, 5, 6,
7, 8))
val
rdd
:
RDD
[
Int
]
=
sc
.
parallelize
(
List
(
1
,
2
,
3
,
4
,
5
,
6
,
7
,
8
),
3
)
//5.
输出默认的分区数
//5.1
setMaster("local[*]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8))
//println("
默认分区
数:
"+rdd.getNumPartitions)//8,
默认当前系统的
CPU
数
//5.2
setMaster("local[2]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8))
//println("
默认分区
数:
"+rdd.getNumPartitions)//2
//5.3
setMaster("local[2]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8),3)
println
(
"
默认分区
数:
"
+
rdd
.
getNumPartitions
)
//3
//6.collect
方法:将
rdd
对象中每个分区的数据,都
发送到
Driver
,形成一个
Array
对象
val
array1
:
Array
[
Int
]
=
rdd
.
collect
()
println
(
"rdd.collect()="
+
array1
.
mkString
(
",
"
))
//7.
显示出
rdd
对象中元素被分布到不同分区的数据信
息
13
14
15
16
17
18
19
20
21
22
23
24
25
80
运行结果:
实时效果反馈
1.
以下关于并行化创建
RDD
的描述错误的是:
A
通过并行化集合创建,将本地集合对象转分布式
RDD
。
B
parallelize()
方法必须传递两个参数。
C
parallelize
没有给定分区数
,
默认分区数等于执行程序的当前
服务器
CPU
核数。
答案:
val
array2
:
Array
[
Array
[
Int
]]
=
rdd
.
glom
().
collect
()
println
(
"rdd.glom().collect()
的内容是
:"
)
/*for(eleArr<- array2){
println(eleArr.mkString(","))
}*/
array2
.
foreach
(
eleArr
=>
println
(
eleArr
.
mkStr
ing
(
","
)))
}
}
26
27
28
29
30
31
32
33
默认分区数:
3
rdd.collect()=1,2,3,4,5,6,7,8
rdd.glom().collect()
的内容是
:
1,2
3,4,5
6,7,8
39.RDD_读取文件创建RDD
40.RDD_读取小文件创建RDD
扩展
wholeTextFiles
适合读取一堆小文件:
//path
指定小文件的路径目录
//minPartitions
最小分区数 可选参数
def
wholeTextFiles
(
path
:
String
,
minPartitions
:
Int
=
defaultMinPartitions
):
RDD
[(
String
,
String
)]
1
2
3
85
代码演示:
package
com
.
itbaizhan
.
rdd
//1.
导入类
import
org
.
apache
.
spark
.
rdd
.
RDD
import
org
.
apache
.
spark
.{
SparkConf
,
SparkContext
}
object
CreateByWholeTextFiles
{
def
main
(
args
:
Array
[
String
]):
Unit
=
{
//2.
构建
SparkConf
对象,并设置本地运行和程序名
称
val
conf
:
SparkConf
=
new
SparkConf
().
setMaster
(
"local[*]"
).
setAppName
(
"WholeTextFiles"
)
//3.
使用
conf
对象构建
SparkContet
对象
val
sc
=
new
SparkContext
(
conf
)
//5.
读取指定目录下的小文件
val
rdd
:
RDD
[(
String
,
String
)]
=
sc
.
wholeTextFiles
(
"data/tiny_files"
)
//(filePath1, "
内容
1"),(filePath2, "
内容
2"),...,(filePathN, "
内容
N")
val
tuples
:
Array
[(
String
,
String
)]
=
rdd
.
collect
()
tuples
.
foreach
(
ele
=>
println
(
ele
.
_1
,
ele
.
_2
))
//6.
获取小文件中的内容
val
array
:
Array
[
String
]
=
rdd
.
map
(
_
.
_2
).
collect
()
println
(
"---------------------------"
)
println
(
array
.
mkString
(
"|"
))
//4.
关闭
sc
对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
86
运行输出结果
:
RDD_
算子概述
定义:
分布式集合
RDD
对象的方法被称为算子
算子分类:
Transformation
转换算子
1
Action
行动算子
2
sc
.
stop
()
}
}
22
23
24
(file:/D:/codes/itbaizhan/sparkdemo/data/tin
y_files/file1.txt,hello Linux
hello Zookeper
hello Maven
hello hive
hello spark)
(file:/D:/codes/itbaizhan/sparkdemo/data/tin
y_files/file2.txt,Spark Core
Spark RDD
Spark Sql)
----------------
hello Linux
hello Zookeper
hello Maven
hello hive
hello spark|Spark Core
Spark RDD
Spark Sql
41.RDD_算子概述
42.RDD_转换算子map
43.RDD_转换算子flatmap
44.RDD_转换算子reducebykey
45.RDD_转换算子filter
46.RDD_转换算子distinct
47.RDD_转换算子glom
48.RDD_转换算子groupby
object
RddGroupBy
{
def
main
(
args
:
Array
[
String
]):
Unit
=
{
//2.
构建
SparkConf
对象,并设置本地运行和程序名
称
val
conf
:
SparkConf
=
new
SparkConf
().
setMaster
(
"local[*]"
).
setAppName
(
"groupBy"
)
//3.
使用
conf
对象构建
SparkContet
对象
val
sc
=
new
SparkContext
(
conf
)
//5.
创建
Rdd
val
rdd
:
RDD
[(
Char
,
Int
)]
=
sc
.
parallelize
(
Array
((
'a'
,
1
), (
'a'
,
2
),
(
'b'
,
1
), (
'b'
,
2
), (
'a'
,
3
), (
'a'
,
4
)))
//6.
通过
groupBy
算子对
rdd
对象中的数据进行分组
//groupBy
插入的函数的用意是指定按照谁进行分组
//
分组后的结果是有二元组组成的
RDD
val
gbRdd
:
RDD
[(
Char
,
Iterable
[(
Char
,
Int
)])]
=
rdd
.
groupBy
(
tupEle
=>
tupEle
.
_1
)
//
收集到
Driver
端
val
result1
:
Array
[(
Char
,
Iterable
[(
Char
,
Int
)])]
=
gbRdd
.
collect
()
//(a,CompactBuffer((a,1), (a,2), (a,3),
(a,4))),(b,CompactBuffer((b,1), (b,2)))
println
(
result1
.
mkString
(
","
))
//7.
使用
map
转换算子
//(a,List((a,1), (a,2), (a,3), (a,4))),
(b,List((b,1), (b,2)))
val
result2
:
Array
[(
Char
,
List
[(
Char
,
Int
)])]
=
gbRdd
.
map
(
tup
=>
(
tup
.
_1
,
tup
.
_2
.
toList
)).
collect
()
println
(
result2
.
mkString
(
","
))
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
104
实时效果反馈
1.
以下关于
rdd.groupBy(tupEle => tupEle._1)
的描述错误的是:
A
groupBy
传入的函数的意思是
:
通过这个函数
,
确定按照谁来
分组。
B
groupBy
方法适用于元素为元祖类型的
RDD
,元祖元素的个
数只能为
2
。
C
groupBy
方法适用于元素为元祖类型的
RDD
,元祖元素的个
数
>=2
。
答案:
1=>B