简介
使用SparkCore求top5值编程,最大最小值
求订单前五的TOP5值
数据
数据字段如下:orderid,userid,payment,productid
需求如下:从文本文件中读取数据,并计算出前5个payment(订单的付款金额)值
//字段 orderid,userid,payment,productid
1,1193,5234,978300760
2,661,323423,978302109
3,914,34234,978301968
4,3408,45435,978300275
5,2355,543443,978824291
6,1197,35345,978302268
7,1287,553,978302039
8,2804,53453,978300719
9,594,45654,978302268
10,919,3534,978301368
11,595,543,978824268
12,938,454,978301752
13,2398,4656,978302281
14,2918,9874,978302124
15,1035,37455,978301753
16,2791,4353,978302188
17,2687,3353,978824268
18,2018,423,978301777
19,3105,56345,978301713
代码
import org.apache.spark.{SparkConf, SparkContext}
//求TopN个payment值
//定义Object
object TopValue {
//声明入口函数
def main(args:Array[String]):Unit = {
//配置定义机器,我们是单机运行的
val conf = new SparkConf().setAppName("TopValue").setMaster("local[1]")
val sc = new SparkContext(conf) //设置日志级别,只有错误信息显示
sc.setLogLevel("ERROR")
//加载文本文件,生成rdd,每一个RDD都是一行文本:其中文本字段为orderid,userid,payment,productid
val lines = sc.textFile("D:\\workspace\\spark\\src\\main\\Data\\orderTopData1",3)
//num是用来计数用的,初始值为零
var num = 0
// 过滤,去除空行和不符合指定格式的行
val result = lines.filter(line => (line.trim().length > 0) && (line.split(",").length == 4 ))
// 取出每行中的payment字段
.map(_.split(",")(2))
//使用map操作将payment字段转成整数类型,并与一空字符串组成键值对,方便后续使用sortByKey进行排序
.map(x => (x.toInt,""))
//使用sortByKey进行降序排序
.sortByKey(false)
//使用map操作只保留payment字段,并私用take()方法去除排名前五的值
.map(x => x._1).take(5)
//foreach遍历结果,并对payment值进行编号,并进行打印
.foreach(x => {
num = num + 1
println(num + "\t" + x)
})
}
}
运行结果
D:\Java\jdk1.8.0_131\bin\java.exe "-javaagent:D:\idea\IntelliJ IDEA 2021.1.3\lib\idea_rt.jar=59560:D:\idea\IntelliJ IDEA 2021.1.3\bin" -Dfile.encoding=UTF-8 -classpath "D:\idea\IntelliJ IDEA 2021.1.3\lib\idea_rt.jar" com.intellij.rt.execution.CommandLineWrapper C:\Users\Administrator\AppData\Local\Temp\idea_classpath338812056 TopValue
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/spark/spark-3.2.0-bin-hadoop2.7/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/Maven/Maven_repositories/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
23/07/17 13:45:01 INFO SparkContext: Running Spark version 3.2.0
23/07/17 13:45:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/07/17 13:45:03 INFO ResourceUtils: ==============================================================
23/07/17 13:45:03 INFO ResourceUtils: No custom resources configured for spark.driver.
23/07/17 13:45:03 INFO ResourceUtils: ==============================================================
23/07/17 13:45:03 INFO SparkContext: Submitted application: TopValue
23/07/17 13:45:03 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/07/17 13:45:03 INFO ResourceProfile: Limiting resource is cpu
23/07/17 13:45:03 INFO ResourceProfileManager: Added ResourceProfile id: 0
23/07/17 13:45:03 INFO SecurityManager: Changing view acls to: Administrator
23/07/17 13:45:03 INFO SecurityManager: Changing modify acls to: Administrator
23/07/17 13:45:03 INFO SecurityManager: Changing view acls groups to:
23/07/17 13:45:03 INFO SecurityManager: Changing modify acls groups to:
23/07/17 13:45:03 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Administrator); groups with view permissions: Set(); users with modify permissions: Set(Administrator); groups with modify permissions: Set()
23/07/17 13:45:11 INFO Utils: Successfully started service 'sparkDriver' on port 59590.
23/07/17 13:45:11 INFO SparkEnv: Registering MapOutputTracker
23/07/17 13:45:11 INFO SparkEnv: Registering BlockManagerMaster
23/07/17 13:45:12 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/07/17 13:45:12 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/07/17 13:45:12 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/07/17 13:45:12 INFO DiskBlockManager: Created local directory at C:\Users\Administrator\AppData\Local\Temp\blockmgr-20fc95dd-f4f4-4897-93d3-a3091092d923
23/07/17 13:45:12 INFO MemoryStore: MemoryStore started with capacity 623.4 MiB
23/07/17 13:45:12 INFO SparkEnv: Registering OutputCommitCoordinator
23/07/17 13:45:13 INFO Utils: Successfully started service 'SparkUI' on port 4040.
23/07/17 13:45:13 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.56.1:4040
23/07/17 13:45:14 INFO Executor: Starting executor ID driver on host 192.168.56.1
23/07/17 13:45:14 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 59606.
23/07/17 13:45:14 INFO NettyBlockTransferService: Server created on 192.168.56.1:59606
23/07/17 13:45:14 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/07/17 13:45:14 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.56.1, 59606, None)
23/07/17 13:45:14 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.56.1:59606 with 623.4 MiB RAM, BlockManagerId(driver, 192.168.56.1, 59606, None)
23/07/17 13:45:14 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.56.1, 59606, None)
23/07/17 13:45:14 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.56.1, 59606, None)
1 543443
2 323423
3 56345
4 53453
5 45654
Process finished with exit code 0
图像演示分析步骤
1,1193,5234,978300760 经过val lines = sc.textFile() 处理为RDD(lines),如下图
上图来源:林子雨
val result = lines.filter(line => (line.trim().length > 0) && (line.split(",").length == 4))
.map(_.split(",")(2))
.map(x => (x.toInt,""))
.sortByKey(false)
.map(x => x._1).take(5)
.foreach(x => {
num = num + 1
println(num + "\t" + x)
})
注:图片来源林子雨教授
求最大值最小值
数据同上,我取得是数据的第三列
代码及运行结果
import org.apache.spark.{SparkConf, SparkContext}
object MaxValue_order {
def main(args:Array[String]):Unit = {
val conf = new SparkConf().setAppName("MaxValues_order").setMaster("local[1]")
val sc = new SparkContext(conf)
val lines = sc.textFile("D:\\workspace\\spark\\src\\main\\Data\\orderTopData1",3)
sc.setLogLevel("ERROR")
val Max_Value = lines.map( line => line.split(",")(2).toInt).max()
val Min_Value = lines.map( line => line.split(",")(2).toInt).min()
println("最大值为:" + Max_Value)
println("最小值为:" + Min_Value)
}
}
运行结果
最大值为:543443
最小值为:423
求最大最小值
需求:
读取文本文件中的数字,并找出每个数字分组中的最大值和最小值。
数据:
213
101
111
123
242
987
999
12
代码:
import org.apache.spark.{SparkConf, SparkContext}
object Max_Min_PaymentValue {
def main(args:Array[String]):Unit = {
// 创建Spark配置对象
val conf = new SparkConf().setAppName("Max_Min_PaymentValue").setMaster("local[1]")
// 创建SparkContext对象
val sc = new SparkContext(conf)
// 读取文本文件,每一行作为一个元素存储在RDD中
val lines = sc.textFile("D:\\workspace\\spark\\src\\main\\Data\\number_sort")
// 过滤掉空行,将每一行转换为(key, value)的形式,其中key是固定的,
//value是行中的数字转换为整数类型,经过groupByKey会将("key",213),("key",101)...变成
// ("key",<213,101,111,123,242,987,999,12>)
val result = lines.filter(_.trim().length > 0)
.map(line => ("key",line.trim.toInt)).groupByKey()
.map(x => {
// 初始化最小值和最大值
var min = Integer.MAX_VALUE
var max = Integer.MIN_VALUE
// 遍历每个分组中的数字,x._2就是键值对(key,value-list)中的value-list
// value-list是 <213,101,111,123,242,987,999,12>
for (num <- x._2){
// 更新最大值
if(num > max){
max = num
}
// 更新最小值
if(num < min){
min = num
}
}
// 返回每个分组的最大值和最小值的元组
(max,min)
}).collect.foreach{
case (max,min) =>
// 输出最大值和最小值
println("最大值:" + max)
println("最小值:" + min)
}
}
}
输出结果:
D:\Java\jdk1.8.0_131\bin\java.exe "-javaagent:D:\idea\IntelliJ IDEA 2021.1.3\lib\idea_rt.jar=52414:D:\idea\IntelliJ IDEA 2021.1.3\bin" -Dfile.encoding=UTF-8 -classpath "D:\idea\IntelliJ IDEA 2021.1.3\lib\idea_rt.jar" com.intellij.rt.execution.CommandLineWrapper C:\Users\Administrator\AppData\Local\Temp\idea_classpath1231880085 Max_Min_PaymentValue
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/spark/spark-3.2.0-bin-hadoop2.7/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/Maven/Maven_repositories/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
23/07/17 15:18:03 INFO SparkContext: Running Spark version 3.2.0
23/07/17 15:18:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/07/17 15:18:05 INFO ResourceUtils: ==============================================================
23/07/17 15:18:05 INFO ResourceUtils: No custom resources configured for spark.driver.
23/07/17 15:18:05 INFO ResourceUtils: ==============================================================
23/07/17 15:18:05 INFO SparkContext: Submitted application: Max_Min_PaymentValue
23/07/17 15:18:05 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/07/17 15:18:05 INFO ResourceProfile: Limiting resource is cpu
23/07/17 15:18:05 INFO ResourceProfileManager: Added ResourceProfile id: 0
23/07/17 15:18:05 INFO SecurityManager: Changing view acls to: Administrator
23/07/17 15:18:05 INFO SecurityManager: Changing modify acls to: Administrator
23/07/17 15:18:05 INFO SecurityManager: Changing view acls groups to:
23/07/17 15:18:05 INFO SecurityManager: Changing modify acls groups to:
23/07/17 15:18:05 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Administrator); groups with view permissions: Set(); users with modify permissions: Set(Administrator); groups with modify permissions: Set()
23/07/17 15:18:17 INFO Utils: Successfully started service 'sparkDriver' on port 52457.
23/07/17 15:18:17 INFO SparkEnv: Registering MapOutputTracker
23/07/17 15:18:17 INFO SparkEnv: Registering BlockManagerMaster
23/07/17 15:18:17 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/07/17 15:18:17 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/07/17 15:18:17 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/07/17 15:18:17 INFO DiskBlockManager: Created local directory at C:\Users\Administrator\AppData\Local\Temp\blockmgr-fefa1a95-9abb-4f02-930f-e101116537e3
23/07/17 15:18:17 INFO MemoryStore: MemoryStore started with capacity 623.4 MiB
23/07/17 15:18:17 INFO SparkEnv: Registering OutputCommitCoordinator
23/07/17 15:18:18 INFO Utils: Successfully started service 'SparkUI' on port 4040.
23/07/17 15:18:19 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.56.1:4040
23/07/17 15:18:19 INFO Executor: Starting executor ID driver on host 192.168.56.1
23/07/17 15:18:19 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52473.
23/07/17 15:18:19 INFO NettyBlockTransferService: Server created on 192.168.56.1:52473
23/07/17 15:18:19 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/07/17 15:18:19 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.56.1, 52473, None)
23/07/17 15:18:19 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.56.1:52473 with 623.4 MiB RAM, BlockManagerId(driver, 192.168.56.1, 52473, None)
23/07/17 15:18:19 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.56.1, 52473, None)
23/07/17 15:18:19 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.56.1, 52473, None)
23/07/17 15:18:22 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 244.0 KiB, free 623.2 MiB)
23/07/17 15:18:22 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.4 KiB, free 623.1 MiB)
23/07/17 15:18:22 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.56.1:52473 (size: 23.4 KiB, free: 623.4 MiB)
23/07/17 15:18:22 INFO SparkContext: Created broadcast 0 from textFile at Max_Min_PaymentValue.scala:7
23/07/17 15:18:23 INFO FileInputFormat: Total input paths to process : 1
23/07/17 15:18:23 INFO SparkContext: Starting job: collect at Max_Min_PaymentValue.scala:9
23/07/17 15:18:23 INFO DAGScheduler: Registering RDD 3 (map at Max_Min_PaymentValue.scala:8) as input to shuffle 0
23/07/17 15:18:23 INFO DAGScheduler: Got job 0 (collect at Max_Min_PaymentValue.scala:9) with 1 output partitions
23/07/17 15:18:23 INFO DAGScheduler: Final stage: ResultStage 1 (collect at Max_Min_PaymentValue.scala:9)
23/07/17 15:18:23 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
23/07/17 15:18:23 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
23/07/17 15:18:23 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at Max_Min_PaymentValue.scala:8), which has no missing parents
23/07/17 15:18:23 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 7.8 KiB, free 623.1 MiB)
23/07/17 15:18:23 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.2 KiB, free 623.1 MiB)
23/07/17 15:18:23 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.56.1:52473 (size: 4.2 KiB, free: 623.4 MiB)
23/07/17 15:18:23 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1427
23/07/17 15:18:23 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at Max_Min_PaymentValue.scala:8) (first 15 tasks are for partitions Vector(0))
23/07/17 15:18:23 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
23/07/17 15:18:24 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.56.1, executor driver, partition 0, PROCESS_LOCAL, 4508 bytes) taskResourceAssignments Map()
23/07/17 15:18:24 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
23/07/17 15:18:25 INFO HadoopRDD: Input split: file:/D:/workspace/spark/src/main/Data/number_sort:0+37
23/07/17 15:18:27 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1204 bytes result sent to driver
23/07/17 15:18:27 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3114 ms on 192.168.56.1 (executor driver) (1/1)
23/07/17 15:18:27 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
23/07/17 15:18:27 INFO DAGScheduler: ShuffleMapStage 0 (map at Max_Min_PaymentValue.scala:8) finished in 3.615 s
23/07/17 15:18:27 INFO DAGScheduler: looking for newly runnable stages
23/07/17 15:18:27 INFO DAGScheduler: running: Set()
23/07/17 15:18:27 INFO DAGScheduler: waiting: Set(ResultStage 1)
23/07/17 15:18:27 INFO DAGScheduler: failed: Set()
23/07/17 15:18:27 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at map at Max_Min_PaymentValue.scala:9), which has no missing parents
23/07/17 15:18:27 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 8.9 KiB, free 623.1 MiB)
23/07/17 15:18:27 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.7 KiB, free 623.1 MiB)
23/07/17 15:18:27 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.56.1:52473 (size: 4.7 KiB, free: 623.4 MiB)
23/07/17 15:18:27 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1427
23/07/17 15:18:27 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at map at Max_Min_PaymentValue.scala:9) (first 15 tasks are for partitions Vector(0))
23/07/17 15:18:27 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks resource profile 0
23/07/17 15:18:27 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1) (192.168.56.1, executor driver, partition 0, NODE_LOCAL, 4271 bytes) taskResourceAssignments Map()
23/07/17 15:18:27 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
23/07/17 15:18:27 INFO ShuffleBlockFetcherIterator: Getting 1 (88.0 B) non-empty blocks including 1 (88.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) push-merged-local and 0 (0.0 B) remote blocks
23/07/17 15:18:27 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 36 ms
23/07/17 15:18:27 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1397 bytes result sent to driver
23/07/17 15:18:27 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 222 ms on 192.168.56.1 (executor driver) (1/1)
23/07/17 15:18:27 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
23/07/17 15:18:27 INFO DAGScheduler: ResultStage 1 (collect at Max_Min_PaymentValue.scala:9) finished in 0.310 s
23/07/17 15:18:27 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
23/07/17 15:18:27 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
23/07/17 15:18:27 INFO DAGScheduler: Job 0 finished: collect at Max_Min_PaymentValue.scala:9, took 4.208463 s
最大值:999
最小值:12
23/07/17 15:18:27 INFO SparkContext: Invoking stop() from shutdown hook
23/07/17 15:18:27 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040
23/07/17 15:18:27 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/07/17 15:18:27 INFO MemoryStore: MemoryStore cleared
23/07/17 15:18:27 INFO BlockManager: BlockManager stopped
23/07/17 15:18:27 INFO BlockManagerMaster: BlockManagerMaster stopped
23/07/17 15:18:27 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/07/17 15:18:27 INFO SparkContext: Successfully stopped SparkContext
23/07/17 15:18:27 INFO ShutdownHookManager: Shutdown hook called
23/07/17 15:18:27 INFO ShutdownHookManager: Deleting directory C:\Users\Administrator\AppData\Local\Temp\spark-d878c8c9-c882-40ea-81cf-2e9408af0f36
Process finished with exit code 0
bug1: reassignment to val max = num
bug原因:报错"reassignment to val"是因为在代码中将一个val
变量(即不可变变量)max
和min
赋值为num
,但是val
变量一旦赋值后就不能再次修改,因此导致了错误。 解决这个问题的方法是将max
和min
变量定义为var
变量(即可变变量),这样就可以进行重新赋值操作。
bug2:value _1 is not a member of Unit
println("最大值:" + x._1)
bug原因:map
操作的结果没有返回一个值,而是返回了一个Unit
类型的值,即(),
我把这个(max,min)放到了for循环语句内了,放外头就行。