光速入门spark(待续)

目录

  • Spark概述
    • Spark 是什么
    • Spark VS Hadoop (MapReduce)
    • Spark or Hadoop
    • Spark四大特点
      • 速度快
      • 易于使用
      • 通用性强
      • 运行方式
    • Spark 框架模块(架构)
    • Spark的运行模式
    • Spark的架构角色
  • Spark环境搭建
    • Local
    • Standalone
      • Spark程序运行层次结构
    • Spark on YARN
      • 部署模式DeployMode
      • 扩展阅读:两种模式详细流程
      • 扩展阅读:YARN
    • spark on k8s
      • 向 Kubernetes 提交应用程序
        • Cluster Mode
        • Client Mode
        • Secret资源 Management
        • 命名空间
        • RBAC
        • 其他spark on k8s配置
      • 举例

Spark概述

Spark 是什么

在这里插入图片描述

官网的定义是:Apache Spark™是一个用于在单节点机器或集群上执行数据工程、数据科学和机器学习的多语言引擎。

Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。

Spark 最早源于一篇论文Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing,该论文是由加州大学柏克莱分校的Matei Zaharia 等人发表的。论文中提出了一种弹性分布式数据集(即RDD)的概念:

在这里插入图片描述
翻译过来就是:RDD 是一种分布式内存抽象,其使得程序员能够在大规模集群中做内存运算,并且有一定的容错方式。而这也是整个Spark 的核心数据结构,Spark 整个平台都围绕着RDD进行。

Spark可以计算:结构化、半结构化、非结构化等各种类型的数据结构,同时也支持使用Python、Java、Scala、R以及SQL语言去开发应用程序计算数据。(主流的还是使用scala和python开发)

在这里插入图片描述

Spark VS Hadoop (MapReduce)

Spark和Hadoop技术栈有何区别呢?

Hadoop:

  • Hadoop是由java语言编写的,在分布式服务器集群上存储海量数据并运行分布式分析应用的开源框架
  • 作为Hadoop 分布式文件系统,HDFS处于Hadoop生态圈的最下层,存储着所有的数据,支持着 Hadoop 的所有服务。它的理论基础源于 Google 的TheGoogleFileSystem 这篇论文,它是GFS的开源实现。
  • MapReduce 是一种编程模型,Hadoop根据Google的MapReduce 论文将其实现,作为Hadoop 的分布式计算模型,是Hadoop的核心。基于这个框架,分布式并行程序的编写变得异常简单。综合了HDFS的分布式存储和MapReduce的分布式计算,Hadoop在处理海量数据时,性能横向扩展变得非常容易。
  • HBase是对Google 的Bigtable 的开源实现,但又和Bigtable 存在许多不同之处。HBase 是一个基于HDFS的分布式数据库,擅长实时地随机读/写超大规模数据集。它也是Hadoop非常重要的组件。

Spark:

  • Spark是一种由Scala语言开发的快速、通用、可扩展的大数据分析引擎
  • Spark Core 中提供了Spark最基础与最核心的功能
  • Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL 或者Apache Hive 版本的SQL方言(HQL)来查询数据。
  • Spark Streaming 是 Spark 平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API。

总而言之,Spark出现的时间相对较晚(年轻),并且主要功能主要是用于数据计算。

下面给出一个更直观的对比表格:

在这里插入图片描述
尽管Spark相对于Hadoop而言具有较大优势,但Spark并不能完全替代Hadoop:

  1. 在计算层面,Spark相比较MR(MapReduce)有巨大的性能优势,但至今仍有许多计算工具基于MR构架,比如非常成熟的Hive

  2. Spark仅做计算,而Hadoop生态圈不仅有计算(MR)也有存储(HDFS)和资源管理调度(YARN),HDFS和YARN仍是许多大数据体系的核心架构。

Spark or Hadoop

Hadoop 的MR框架和Spark框架都是数据处理框架,那么我们在使用时如何选择呢?

  1. Hadoop MapReduce 由于其设计初衷并不是为了满足循环迭代式数据流处理,因此在多并行运行的数据可复用场景(如:机器学习、图挖掘算法、交互式数据挖掘算法)中存在诸多计算效率等问题。所以Spark应运而生,Spark就是在传统的MapReduce 计算框架的基础上,利用其计算过程的优化,从而大大加快了数据分析、挖掘的运行和读写速度,并将计算单元缩小到更适合并行计算和重复使用的RDD计算模型。
  2. 机器学习中 ALS、凸优化梯度下降等。这些都需要基于数据集或者数据集的衍生数据反复查询反复操作。MR这种模式不太合适,即使多MR串行处理,性能和时间也是一个问题。数据的共享依赖于磁盘。另外一种是交互式数据挖掘,MR 显然不擅长。而Spark 所基于的scala语言恰恰擅长函数的处理。
  3. Spark 是一个分布式数据快速分析项目。它的核心技术是弹性分布式数据集(Resilient Distributed Datasets),提供了比 MapReduce 丰富的模型,可以快速在内存中对数据集进行多次迭代,来支持复杂的数据挖掘算法和图形计算算法。
  4. Spark和Hadoop的根本差异是多个作业之间的数据通信问题 : Spark多个作业之间数据通信是基于内存,而Hadoop是基于磁盘。
  5. Spark Task 的启动时间快。Spark采用fork线程的方式,而Hadoop采用创建新的进程的方式。
  6. Spark只有在shuffle的时候将数据写入磁盘,而Hadoop中多个MR作业之间的数据交互都要依赖于磁盘交互
  7. Spark的缓存机制比HDFS的缓存机制高效。

经过上面的比较,可以看出在绝大多数的数据计算场景中,Spark确实会比MapReduce更有优势。但是Spark是基于内存的,所以在实际的生产环境中,由于内存的限制,可能会由于内存资源不够导致Job执行失败,此时,MapReduce其实是一个更好的选择,所以Spark并不能完全替代MR。

