二刷大数据(二)- Spark

目录

  • Spark
    • Hadoop区别
    • 核心组件
    • 运行架构
      • Master&Worker
      • Application (Driver)
      • Executor
    • RDD
      • 概念
      • yarn下工作原理
      • 算子
      • 依赖血缘关系
      • 阶段划分
      • 广播变量
    • shuffle流程
    • SparkSQL
      • DataSet、DataFrame、RDD
      • 相互转换
    • SparkStreaming

Spark

Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎

Hadoop区别

Spark和Hadoop是两种广泛应用于大数据处理领域的框架,它们各有特点和适用场景,主要区别可以从以下几个方面进行概述:

  1. 数据处理模型

    • Hadoop:采用批处理模型,通过其核心组件MapReduce实现数据处理。MapReduce将数据分解成多个数据块,分别在集群节点上进行并行处理,然后将结果合并。这一过程涉及磁盘I/O,包括读取输入数据、中间结果写入磁盘、最终结果写回磁盘,因此对于需要大量磁盘读写交互的复杂迭代任务,效率相对较低。

    • Spark:基于内存计算模型,利用RDD(Resilient Distributed Datasets)作为核心数据结构,能够在内存中存储和计算数据,显著减少了对磁盘的依赖。Spark支持微批处理和流处理(通过Spark Streaming),能够对数据进行快速迭代和实时分析。内存中的数据分析速度相比Hadoop的磁盘计算有数量级的提升。

  2. 执行速度与性能

    • Hadoop:由于依赖磁盘I/O,Hadoop在处理大规模数据时虽然具备良好的可扩展性和容错性,但整体执行速度相对较慢,尤其不适合需要多次迭代或实时响应的应用。

    • Spark:由于在内存中进行计算,并且优化了数据管道和任务调度,Spark在处理相同任务时通常比Hadoop快几个数量级,特别是在处理需要多次访问同一数据集的迭代算法或交互式查询时。对于需要低延迟响应的流处理应用,Spark Streaming也提供了更高的性能。

  3. 编程模型与易用性

    • Hadoop:MapReduce编程模型相对较为繁琐,需要定义map()reduce()函数,且抽象层次较低,对于复杂数据处理逻辑可能需要多步MapReduce作业链式执行。

    • Spark:提供了更高级的API和丰富的库,如Spark SQL(支持SQL查询和DataFrame API)、Spark Streaming(流处理)、MLlib(机器学习)、GraphX(图计算)。这些库提供了更直观、面向对象的编程接口,使得开发者可以使用更简洁、高层的代码来表达复杂的计算逻辑,提高了开发效率和代码可读性。

  4. 资源管理与部署

    • Hadoop:包含YARN(Yet Another Resource Negotiator)作为资源管理系统,负责集群中任务的调度与监控。Hadoop生态系统还包括HDFS(Hadoop Distributed File System)作为分布式存储系统,以及HBaseHive等其他数据存储与查询组件。

    • Spark:虽然可以独立部署,但通常与Hadoop YARN或其它资源管理系统(如Mesos、Kubernetes)配合使用,充分利用已有的集群资源。Spark可以无缝读写HDFS上的数据,并且在YARN上进行任务调度。这种兼容性使得Spark可以作为Hadoop生态的一部分,补充其在实时计算和内存处理方面的不足。

  5. 应用场景

    • Hadoop:更适合处理海量静态数据的离线批处理任务,如日志分析、历史数据挖掘、大规模ETL(Extract, Transform, Load)等,以及需要高容错性和低成本存储的场景。

    • Spark:适用于需要快速响应、多次迭代、交互式查询和实时流处理的场景,如实时推荐系统、复杂事件处理、大规模机器学习训练与预测、即席查询等。

总结来说,Hadoop和Spark在大数据处理中扮演着互补的角色。Hadoop作为分布式存储和批处理的基础架构,擅长处理大规模静态数据和离线分析任务;而Spark凭借其内存计算、高效的数据处理管道和丰富的库支持,特别适合于需要高性能、低延迟和复杂分析的应用场景。在实际项目中,两者常被结合使用,利用Hadoop进行数据存储和初步处理,再通过Spark进行深度分析和实时计算。

核心组件

Apache Spark 是一个统一的大数据处理框架,以其高效、易用和灵活的特点而广受欢迎。Spark 的组成主要包括以下几个关键模块:

  1. Spark Core

    • 核心引擎:Spark Core 提供了基本的分布式任务调度、内存管理、错误恢复、I/O 接口以及度量收集等功能,构成了整个 Spark 框架的基础。它实现了 Spark 的基本分布式计算模型,包括任务调度、数据分片(partitioning)、任务执行和结果聚合等。

    • RDD(Resilient Distributed Datasets):Spark Core 中最重要的抽象是 RDD,这是一种容错的、可以并行操作的元素集合。RDD 支持两种类型的操作:转换(transformations)和行动(actions)。转换操作创建新的 RDD,而行动操作触发实际的计算并返回结果。RDD 具有容错性,如果数据丢失,可以通过记录的 lineage(血统)信息重新计算。

  2. Spark SQL

    • 结构化数据处理:Spark SQL 扩展了 Spark Core,提供了对结构化数据(如关系型数据表)的高效处理能力。它引入了 DataFrame 和 Dataset API,允许用户使用 SQL 查询语句或者面向对象的方式操作数据。Spark SQL 还包含了 Catalyst 查询优化器,能对 SQL 查询进行高级优化,并与 Hive Metastore 兼容,支持 HiveQL 查询和元数据管理。

    • Structured Streaming:基于 Spark SQL,Structured Streaming 提供了对无界和有界数据流的统一处理接口,允许用户以类似处理静态数据的方式来编写流处理程序。Structured Streaming 强调端到端的精确一次处理语义,以及与批处理一致的 API,简化了流批一体的应用开发。

  3. Spark Streaming

    • 实时流处理:Spark Streaming 是 Spark 对实时数据处理的支持,它将实时数据流划分为一系列小的批次(微批处理),并在 Spark 的批处理引擎上进行处理。Spark Streaming 提供了高吞吐、容错机制以及与 Spark Core、Spark SQL 和 MLlib 等其他组件的无缝集成,便于构建复杂的实时分析应用。
  4. MLlib

    • 机器学习库:MLlib 是 Spark 的机器学习库,包含了大量常用的机器学习算法、实用工具和数据类型,支持分类、回归、聚类、协同过滤、降维等任务。MLlib 支持在分布式环境中高效地训练模型,并且与 Spark 的分布式数据集紧密集成,便于在大规模数据集上进行机器学习。
  5. GraphX

    • 图计算:GraphX 是 Spark 中用于图处理和图并行计算的库,提供了图数据模型、图操作(如顶点和边的变换)、图算法(如PageRank、连通分量、三角计数等)以及图形化的可视化工具。GraphX 将图抽象为顶点(Vertex)和边(Edge)的集合,并支持属性图模型,允许在顶点和边上附加任意属性。

