Spark 核心编程之 RDD 介绍

一、Spark 分布式计算模拟

Driver 端将数据拆分成 n 个 Task 发送给 Executor,n 为 Executor 个数,Task 包含数据和计算逻辑,Executor 接收到 Task 后进行计算并将计算后的结果返回给 Driver

在这里插入图片描述

  • 定义封装整体数据和逻辑的资源类

    class Resource extends Serializable {
        // 数据集
        val datas: List[Int] = List(1, 2, 3, 4)
        
        // 计算逻辑
        val logic: Int => Int = _ * 2
    }
    
  • 定义 Task 类

    class Task extends Serializable {
        // 持有的数据
        var data: List[Int] = _
        
        // 持有的逻辑
        var logic: Int => Int = _
        
        // 计算方法
        def compute() = {
            data.map(logic)
        }
    }
    
  • 定义 Driver 类

    /*
    	负责与 Executor 通信并将准备好的 Task 发送给 Executor
    */
    object Driver {
        def main(args: Array[String]): Unit = {
            // 1.建立与 Executor 的连接
            val client1 = new Socket("localhost", 8888)
            val client2 = new Socket("localhost", 9999)
            
            // 2.封装 Task
            val resource = new Resource()
            val task1 = new Task()
            task1.data = resource.datas.take(2)
            task1.logic = resource.logic
            
            val task2 = new Task()
            task2.data = resource.datas.takeRight(2)
            task2.logic = resource.logic
            
            // 3.发送 Task
            val objOut1 = new ObjectOutputStream(client1.getOutputStream)
            objOut1.writeObject(task1)
            objOut1.close()
            objOut1.flush()
            client1.close()
            
            val objOut2 = new ObjectOutputStream(client2.getOutputStream)
            objOut2.writeObject(task2)
            objOut2.close()
            objOut2.flush()
            client2.close()
            
            println("客户端数据发送完毕")
            
        }
    }
    
  • 定义两个 Executor 类

    /*
    	负责接收 Driver 发送过来的 Task 并计算出结果
    */
    object Executor1 {
        def main(args: Array[String]): Unit = {
            // 1.启动服务并等待客户端连接
            val server = new ServerSocket(8888)
            println("服务端[8888]启动,等待客户端连接...")
            val client = server.accept()
            
            // 2.接收 Task
            val objIn = new ObjectInputStream(client.getInputStream)
            val task = objIn.readObject()
            
            // 3.执行计算并输出结果
            val result = task.compute()
            println("Executor[8888]计算结果为:" + result)
            
            objIn.close()
            client.close()
            server.close()
            
        }
    }
    
    /*
    	负责接收 Driver 发送过来的 Task 并计算出结果
    */
    object Executor2 {
        def main(args: Array[String]): Unit = {
            // 1.启动服务并等待客户端连接
            val server = new ServerSocket(9999)
            println("服务端[9999]启动,等待客户端连接...")
            val client = server.accept()
            
            // 2.接收 Task
            val objIn = new ObjectInputStream(client.getInputStream)
            val task = objIn.readObject()
            
            // 3.执行计算并输出结果
            val result = task.compute()
            println("Executor[9999]计算结果为:" + result)
            
            objIn.close()
            client.close()
            server.close()
            
        }
    }
    

二、RDD 介绍

Resilient Distributed Dataset,简称 RDD,弹性分布式数据集

1. 概念

  • RDD 是 Spark 中最基本的数据处理模型,在代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合
  • 弹性的特点:
    • 存储的弹性:内存与磁盘的自动切换
    • 容错的弹性:数据丢失可以自动恢复
    • 计算的弹性:计算出错重试机制
    • 分片的弹性:可根据需要重新分片
  • 分布式:数据存储在大数据集群不同节点上
  • 数据集:RDD 封装了计算逻辑,并不保存数据
  • 数据抽象:RDD 是一个抽象类,需要子类具体实现
  • 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑
  • 可分区、并行计算

2. 实现原理

2.1 IO 的基本实现原理