Hadoop中的MR中每个map/reduce task都是一个java进程方式运行,好处在于进程之间是互相独立的,每个task独享进程资源,没
有互相干扰,监控方便,但是问题在于task之间不方便共享数据,执行效率比较低。比如多个map task读取不同数据源文件需要将数据源加
载到每个map task中,造成重复加载和浪费内存。而基于线程的方式计算是为了数据共享和提高执行效率,Spark采用了线程的最小的执行
单位,但缺点是线程之间会有资源竞争。

Spark四大特点

速度快

由于Apache Spark支持内存计算,并且通过DAG(有向无环图)执行引擎支持无环数据流,所以官方宣称其在内存中的运算速度要比Hadoop的MapReduce快100倍,在硬盘中要快10倍。

Spark处理数据与MapReduce处理数据相比,有如下两个不同点:

  1. Spark处理数据时,可以将中间处理结果数据存储到内存中;
  2. Spark提供了非常丰富的算子(API), 可以做到复杂任务在一个Spark 程序中完成.

易于使用

Spark 的版本已经更新到Spark 3.2.0(截止日期2021.10.13),支持了包括 Java、Scala、Python 、R和SQL语言在内的多种语言。为了兼容Spark2.x企业级应用场景,Spark仍然持续更新Spark2版本。
在这里插入图片描述

通用性强

在Spark 的基础上,Spark 还提供了包括Spark SQL、Spark Streaming、MLib 及GraphX在内的多个工具库。

在这里插入图片描述

运行方式

Spark 支持多种运行方式,包括在Hadoop 和Mesos 上,也支持Standalone的独立运行模式,同时也可以运行在云Kubernetes(Spark 2.3开始支持)上。

在这里插入图片描述
对于数据源而言,Spark 支持从HDFS、HBase、Cassandra 及 Kafka 等多种途径获取数据。

Spark 框架模块(架构)

在这里插入图片描述

  • Spark Core:Spark的核心,Spark核心功能均由Spark Core模块提供,是Spark运行的基础。Spark Core以RDD为数据抽象,提供Python、Java、Scala、R语言的API,可以编程进行海量离线数据批处理计算。

  • SparkSQL:基于SparkCore之上,提供结构化数据的处理模块。SparkSQL支持以SQL语言对数据进行处理,SparkSQL本身针对离线计算场景。同时基于SparkSQL,Spark提供了StructuredStreaming模块,可以以SparkSQL为基础,进行数据的流式计算。

  • SparkStreaming:以SparkCore为基础,提供数据的流式计算功能。

  • MLlib:以SparkCore为基础,进行机器学习计算,内置了大量的机器学习库和API算法等。方便用户以分布式计算的模式进行机器学习计算。

  • GraphX:以SparkCore为基础,进行图计算,提供了大量的图计算API,方便用于以分布式计算模式进行图计算。

Spark的运行模式

Spark提供多种运行模式,包括:

  1. 本地模式(单机):本地模式就是以一个独立的进程,通过其内部的多个线程来模拟整个Spark运行时环境
  2. Standalone模式(集群):Spark中的各个角色以独立进程的形式存在,并组成Spark集群环境
  3. Hadoop YARN模式(集群):Spark中的各个角色运行在YARN的容器内部,并组成Spark集群环境
  4. Kubernetes模式(容器集群):Spark中的各个角色运行在Kubernetes的容器内部,并组成Spark集群环境

在这里插入图片描述

Spark的架构角色

在这里插入图片描述
注:正常情况下Executor是干活的角色,不过在特殊场景下(Local模式)Driver可以即管理又干活

Spark环境搭建

Local

本质:启动一个JVM Process进程(一个进程里面有多个线程),执行任务Task,Local模式可以限制模拟Spark集群环境的线程数量, 即Local[N] 或Local[*],其中N代表可以使用N个线程,每个线程拥有一个cpu core。

如果不指定N,则默认是1个线程(该线程有1个core)。通常Cpu有几个Core,就指定几个线程,最大化利用计算能力.如果是local[*],则代表Run Spark locally with as many worker threads as logical cores on your machine.按照Cpu最多的Cores设置线程数。

在这里插入图片描述

Local 下的角色分布:

  1. 资源管理:

    • Master:Local进程本身
    • Worker:Local进程本身
  2. 任务执行:

    • Driver:Local进程本身
    • Executor:不存在,没有独立的Executor角色, 由Local进程(也就是Driver)内的线程提供计算能力

Driver也算一种特殊的Executor, 只不过多数时候, 我们将Executor当做纯Worker对待, 这样和Driver好区分(一类是管理 一类是工人)。

在官网下载安装包,并解压,之后将bin目录添加到环境目录中:
在这里插入图片描述
然后执行spark-shell即可:
在这里插入图片描述
默认的spark-shell是基于scala进行交互式开发的,如果需要使用python进行开发,也可以执行pyspark命令打开python交互页面。

在这里插入图片描述
实际开发中都不会使用交互式开发,比如python开发要下载pip install pyspark,然后使用spark-submit提交计算任务,spark-shell也只是spark-submit加了参数后提交的一个任务:

在这里插入图片描述
所以我们学习的就是,1.如何编写任务 2. 如何提交任务。

pyspark/spark-shell/spark-submit 对比:

在这里插入图片描述

Standalone

Standalone模式是Spark自带的一种集群模式,不同于前面本地模式启动多个进程来模拟集群的环境,Standalone模式是真实地在多个机器之间搭建Spark集群的环境,完全可以利用该模式搭建多机器集群,用于实际的大数据处理。