除了以上主要模块,Spark 还包括以下组成部分:

  • 运行模式:Spark 支持多种运行模式,适应不同的部署环境,包括:

    • 本地模式:在单机上运行,用于开发和测试。
    • Standalone 模式:使用 Spark 自身提供的内置资源管理框架,在独立集群中部署。
    • Hadoop YARN:在 Hadoop 集群中运行,利用 YARN 进行资源管理和调度。
    • Apache Mesos:在 Mesos 集群中运行,实现资源共享。
    • Kubernetes:在 Kubernetes 容器平台上部署 Spark 应用。
  • 集群管理器与守护进程:Spark 集群中包含主守护程序(Driver)和辅助进程(Executor)。主守护程序负责应用程序的初始化、任务调度、结果收集等,而辅助进程负责在工作节点上执行具体任务。

  • 连接器与数据源:Spark 提供了与多种数据源(如 HDFS、Cassandra、HBase、Kafka 等)的连接器,使得 Spark 应用能够轻松访问和处理这些数据源中的数据。

综上所述,Spark 由 Spark Core、Spark SQL、Spark Streaming、MLlib 和 GraphX 等核心模块组成,支持结构化数据处理、实时流处理、机器学习和图计算等多种大数据处理场景,并能够灵活部署在不同类型的集群环境中。

运行架构

Spark运行架构中,各个组件如Master、Application、Executor、Driver等各司其职,共同协作完成大数据处理任务。以下是它们各自的分工和作用:

Master&Worker

Master是Spark集群的主控节点,类似YARN的RM,负责整个集群的资源管理和作业调度。具体职责包括:

  1. 资源管理:Master监控集群中所有可用的工作节点(Worker Nodes),了解它们的内存、CPU等资源情况,并负责这些资源的分配。

  2. 作业调度:当接收到客户端提交的Spark应用程序(Application)时,Master根据应用程序的需求和当前集群资源状况,决定在哪些Worker Node上启动Executor,并为每个Executor分配合适的资源。

  3. 监控与协调:Master持续监控作业执行状态,包括Executor的生命周期管理、任务失败后的重试等。它还接收来自Worker和Driver的定期心跳,以确保集群的稳定运行和故障检测。

  4. 服务发现:Master提供服务注册和发现功能,使得Driver能够找到并与其管理的Executors通信。

Mater和Worker都是进程,类似于ResourceManager和NodeManager,运行在某个节点上

Application (Driver)

Application是指用户提交的Spark作业,由一个Driver程序代表。Driver是应用程序的主控进程,承担以下职责:

  1. 初始化:Driver启动时,会创建一个SparkContext对象,这是与Spark集群交互的核心接口。SparkContext负责与Master节点建立连接,提交应用程序及其资源需求。

  2. 任务解析与调度:Driver解析用户编写的Spark程序(使用RDD、DataFrame、Dataset等API编写),将其转化为一系列Jobs,每个Job又进一步细分为多个Stages(基于 Shuffle 操作划分)。Driver中的DAGScheduler负责将Job分解为Stage,并提交给TaskScheduler,后者负责将Stages转化为具体的Tasks并发送到各个Executor执行。

  3. 数据分发与结果收集:Driver负责将计算所需的数据分布到各个Executor上,并在Executor执行完Tasks后收集结果。对于Shuffle操作,Driver还会协调跨Executor之间的数据交换。

  4. 故障恢复:在出现Executor故障或其他异常时,Driver负责重新调度受影响的任务,以保证作业的容错性。

Executor

Executor是Worker Node上为特定Application启动的进程,负责实际的数据处理工作。每个Executor维护一个或多个线程池,每个线程可以执行一个Task。其主要职责包括:

  1. Task执行:Executor接收来自Driver的任务指令,将其分配给线程池中的线程执行。每个Task对应Spark程序中的一个计算单元。

  2. 数据缓存与计算:Executor根据Driver的指示,可以在内存或磁盘上缓存数据,以加速后续计算。它执行诸如map、filter、join等操作,并在必要时与本地或其他Executor交换数据。

  3. 数据本地性优化:Executor尽可能地在存储数据的同节点上执行相关任务,利用数据本地性原则减少网络传输,提高性能。

  4. 结果反馈:Executor将任务执行的结果返回给Driver,或者根据Driver的指令更新缓存或持久化数据。

在Spark运行架构中,Master作为集群的管理者,负责全局资源调度和作业监控。Application(由Driver代表)提交作业请求、解析作业逻辑、生成执行计划,并与Master交互获取资源。Executor作为Application在Worker节点上的代理,实际执行任务并处理数据,同时与Driver保持通信以报告进度和结果。各组件间通过网络通信协作,共同实现高效、可靠的大数据处理。

RDD

概念