IO 的实现体现了装饰者设计模式思想,实现了对类的功能的增强

  • 字节流

    InputStream in = new FileInputStream("filePath");
    int i = -1;
    while((i = in.read()) != -1) { // 每读取一个字节就打印输出一个字节
        System.out.println(i);
    }
    

    在这里插入图片描述

  • 缓冲字节流

    InputStream in = new BufferedInputStream(new FileInputStream("filePath"));
    int i = -1;
    while((i = in.read()) != -1) { // 每读取一个字节就放进缓存中,当超过缓存阈值就全部输出
        System.out.println(i);
    }
    

    在这里插入图片描述

  • 字符流

    Reader in = new BufferedReader(new InputStreamReader(new FileInputStream("filePath"), "UTF-8"));
    
    String s = null;
    while((s = in.readLine()) != null) { // 每读取一个字节就放入转换区,满足大小就转换成一个字符放入缓存区,超过缓存区阈值就将全部字符输出
        System.out.println(s);
    }
    

    在这里插入图片描述

2.2 RDD 与 IO 的关系
// 以 wordCount 案例
val lines: RDD[String] = sc.textFile("datas")
val words: RDD[String] = lines.flatMap(_.split(" "))
val wordToOne: RDD[(String, Int)] = words.map((_, 1))
val wordToCount: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
val wordArray: Array[(String, Int)] = wordToCount.collect()
wordArray.foreach(println)

在这里插入图片描述

  • RDD 的数据处理方式类似于 IO 流,也包含了装饰者设计模式思想
  • RDD 的数据只有在调用 collect 方法时才会真正地执行业务逻辑操作,之前的操作都是对 RDD 功能的扩展
  • RDD 是不保存数据的,但是 IO 流的缓存区可以临时保存一部分数据

3. 核心属性

  • 分区列表:用于执行任务时并行计算,是实现分布式计算的重要属性

    protected def getPartitions: Array[Partition]
    
  • 分区计算函数:Spark 在计算时,是使用分区函数对每一个分区进行计算

    def compute(split: Partition, context: TaskContext): Iterator[T]
    
  • RDD 之间的依赖关系:RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系

    protected def getDependencies: Seq[Dependency[_]] = deps
    
  • 分区器:可选,当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区

    @transient val partitioner: Option[Partitioner] = None
    
  • 首选位置:可选,计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算,可以判断发送到哪个节点计算效率最优

    protected def getPreferredLocations(split: Partition): Seq[String] = Nil
    
    

4. 执行原理

在这里插入图片描述

  • 启动 Yarn 集群环境(ResourceManager 和 NodeManager)
  • Spark 通过申请资源创建调度节点(Driver)和计算节点(Executor)
  • Spark 框架根据需求将计算逻辑根据分区划分成不同的任务(task)
  • 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
  • 总结:RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给 Executor 节点执行计算

5. RDD 创建

object TestRDDCreate {
    def main(args: Array[String]): Unit = {
        // 1.创建 spark 连接对象
        val sparkConf = new SparkConf.setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(sparkConf)
        
        // 2.创建 RDD
        // 2.1 从集合(内存)中创建 RDD:parallelize() 和 makeRDD()
        val seq: Seq[Int] = Seq[Int](1,2,3,4)
        val rdd1: RDD[Int] = sc.parallelize(seq)
        val rdd2: RDD[Int] = sc.makeRDD(seq) // 推荐,makeRDD底层实现是调用parallelize方法
        
        rdd1.collect.foreach(println)
        rdd2.collect.foreach(println)
        
        println("=================")
        
        // 2.2 从外部存储(文件)创建 RDD:本地的文件系统、Hadoop支持的数据集(如HDFS、HBase等)
        // 2.2.1 使用本地文件的绝对路径或相对路径(相对于项目的根目录)创建
        val rdd3: RDD[String] = sc.textFile("D:\\data\\1.txt")
        // val rdd3: RDD[String] = sc.textFile("input/1.txt")
        
        // 2.2.2 使用目录创建(读取目录下所有文件)
        val rdd4 = sc.textFile("D:\\data") // 以行为单位读取,结果是内容字符串
        val rdd41 = sc.wholeTextFiles("D:\\data") // 以文件为单位读取,结果是二元组,第一个元素为文件路径,第二个元素为文件内容
        
        // 2.2.3 使用路径通配符创建
        val rdd5 = sc.textFile("input/1*.txt")
        
        // 2.2.4 使用分布式文件系统路径
        val rdd6 = sc.textFile("hdfs://hadoop102:8020/data/word.txt")
        
        println("=================")
        
        // 2.3 从其他 RDD 创建:通过一个 RDD 运算完后,再产生新的 RDD
        // 2.4 使用 new 直接创建 RDD:一般由 Spark 框架自身使用
        
        // 3.关闭连接
        sc.stop()
    }
}