StandAlone 是完整的Spark运行环境,其中:

  1. Master角色以Master进程存在, Worker角色以Worker进程存在
  2. Driver和Executor运行于Worker进程内, 由Worker提供资源供给它们运行

在这里插入图片描述
StandAlone集群在进程上主要有3类进程:

  1. 主节点Master进程:Master角色, 管理整个集群资源,并托管运行各个任务的Driver
  2. 从节点Workers:Worker角色, 管理每个机器的资源,分配对应的资源来运行Executor(Task);每个从节点分配资源信息给Worker管理,资源信息包含内存Memory和CPU Cores核数
  3. 历史服务器HistoryServer(可选):Spark Application运行完成以后,保存事件日志数据至HDFS,启动HistoryServer可以查看应用运行相关信息。
    在这里插入图片描述
    在这里插入图片描述
    集群模式下程序是在集群上运行的,不要直接读取本地文件,应该读取外部数据源上的,因为程序运行在集群上,具体在哪个节点上我们运行并不知道,其他节点可能并没有那个数据文件。

在这里插入图片描述
在这里插入图片描述

Spark程序运行层次结构

用户程序从最开始的提交到最终的计算执行,需要经历以下几个阶段:

  1. 用户程序创建SparkContext 时,新创建的SparkContext 实例会连接到ClusterManager。 Cluster Manager 会根据用户提交时设置的CPU 和内存等信息为本次提交分配计算资源,启动Executor。

  2. Driver会将用户程序划分为不同的执行阶段Stage,每个执行阶段Stage由一组完全相同Task组成,这些Task分别作用于待处理数据的不同分区。在阶段划分完成和Task创建后,Driver会向Executor发送Task;

  3. Executor在接收到Task后,会下载Task的运行时依赖,在准备好Task的执行环境后,会开始执行Task,并且将Task的运行状态汇报给Driver;

  4. Driver会根据收到的Task的运行状态来处理不同的状态更新。Task分为两种:一种是Shuffle Map Task,它实现数据的重新洗牌,洗牌的结果保存到Executor 所在节点的文件系统中;另外一种是Result Task,它负责生成结果数据;

  5. Driver 会不断地调用Task,将Task发送到Executor执行,在所有的Task 都正确执行或者超过执行次数的限制仍然没有执行成功时停止;

可以发现在一个Spark Application中,包含多个Job,每个Job有多个Stage组成,每个Job执行按照DAG图进行的。

在这里插入图片描述
其中每个Stage中包含多个Task任务,每个Task以线程Thread方式执行,需要1Core CPU

Spark Application程序运行时三个核心概念:Job、Stage、Task,说明如下:

  1. Job:由多个Task 的并行计算部分,一般Spark 中的action 操作(如 save、collect,后面进一步说明),会
    生成一个Job。
  2. Stage:Job 的组成单位,一个Job 会切分成多个Stage,Stage 彼此之间相互依赖顺序执行,而每个Stage 是多个Task 的集合,类似map 和reduce stage。
  3. Task:被分配到各个Executor 的单位工作内容,它是Spark 中的最小执行单位,一般来说有多少个Paritition
    (物理层面的概念,即分支可以理解为将数据划分成不同部分并行处理),就会有多少个Task,每个Task 只会处理单一分支上的数据。

在这里插入图片描述

Spark Standalone集群是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群一样,存在着Master单点故障(SPOF)的问题。

Spark on YARN

YARN本身是一个资源调度框架, 负责对运行在内部的计算框架进行资源调度管理。作为典型的计算框架, Spark本身也是直接运行在YARN中, 并接受YARN的调度的。

Spark On Yarn的本质:

  1. Master角色由YARN的ResourceManager担任.
  2. Worker角色由YARN的NodeManager担任.
  3. Driver角色运行在YARN容器内或提交任务的客户端进程中
  4. 真正干活的Executor运行在YARN提供的容器内

Spark On Yarn需要啥?

  1. 需要Yarn集群;
  2. 需要Spark客户端工具, 比如spark-submit, 可以将Spark程序提交到YARN中
  3. 需要被提交的代码程序:,如spark/examples/src/main/python/pi.py此示例程序,或我们后续自己开发的Spark任务

在这里插入图片描述

部署模式DeployMode

Spark On YARN是有两种运行模式的,一种是Cluster模式一种是Client模式,这两种模式的区别就是Driver运行的位置.

  1. Cluster模式即:Driver运行在YARN容器内部, 和ApplicationMaster在同一个容器内
  2. Client模式即:Driver运行在客户端进程中, 比如Driver运行在spark-submit程序的进程中

如图, 此为Cluster模式,Driver运行在容器内部:

在这里插入图片描述
如图, 此为Client模式,Driver运行在客户端程序进程中(以spark-submit为例):

在这里插入图片描述
两种模式的区别:

在这里插入图片描述
假设运行圆周率PI程序,采用client模式,命令如下:

SPARK_HOME=/export/server/spark
 ${SPARK_HOME}/bin/spark-submit \--master yarn \--deploy-mode client \--driver-memory 512m \--executor-memory 512m \--num-executors 1 \--total-executor-cores 2 \
 ${SPARK_HOME}/examples/src/main/python/pi.py \
 10

采用cluster模式,命令如下:

SPARK_HOME=/export/server/spark
 ${SPARK_HOME}/bin/spark-submit \--master yarn \--deploy-mode cluster \--driver-memory 512m \--executor-memory 512m \--num-executors 1 \--total-executor-cores 2 \--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
 ${SPARK_HOME}/examples/src/main/python/pi.py \
 10