RDD(Resilient Distributed Datasets,弹性
分布式数据集)是Apache Spark的核心数据抽象,它以一种容错、可并行操作的元素集合形式表示数据。以下是RDD的工作方式概述:

  1. 创建
    RDD可以通过以下方式创建:
  • 从外部数据源:读取本地文件系统、HDFS、Cassandra、HBase等存储系统中的数据,通过SparkContext提供的API(如textFile()parquetFile()等)创建RDD。
  • 从现有RDD:通过对现有RDD应用转换操作(如map()filter()flatMap()等)生成新的RDD。
  • 从并行集合:直接从Spark上下文(SparkContext)中创建,如通过parallelize()方法将本地集合转化为分布式RDD。
  • 不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑
  1. 分区与分布
  • 分区:RDD被划分为多个逻辑分区(partitions),每个分区包含一部分数据。分区决定了数据如何在集群中分布和并行处理。分区的数量和数据分布策略对Spark的并行性和性能有直接影响。执行并行计算

  • 分布:Spark根据数据分区将RDD分布在集群的不同工作节点(Worker Nodes)上。每个分区通常对应一个Executor上的一个任务(Task)。这样设计有助于实现数据本地性,即任务尽量在数据所在节点上执行,减少网络通信开销。

  • 移动数据❌移动计算✅

  1. 转换与行动
  • 转换操作(Transformations):对RDD进行诸如map()filter()join()等操作,生成新的RDD。转换操作不会立即执行,仅记录操作逻辑和依赖关系,形成一个DAG(Directed Acyclic Graph,有向无环图),描述了从原始数据到最终结果的计算步骤。

  • 行动操作(Actions):如count()collect()saveAsTextFile()等,触发实际的计算。行动操作会触发DAG的执行计划生成,Spark根据DAG确定任务的调度和执行顺序,将任务提交给Executor执行,并将结果返回给Driver或写入外部存储。

  1. 计算执行
  • 任务调度:Spark的DAGScheduler将DAG分解为一系列阶段(Stage),每个阶段内部的任务可以并行执行,阶段之间可能存在依赖关系。TaskScheduler将这些任务分配到各个Executor上执行。

  • 任务执行:Executor接收到来自Driver的任务后,在本地线程池中执行任务。任务通常在数据本地的节点上运行,以利用数据本地性。任务执行过程中,可能会涉及到内存与磁盘之间的数据交换,以及跨节点的数据传输(如Shuffle操作)。

  1. 缓存与容错
  • 缓存:通过cache()persist()方法,可以将RDD的分区数据保存在内存、硬盘或二者结合的存储介质中,以加快后续计算。Spark会自动管理缓存数据的生命周期和空间,当内存不足时,可能将部分数据溢写到硬盘。

  • 容错:RDD通过**Lineage(血统)**机制实现容错。每个RDD记录了其由哪些父RDD经过何种转换操作生成的完整历史。当某个分区数据丢失时,Spark可以根据Lineage信息重新计算该分区数据,无需重算整个RDD。这种机制使得Spark能在发生故障时仍能保证数据计算的正确性和完整性。

综上所述,RDD工作方式的核心在于通过转换操作构建计算逻辑的DAG,仅在行动操作时触发实际计算,并利用分区、分布、缓存和容错机制实现高效、弹性的分布式数据处理。 RDD的设计旨在简化并行编程模型,同时利用内存计算和高效的调度策略,显著提升大数据处理性能。

yarn下工作原理

Spark在YARN(Yet Another Resource Negotiator,另一种资源协调者)模式下工作时,Spark应用程序借助YARN作为资源管理和调度平台来运行。以下是在YARN模式下Spark工作原理的详细步骤:

  1. 提交应用程序

用户通过spark-submit脚本提交Spark应用程序,指定关键参数,如--master yarn(表明使用YARN作为集群管理器),--deploy-mode(可以选择clientcluster模式),以及Spark应用程序的jar包、主类、配置参数(如spark.executor.memoryspark.executor.coresnum-executors等)。

  1. 启动客户端/Driver进程
  • Client模式:在client模式下,spark-submit脚本所在的客户端机器上启动Driver进程。Driver负责与YARN交互,提交应用程序并监控其执行。

  • Cluster模式:在cluster模式下,spark-submit向YARN提交请求后即可退出。YARN在集群中启动一个专用的ApplicationMaster(AM)进程作为Driver,负责后续的资源请求和任务调度。

  1. YARN资源申请

    无论是哪种部署模式,接下来的过程相同:

  • Application Submission:Spark Driver(或AM)向YARN的ResourceManager(RM)提交应用程序,其中包括:

    • 应用程序元数据(如名称、队列等)
    • 应用程序主类(在Cluster模式下,即ApplicationMaster的类)
    • 任何必需的依赖(如Spark相关jar包)
    • 配置信息(如executor资源需求)
  • ApplicationMaster启动:RM接收到请求后,会在集群中选择一个NodeManager(NM),为该应用程序分配第一个Container。NM在该Container中启动ApplicationMaster进程。

  1. 资源协商与Executor启动
  • 注册与资源请求:ApplicationMaster(在Cluster模式下即Driver)向RM注册,并开始周期性地请求资源(Container),指定所需内存(spark.executor.memory)和CPU核心数(spark.executor.cores)。

  • 资源分配:RM根据集群资源状况和调度策略,为ApplicationMaster分配所需资源(Container)。当Container分配后,RM通知ApplicationMaster。

  • Executor启动:ApplicationMaster收到Container分配信息后,与对应的NodeManager通信,指示NM在分配的Container中启动Executor进程。Executor启动后,会与Driver建立连接,并注册为可用执行单元。

  1. 任务调度与执行
  • 任务划分:Driver根据用户代码生成DAG(有向无环图),将其划分为多个Stage,并为每个Stage生成TaskSet(一组Task)。这些Task对应于待执行的操作,如Map、Reduce等。

  • 任务分配:Driver将TaskSet发送给注册的Executors。每个Executor接收到任务后,在本地线程中执行。

  • 数据流与通信:执行过程中,Spark利用高效的数据序列化、传输机制以及RDD的Lineage特性来处理数据流和跨节点通信(如Shuffle操作)。Spark还利用数据本地性原则,尽可能让任务在数据所在节点上执行,以减少网络传输。

  1. 监控与容错
  • 监控:Driver持续监控所有Executor的状态,包括任务进度、资源使用情况等。ApplicationMaster也定期向RM汇报应用程序的总体状态。

  • 容错:如果Executor出现故障,Driver能够通过RDD的Lineage信息重新计算丢失的数据分区。此外,YARN自身的容器管理机制也会监测Executor进程的健康状况,必要时重新启动失败的Container。

  • 应用程序结束:当所有任务完成或者用户主动终止应用程序时,Driver向RM注销ApplicationMaster,释放所有已分配的资源。YARN清理与该应用程序相关的资源,结束生命周期。

