03-240605-Spark笔记

03-240605

1. 行动算子-1

  • reduce

聚合

格式:

def reduce(f: (T, T) => T): T

例子:

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)
​
        val rdd = sc.makeRDD(List(1,2,3,4))
​
        // TODO - 行动算子
​
        //reduce
        val i: Int = rdd.reduce(_+_)
        println(i)

输出结果:

10

  • collect

采集

格式:

def collect(): Array[T]

例子:

        // collect : 方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组
        val ints: Array[Int] = rdd.collect()
        println(ints.mkString(","))

输出结果:

1,2,3,4

  • count

计数

格式:

def count(): Long

例子:

        // count : 数据源中数据的个数
        val cnt = rdd.count()
        println(cnt)

运行结果:

4

  • first

获取数据源的第一个数据

格式:

def first(): T

例子:

        // first : 获取数据源中数据的第一个
        val first = rdd.first()
        println(first)

输出结果:

1

  • take

获取数据源的N个数据

格式:

def take(num: Int): Array[T]

例子:

        // take : 获取N个数据
        val ints: Array[Int] = rdd.take(3)
        println(ints.mkString(","))

输出结果:

1,2,3

  • takeOrdered

数据排序后.再取第N个数据

格式:

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

例子:

        // takeOrdered : 数据排序后,取N个数据
        val rdd1 = sc.makeRDD(List(4,2,3,1))
        val ints1: Array[Int] = rdd1.takeOrdered(3)
        println(ints1.mkString(","))

输出结果:

1,2,3

  • aggregate

给定初始值,初始值参与分区内与分区间的计算

格式:

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

例子:

        val rdd = sc.makeRDD(List(1,2,3,4),2)
        //10 + 13 + 17 = 40
        // aggregateByKey : 初始值只会参与分区内计算
        // aggregate : 初始值会参与分区内计算,并且和参与分区间计算
        val result = rdd.aggregate(10)(_+_._+_)

        println(result)

输出结果:

40

  • fold

折叠操作,aggregate的简化版操作

格式:

def fold(zeroValue: T)(op: (T, T) => T): T

例子:

        //10 + 13 + 17 = 40
        // aggregateByKey : 初始值只会参与分区内计算
        // aggregate : 初始值会参与分区内计算,并且和参与分区间计算
        //val result = rdd.aggregate(10)(_+_, _+_)
        val result = rdd.fold(10)(_+_)

        println(result)

输出结果:

40

  • countByKey 与 countByValue

都是统计每种Key或者Value出现的个数

格式:

def countByKey(): Map[K, Long]

例子:

image-20240604213641365

        val rdd = sc.makeRDD(List(
            ("a", 1),("a", 2),("a", 3)
        ))

       //val intToLong: collection.Map[Int, Long] = rdd.countByValue()
        //println(intToLong)
        val stringToLong: collection.Map[String, Long] = rdd.countByKey()
        println(stringToLong)

输出结果:

Map(a -> 3)

  • WordCount 不同的实现方式:

运用9种不同的方式实现WordCount

  1. 使用groupBy:

    // groupBy
    def wordcount1(sc : SparkContext): Unit = {

        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))
        val group: RDD[(String, Iterable[String])] = words.groupBy(word=>word)
        val wordCount: RDD[(String, Int)] = group.mapValues(iter=>iter.size)
    }
  1. 使用groupByKey:

    // groupByKey
    def wordcount2(sc : SparkContext): Unit = {
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_,1))
        val group: RDD[(String, Iterable[Int])] = wordOne.groupByKey()
        val wordCount: RDD[(String, Int)] = group.mapValues(iter=>iter.size)
    }
  1. 使用reduceByKey:

    // reduceByKey
    def wordcount3(sc : SparkContext): Unit = {
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_,1))
        val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_+_)
    }
  1. 使用aggregateByKey

    // aggregateByKey
    def wordcount4(sc : SparkContext): Unit = {
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_,1))
        val wordCount: RDD[(String, Int)] = wordOne.aggregateByKey(0)(_+_, _+_)
    }
  1. 使用foldByKey:

    // foldByKey
    def wordcount5(sc : SparkContext): Unit = {
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_,1))
        val wordCount: RDD[(String, Int)] = wordOne.foldByKey(0)(_+_)
    }
  1. 使用combineByKey:

    // combineByKey
    def wordcount6(sc : SparkContext): Unit = {
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_,1))
        val wordCount: RDD[(String, Int)] = wordOne.combineByKey(
            v=>v,
            (x:Int, y) => x + y,
            (x:Int, y:Int) => x + y
        )
    }
  1. 使用countByKey:

    // countByKey
    def wordcount7(sc : SparkContext): Unit = {
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_,1))
        val wordCount: collection.Map[String, Long] = wordOne.countByKey()
    }
  1. 使用countByValue:

    // countByValue
    def wordcount8(sc : SparkContext): Unit = {
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))
        val wordCount: collection.Map[String, Long] = words.countByValue()
    }
  1. 使用reduce:

    def wordcount91011(sc : SparkContext): Unit = {
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))

        // 【(word, count),(word, count)】
        // word => Map[(word,1)]
        val mapWord = words.map(
            word => {
                mutable.Map[String, Long]((word,1))
            }
        )

       val wordCount = mapWord.reduce(
            (map1, map2) => {
                map2.foreach{
                    case (word, count) => {
                        val newCount = map1.getOrElse(word, 0L) + count
                        map1.update(word, newCount)
                    }
                }
                map1
            }
        )

        println(wordCount)
    }