总之,Client模式和Cluster模式最最本质的区别是:Driver程序运行在哪里。

  • Client模式:学习测试时使用,生产不推荐(要用也可以,性能略低,稳定性略低)
    1. Driver运行在Client上,和集群的通信成本高
    2. Driver输出结果会在客户端显示
  • Cluster模式:生产环境中使用该模式
    1. Driver程序在YARN集群中,和集群的通信成本低
    2. Driver输出结果不能在客户端显示
    3. 该模式下Driver运行ApplicattionMaster这个节点上,由Yarn管理,如果出现问题,yarn会重启
      ApplicattionMaster(Driver)

扩展阅读:两种模式详细流程

在YARN Client模式下,Driver在任务提交的本地机器上运行,示意图如下:
在这里插入图片描述
具体流程步骤如下:

  1. Driver在任务提交的本地机器上运行,Driver启动后会和ResourceManager通讯申请启动ApplicationMaster;
  2. 随后ResourceManager分配Container,在合适的NodeManager上启动ApplicationMaster,此时的
    ApplicationMaster的功能相当于一个ExecutorLaucher,只负责向ResourceManager申请Executor内存;
  3. ResourceManager接到ApplicationMaster的资源申请后会分配Container,然后ApplicationMaster在资源分
    配指定的NodeManager上启动Executor进程;
  4. Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数;
  5. 之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分Stage,每个Stage生成对应的TaskSet,之后将Task分发到各个Executor上执行。

在YARN Cluster模式下,Driver运行在NodeManager Contanier中,此时Driver与AppMaster合为一体,示意图如:

在这里插入图片描述
具体流程步骤如下:

  1. 任务提交后会和ResourceManager通讯申请启动ApplicationMaster;
  2. 随后ResourceManager分配Container,在合适的NodeManager上启动ApplicationMaster,此时的
    ApplicationMaster就是Driver;
  3. Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请
    后会分配Container,然后在合适的NodeManager上启动Executor进程;
  4. Executor进程启动后会向Driver反向注册;
  5. Executor全部注册完成后Driver开始执行main函数,之后执行到Action算子时,触发一个job,并根据宽依赖开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行

扩展阅读:YARN

Yarn 是 hadoop 集群的资源管理层。它允许不同的数据处理引擎(如图形处理、交互式 SQL、流处理、批处理)运行在 hadoop 集群中并处理 HDFS 中的数据(移动计算而非数据),除了资源管理外,Yarn 还用于作业调用。

详细参考:Apache Hadoop YARN

在这里插入图片描述
Yarn 采用传统的 master-slave 架构模式,其主要由 4 种组件组成,它们的主要功能如下:

  • ResourceManager(RM):全局资源管理器,负责整个系统的资源管理和分配;
    • 处理客户端请求
    • 启动/监控ApplicationMaster
    • 监控NodeManager
    • 资源分配与调度
  • ApplicationMaster(AM):负责应用程序(Application)的管理;
    • 为应用程序申请资源,并分配给内部任务
    • 任务调度、监控与容错
  • NodeManager(NM):负责 slave 节点的资源管理和使用;
    • 单个节点上的资源管理
    • 处理来自ResourceManger的命令
    • 处理来自ApplicationMaster的命令
  • Container(容器):对任务运行环境的一个抽象。

外部博客参考链接:

  1. https://www.cnblogs.com/gentlescholar/p/15048301.html
  2. http://kentt.top/2018/09/16/Yarn-Architecture/

spark on k8s

Spark在Kubernetes上是一个令人兴奋的组合,它使您可以利用Apache Spark进行分布式数据处理,并使用Kubernetes进行容器编排。通过在Kubernetes上运行Spark应用程序,您可以利用Kubernetes平台提供的可扩展性、弹性和灵活性。Kubernetes为运行和管理应用程序提供了一个容器化基础设施,而Spark则实现了大规模数据处理和分析。这种组合使您能够轻松扩展Spark应用程序,高效利用资源,并将其与其他Kubernetes原生工具和服务无缝集成。在本文中,我们将带您逐步了解在Kubernetes集群上设置和运行Spark应用程序的过程。我们将涵盖集群配置、应用程序部署、监控、调试和扩展等主题。

首先假设已经有了一个完备的k8s集群,并且配置了对应ServiceAccount和PVC…

官方文档:https://spark.apache.org/docs/latest/running-on-kubernetes.html

在这里插入图片描述
spark-submit可以直接用于向Kubernetes集群提交Spark应用程序。提交机制的工作原理如下:

  1. Spark 创建一个在Kubernetes pod中运行的 Spark driver。
  2. driver创建也在 Kubernetes Pod 中运行的 executors 并连接到它们,然后执行应用程序代码。
  3. 当应用程序完成时,executors Pod 终止并被清理,但driver Pod 会保留日志并在 Kubernetes API 中保持“已完成”状态,直到最终被垃圾收集或手动清理。(在完成状态下,驱动程序 Pod 不使用任何计算或内存资源。)

向 Kubernetes 提交应用程序

Cluster Mode

要在集群模式下启动 Spark Pi,

$ ./bin/spark-submit \
    --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
    --deploy-mode cluster \
    --name spark-pi \
    --class org.apache.spark.examples.SparkPi \
    --conf spark.executor.instances=5 \
    --conf spark.kubernetes.container.image=<spark-image> \
    local:///path/to/examples.jar

Spark master(通过将–master命令行参数传递给应用程序的配置spark-submit或通过 spark.master参数在应用程序的配置中进行设置来指定)的格式必须为:k8s://<api_server_host>:<k8s-apiserver-port>。必须始终指定端口,即使它是 HTTPS 端口 443。

在主字符串前添加 k8s:// 将导致 Spark 应用程序在 Kubernetes 集群上启动。如果 URL 中未指定 HTTP 协议,则默认为https. 例如,将 master 设置为k8s://example.com:443相当于将其设置为k8s://https://example.com:443,但要在不同端口上并且不使用 TLS 进行连接,则 master 必须设置为k8s://http://example.com:8080

Client Mode

