1 spark 介绍
1.1 spark概念
Apache Spark是专为大规模数据处理而设计的快速通用的分布式计算引擎,是开源的类Hadoop MapReduce的通用分布式计算框架。和MapReduce一样,都是完成大规模数据的计算处理。
简而言之,Spark 借鉴了 MapReduce思想发展而来,保留了其分布式并行计算的优点并改进了其明显的缺陷。让中间数据存储在内存中提高了运行速度、并提供丰富的操作数据的API提高了开发速度。
- spark是基于内存的分布式计算引擎框架
- 处理海量的数据,提高计算速度
- spark只是用于数据计算,不用于数据存储
1.2 Spark和Hadoop对比
Hadoop Spark
类型 基础平台,包含计算、存储、调度 分布式计算工具
场景 大规模数据的批处理 迭代计算、交互式计算、流计算
价格 对机器要求低,便宜 对内存有要求,相对较贵
编程范式 Map+Reduce,API 较为底层,算法适应性差 API 较为顶层,方便使用
数据存储结构 MapReduce中间计算结果在HDFS磁盘上,延迟大 RDD中间运算结果在内存中,延迟小
运行方式 Task以进程方式维护,任务启动慢 Task以线程方式维护,任务启动快
尽管Spark相对于Hadoop而言具有较大优势,但Spark并不能完全替代Hadoop
- Spark主要用于替代Hadoop中的MapReduce计算模型。存储依然可以使用HDFS,但是中间结果可以存放在内存中,内存数据的读写速度要比磁盘快的多,所以Spark的计算速度要比MapReduce快
- Spark已经很好地融入了Hadoop生态圈,并成为其中的重要一员,它可以借助于YARN实现资源调度管理,借助于HDFS实现分布式存储
- Presto也是基于内存计算的,Presto不适合海量数据处理,而且不能创建库表。Spark对海量数据在内存上的计算做了优化,内存不足是会将结果存在磁盘上,适合海量数据处理,并且可以进行库表创建
进程和线程回顾
- 进程是操作系统资源分配的基本单位,分配资源需要花费时间
- 线程是处理器任务调度和执行的基本单位,使用进程创建的资源执行任务
- 一个进程一般包含多个线程, 一个进程下的多个线程共享进程的资源
- 进程之间不共享资源
- 不同进程之间的线程相互不可见
- 线程不能独立执行,必须依附在进程中执行
1.3 Spark特性
- 高效性
计算速度快,由于Apache Spark支持内存计算,并且是通过线程执行计算任务,所以官方宣称其在内存中的运算速度要比Hadoop的MapReduce快100倍,在硬盘中要快10倍。
值得是计算效率高
基于内存计算
task任务是以线程方式执行 - 易用性
支持多种编程语言开发 (Python,Java,Scala,SQL,R等),降低了学习难度 - 通用性
- 支持多种计算方式
- RDD计算 -> Spark core
- DataFrame计算(sql计算)->spark sql
- 实时计算(流计算)->sqark/Structured streaming
- 图计算 -> Spark GraphX
- 机器学习计算 -> Spark MLlib
- 支持多种开发方式
- 交互式开发 -> 在终端
- 脚本式开发 -> 通过编写代码文件完成程序运行
- 支持多种计算方式
- 兼容性
- 支持三方工具接入
- 数据存储工具
- hdfs
- kafka
- hbase
- es
- mysql
- 资源调度工具
- yarn
- standalone(spark自带)
- mesos
- 高可用工具
- zookeeper
- 数据存储工具
- 支持多种操作系统
- Linux
- Windows
- Mac
- 支持三方工具接入
1.4计算架构
将RDD任务(使用Spark Sql时,也是转换成RDD任务)提交给yarn服务管理
Yarn中RM随机找到NM创建container(容器),在container中创建applicationMaster
applicationMaster向RM保持通讯,申请计算资源
applicationMaster找到其他的NM创建container,container中创建map task和reduce task,来执行计算任务
1.5组成架构(五大组件)
Spark Core:最基本核心的组件,处RDD数据结构,其它组件都是基于RDD的
Spark SQL:处理DateFrame/DataSet数据结构(结构化数据),类似于HiveSQL,SparkSQL底层也是转换成RDD任务
Spark/Structured streaming:处理流数据(Spark SQL),实时计算
Spark ML/MLlib:机器学习计算,分类算法,回归算法
Graphx:图计算算法,DAG有向无环图,有响有环图
2 Spark部署方式
2.1 Local模式 需要运维人员部署
本地模式部署,使用一台服务器进行部署,一般用于测试代码,在本地能运行成功的代码在集群下也能运行成功
- 集群模式 需要运维人员部署
2.2 Standalone模式
Standalone模式被称为集群单机模式。Spark框架自带了完整的资源调度管理服务,可以独立部署到一个集群中,无需依赖任何其他的资源管理系统。在该模式下,Spark集群架构为主从模式,即一台Master节点与多台Slave节点,Slave节点启动的进程名称为Worker。此时集群会存在单点故障问题,利用Zookeeper搭建Spark HA集群解决单点问题。
- Cluster Manager:在standalone模式中即为Master主节点,控制整个集群,监控worker。在YARN模式中为资源管理器
Worker:从节点,负责控制计算节点,启动Executor或者Driver。
Driver:进程程序,当spark的计算代码程序运行时就会产生一个driver,执行计算任务。运行Application的main()函数。负责管理计算任务。
Executor:进程程序、执行器,是为某个Application运行在Worker Node上的一个进程。负责执行计算任务 - Yarn模式 常用集群模式
Yarn模式被称为Spark on Yarn模式,即把Spark作为一个客户端,将作业提交给Yarn服务,由于在生产环境中,很多时候都要与Hadoop使用同一个集群,因此采用Yarn来管理资源调度,可以有效提高资源利用率,Yarn模式又分为Yarn Cluster模式和Yarn Client模式,具体介绍如下:- Yarn Cluster:用于生产环境,所有的资源调度和计算都在集群上运行。
- Yarn Client:用于交互、调试环境。
Yarn模式需要安装hadoop,搭建hadoop的yarn集群,使用spark替换mapreduce
Mesos模式 了解
Mesos模式被称为Spark on Mesos模式,Mesos与Yarn同样是一款资源调度管理系统,可以为Spark提供服务,由于Spark与Mesos存在密切的关系,因此在设计Spark框架时充分考虑到了对Mesos的集成,但如果你同时运行Hadoop和Spark,从兼容性的角度来看,Spark on Yarn是更好的选择。
2.3简述四种部署模式
- local(本地单机)模式
使用一台服务器资源执行spark计算任务
测试环境中使用 - 集群模式
- standalone(集群单机模式)
standalone是spark自带的服务
一台主节点服务器,容易倒是单点故障问题,通过zk工具搭建Spark HA模式(Standalone高可用模式)。有一台备用主节点服务器 - Yarn模式spark on yarn
使用hadoop中的yarn工具管理调度spark集群资源
yarn cluster模式:生产环境中使用
yarn client 模式:测试环境,交互环境中使用 - mesos模式 spark on mesos
使用mesos工具管理调度spark集群资源
考虑兼容性问题的话,首选yarn模式
3 开发方式
Spark交互式开发步骤
注意点spark需要连接HDFS读取文件,如果hdfs没有启动会出现连接失败错误
需要先启动Hadoop服务
命令:start_all.sh
启动python终端
命令:pyspark
退出应用程序
命令exit()或Ctrl + d
scala交互式开发
启动终端命令:
spark-shell
退出交互界面:quit或者ctrl+d
pyspark脚本式开发步骤
将开发的代码写入文件中,通过运行代码文件进而运行计算程序
python开发的脚本文件后缀为.py
常用的脚本开发方式步骤为:
①编写XX.py 文件
②进入base虚拟机环境(默认为base环境不用切换了)
命令:conda activate base
③执行XX.py脚本程序
python3 XX.py
4 不同部署模式的Spark使用操作
4.1 Local本地模式
默认情况下不需要开启任何服务,Spark需要连接hdfs读取数据文件,所以使用前需要开启Hadoop 集群
命令为:start-all.sh
Spark中可以查看历史服务,查看Spark的计算历史信息
命令:/export/server/spark/sbin/start-history-server.sh
开启后可以在浏览器端输入网址查看
http://192.168.88.100:18080/
4.1.1本地两种计算方式–交互式
- 交互式
# 进入base虚拟环境
[root@node1 ~]# conda activate base
# 启动hadoop集群
(base) [root@node1 ~]# start-all.sh
# 启动历史服务
(base) [root@node1 ~]# /export/server/spark/sbin/start-history-server.sh
# 启动spark本地模式
# 没有任何指定,采用是local模式,调用的是本机资源无法使用集群资源,相当于是单机计算
(base) [root@node1 ~]# pyspark
- 脚本式
# 导入模块
from pyspark import SparkContext
# 创建SparkContext对象
# 没有指定任何参数,使用本地local模式
# master='local[*]'
sc = SparkContext()
# 创建python列表数据
a = [1, 2, 3, 4]
# 转换成RDD
rdd = sc.parallelize(a)
# 对rdd数据进行计算
res = rdd.reduce(lambda a, b: a + b)
print(res)
4.2 Yarn集群模式
需要启动yarn集群服务,包括ResourceManager和NodeManager
启动命令start-all.sh
启动完以后可以在浏览器查看网页
命令:http://192.168.88.100:8088/
建议:两个资源调度服务在使用时,只需要选择一个服务即可,实际开发更多采用yarn进行资源调度
4.2.1交互式
# 启动yarn集群服务
(base) [root@node1 ~]# start-all.sh
# 启动pyspark, yarn资源调度
(base) [root@node1 ~]# pyspark --master yarn
4.2.2脚本式
# 导入模块
from pyspark import SparkContext
# 创建SparkContext对象
# master参数可以指定调用的资源服务
# 使用yarn资源调度
sc = SparkContext(master='yarn')
# 创建python列表数据
a = [1, 2, 3, 4]
# 转换成RDD
rdd = sc.parallelize(a)
# 对rdd数据进行计算
res = rdd.reduce(lambda a, b: a + b)
print(res)
4.3 Standalone集群模式
standalone是自带的资源调度管理服务
master类似yarn中的ResourceManger负责管理找资源服务
worker 类似于yarn中的NodeManager负责将每台机器上的资源给到计算任务
node1上的启动指令
/export/server/spark/sbin/start-all.sh
查看相关网页指令
http://192.168.88.100:8080/
- 交互式
因为配置了高可用模式, 三台虚拟机要先启动ZooKeeper服务
(base) [root@node1 ~]# zkServer.sh start
(base) [root@node2 ~]# zkServer.sh start
(base) [root@node3 ~]# zkServer.sh start
# 在node1虚拟机上启动standalone服务
(base) [root@node1 ~]# /export/server/spark/sbin/start-all.sh
# 启动pyspark, 使用standalone资源调度
(base) [root@node1 ~]# pyspark --master spark://node1:7077
- 脚本式
导入模块
from pyspark import SparkContext
# 创建SparkContext对象
# master参数可以指定调用的资源服务
# 使用standalone资源调度
sc = SparkContext(master='spark://node1:7077')
# 创建python列表数据
a = [1, 2, 3, 4]
# 转换成RDD
rdd = sc.parallelize(a)
# 对rdd数据进行计算
res = rdd.reduce(lambda a, b: a + b)
print(res)
Standalone 高可用集群模式
- 交互式
因为配置了高可用模式, 三台虚拟机要先启动ZooKeeper服务
(base) [root@node1 ~]# zkServer.sh start
(base) [root@node2 ~]# zkServer.sh start
(base) [root@node3 ~]# zkServer.sh start
# 在node1虚拟机上启动standalone服务
(base) [root@node1 ~]# /export/server/spark/sbin/start-all.sh
# 在node2虚拟机上启动standalone服务
(base) [root@node2 ~]# /export/server/spark/sbin/start-master.sh
# 启动pyspark, 使用standalone高可用资源调度
(base) [root@node1 ~]# pyspark --master spark://node1:7077,node2:7077
- 脚本式
# 导入模块
from pyspark import SparkContext
# 创建SparkContext对象
# master参数可以指定调用的资源服务
# 使用standalone高可用资源调度
sc = SparkContext(master='spark://node1:7077,node2:7077')
# 创建python列表数据
a = [1, 2, 3, 4]
# 转换成RDD
rdd = sc.parallelize(a)
# 对rdd数据进行计算
res = rdd.reduce(lambda a, b: a + b)
print(res)