6. RDD 并行度与分区

6.1 分区规则
  • RDD 有分区列表属性,可以将一个作业切分成多个任务后,发送给不同的 Executor 节点并行计算,而能够同时计算的任务数量称之为并行度

  • RDD 在创建时可以指定分区个数

    // makeRDD 方法的第二个参数表示分区的数量,其默认值为 defaultParallelism 方法的返回值;
    // defaultParallelism 方法底层最终执行返回的是 scheduler.conf.getInt("spark.default.parallelism", totalCores) 的返回值;
    // 首先会在 SparkConf 中获取 key="spark.default.parallelism" 的值,获取到则返回;如果获取不到则返回 totalCores 的值,即当前运行环境(机器)的最大可用 CPU 核数
    
    val rdd1 = sc.makeRDD(List(1,2,3,4), 2)
    
    // 将数据保存成对应个数的分区文件
    rdd1.saveAsTextFile("output1") // 目录下有 2 个分区文件
    
    
    // textFile 方法的第二个参数表示最小的分区数量;其默认值为 defaultMinPartitions 方法的返回值;
    // defaultMinPartitions 方法实现为:math.min(defaultParallelism, 2),如果没指定 "spark.default.parallelism" 的值,则最小分区数为 2
    // 可以通过第二个参数指定最小分区数
    // 由于 spark 读取文件底层使用的是 Hadoop 的 TextInputFormat,所以其分区计算使用的是 TextInputFormat 的 getSplits 方法:
    // 首先获取文件的总字节数:totalSize (如 7 字节);再根据指定的最小分区数计算出每个分区存储的字节大小:goalSize = totalSize/(numSplits == 0 ? 1 : numSplits) = 7/2 = 3;再根据 1.1 倍原则判断剩余的大小是否需要创建新分区:7 - 7/3 = 1,由于 1/3 + 1 > 1.1,所以需要创建新分区,即 7/3 + 1 = 3 个分区 
    val rdd2 = sc.textFile("input/1.txt", 2)
    
    rdd2.saveAsTextFile("output2") // 目录下有 3 个分区文件
    
  • 建立 Spark 连接时可以指定并行度配置

    // local 表示单核运行;local[n] 表示指定核数运行;local[*] 表示最大核数运行
    val sparkConf = new SparkConf.setMaster("local[*]").setAppName("spark")
    sparkConf.set("spark.default.parallelism", "5")
    
    val sc = new SparkContext(sparkConf)
    
6.2 分区数据分配规则
  • makeRDD 创建:集合(内存)数据

    // 以 5 个元素的集合和 3 个分区创建 RDD
    val list = List(1,2,3,4,5)
    val rdd = sc.makeRDD(list, 3)
    
    // makeRDD 底层调用 parallelize
    parallelize(list, 3)
    
    // parallelize 中创建 ParallelCollectionRDD
    new ParallelCollectionRDD(.., list, 3, ..)
    
    // ParallelCollectionRDD 中有核心属性分区列表
    def getPartitions: Array[Partition] = {
        // 调用 slice 方法
        slice(list, 3)
    }
    
    // slice 方法中有分配的核心方法 positions
    def positions(length: Int, numSlice: Int): Iterator[(Int,Int)] = { // 5, 3
        (0 until numSlices).iterator.map { // (0, 1, 2)
            i => { // 0 -- 1 -- 2
                val start = ((i * length) / numSlices).toInt // 0 -- 1 -- 3
                val end = (((i + 1) * length) / numSlices).toInt // 1 -- 3 -- 5
                (start, end) // (0,1) -- (1,3) -- (3,5)
            }
        }
    }
    
    // slice 中会对数据集进行类型模式匹配判断
    case _ => { 
        // 调用 position 方法再映射
        positions(list.toArray.length, 3).map { // 5, 3
            case (start,end) => list.toArray.slice(start, end) // slice(from,until)
            // (0,1) -> (1,2,3,4,5) -> 1
            // (1,3) -> (1,2,3,4,5) -> 2,3
            // (3,5) -> (1,2,3,4,5) -> 4,5
        }
    }
    
    // 结论:3 个分区文件中的数据分配:【1】,【2,3】,【4,5】
    
  • textFile 创建:文件数据

    /**
    	读取的文件内容:1.txt,@表示换行符的位置
    	1@@
    	2@@
    	3
    */
    // 创建文件读取的 RDD
    val rdd = sc.textFile("input/1.txt", 2) // 由于文件为 7 字节,所以分区数为 3,每个分区存储 3 字节
    
    // 1.Spark文件读取是使用 hadoop 的方式以行为单位读取,与字节数无关
    // 2.Spark是以偏移量的形式来读取一行数据,且偏移量不会被重复读取
    /*
    	1@@ -> 偏移量:012
    	2@@ -> 偏移量:345
    	3 -> 偏移量:6
    */
    // 3.每个分区所包含的偏移量范围:(起始偏移量 ~ 起始偏移量 + 分区字节数)
    /*
    	分区0:[0 - 0 + 3 = 3]  -> 1@@2
    	分区1:[3 - 3 + 3 = 6]    -> 3
    	分区2:[6 - 6 + 3 = 9]    -> 
    */
    
    // 结论:3 个分区文件中的数据分配:【1, 2】,【3】,【】
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/664873.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