Spark executors必须能够通过可路由的主机名和端口连接到Spark driver。

在客户端模式下,Spark正常运行所需的特定网络配置将因设置而异。可以使用无头服务来使driver pod 能够通过稳定的主机名从 executors 路由。

在部署无头服务时,确保服务的标签选择器只匹配 driver pod 而不匹配其他pod。(为 driver pod分配一个足够唯一的标签,并在无头服务的标签选择器中使用该标签)通过spark.driver.host配置指定Spark driver的主机名,通过spark.driver.port配置指定Spark driver 的端口。

Secret资源 Management

Kubernetes Secrets可用于为 Spark 应用程序提供访问安全服务的凭据。

--conf spark.kubernetes.driver.secrets.spark-secret=/etc/secrets
--conf spark.kubernetes.executor.secrets.spark-secret=/etc/secrets

要通过环境变量使用机密,使用以下命令选项spark-submit:

--conf spark.kubernetes.driver.secretKeyRef.ENV_NAME=name:key
--conf spark.kubernetes.executor.secretKeyRef.ENV_NAME=name:key
命名空间

Spark on Kubernetes 可以使用命名空间来启动 Spark 应用程序。可以通过配置spark.kubernetes.namespace来使用它。

RBAC

在启用RBAC的 Kubernetes 集群中,用户可以配置 Kubernetes RBAC 角色和各种 Spark on Kubernetes 组件使用的服务帐户来访问 Kubernetes API 服务器。

Spark 驱动程序 pod 使用 Kubernetes 服务帐户访问 Kubernetes API 服务器来创建和监视执行程序 pod。至少必须向服务帐户授予 Role或ClusterRole权限,以允许驱动程序 Pod 创建 Pod 和服务。

--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark
其他spark on k8s配置

有关 Spark 配置的信息,请参阅配置页面。以下配置特定于 Kubernetes 上的 Spark。

Spark Properties:

Property NameDefaultMeaning
spark.kubernetes.namespacedefault 指定driver和executor的命名空间
spark.kubernetes.container.image(none) 用于指定 Spark 应用程序的容器镜像。必填,除非为每种不同的容器类型提供了显式图像。(见下方两个位置)
spark.kubernetes.driver.container.image(value of spark.kubernetes.container.image) driver镜像
spark.kubernetes.executor.container.image(value of spark.kubernetes.container.image) executor镜像
spark.kubernetes.container.image.pullPolicyIfNotPresent 在 Kubernetes 中拉取镜像时使用的容器镜像拉取策略。
spark.kubernetes.container.image.pullSecrets 用于从私有映像仓库中提取镜像的 Kubernetes 密钥。
spark.kubernetes.allocation.batch.size5 每轮执行程序 Pod 分配中一次启动的 Pod 数量。
spark.kubernetes.allocation.batch.delay1s 每轮执行程序 Pod 分配之间等待的时间。指定小于 1 秒的值可能会导致 Spark driver的 CPU 使用率过高。
spark.kubernetes.authenticate.submission.caCertFile(none) Path to the CA cert file for connecting to the Kubernetes API server over TLS when starting the driver. This file must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.caCertFile instead.
spark.kubernetes.authenticate.submission.clientKeyFile(none) Path to the client key file for authenticating against the Kubernetes API server when starting the driver. This file must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.clientKeyFile instead.
spark.kubernetes.authenticate.submission.clientCertFile(none) Path to the client cert file for authenticating against the Kubernetes API server when starting the driver. This file must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.clientCertFile instead.
spark.kubernetes.authenticate.submission.oauthToken(none) OAuth token to use when authenticating against the Kubernetes API server when starting the driver. Note that unlike the other authentication options, this is expected to be the exact string value of the token to use for the authentication. In client mode, use spark.kubernetes.authenticate.oauthToken instead.
spark.kubernetes.authenticate.submission.oauthTokenFile(none) Path to the OAuth token file containing the token to use when authenticating against the Kubernetes API server when starting the driver. This file must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.oauthTokenFile instead.
spark.kubernetes.authenticate.driver.caCertFile(none) Path to the CA cert file for connecting to the Kubernetes API server over TLS from the driver pod when requesting executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.caCertFile instead.
spark.kubernetes.authenticate.driver.clientKeyFile(none) Path to the client key file for authenticating against the Kubernetes API server from the driver pod when requesting executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod as a Kubernetes secret. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.clientKeyFile instead.
spark.kubernetes.authenticate.driver.clientCertFile(none) Path to the client cert file for authenticating against the Kubernetes API server from the driver pod when requesting executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod as a Kubernetes secret. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.clientCertFile instead.
spark.kubernetes.authenticate.driver.oauthToken(none) OAuth token to use when authenticating against the Kubernetes API server from the driver pod when requesting executors. Note that unlike the other authentication options, this must be the exact string value of the token to use for the authentication. This token value is uploaded to the driver pod as a Kubernetes secret. In client mode, use spark.kubernetes.authenticate.oauthToken instead.
spark.kubernetes.authenticate.driver.oauthTokenFile(none) Path to the OAuth token file containing the token to use when authenticating against the Kubernetes API server from the driver pod when requesting executors. Note that unlike the other authentication options, this file must contain the exact string value of the token to use for the authentication. This token value is uploaded to the driver pod as a secret. In client mode, use spark.kubernetes.authenticate.oauthTokenFile instead.
spark.kubernetes.authenticate.driver.mounted.caCertFile(none) Path to the CA cert file for connecting to the Kubernetes API server over TLS from the driver pod when requesting executors. This path must be accessible from the driver pod. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.caCertFile instead.
spark.kubernetes.authenticate.driver.mounted.clientKeyFile(none) Path to the client key file for authenticating against the Kubernetes API server from the driver pod when requesting executors. This path must be accessible from the driver pod. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.clientKeyFile instead.
spark.kubernetes.authenticate.driver.mounted.clientCertFile(none) Path to the client cert file for authenticating against the Kubernetes API server from the driver pod when requesting executors. This path must be accessible from the driver pod. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.clientCertFile instead.
spark.kubernetes.authenticate.driver.mounted.oauthTokenFile(none) Path to the file containing the OAuth token to use when authenticating against the Kubernetes API server from the driver pod when requesting executors. This path must be accessible from the driver pod. Note that unlike the other authentication options, this file must contain the exact string value of the token to use for the authentication. In client mode, use spark.kubernetes.authenticate.oauthTokenFile instead.
spark.kubernetes.authenticate.driver.serviceAccountNamedefault Service account that is used when running the driver pod. The driver pod uses this service account when requesting executor pods from the API server. Note that this cannot be specified alongside a CA cert file, client key file, client cert file, and/or OAuth token. In client mode, use spark.kubernetes.authenticate.serviceAccountName instead.
spark.kubernetes.authenticate.caCertFile(none) In client mode, path to the CA cert file for connecting to the Kubernetes API server over TLS when requesting executors. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
spark.kubernetes.authenticate.clientKeyFile(none) In client mode, path to the client key file for authenticating against the Kubernetes API server when requesting executors. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
spark.kubernetes.authenticate.clientCertFile(none) In client mode, path to the client cert file for authenticating against the Kubernetes API server when requesting executors. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
spark.kubernetes.authenticate.oauthToken(none) In client mode, the OAuth token to use when authenticating against the Kubernetes API server when requesting executors. Note that unlike the other authentication options, this must be the exact string value of the token to use for the authentication.
spark.kubernetes.authenticate.oauthTokenFile(none) In client mode, path to the file containing the OAuth token to use when authenticating against the Kubernetes API server when requesting executors.
spark.kubernetes.driver.label.[LabelName](none) Add the label specified by LabelName to the driver pod. For example, spark.kubernetes.driver.label.something=true. Note that Spark also adds its own labels to the driver pod for bookkeeping purposes.
spark.kubernetes.driver.annotation.[AnnotationName](none) Add the annotation specified by AnnotationName to the driver pod. For example, spark.kubernetes.driver.annotation.something=true.
spark.kubernetes.executor.label.[LabelName](none) Add the label specified by LabelName to the executor pods. For example, spark.kubernetes.executor.label.something=true. Note that Spark also adds its own labels to the driver pod for bookkeeping purposes.
spark.kubernetes.executor.annotation.[AnnotationName](none) Add the annotation specified by AnnotationName to the executor pods. For example, spark.kubernetes.executor.annotation.something=true.
spark.kubernetes.driver.pod.name(none) 驱动程序 Pod 的名称。在集群模式下,如果未设置此选项,则驱动程序 pod 名称将设置为“spark.app.name”并添加当前时间戳后缀,以避免名称冲突。在客户端模式下,如果您的应用程序在 pod 内运行,强烈建议将其设置为您的驱动程序运行所在的 pod 的名称。在客户端模式下设置此值允许驱动程序成为其执行程序 pod 的所有者,这反过来又允许集群对执行器 Pod 进行垃圾收集。
spark.kubernetes.executor.lostCheck.maxAttempts10 Number of times that the driver will try to ascertain the loss reason for a specific executor. The loss reason is used to ascertain whether the executor failure is due to a framework or an application error which in turn decides whether the executor is removed and replaced, or placed into a failed state for debugging.
spark.kubernetes.submission.waitAppCompletiontrue 在集群模式下,是否等待应用程序完成后再退出启动器进程。When changed to false, the launcher has a "fire-and-forget" behavior when launching the Spark job.
spark.kubernetes.report.interval1s Interval between reports of the current Spark job status in cluster mode.
spark.kubernetes.driver.limit.cores(none) 指定driver pod 的CPU限制。
spark.kubernetes.executor.request.cores(none) 指定每个执行器pod的CPU请求。其值符合Kubernetes的规范。示例值包括0.1、500m、1.5、5等,CPU单位的定义详见CPU单位文档。这与spark.executor.cores不同:它仅在设置了执行器pod的CPU请求时使用,并优先于spark.executor.cores。这不影响任务并行性,例如,执行器可以同时运行的任务数量不受此影响。
spark.kubernetes.executor.limit.cores(none) 为executor pod 指定CPU限制。
spark.kubernetes.node.selector.[labelKey](none) Adds to the node selector of the driver pod and executor pods, with key labelKey and the value as the configuration's value. For example, setting spark.kubernetes.node.selector.identifier to myIdentifier will result in the driver pod and executors having a node selector with key identifier and value myIdentifier. Multiple node selector keys can be added by setting multiple configurations with this prefix.
spark.kubernetes.driverEnv.[EnvironmentVariableName](none) Add the environment variable specified by EnvironmentVariableName to the Driver process. The user can specify multiple of these to set multiple environment variables.
spark.kubernetes.driver.secrets.[SecretName](none) Add the Kubernetes Secret named SecretName to the driver pod on the path specified in the value. For example, spark.kubernetes.driver.secrets.spark-secret=/etc/secrets.
spark.kubernetes.executor.secrets.[SecretName](none) Add the Kubernetes Secret named SecretName to the executor pod on the path specified in the value. For example, spark.kubernetes.executor.secrets.spark-secret=/etc/secrets.
spark.kubernetes.driver.secretKeyRef.[EnvName](none) Add as an environment variable to the driver container with name EnvName (case sensitive), the value referenced by key key in the data of the referenced Kubernetes Secret. For example, spark.kubernetes.driver.secretKeyRef.ENV_VAR=spark-secret:key.
spark.kubernetes.executor.secretKeyRef.[EnvName](none) Add as an environment variable to the executor container with name EnvName (case sensitive), the value referenced by key key in the data of the referenced Kubernetes Secret. For example, spark.kubernetes.executor.secrets.ENV_VAR=spark-secret:key.
spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.path(none) Add the Kubernetes Volume named VolumeName of the VolumeType type to the driver pod on the path specified in the value. For example, spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint.
spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.readOnly(none) Specify if the mounted volume is read only or not. For example, spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false.
spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].options.[OptionName](none) Configure Kubernetes Volume options passed to the Kubernetes with OptionName as key having specified value, must conform with Kubernetes option format. For example, spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim.
spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.path(none) Add the Kubernetes Volume named VolumeName of the VolumeType type to the executor pod on the path specified in the value. For example, spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint.
spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.readOnlyfalse Specify if the mounted volume is read only or not. For example, spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false.
spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].options.[OptionName](none) Configure Kubernetes Volume options passed to the Kubernetes with OptionName as key having specified value. For example, spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim.
spark.kubernetes.memoryOverheadFactor0.1 This sets the Memory Overhead Factor that will allocate memory to non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, and various systems processes. For JVM-based jobs this value will default to 0.10 and 0.40 for non-JVM jobs. This is done as non-JVM tasks need more non-JVM heap space and such tasks commonly fail with "Memory Overhead Exceeded" errors. This prempts this error with a higher default.
spark.kubernetes.pyspark.pythonVersion"2" Python版本,如果使用python的话