总结来说,Spark在YARN模式下工作时,Spark应用程序以YARN应用程序的形式运行,利用YARN进行资源管理和调度,而Spark自身专注于任务划分、数据处理和容错机制。这种模式允许Spark无缝地融入Hadoop生态系统,利用YARN的资源隔离和多租户能力,实现高效的分布式计算。

算子

RDD算子主要分为两类:转换(Transformation)算子行动(Action)算子

  • 转换算子
    转换算子不会立即触发计算,而是定义了从一个RDD到另一个RDD的映射关系。它们是惰性求值的,也就是说,当调用转换算子时,Spark不会立即执行计算,而是记录下这个操作及其依赖关系,形成一个计算逻辑图(DAG)。实际计算发生在行动算子触发时。

  • 行动算子
    行动算子触发实际的计算,并将结果返回给Driver程序或写入外部存储系统。它们会触发整个DAG图的执行,直到得到最终结果。行动算子的结果通常是一个非RDD值(如单个数值、统计结果、文件路径等),或者将数据写入外部存储。

特点与特性

  1. 并行计算:算子操作在RDD的各个分区上并行执行,充分利用集群资源。
  2. 延迟计算(Lazy Evaluation):转换算子仅构建计算逻辑图,不立即执行,直到遇到行动算子时才触发实际计算。
  3. 容错性:基于RDD的Lineage(血统)信息,当数据分区丢失时,Spark可以根据算子依赖关系自动重新计算丢失部分。
  4. 可伸缩性:算子设计旨在处理大规模数据集,能够随着集群规模的增加而扩展计算能力。
  5. 优化潜力:Spark通过Catalyst优化器对包含算子的DAG进行优化,如合并连续的map操作、推断常量表达式、优化shuffle阶段等,提高执行效率。
rdd.转换算子.转换算子.转换算子.转换算子.行动算子

依赖血缘关系

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

每一个rdd都保存了一系列依赖关系,当其中一个rdd出错时,可以恢复,但是rdd不能存储数据,rdd实际上就是一串代码

宽窄依赖

  • 窄依赖(1对1依赖)表示每一个父(上游)RDD的Partition最多被子(下游)RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女。
  • 宽依赖(shuffle依赖)表示每一个父(上游)RDD的Partition被子(下游)RDD的多个Partition使用,宽依赖我们形象的比喻为多生子女。

阶段划分

RDD(Resilient Distributed Dataset)的阶段划分是Spark为了有效地调度和执行分布式计算任务而进行的一种逻辑切分。阶段划分基于RDD间的依赖关系和计算模式,旨在最小化数据在不同阶段间的shuffle(重新分区和排序)成本。以下是RDD阶段划分的详细说明:

阶段划分依据

  1. 依赖类型:RDD之间的依赖关系有两种主要类型:
    • 窄依赖(Narrow Dependencies):父RDD的每个分区最多被子RDD的一个分区所依赖。如mapfilterunion等算子产生的依赖属于窄依赖。窄依赖的调度和执行非常高效,因为不需要全局的数据重新分发和排序。

    • 宽依赖(Wide Dependencies,又称Shuffle Dependencies):父RDD的每个分区可能被子RDD的多个分区所依赖,需要进行跨分区的数据重排和重新分区。如reduceByKeygroupByKeyjoin等算子产生的依赖属于宽依赖。宽依赖会导致数据的全局shuffle,通常会触发阶段边界

阶段划分过程

  1. 构建DAG(有向无环图):Spark首先将用户程序中的RDD转换操作(Transformation)串联起来,形成一个表示计算流程的DAG。在这个DAG中,每个节点代表一个RDD,边表示它们之间的依赖关系。

  2. 识别宽依赖:沿着DAG遍历,每当遇到宽依赖时,就将其作为阶段划分的边界。这意味着一个阶段内的所有操作都是窄依赖,而宽依赖标志着阶段之间的界限。

  3. 阶段生成:每个阶段(Stage)包含一个或多个连续的窄依赖操作,以及一个终结于宽依赖的shuffle操作。阶段内的任务可以并行执行,且通常在同一台机器上处理同一分区的数据,有利于数据本地性。

阶段特性

  • Stage ID:每个阶段都有一个唯一的ID,用于标识和跟踪其在整个应用程序中的位置。

  • 任务(Task):每个阶段进一步细分为多个任务,每个任务对应一个父RDD分区的计算。任务是Spark调度的最小单位,由Executor在工作节点上执行。

  • Shuffle Map Stage:包含宽依赖的阶段称为Shuffle Map Stage。这些阶段的最后一步会产生shuffle输出文件,这些文件将作为下一个阶段(通常是Reduce Stage)的输入。

  • Reduce Stage:紧随Shuffle Map Stage之后的阶段通常被称为Reduce Stage,其任务会读取前一阶段产生的shuffle文件,进行聚合或连接等操作。

  • Pipeline执行:对于一个阶段内的多个连续窄依赖操作,Spark会尝试将其合并为一个任务,避免不必要的中间结果写入磁盘,从而形成计算管道,提高执行效率。

阶段划分的意义

  • 高效调度:阶段划分使得Spark能够清晰地识别出哪些操作可以并行执行,哪些操作需要等待shuffle数据准备完成。这种划分有助于优化任务调度,减少不必要的等待时间。

  • 数据流控制:通过阶段划分,Spark可以明确知道哪些数据需要在不同节点间进行shuffle,从而合理安排网络通信和数据分发,避免数据瓶颈。

  • 容错恢复:基于阶段划分,Spark可以利用RDD的Lineage(血统)信息快速重建丢失的分区数据。当某个阶段的任务失败时,只需重新计算该阶段及其依赖的上游阶段,而不是整个DAG。

  • 优化机会:阶段划分有助于Catalyst优化器识别和实施跨阶段的优化策略,如合并连续的map操作、推断常量表达式、优化shuffle阶段等,进一步提升执行效率。

综上所述,RDD的阶段划分是Spark实现高效分布式计算的关键机制之一,它通过识别并分离出具有宽依赖的计算阶段,为任务调度、数据流控制、容错恢复和优化提供了基础。阶段划分使得Spark能够将复杂的计算任务拆解为一系列相互独立、易于管理和优化的执行单元。

广播变量

Spark的广播变量是一种优化机制,用于在分布式计算环境中高效地共享只读数据集。以下是对Spark广播变量的详细解释:

基本概念

广播变量允许将一个在Driver程序中创建的变量(可能是简单数据类型,也可能是较大的数据结构,如数组、列表或映射)广播到Executor节点上。每个Executor只需存储该变量的一份本地副本,而不是在每次任务执行时都由Driver传送给Executor。这样做的主要目的是:

  • 减少网络传输:避免在每个任务启动时重复发送相同的变量副本到各个Executor,尤其是当这个变量较大时,能显著降低网络开销。

  • 提升计算效率:Executor上的所有任务都能快速访问到本地存储的广播变量,无需网络延迟,提高了数据访问速度。

  • 节省内存:每个Executor只需要存储一个副本,相比每个任务都保留一个副本,更有效地利用Executor的内存资源。

使用与工作原理

定义与使用

  1. 在Driver端创建:广播变量在Driver程序中通过SparkContext的broadcast方法创建,将一个普通变量封装为Broadcast对象。

    val sc: SparkContext = ...
    val largeData: Array[Int] = ... // 大型只读数据集
    val broadcastVar = sc.broadcast(largeData)
    
  2. 在Executor端访问:在RDD的操作(如mapfilter等)中,可以通过value属性来访问广播变量的值。

    val processedRDD = someRDD.map { data =>
      val localValue = broadcastVar.value
      // 使用localValue进行计算
      ...
    }
    

工作原理

  • 分发:Driver将广播变量序列化后,将其拆分为多个小块(chunks),并通过高效的TorrentBroadcast算法(类似于P2P下载)分发到各个Executor。Executor可以从Driver或其它Executor节点并行地接收这些块。

  • 存储与缓存:每个Executor接收到广播变量的所有块后,将其反序列化并在本地内存中存储。Spark默认将广播变量的存储级别设置为MEMORY_AND_DISK,意味着如果内存不足,部分或全部数据会被存储到本地磁盘上。Executor的BlockManager负责管理这些块。

  • 访问:Executor上的任务在执行时可以直接从本地内存(或磁盘)中读取广播变量的值,无需通过网络请求Driver或其它Executor。

  • 只读性:广播变量一旦创建并分发,即变为只读状态。任何试图修改广播变量值的操作都会抛出异常。这是为了确保所有Executor上的副本保持一致,并防止潜在的并发问题。

注意事项

  • 数据大小:广播变量适合用于中到大型(几十MB以上)的只读数据集。对于非常小的变量,直接通过闭包传递可能更为高效;而对于特别大的数据,可能需要考虑更高级的数据分片或外部存储方案。

  • 数据不变性:广播变量要求其内容在整个生命周期内保持不可变。这是因为一旦广播出去,所有Executor上的副本即被视为静态数据,任何对原始变量的更改都不会传播到已存在的副本。

  • 序列化与反序列化:广播变量的内容必须是可以序列化和反序列化的。如果包含复杂数据结构,确保其成员变量及嵌套对象也支持序列化。

  • 生命周期管理:Spark会自动管理广播变量的生命周期。当一个广播变量不再被任何任务引用且内存压力需要释放空间时,Spark会将其从Executor的内存中清除。然而,由于广播变量通常存储在磁盘上,即使从内存中删除,仍可重新加载到内存。

应用示例

广播变量常用于以下场景:

  • 全局配置参数:在大规模数据分析中,可能需要将一些全局的配置信息(如阈值、规则列表等)广播到所有Executor,供所有任务统一使用。

  • 查找表:当有大量重复查询(如字典、映射表等)发生时,将这些查找表广播到每个Executor可以避免重复网络传输。

  • 统计摘要:预先计算好的统计数据(如平均值、最大值、直方图等)可以作为广播变量分发,用于后续计算的基准或过滤条件。

总之,Spark的广播变量是一种有效的优化手段,通过在Executor上本地缓存一份只读数据的副本,减少了网络传输,提升了计算性能,尤其适用于需要在分布式计算任务中频繁、全局访问的大型只读数据集。

shuffle流程

Shuffle是Spark中进行数据重新分区和排序的关键过程,它发生在宽依赖(Wide Dependency)的两个阶段之间。Shuffle的主要目的是将前一阶段产生的数据按照特定的键进行重新分布,使得相同键的数据聚集到同一个分区,以便后续阶段进行聚合、join、reduceByKey等操作。以下是Shuffle的工作过程详细说明:

1. 数据产生与分区

**前一阶段(Shuffle Map Stage)**的每个任务(Task)对输入数据进行处理后,生成中间结果,通常以(key, value)对的形式存在。这些对按照预先设定的分区器(Partitioner)被分配到不同的输出分区(Output Partition)。输出分区的个数通常与下一阶段的并行度相匹配。

2. 数据排序与溢写

每个任务在本地内存中为每个输出分区维护一个缓冲区(Buffer)。当缓冲区数据达到一定阈值(可通过配置调整)时,触发以下操作:

  • 排序:对缓冲区内(key, value)对按key进行排序。排序是Shuffle过程中的重要环节,确保相同key的数据在写入磁盘前已局部有序,有利于后续的聚合操作。

  • 溢写(Spill):将排序后的数据写入本地磁盘上的临时文件(Spill File)。溢写过程中,数据会进一步压缩以减少磁盘占用和后续网络传输的开销。每个任务可能产生多个溢写文件。

3. 合并与索引构建

当所有数据处理完毕,任务会对之前生成的所有溢写文件进行合并(Merge),生成一个或多个最终的Shuffle文件。合并过程中,Spark会保留排序属性,并可能继续进行压缩。同时,任务还会创建一个索引文件(Index File),记录每个Shuffle文件中不同key范围的信息,以便后续阶段快速定位所需数据。

4. 数据传输

**后一阶段(Reduce Stage)**开始时,各个Executor会根据任务调度情况,通过网络从产生Shuffle数据的Executor那里拉取所需的Shuffle文件。拉取过程遵循数据本地性原则,优先从本地Executor获取,否则通过网络从其他节点获取。拉取过程中,Executor会利用索引文件快速定位所需数据。

5. 数据解压与合并

Executor接收到Shuffle文件后,进行解压并读取数据。对于一个任务,它可能需要合并来自多个源的同一分区数据。合并时,Spark会维持排序属性,并可能对数据进行进一步的聚合或操作,生成供当前任务使用的中间结果。