高性能服务器网络模型详解

1999年Dan Kegel在发表的论文中提出了The C10K problem,这篇论文对传统服务器架构处理大规模并发连接时的挑战进行了详细描述,并提出了一些解决方案和优化技术。这里的C指的是Concurrent(并发)的缩写,C10K问题是指怎么在单台服务器上并发一万…

buidldroot musl uclib库 编译

buildroot 修改 编译工具链 原本编译器相关信息: Incorrect selection of the C library buidroot编译 注意相关选项,后续使用CUSTOM TOOLCHAIN 时对应 UCLIB 能将生成IMAGE 从2.9K变为2.3K MUSL 能将生成IMAGE 从2.9K变为2.7K 变大了 arm-linux-…

【运维项目经历|025】企业高效邮件系统部署与运维项目

目录 项目名称 项目背景 项目目标 项目成果 我的角色与职责 我主要完成的工作内容 本次项目涉及的技术 本次项目遇到的问题与解决方法 本次项目中可能被面试官问到的问题 问题1:项目周期为多长时间? 问题2:服务器部署架构方式及数量…

ubuntu 18.04 ros1学习

总结了一下,学习内容主要有: 1.ubuntu的基础命令 pwd: 获得当前路径 cd: 进入或者退出一个目录 ls:列举该文件夹下的所有文件名称 mv 移动一个文件到另一个目录中 cp 拷贝一个文件到另一个目录中 rm -r 删除文件 gedit sudo 给予管理员权限 sudo apt-…

uniapp实现图片上传——支持APP、微信小程序

uniapp实现图片、视频上传 文章目录 uniapp实现图片、视频上传效果图组件templatejs 使用 相关文档: 结合 uView 插件 uni.uploadFile 实现 u-upload uploadfile 效果图 组件 简单封装,还有很多属性…,自定义样式等…根据个人所需调整 te…

DNF手游攻略:勇士进阶指南!

在即将到来的6月5日,《DNF手游》将迎来一场盛大的更新,此次更新带来了大量新内容和玩法,极大丰富了游戏的体验。本文将为广大玩家详细解析此次更新的亮点,包括新增的组队挑战玩法“罗特斯入门团本”、新星使宠物的推出、宠物进化功…

ADB日常使用命令

【ADB全称 Android Debug Bridge】 是Android SDK中的一个命令行工具adb命令可以直接操作管理Android模拟器或真实的Android设备(手机) 建立PC和模拟器连接 # 建立连接 adb connect 127.0.1: 模拟器端口号〈逍遥模拟器21503〉 # 验证是否连接成功 adb d…

NFS p.1 服务器的部署以及客户端与服务端的远程挂载

目录 介绍 应用 NFS的工作原理 NFS的使用 步骤 1、两台机子 2、安装 3、配置文件 4、实验 服务端 准备 启动服务: 客户端 准备 步骤 介绍 NFS(Network File System,网络文件系统)是一种古老的用于在UNIX/Linux主…

使用 Apache Commons Exec 管理外部进程

😄 19年之后由于某些原因断更了三年,23年重新扬帆起航,推出更多优质博文,希望大家多多支持~ 🌷 古之立大事者,不惟有超世之才,亦必有坚忍不拔之志 🎐 个人CSND主页——Mi…

