参考链接
spark入门实战系列--8MLlib spark 实战_mob6454cc68310b的技术博客_51CTO博客https://blog.51cto.com/u_16099212/7454034
Spark和Hadoop的安装-CSDN博客https://blog.csdn.net/weixin_64066303/article/details/138021948?spm=1001.2014.3001.5501
1. spark-shell交互式编程
启动spark-shell
cd /usr/local/spark/
./bin/spark-shell
1.1 该系总共有多少学生
注:我将下载的chapter5-data1.txt文件放在“/home/hadoop/下载”目录下。
val lines = sc.textFile("file:///home/hadoop/下载/chapter5-data1.txt") #读取文件
lines.map(row=>row.split(",")(0)).distinct().count #每一行作为一个字符串,用’,’分割,取第一个元素,distinct去重,count统计有多少数据项
1.2 该系共开设来多少门课程
lines.map(row=>row.split(",")(1)).distinct().count #去第二个元素,去重,统计元素数量
1.3 Tom同学的总成绩平均分是多少
lines.filter(row=>row.split(",")(0)=="Tom") #以','作为分隔符,用filter进行过滤,筛选出第一项是“Tom”的数据项
.map(row=>(row.split(",")(0),row.split(",")(2).toInt)) #把第一项和第三项(姓名+成绩)合在一起构成一个数据项
.mapValues(x=>(x,1)) #去除value,把x变成(x,1),第一项是原始数据,第二项是数字1
.reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)) #针对想对的Key(也就是姓名),来进行运行,运算规则是(x.1+y._1),表示求和,也就是对(x,1)分别进行求和
.mapValues(x=>(x._1/x._2)).collect() #求平均值运算,x._1是原始数据的求和,x._2是1的求和,表示数据项的个数
读取的是字符串,所以需要转Int .
1.4 求每名同学的选修的课程门数
lines.map(row=>(row.split(",")(0),1)).reduceByKey((x,y)=>x+y).collect
首先是将数据变成(姓名,1)的map,然后针对相同key(姓名)的数据进行求和,也就是统计数据项的个数。
1.5 该系DataBase课程共有多少人选修
lines.filter(row=>row.split(",")(1)=="DataBase").count #直接是筛选第二项(课程)是DataBase的数据,然后进行统计个数
1.6 各门课程的平均分是多少
lines.map(row=>(row.split(",")(1),row.split(",")(2).toInt)).mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>(x._1/x._2)).collect()
求平均分的部分和前面是保持一致的,区别就是筛选构成map的时候前面是根据“Tom”来划分,现在是根据第二项的课程来进行划分。
1.7 使用累加器计算共有多少人选了DataBase这门课
val acc=sc.longAccumulator("My Accumulator") #定义一个累加器
# #筛选第二项是DataBase的数据项,构成一个(DataBase,1)的map,用foreach,对values值来进行累加
lines.filter(row=>row.split(",")(1)=="DataBase").map(row=>(row.split(",")(1),1)).values.foreach(x=>acc.add(x))
#输出累加值
acc.value
2. 编写独立应用程序实现数据去重
2.1创建相关项目
sudo mkdir -p /example/sparkapp4/src/main/scala
cd /example/sparkapp4/src/main/scala
sudo touch A.txt
sudo vim A.txt
sudo touch B.txt
sudo vim B.txt
sudo vim SimpleApp.scala
import java.io.FileWriter
import org.apache.spark.{SparkConf, SparkContext}
object SimpleApp {
def main(args: Array[String]): Unit = {
//配置
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
//读取文件A.txt
val A = sc.textFile("/example/sparkapp4/src/main/scala/A.txt")
//读取文件B.txt
val B = sc.textFile("/example/sparkapp4/src/main/scala/B.txt")
//对两个文件进行合并
val C = A ++ B
//1.用distinct进行去重
//2.以空格来进行分割
//3.根据key排序
val distinct_lines = C.distinct().map(row => (row.split(" ")(0), row.split(" ")(1))).sortByKey()
//将RDD类型的数据转换为数组
val result = distinct_lines.collect()
//将结果输出到C.txt中
val out = new FileWriter("/example/sparkapp4/src/main/scala/C.txt", true)
for (item <- result) {
out.write(item + "\n")
println(item)
}
out.close()
}
}
2.2创建.sbt文件
cd /example/sparkapp3
sudo touch build.sbt
sudo vim build.sbt
name := "Simple Project"
version := "1.0"
scalaVersion := "2.13.13"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"
2.3打包执行
sudo /usr/local/sbt/sbt package
spark-submit --class " SimpleApp " ./target/scala-2.13/simple-project_2.13-1.0.jar 2>&1 | grep "num"