6. 资源回收

完成Shuffle数据处理后,Spark会自动清理不再需要的临时文件和索引文件,释放磁盘空间。

Shuffle优化

Spark对Shuffle过程进行了诸多优化,包括:

  • Shuffle Service:提供集中式的Shuffle数据管理,减轻Executor的存储压力,提高容错性。

  • Sort-Based Shuffle(默认):优化了数据排序和合并过程,减少了磁盘写入和读取次数。

  • Shuffle Partition Tuning:适当调整Shuffle分区数,平衡并行度与数据倾斜问题。

  • Compress/Shuffle Spilling:使用压缩减少磁盘占用和网络传输,合理设置内存缓冲区大小避免频繁溢写。

  • 推测执行(Speculative Execution):对可能运行较慢的任务启动备份任务,减少整体延迟。

Shuffle的重要性

Shuffle是Spark中非常关键且代价高昂的操作,它直接影响了Spark应用程序的性能和资源利用率。优化Shuffle过程可以显著提升Spark作业的执行效率,降低数据处理的延迟,并且对于处理大规模数据集和复杂计算任务至关重要。正确理解和配置Shuffle相关参数,以及利用Spark提供的优化机制,是编写高效Spark应用程序的重要环节。

SparkSQL

DataSet、DataFrame、RDD

在Apache Spark中,DatasetDataFrameRDD(Resilient Distributed Dataset)是三种不同的数据抽象,它们都代表了分布式、可并行操作的数据集,但在功能、特性和使用场景上有所区别。以下是它们之间的关系与对比:

1. RDD(Resilient Distributed Dataset)

  • 基本概念:RDD是最基础的数据抽象,代表了一个弹性、容错的分布式数据集,由一系列不可变的、分区的记录组成。RDD提供了丰富的转换(transformation)和行动(action)操作,允许用户以函数式编程风格处理数据。

  • 特性

    • 容错性:基于Lineage(血统)记录,能够自动重建失败任务的数据。
    • 分布式:数据分布在集群中的多个节点上,支持并行计算。
    • 惰性计算:只有在行动操作触发时才执行计算。
  • 编程模型:使用Scala、Java或Python的低级API进行编程,提供了map、filter、reduce等函数式操作。

2. DataFrame

  • 基本概念:DataFrame是Spark SQL引入的一种带有Schema(结构信息)的RDD,可以看作是带有列名、数据类型和约束的二维表格数据。DataFrame提供了SQL查询和DataFrame API两种操作方式。

  • 特性

    • 结构化:具有明确的列名和数据类型,便于进行类型安全的查询和操作。
    • 优化执行:利用Catalyst优化器进行查询优化,包括列裁剪、谓词下推等,提高执行效率。
    • 多语言支持:除了Scala、Java之外,还支持SQL查询语句和Python、R等语言的DataFrame API。
  • 编程模型:既可以使用SQL查询,也可以使用面向对象的DataFrame API进行操作。

3. Dataset

  • 基本概念:Dataset是DataFrame的扩展,结合了RDD和DataFrame的优点,是Spark SQL中最通用的数据抽象。Dataset本质上是带有类型信息的DataFrame,提供了类型安全和强类型的编程体验。

  • 特性

    • 类型安全:在Scala和Java中,Dataset提供了类型安全的API,可以捕获编译期错误,避免运行时类型转换问题。
    • 性能:与DataFrame共享优化执行引擎,同样受益于Catalyst优化器。
    • 编码灵活性:支持自定义对象(case class或POJO),可以更好地与应用程序的领域模型集成。
  • 编程模型:主要针对Scala和Java用户,提供了强类型的API,支持隐式转换和lambda表达式。

关系与转换

  • 转换关系:RDD是最低层的数据抽象,DataFrame和Dataset是在其基础上的扩展。可以将RDD转换为DataFrame或Dataset,反之亦然:

    RDD -> DataFrame/Dataset
    DataFrame -> Dataset (仅限Scala/Java)
    Dataset -> RDD
    
  • 转换方法

    • RDD.toDF()SparkSession.createDataFrame(RDD, schema):将RDD转换为DataFrame。
    • DataFrame.as[CaseClass](Scala)或 DataFrame.as(Encoders.bean(Class))(Java):将DataFrame转换为Dataset。
    • Dataset.rdd:将Dataset转换回RDD。

总结

  • RDD:最基础、灵活但编程模型相对低级的数据抽象,适合需要精细控制执行细节或处理非结构化数据的情况。
  • DataFrame:带有Schema的RDD,提供SQL查询和类型安全的DataFrame API,适用于结构化数据处理和分析。
  • Dataset:在DataFrame基础上增加了类型安全的特性,特别适用于Scala和Java开发,提供了与应用程序领域模型更紧密的集成。

在实际使用中,用户可根据任务需求、数据结构和编程语言偏好选择合适的数据抽象。对于结构化数据分析和SQL查询,通常首选DataFrame或Dataset;对于需要更多控制权或处理非结构化数据的任务,可能更倾向于使用RDD。在Scala或Java项目中,Dataset由于其类型安全和编码便利性,常常成为首选。

相互转换

以下是一些示例,展示了如何在Spark中将RDD、DataFrame和Dataset之间进行相互转换:

RDD → DataFrame

假设有一个包含字符串元素的RDD,我们希望将其转换为具有两列(nameage)的DataFrame。

// 假设已有SparkSession实例名为spark

// 创建一个简单的RDD
val rdd: RDD[String] = spark.sparkContext.parallelize(Seq("John,30", "Alice,25"))

// 定义Schema
import org.apache.spark.sql.types._
val schema = StructType(Seq(
  StructField("name", StringType),
  StructField("age", IntegerType)
))

// 将RDD转换为DataFrame,需指定Schema
val df: DataFrame = spark.createDataFrame(rdd.map(_.split(",")), schema)

// 查看DataFrame内容
df.show()

DataFrame → Dataset

若已有一个DataFrame,且希望将其转换为具有类型信息的Dataset,可以使用Scala的case class或Java的POJO(Plain Old Java Object)来定义数据类型。

// 假设已有DataFrame df

// 定义case class对应DataFrame的Schema
case class Person(name: String, age: Int)