其余配置参考:https://spark.apache.org/docs/2.4.8/configuration.html。

举例

可以这样来提交一个任务,同时设置 driver 和 executor 的 CPU、内存的资源 request 和 limit 值(driver 的内存 limit 值为 request 值的 110%)。

./spark-submit \
  // 设置cluster模式启动
  --deploy-mode cluster \
  --class org.apache.spark.examples.SparkPi \
  // 指定k8s apiserver的地址
  --master k8s://https://172.20.0.113:6443 \
  --kubernetes-namespace spark-cluster \
  // 指定k8s的serviceAccount
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  // k8s资源限额
  --conf spark.driver.memory=100G \
  --conf spark.executor.memory=10G \
  --conf spark.driver.cores=30 \
  --conf spark.executor.cores=2 \
  --conf spark.driver.maxResultSize=10240m \
  --conf spark.kubernetes.driver.limit.cores=32 \
  --conf spark.kubernetes.executor.limit.cores=3 \
  --conf spark.kubernetes.executor.memoryOverhead=2g \
  --conf spark.executor.instances=5 \
  --conf spark.app.name=spark-pi \
  // spark创建Pod模板
  --conf spark.kubernetes.driver.docker.image=sz-pg-oam-docker-hub-001.tendcloud.com/library/spark-driver:v2.1.0-kubernetes-0.3.1-1 \
  --conf spark.kubernetes.executor.docker.image=sz-pg-oam-docker-hub-001.tendcloud.com/library/spark-executor:v2.1.0-kubernetes-0.3.1-1 \
  --conf spark.kubernetes.initcontainer.docker.image=sz-pg-oam-docker-hub-001.tendcloud.com/library/spark-init:v2.1.0-kubernetes-0.3.1-1 \
  // 提交真正的计算任务
local:///opt/spark/examples/jars/spark-examples_2.11-2.2.0-k8s-0.4.0-SNAPSHOT.jar 10000000

这将启动一个包含一千万个 task 的计算 pi 的 spark 任务,任务运行过程中,drvier 的 CPU 实际消耗大约为 3 核,内存 40G,每个 executor 的 CPU 实际消耗大约不到 1 核,内存不到 4G,我们可以根据实际资源消耗不断优化资源的 request 值。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/413467.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

有适合短视频剪辑软件的吗?分享4款热门软件!

在数字时代&#xff0c;短视频已成为人们获取信息、娱乐消遣的重要形式。随着短视频行业的蓬勃发展&#xff0c;市场上涌现出众多短视频剪辑软件&#xff0c;它们功能各异&#xff0c;各具特色。本文将为您详细介绍几款热门短视频剪辑软件&#xff0c;助您轻松掌握短视频剪辑技…

Linux拉取SVN服务器代码

1. window10系统上安装了Ubuntu&#xff0c;然后在Ubuntu上拉去SVN服务器的代码&#xff0c;我这是用VScode连接的ubuntu 终端Terminal&#xff0c;我这里相当于有三台电脑了&#xff0c;公司的服务器上windows的&#xff0c;svn代码就是在这台服务器里面&#xff0c;然后我又在…

idea集成git(实用篇)

0.Git常用命令 Git常用命令-CSDN博客 1.下载git Git - Downloads 一路傻瓜式安装即可&#xff08;NEXT&#xff09; 2.软件测试 在Windows桌面空白处&#xff0c;点击鼠标右键&#xff0c;弹出右键菜单 Git软件安装后&#xff0c;会在右键菜单中增加两个菜单 Git GUI He…

ClickHouse 指南(三)最佳实践 -- 跳数索引

Data Skipping Indexes Data Skipping Indexes 2 1、简介 影响ClickHouse查询性能的因素很多。在大多数情况下&#xff0c;关键因素是ClickHouse在计算查询WHERE子句条件时是否可以使用主键。因此&#xff0c;选择适用于最常见查询模式的主键对于有效的表设计至关重要。 然…

华为OD机试真题-靠谱的车-2023年OD统一考试(C卷)---Python3-开源

题目&#xff1a; 考察内容&#xff1a; 思维转化&#xff0c;进制转化&#xff0c;9进制转为10进制&#xff0c;在4的位置1&#xff0c;需要判断是否大于4 代码&#xff1a; """ 题目分析&#xff1a; 9进制转化为10进制23-25 39-50 399-500输入&#xff1a…