基于 Apache Doris 的实时/离线一体化架构,赋能中国联通 5G 全连接工厂解决方案

作者:田向阳,联通西部创新研究院 大数据专家 共创:SelectDB 技术团队 导读: 数据是 5G 全连接工厂的核心要素,为支持全方位的数据收集、存储、分析等工作的高效进行,联通 5G 全连接工厂从典型的 Lambda 架…

使用PNP管控制MCU是否需要复位

这两台用到一款芯片带电池,希望电池还有电芯片在工作的时候插入电源不要给芯片复位,当电池没电,芯片不在工作的时候,插入电源给芯片复位所以使用一个PNP三极管,通过芯片IO控制是否打开复位,当芯片正常工作的…

在长窗口时代,RAG技术是否仍然必要?

自从谷歌推出 Gemini 1.5 Pro,行业内部对于 RAG 的讨论就不绝于耳。 Gemini 1.5 Pro 的性能确实令人瞩目。根据谷歌公布的技术文档,该系统能够稳定处理长达 100 token 的内容,相当于一小时的视频、十一小时的音频、超过三万行的代码或七十万…

Spring Cloud Alibaba-09-Seata分布式事务

Lison <dreamlison163.com>, v1.0.0, 2024.5.03 Spring Cloud Alibaba-09-Seata分布式事务 文章目录 Spring Cloud Alibaba-09-Seata分布式事务分布式事务基础事务本地事务分布式事务分布式事务的场景 分布式事务的解决方案全局事务可靠消息服务最大努力通知TCC事务 Se…

Java实现数据结构---数组

文章目录 概念存储原理数组的操作完整代码 概念 数组是&#xff08;Array&#xff09;是有限个相同类型的变量所组成的有序集合&#xff0c;数组中的每一个变量为称为元素。数组是最简单、最常用的数据结构。 数组下标从零开始。 存储原理 数组用一组连续的内存空间来存储一…

蓝桥杯第17135题 不完整的算式 C++ Java Python

目录 题目 思路和解题方法 步骤 1&#xff1a;识别缺失的部分 步骤 2&#xff1a;根据已知条件计算或推断 步骤 3&#xff1a;处理特殊情况和验证 c 代码 Java 版本 Python 版本&#xff08;仅供参考&#xff09; 代码和解题细节&#xff1a; 题目 题目链接&#xff…

STM32自己从零开始实操03:输出部分原理图

一、继电器电路 1.1指路 延续使用 JZC-33F-012-ZS3 继电器&#xff0c;设计出以小电流撬动大电流的继电器电路。 &#xff08;提示&#xff09;电路需要包含&#xff1a;三极管开关电路、续流二极管、滤波电容、指示灯、输出部分。 1.2数据手册重要信息提炼 联系排列&…

神经网络与深度学习——第3章 线性模型

本文讨论的内容参考自《神经网络与深度学习》https://nndl.github.io/ 第3章 线性模型 线性模型 线性模型&#xff08;Linear Model&#xff09;是机器学习中应用最广泛的模型&#xff0c;指通过样本特征的线性组合来进行预测的模型&#xff0c;给定一个 D D D维样本 x [ x …

解锁 GPT-4o 背后数据带来的情绪价值

GPT-4o 可以说已经是一个富有情感、通人性的智能语音助手&#xff0c;或者更准确地说&#xff0c;是一个越来越接近人类交互的 “新物种”。这个强大的模型同时具备文本、图片、视频和语音理解和合成方面的能力&#xff0c;甚至可以被视为 GPT-5 的一个未完成版。 01 富有情感的…

lipo制作通用版本静态库

文章目录 目的了解多架构的maclipo如何利用lipo编译通用版本静态库lipo 命令整理扩展目的 主要是使用lipo命令在macOS上创建通用版本的静态库(.a文件),来支持多种架构,如arm64,x86_64。 学习目的: 了解mac 不同架构arm64, x86_64了解lipo命令了解多架构的mac 随着appl…

Linux - 文件管理高级1

0.管道 | 将前面命令的标准输出传递给管道作为后面的标准输入 1.文件查找 find find 进行文件查找时&#xff0c;默认进行递归查找&#xff0c;会查找隐藏目录下的文件 1.1 用法 # find 查找路径 查找条件... -type // 文件类型 f 普通文件 b 设备 d …