2. 序列化

算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。

  • RDD序列化

案例:

object Spark01_RDD_Serial {

    def main(args: Array[String]): Unit = {
        val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
        val sc = new SparkContext(sparConf)

        val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))

        val search = new Search("h")

        //search.getMatch1(rdd).collect().foreach(println)
        search.getMatch2(rdd).collect().foreach(println)

        sc.stop()
    }
    // 查询对象
    // 类的构造参数其实是类的属性, 构造参数需要进行闭包检测,其实就等同于类进行闭包检测
    class Search(query:String){

        def isMatch(s: String): Boolean = {
            s.contains(this.query)
        }

        // 函数序列化案例
        def getMatch1 (rdd: RDD[String]): RDD[String] = {
            rdd.filter(isMatch)
        }

        // 属性序列化案例
        def getMatch2(rdd: RDD[String]): RDD[String] = {
            val s = query
            rdd.filter(x => x.contains(s))
        }
    }
}

输出结果:

image-20240605133427336

  • Kryo序列化框架

Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。

了解一下就行

案例:

 def main(args: Array[String]): Unit = {
 val conf: SparkConf = new SparkConf()
 .setAppName("SerDemo")
 .setMaster("local[*]")
 // 替换默认的序列化机制
 .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
 // 注册需要使用 kryo 序列化的自定义类
 .registerKryoClasses(Array(classOf[Searcher]))
 val sc = new SparkContext(conf)
 val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", 
"atguigu", "hahah"), 2)
 val searcher = new Searcher("hello")
 val result: RDD[String] = searcher.getMatchedRDD1(rdd)
 result.collect.foreach(println)
 }
}
case class Searcher(val query: String) {
 def isMatch(s: String) = {
 s.contains(query)
 }
 def getMatchedRDD1(rdd: RDD[String]) = {
 rdd.filter(isMatch) 
 }
 def getMatchedRDD2(rdd: RDD[String]) = {
 val q = query
 rdd.filter(_.contains(q))
 }

Kryo绕过了Java的序列化机制,Kryo比Java序列化小,适合大数据传输、存储

  • RDD 血缘关系

toDebugString查看血缘关系

image-20240605135251669

多个连续的RDD的依赖关系,称之为血缘关系

演示:

image-20240605135543501

关于如何将RDD间的关系保存下来:

image-20240605135759577

血缘关系演示:

image-20240605140042850

image-20240605140110991

image-20240605140124464

  • RDD的依赖关系

dependencies查看依赖关系

image-20240605140238899

OneToOne依赖(窄依赖)

image-20240605140706460

窄依赖我们形象的比喻为独生子女。

image-20240605141335525

Shuffle依赖(宽依赖):

image-20240605140820212

宽依赖我们形象的比喻为多生。

image-20240605141533442

  • RDD 阶级划分

image-20240605141721424

image-20240605142320693

  • RDD 任务划分

image-20240605142405972

源码演示:

image-20240605142747592

  • RDD 的持久化

这样的复用在底层不是很好用:

image-20240605143051395

image-20240605143137900

应该这样:

image-20240605143221932

image-20240605143241039

image-20240605143253432

放在内存中 mapRDD.cache()

放在磁盘中 mapRDD.persist()

Cache缓存:

image-20240605143410471

  • RDD CheckPoint 检查点

image-20240605143502870

image-20240605143516625

checkpoint 需要落盘,需要指定检查点保存路径

检查点路径保存的文件,当作业执行完毕后,不会被删除

一般保存路径都是在分布式存储系统: HDFS

  • checkpoint、Cache、Persist的区别:

以上三个都可以存储,关于他们的区别:

cache : 将数据临时存储在内存中进行数据重用

会在血缘关系中添加新的依赖。一旦出现问题,可以重新读取数据

persist : 将数据临时存储在硬盘文件中进行数据重用

涉及到磁盘IO,性能较低,但是数据安全

如果作业执行完毕,临时保存的数据文件就会丢失

checkpoint : 将数据长久地保存在磁盘文件中进行数据重用

涉及到磁盘IO,性能较低,但是数据安全

为了保证数据安全,所以一般情况下,会独立执行作业

为了能够提高效率,一般情况下,是需要和cache联合使用

执行过程中,会切断血缘关系,重新建立新的血缘关系

checkpoint等同于改变数据源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/686105.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MySQL第二种实现方式:现在有一个生产计划,甲乙丙3个品类共16个产品,生产时间6天,每天甲品类可以生产1张单,乙3张,丙1张,请用MySQL写出H列的效果

接上篇:链接: 现在有一个生产计划,甲乙丙3个品类共16个产品,生产时间6天,每天甲品类可以生产1张单,乙3张,丙1张,请用MySQL写出H列的效果 第二种写法: -- 使用WITH子句创建CTE WITH…

【Python报错】已解决ImportError: cannot import name ‘mapping‘ from ‘collections‘

成功解决“ImportError: cannot import name ‘mapping’ from ‘collections’”错误的全面指南 成功解决“ImportError: cannot import name ‘mapping’ from ‘collections’”错误的全面指南 一、引言 在Python编程中,当我们尝试从某个模块中导入某个名称时&…

QT学习过程中遇到的问题自记

文章目录 前言问题1问题2问题3 前言 学习QT嵌入式实战开发(从串口通信到JSON通信微课视频版)的过程中遇到的几个小问题 问题1 1.将书中的示例代码导入自己的电脑,然后点击工程进去,不能运行,报错 no kits are enabled for this project… 我…

python pip 安装

如果您不确定pip的安装路径,可以通过以下命令来查询: pip show pip 这个命令会显示pip的详细信息,其中包括pip安装的路径。如果您想修改pip的默认安装路径,可以使用pip的"--target"参数指定目标路径,例如&a…

解决跨域的几种方法

解决跨域的方法主要有以下几种: 1.CORS(跨域资源共享) CORS是一种W3C规范,它定义了一种浏览器和服务器交互的方式来确定是否允许跨源请求。 服务器通过设置响应头Access-Control-Allow-Origin来允许或拒绝跨域请求。例如&#xf…

tsconfig.json和tsconfig.app.json文件解析(vue3+ts+vite)

tsconfig.json {"files": [],"references": [{"path": "./tsconfig.node.json"},{"path": "./tsconfig.app.json"}] }https://www.typescriptlang.org/tsconfig/#files files: 在这个例子中,files 数…

Java数据结构- Map和Set

目录 1. Map和Set2. Map的使用3. Set的使用 1. Map和Set Java中,Map和Set是两个接口,TreeSet、HashSet这两个类实现了Set接口,TreeMap、HashMap这两个类实现了Map接口。 带Tree的这两个类(TreeSet、TreeMap)底层的数…

uniapp封装picker选择器组件,支持关键字查询

CommonPicker.vue组件 路径在 components\CommonPicker.vue <template><view><uni-easyinput v-model"searchQuery" :placeholder"placeholder" /><picker :range"filteredOptions" :range-key"text" v-model&…

六位一线AI工程师总结Agent构建经验,天工SkyAgents的Agent构建实战。

原文链接&#xff1a;&#xff08;更好排版、视频播放、社群交流、最新AI开源项目、AI工具分享都在这个公众号&#xff01;&#xff09; 六位一线AI工程师总结Agent构建经验&#xff0c;天工SkyAgents的Agent构建实战。 &#x1f31f;我们给人类新手明确的目标和具体的计划&am…

浪潮电脑文件消失怎么恢复?原来有这五种方法

无论是工作、学习还是娱乐&#xff0c;电脑都扮演着举足轻重的角色。然而&#xff0c;在使用电脑的过程中&#xff0c;我们有时会遇到一些令人头疼的问题&#xff0c;比如文件突然消失。对于使用浪潮电脑的用户来说&#xff0c;文件消失可能是一个令人焦虑的问题。本文将为您详…

用负载绿原酸的纳米复合水凝胶调节巨噬细胞表型以加速伤口愈合

引用信息 文 章&#xff1a;Modulating macrophage phenotype for accelerated wound healing with chlorogenic acid-loaded nanocomposite hydrogel. 期 刊&#xff1a;Journal of Controlled Release&#xff08;影响因子&#xff1a;10.8&#xff09; 发表时间&a…

ubuntu系统(香蕉派)设置开机自启动

基本使用参考我之前发的&#xff1a;OrangePi AIpro从上手到网站部署使用-CSDN博客 以下介绍两种设置开机自启动的方法&#xff0c;分别对应界面中配置和在命令中配置 方法1&#xff1a; 编辑开启自启动命令 sudo nano /etc/rc.local # 在该文件中添加启动执行命令&#xff…

外企跨国大数据迁移的注意事项

跨国数据迁移&#xff0c;对汽车行业来说&#xff0c;是一桩大事。跨国公司在进行这一操作时&#xff0c;会遇到不少挑战&#xff0c;比如网络延迟、数据安全、成本控制等等。今天&#xff0c;咱们就聊聊跨国大数据迁移中&#xff0c;跨国车企需要留意的几个关键点。 跨国大数据…

主流的单片机语言是 C 吗?是的话为啥不是 C++?

是c&#xff0c;而且可以预见在很长很长一段时间&#xff0c;没有巨大变革的情况下都会是c 商业项目开发光讨论语言特性优劣问题&#xff0c;是非常片面的&#xff0c;所以要看待为什么是c&#xff0c;最主要仍然是从收益和成本上来看。 刚好我有一些资料&#xff0c;是我根据…

通过 SFP 接口实现千兆光纤以太网通信4

Tri Mode Ethernet MAC 与 1G/2.5G Ethernet PCS/PMA or SGMII 的连接 在设计中&#xff0c;需要将 Tri Mode Ethernet MAC 与 1G/2.5G Ethernet PCS/PMA or SGMII 之间通过 GMII 接口互联。Tri Mode Ethernet MAC IP 核的工作时钟源为 1G/2.5G Ethernet PCS/PMA or SGMII …

心理咨询系统|心理咨询系统成品开发功能

心理咨询系统开发后端设计是一个复杂且精细的过程&#xff0c;涉及多个关键领域的专业知识和技术。本文将详细探讨心理咨询系统开发后端设计的各个方面&#xff0c;包括系统架构、数据库设计、接口开发、安全性保障以及性能优化等。 首先&#xff0c;我们来谈谈系统架构。在心理…

指针还是学不会?跟着小代老师学,进入深入理解指针(4)

指针还是学不会&#xff1f;跟着小代老师学&#xff0c;进入深入理解指针&#xff08;4&#xff09; 1回调函数2qsort使用举例2.1使用qsort函数排序整行数据2.2使用qsort排序结构体数据 3qsort函数的模拟实现 1回调函数 回调函数就是一个通过函数指针调用的函数。 如果你把函数…

【窗口函数的详细使用】

前言&#xff1a; &#x1f49e;&#x1f49e;大家好&#xff0c;我是书生♡&#xff0c;今天主要和大家分享一下可MySQL中的窗口函数的概念&#xff0c;语法以及常用的窗口函数,希望对大家有所帮助。感谢大家关注点赞。 &#x1f49e;&#x1f49e;前路漫漫&#xff0c;希望大…

【TS】进阶

一、类型别名 类型别名用来给一个类型起个新名字。 type s string; let str: s "123";type NameResolver () > string;: // 定义了一个类型别名NameResolver&#xff0c;它是一个函数类型。这个函数没有参数&#xff0c;返回值类型为string。这意味着任何被…

【轻量化】YOLOv10: Real-Time End-to-End Object Detection

论文题目&#xff1a;YOLOv10: Real-Time End-to-End Object Detection 研究单位&#xff1a;清华大学 论文链接&#xff1a;http://arxiv.org/abs/2405.14458 代码链接&#xff1a;https://github.com/THU-MIG/yolov10 推荐测试博客&#xff1a;YOLOv10最全使用教程&#xff0…