系统性能提升70%!华润万家某核心系统数据库升级实践

华润万家是华润集团旗下优秀零售连锁企业&#xff0c;业务覆盖中国内地及香港市场&#xff0c;面对万家众多业务需求和互相关联的业务环境&#xff0c;亟需加强各业务耦合性&#xff0c;以适应线上、线下、物流、财务等各个业务环境的快速发展。 随着信息技术的快速发展和数字化…

blender bvh显示关节名称

导入bvh&#xff0c;菜单选择布局&#xff0c;右边出现属性窗口&#xff0c; 在下图红色框依次点击选中&#xff0c;就可以查看bvh关节名称了。

ReentrantLock详解-可重入锁-默认非公平

ReentrantLock是Java中的一个可重入锁&#xff0c;也被称为“独占锁”。它基于AQS&#xff08;AbstractQueuedSynchronizer&#xff09;框架实现&#xff0c;是JDK中提供的一种线程并发访问的同步手段&#xff0c;与synchronized类似&#xff0c;但具有更多特性。 ReentrantLo…

【Linux】进程优先级和Linux内核进程调度队列的简要介绍

进程优先级 基本概念查看系统进程修改进程的优先级Linux2.6内核进程调度队列的简要介绍和进程优先级有关的概念进程切换 基本概念 为什么会存在进程优先级&#xff1f;   进程优先级用于确定在资源竞争的情况下&#xff0c;哪个进程将被操作系统调度为下一个运行的进程。进程…

【java】15:抽象类

当父类的一些方法不能确定时,可以用abstract关键字来修饰该方法&#xff0c;这个方法就是抽象方法&#xff0c;用abstract来修饰该类就是抽象类。 //我们看看如何把Animal做成抽象类&#xff0c;并让子类Cat类实现。 abstract class Animal{ String name; int age; abstract p…

【C++精简版回顾】12.友元函数

1.友元函数 1.class class MM { public:MM(int age,string name):age(age),name(name){}friend void print(MM mm); private:int age;string name;void print() {cout << age << "岁的" << name << "喜欢你" << endl;} }; f…

k8s 进阶实战笔记 | NFS 动态存储类的部署与使用

文章目录 NFS 动态存储类的部署与使用演示环境说明NFS subdir external provisioner准备 NFS 服务器手动部署 NFS Subdir External Provisioner部署 StorageClass验证使用更多信息 NFS 动态存储类的部署与使用 演示环境说明 演示环境信息&#xff1a;单机K3s 1.28.2 操作系统…

Ansible 简介安装

1、概念介绍 Ansible 是一款为类 Unix 系统开发的自由开源的配置和自动化工具。由 Red Hat 公司使用 python 研发&#xff0c;类似于 saltstack 和 Puppet&#xff0c;但是有一个不同和优点是我们不需要在节点中安装任何客户端。它使用 SSH 来和节点进行通信。Ansible 基于 Py…

信号系统之FFT卷积

1 Overlap-Add 方法 在许多 DSP 应用中&#xff0c;长信号必须分段过滤。例如&#xff0c;高保真数字音频需要大约 5 MB/min 的数据速率&#xff0c;而数字视频需要大约 500 MB/min 的数据速率。在数据速率如此之高的情况下&#xff0c;计算机通常没有足够的内存来同时保存要处…

【程序员英语】【美语从头学】初级篇(入门)(笔记)Lesson 16 At the Shoe Store 在鞋店

《美语从头学初级入门篇》 注意&#xff1a;被 删除线 划掉的不一定不正确&#xff0c;只是不是标准答案。 文章目录 Lesson 16 At the Shoe Store 在鞋店对话A对话B笔记会话A会话B替换 Lesson 16 At the Shoe Store 在鞋店 对话A A: Do you have these shoes in size 8? B:…

SQLlabs46关

看看源码 最终我们的id是放到order by后面了 如果我们直接用列去排序 ?sortusername/password username&#xff1a; passward 可以看到顺序是不同的&#xff0c;当然第一列第二列第三列也可以&#xff0c;基本上都是这个原理&#xff0c;那怎么去实现注入呢&#xff0c;我…

Qt程序设计-钟表自定义控件实例

本文讲解Qt钟表自定义控件实例。 效果如下: 创建钟表类 #ifndef TIMEPIECE_H #define TIMEPIECE_H#include <QWidget> #include <QPropertyAnimation> #include <QDebug> #include <QPainter> #include <QtMath>#include <QTimer>#incl…

leetcode hot100 买卖股票的最佳时机1

本题之前采用贪心算法来解决&#xff0c;现在可以采用动态规划来解决&#xff0c;通过dp数组记录每次的状态从而获取到最大的利润。 这里dp数组定义为二维数组 dp[price.length][2]&#xff0c;其中price.length表示第i天&#xff0c;[2]其中有0/1两种状态&#xff0c;[0]表示…

设计模式(五)-观察者模式

前言 实际业务开发过程中&#xff0c;业务逻辑可能非常复杂&#xff0c;核心业务 N 个子业务。如果都放到一块儿去做&#xff0c;代码可能会很长&#xff0c;耦合度不断攀升&#xff0c;维护起来也麻烦&#xff0c;甚至头疼。还有一些业务场景不需要在一次请求中同步完成&…

【LeetCode】【滑动窗口长度不固定】978 最长湍流子数组

1794.【软件认证】最长的指定瑕疵度的元音子串 这个例题&#xff0c;是滑动窗口中长度不定求最大的题目&#xff0c;在看题之前可以先看一下【leetcode每日一题】【滑动窗口长度不固定】案例。 题目描述 定义&#xff1a;开头和结尾都是元音字母&#xff08;aeiouAEIOU&…