// 将DataFrame转换为Dataset[Person]
val ds: Dataset[Person] = df.as[Person]

// 或者在Java中,假设已有对应的Person类
Dataset<Row> df = ...;
Dataset<Person> ds = df.as(Encoders.bean(Person.class));

Dataset → RDD

有时需要将具有类型信息的Dataset转换回RDD以进行更底层的操作。以下是如何将Dataset转换为RDD的示例:

// 假设已有Dataset[Person] ds

// 将Dataset转换为RDD[Person]
val rdd: RDD[Person] = ds.rdd

// 或者在Java中
Dataset<Person> ds = ...;
JavaRDD<Person> rdd = ds.toJavaRDD();

这些例子展示了如何在Spark中根据需要在RDD、DataFrame和Dataset之间进行转换,以便利用不同数据抽象的优势。在实际应用中,应根据任务需求、数据结构和编程语言特性选择最适合的数据抽象类型。

SparkStreaming

准实时,微批次
Apache Spark 实现流式数据实时处理主要通过其流处理模块 Spark Streaming(旧版)或 Structured Streaming(新版推荐)来实现。这里主要介绍 Structured Streaming 的实现原理,因为它提供了更为统一、易用且功能丰富的流处理模型。

Structured Streaming 实现流式数据实时处理的关键要素包括:

1. 微批处理(Micro-batching)

Structured Streaming 采用了微批处理(micro-batch processing)模型,即将实时数据流划分为一系列小的时间间隔(如秒级或毫秒级),每个时间间隔内的数据作为一个小的“批”来处理。尽管这种方式不是严格意义上的事件驱动(event-driven)处理,但它在保证近实时处理的同时,能够利用 Spark 的批处理优化技术和容错机制,实现高效、可靠的流处理。

2. 无界查询模型

Structured Streaming 提供了一种无界查询(unbounded query)模型,用户可以像编写处理静态数据集的 SQL 查询或 DataFrame/Dataset 操作那样,编写处理无限数据流的查询语句。这种查询语句定义了数据流的输入源、转换操作以及输出 sink。Spark 会持续不断地执行这些查询,处理源源不断流入的数据。

3. 结构化 API 与统一接口

Structured Streaming 采用了与批处理相同的 DataFrame/Dataset API,使得批处理和流处理可以共享相同的代码逻辑,实现了批流一体(batch-stream unification)。用户无需学习新的 API,就可以使用熟悉的 SQL 查询或 DataFrame 操作来处理流数据,大大降低了开发复杂度。

4. 端到端 exactly-once 语义

Structured Streaming 通过 checkpointing、write-ahead logs 和 transactional output sinks 等机制,确保了即使在出现故障的情况下,也能提供 exactly-once 的数据处理语义,即每个数据项仅被准确地处理一次,不会丢失也不会重复处理。

5. 可伸缩的事件时间处理

Structured Streaming 支持事件时间(event time)处理,允许用户按照数据项生成时的真实时间(而非处理时间)进行窗口聚合、水印(watermarking)和迟到数据处理。这使得系统能够正确处理乱序事件和处理大规模流数据时的时间窗口计算。

6. 动态调整与容错

Structured Streaming 允许在运行时动态调整处理逻辑,比如添加、删除或修改查询,而无需重启整个流处理应用。此外,Spark 的容错机制能够自动从故障中恢复,确保流处理任务的持续稳定运行。

处理流程概览

  1. 数据摄入:从各种数据源(如 Kafka、Kinesis、HDFS 等)摄取数据流。

  2. 查询编译:用户编写的无界查询被编译成一个执行计划(logical plan),该计划描述了数据流的处理逻辑。

  3. 微批生成:按照预设的时间间隔(微批间隔),系统生成一个个微批数据集。

  4. 查询执行:对每个微批数据集执行编译好的执行计划,进行相应的转换和聚合操作。

  5. 结果更新:将处理结果累积到状态存储(如 RocksDB、HDFS 文件等),形成一张不断增长的“结果表”。

  6. 输出写入:将结果表中满足输出条件的部分(如滑动窗口的结果)写入指定的外部系统或存储。

  7. 容错与checkpointing:定期将查询的执行状态和必要的元数据checkpoint到可靠存储,以备故障恢复时使用。

综上所述,Spark通过Structured Streaming模块,采用微批处理模型、无界查询接口、端到端exactly-once语义以及灵活的事件时间处理能力,实现了对流式数据的实时高效处理。这种处理方式兼顾了实时性、准确性、容错性和易用性,使得Spark成为处理大规模实时数据流的理想选择。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/553011.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C# Solidworks二次开发:比较两个solidworks文档属性相关API详解

大家好&#xff0c;今天要讲的文章是关于如何比较两个solidworks文档。 下面是API的介绍&#xff1a; &#xff08;1&#xff09;第一个为Close&#xff0c;这个API的含义为在比较solidworks文档以后执行必要的清理。下面是官方的具体解释&#xff1a; 其没有输入参数&#x…

MySQL Workbench下载安装、 MySQL Workbench使用

官方下载链接;MySQL :: Download MySQL Workbench 下载好懒人安装&#xff0c;也可自己选择目录 下面是使用&#xff1a; 连接数据库&#xff1a; 填写数据库连接信息&#xff1a; 基本操作部分&#xff1a; 数据导入导出&#xff1a; 导出/备份 导入&#xff1a; 生产er图…

【机器学习】科学库使用第5篇:Matplotlib,学习目标【附代码文档】

机器学习&#xff08;科学计算库&#xff09;完整教程&#xff08;附代码资料&#xff09;主要内容讲述&#xff1a;机器学习&#xff08;常用科学计算库的使用&#xff09;基础定位、目标&#xff0c;机器学习概述定位,目标,学习目标,学习目标,1 人工智能应用场景,2 人工智能小…

react中关于类式组件和函数组件对props、state、ref的使用

文章中有很多蓝色字体为扩展链接&#xff0c;可以补充查看。 常用命令使用规则 组件编写方式: 1.函数式 function MyButton() { //直接return 标签体return (<>……</>); }2.类 class MyButton extends React.Component { //在render方法中&#xff0c;return…

UE5 C++ 射线检测

一.声明四个变量 FVector StartLocation;FVector ForwardVector;FVector EndLocation;FHitResult HitResult;二.起点从摄像机&#xff0c;重点为摄像机前9999m。射线检测 使用LineTraceSingleByChannel 射线直线通道检测&#xff0c;所以 void AMyCharacter::Tick(float Delt…

GPT国内能用吗

2022年11月&#xff0c;Open AI发布ChatGPT&#xff0c;ChatGPT展现了大型语模型在自然语言处理方面的惊人进步&#xff0c;其生成文本的流畅度和连贯性令人印象深刻&#xff0c;为AI应用打开了新的可能性。 ChatGPT的出现推动了AI技术在各个领域的应用&#xff0c;例如&#x…

Python学习教程(Python学习路线+Python学习视频):Python数据结构

数据结构引言&#xff1a; 数据结构是组织数据的方式&#xff0c;以便能够更好的存储和获取数据。数据结构定义数据之间的关系和对这些数据的操作方式。数据结构屏蔽了数据存储和操作的细节&#xff0c;让程序员能更好的处理业务逻辑&#xff0c;同时拥有快速的数据存储和获取方…

.net9 AOT编绎生成标准DLL,输出API函数教程-中国首创

1&#xff0c;安装VS2022预览版&#xff08;Visual Studio Preview&#xff09; https://visualstudio.microsoft.com/zh-hans/vs/preview/#download-preview 2&#xff0c;选择安装组件&#xff1a;使用C的桌面开发 和 .NET桌面开发 ------------------------------------- …

java八股文知识点讲解(个人认为讲的比较好的)

1、解决哈希冲突——链地址法&#xff1a;【第7章查找】19哈希表的查找_链地址法解决哈希冲突_哔哩哔哩_bilibili 2、解决哈希冲突——开放地址法 &#xff1a; 【第7章查找】18哈希表的查找_开放定址法解决哈希冲突_哔哩哔哩_bilibili 3、小根堆大根堆的创建&#xff1a;选择…

【每日刷题】Day17

【每日刷题】Day17 &#x1f955;个人主页&#xff1a;开敲&#x1f349; &#x1f525;所属专栏&#xff1a;每日刷题&#x1f34d; &#x1f33c;文章目录&#x1f33c; 1. 19. 删除链表的倒数第 N 个结点 - 力扣&#xff08;LeetCode&#xff09; 2. 162. 寻找峰值 - 力扣…

1 回归:锂电池温度预测top2 代码部分(一) Tabnet

2024 iFLYTEK A.I.开发者大赛-讯飞开放平台 TabNet&#xff1a; 模型也是我在这个比赛一个意外收获&#xff0c;这个模型在比赛之中可用。但是需要GPU资源&#xff0c;否则运行真的是太慢了。后面针对这个模型我会写出如何使用的方法策略。 比赛结束后有与其他两位选手聊天&am…

《ElementPlus 与 ElementUI 差异集合》el-popconfirm 气泡确认框之插槽写法有差异

ElementUI 直接在 el-button 上配置属性 slot&#xff1b; <el-popconfirm title"确定删除吗&#xff1f;请谨慎操作&#xff01;" confirm"delete"><el-button slot"reference" size"small" type"danger">删…

Word学习笔记之奇偶页的页眉与页码设置

1. 常用格式 在毕业论文中&#xff0c;往往有一下要求&#xff1a; 奇数页右下角显示、偶数页左下角显示奇数页眉为每章标题、偶数页眉为论文标题 2. 问题解决 2.1 前期准备 首先&#xff0c;不论时要求 1、还是要求 2&#xff0c;这里我们都要做一下设置&#xff1a; 鼠…

Adobe Firefly是否将重新定义AI视频编辑领域?|TodayAI

Adobe最近发布了一段令人瞩目的视频&#xff0c;详细展示了其最新推出的Adobe Firefly视频模型。这一模型集成了尖端的生成式人工智能技术&#xff0c;带来了一系列颠覆性的视频编辑功能&#xff0c;引发了业界的广泛关注和讨论。 视频中的旁白充满热情地宣布&#xff1a;“Ad…

【超级简单】vscode进入服务器的docker容器

前提 1、已经运行docker容器 2、已经用vscode链接服务器 在vscode中安装的插件 Dev Containers docker 在容器中安装的依赖 yum install openssh-server yum install openssh-clientsvscode进入服务器的docker容器 找到自己的容器&#xff0c;右键点击&#xff0c;找到…

Jmeter BeanShell调用Java方法加密

1、添加BeanShell前置处理器 由于请求接口时&#xff0c;会传加密参数。加密过程会在请求之前完成&#xff0c;所以需要使用前置处理器中beanshell preprocessor 2、编写BeanShell脚本 ①定义一个beashell变量&#xff1a;phoneNum&#xff0c;在Beanshell中可以直接调用Jmete…

idea运行报错:启动命令过长

JAVA项目&#xff0c;运行的时候报错 Command line is too long. Shorten the command line via JAR manifest or via a classpath file and rerun老问题了&#xff0c;记录一下 解决办法&#xff1a; 1、Edit Configurations 2、点击Modify options设置&#xff0c;勾选S…

janus架构学习

基础介绍 Janus 是由Meetecho设计和开发的开源、通用的基于SFU架构的WebRTC流媒体服务器&#xff0c;它支持在Linux的服务器或MacOS上的机器进行编译和安装。Janus 是使用C语言进行编写的&#xff0c;它的性能十分优秀。 架构 janus为sfu架构 模块结构图 模块说明 core模…

elementui 弹窗展示自动校验表单项bug

表单校验失败一次之后&#xff0c;再次弹出表单&#xff0c;触发自动校验 解决方案&#xff1a; clearValidate() 方法清空表单校验项 this.$nextTick(() > {this.$refs[checkForm].clearValidate() }) 使用nextTick规避报错

chroot -- 限制其他用户liunx空间

目录- 限制其他用户liunx空间 前言开始进入监狱总结 前言 前提 ecs 服务器&#xff0c;centos系统&#xff0c;乌班图系统需要root榕湖 开始 首先&#xff0c;登录到您的ECS机器。创建一个新用户&#xff1a; 使用 adduser 命令创建一个新用户。例如&#xff0c;要创建一个名…