文章目录
- 前言
- Spark 的起源
- Spark 是什么
- 速度
- 易用性
- 模块化
- 可扩展性
- 分析方法的统一
- Spark SQL
- Spark MLlib
- Spark Structured Streaming
- GraphX
- Spark的分布式执行
- Spark driver
- SparkSession
- Cluster manager
- Spark executor
- 部署模式
- 分布式数据和分区
- 开发的经验
- Spark 的使用人群与使用目的
前言
本文主要讲解 Spark 的起源及其基本理念,Spark 项目的主要组件及其分布式体系结构。
Spark 的起源
谷歌的大数据和分布式计算
当提到互联网的数据规模,我们不禁会想到谷歌,谷歌的搜索引擎可以以闪电般的速度搜索全球互联网数据。如今,谷歌已经成为了规模的代名词和先进技术的引路者。
像关系数据库管理系统(RDBMSs)这样的传统存储系统和命令式编程方式都无法处理Google想要构建和搜索互联网索引文档的规模。对新方法的需求导致了Google文件系统(GFS)、 MapReduce (MR) 和 Bigtable的创建。
GFS在集群群中的许多商用硬件服务器上提供了容错和分布式的文件系统,而Bigtable提供了跨GFS的结构化数据的可伸缩存储。MR引入了一种新的基于函数式编程的并行编程范式,用于大规模处理分布在GFS和Bigtable上的数据。
本质上,我们写的MR应用程序与MapReduce系统交互的方式是MapReduce系统将计算代码(map和reduce函数)发送到数据所在的位置,这利用了数据的局域性和机架的相关性,而不是将数据发送到应用程序所在的代码位置那里。
集群中的 worker 会聚合并减少中间计算,并从reduce函数生成追加的最终输出,然后将其写入应用程序可访问的分布式存储中。这种方法显著减少了网络流量,并将大部分输入/输出(I/O)保持在磁盘本地,而不是分布在网络上。
Hadoop at Yahoo!
Google在GFS论文中表达的计算挑战和解决方案为Hadoop文件系统(HDFS)提供了一个蓝图,包括作为分布式计算框架的MapReduce实现。它于2006年4月被捐赠给Apache软件基金会(ASF),成为Apache Hadoop框架相关模块的一部分:Hadoop Common、MapReduce、HDFS和Apache Hadoop YARN。
尽管 Hadoop 已经在雅虎之外获得了广泛的采用,但HDFS上的MapReduce框架有一些缺点。
首先,它难以管理,操作复杂。
其次,它的通用批处理MapReduce API过于冗长,需要大量样板设置代码,容错能力很弱。
第三,对于大数据量的作业和许多对MR任务,每对中间计算结果都要被写入本地磁盘,以供其后续阶段的操作(见图1-1)。这种重复的磁盘I/O对性能造成了损失:大型MR作业可以连续运行几个小时,甚至几天。
图1 - 1,map和reduce计算之间的读写交替迭代
最后,尽管Hadoop MR有利于一般批处理的大规模作业,但它在结合其他数据工作内容(如机器学习、流或交互式sql类查询)时却有所不足。
为了处理这些新的工作内容,工程师们又开发了定制的系统(Apache Hive, Apache Storm, Apache Impala, Apache Giraph, Apache Drill, Apache Mahout等),每个系统都有自己的api和集群配置,进一步增加了Hadoop的操作复杂性和开发人员的学习曲线。
然后问题变成了没有一种方法可以让Hadoop和MR更简单、更快?
Spark早年在AMPLab的经历
加州大学伯克利分校(UC Berkeley)的研究人员此前曾从事Hadoop MapReduce的研究,他们发起了一个名为Spark的项目,迎接了这一挑战。他们承认MR对于交互或迭代计算工作是低效的(或难以处理的),并且是一个复杂的学习框架,所以从一开始他们就接受了让Spark更简单、更快和更容易的想法。这项努力始于2009年的RAD实验室,后来成为AMPLab(现在被称为RISELab)。
早期发表在Spark上的论文表明,在某些工作上,它比Hadoop MapReduce快10到20倍。今天,它的速度快了许多个数量级。Spark项目的核心是引入借鉴于Hadoop MapReduce的思想,使新系统变得更强: 具有高度容错性和并行性,支持在内存中交互迭代映射结果,并减少计算,提供多种语言的简单易用的API,并以统一的方式支持其他工作方式。
到2013年,Spark已经得到了广泛的应用,它的一些原始创造者和研究人员——matei Zaharia、Ali Ghodsi、reynolds Xin、Patrick Wendell、Ion Stoica和Andy konwinski——将Spark项目捐赠给了ASF,并成立了一家名为Databricks的公司。
Databricks和开源开发者社区在ASF的管理下,于2014年5月发布了Apache Spark 1.0。这是第一个主要版本,为Databricks和100多家商业供应商为Apache Spark频繁发布和贡献重要特性奠定了基础。
Spark 是什么
Apache Spark是一个统一的引擎,专为大规模分布式数据处理而设计,可以在数据中心或云中部署。
Spark为中间计算提供内存存储,使其比Hadoop MapReduce快得多。它包含了用于机器学习的可组合api (MLlib)、用于交互式查询的SQL (Spark SQL)、用于与实时数据交互的流处理(结构化流)和图形处理(GraphX)的库。
Spark的设计理念围绕着四个关键特征:
- 速度
- 易用性
- 模块化
- 可扩展性
速度
Spark以多种方式来达到追求速度的目标。首先,它的内部实现极大地受益于硬件行业最近在提高cpu和内存的价格和性能方面所取得的巨大进步。今天的商品服务器价格便宜,具有数百gb的内存、多个内核,底层基于unix的操作系统利用了高效的多线程和并行处理。该框架经过优化,充分利用了所有这些因素。
其次,Spark将查询计算构建为有向无环图(DAG);它的DAG调度器和查询优化器构建了一个高效的计算图,通常可以将其分解为在集群上的worker之间并行执行的任务。第三,它的物理执行引擎Tungsten使用全阶段代码生成来生成紧凑的执行代码。
由于所有中间结果都保留在内存和有限的磁盘I/O中,这给了它巨大的性能提升。
易用性
Spark通过提供简单逻辑数据结构的基本抽象(称为弹性分布式数据集(RDD))来实现简单性,所有其他高级结构化数据抽象(如dataframe和Datasets)都是在此基础上构建的。通过提供一组转换和操作作为算子,Spark提供了一个简单的编程模型,可以使用熟悉的语言构建大数据应用程序。
模块化
Spark操作可以应用于多种类型的工作负载,并以任何支持的编程语言表示:Scala、Java、Python、SQL和r。Spark提供了统一的库,其中包含文档完备的api,其中包括以下模块作为核心组件:Spark SQL、Spark结构化流、Spark MLlib和GraphX,将所有工作负载组合在一个引擎下运行。
我们可以编写一个可以完成所有工作的Spark应用程序——不需要为不同的工作内容使用不同的引擎,也不需要学习单独的api。使用Spark,我们可以为我们的工作内容获得一个统一的处理引擎。
可扩展性
Spark专注于其快速、并行的计算引擎,而不是存储。与同时包含存储和计算的Apache Hadoop不同,Spark将两者解耦。这意味着可以使用Spark读取存储在无数数据源中的数据——Apache Hadoop、Apache Cassandra、Apache HBase、MongoDB、Apache Hive、rdbms等等——并在内存中处理这些数据。Spark的DataFrameReaders
和dataframerwriters
也可以扩展到从其他来源(如Apache Kafka、Kinesis、Azure Storage和Amazon S3)读取数据到它的逻辑数据抽象中,并在其上进行操作。
Spark开发人员社区维护着一个第三方Spark包列表,作为不断增长的生态系统的一部分(见图1-2)。这个丰富的软件包生态系统包括用于各种外部数据源、性能监视器等的Spark连接器。
图1 - 2. Apache Spark的连接器生态系统
分析方法的统一
虽然统一的概念不是Spark独有的,但它是其设计理念和演变的核心组成部分。2016年11月,美国计算机协会(ACM)认可了Apache Spark,并授予其原始创建者享有盛誉的ACM奖,以表彰他们将Apache Spark描述为“大数据处理的统一引擎”。这篇获奖论文指出,Spark用一个统一的组件堆栈取代了所有独立的批处理、图形、流和查询引擎,如Storm、Impala、Dremel、Pregel等,可以在一个分布式快速引擎下处理不同的工作负载。
统一技术栈的Apache Spark组件
如图1-3所示,Spark为不同的工作负载提供了四个不同的组件作为库:Spark SQL、Spark MLlib、Spark Structured Streaming和GraphX。这些组件中的每一个都与Spark的核心容错引擎分开,因为要使用api编写Spark应用程序,然后Spark将其转换为由核心引擎执行的DAG。因此,无论你是使用Java、R、Scala、SQL还是Python提供的结构化api编写Spark代码,底层代码都会被分解成高度紧凑的字节码,并在集群中的worker jvm中执行。
图1 - 3. Apache Spark组件和API栈
Spark SQL
这个模块可以很好地处理结构化数据。您可以读取存储在RDBMS表中的数据或从具有结构化数据(CSV, text, JSON, Avro, ORC, Parquet等)的文件格式中读取数据,然后在Spark中构建永久或临时表。此外,当在Java、Python、Scala或R中使用Spark的结构化api时,你可以组合类似sql的查询来查询刚刚读入Spark DataFrame的数据。到目前为止,Spark SQL是ANSI SQL:2003兼容的,它也可以作为一个纯SQL引擎。
例如,在这个Scala代码片段中,你可以从存储在Amazon S3上的JSON文件中读取,创建一个临时表,并对作为Spark DataFrame读入内存的结果发出类似sql的查询:
// In Scala
// Read data off Amazon S3 bucket into a Spark DataFrame
spark.read.json("s3://apache_spark/data/committers.json")
.createOrReplaceTempView("committers")
// Issue a SQL query and return the result as a Spark DataFrame
val results = spark.sql("""SELECT name, org, module, release, num_commits
FROM committers WHERE module = 'mllib' AND num_commits > 10
ORDER BY num_commits DESC""")
你可以用Python、R或Java编写类似的代码片段,生成的字节码将是相同的,从而获得相同的性能。
Spark MLlib
Spark附带了一个库,其中包含称为MLlib的常见机器学习(ML)算法。自从Spark第一次发布以来,由于Spark 2的出现,这个库组件的性能得到了显著提高。X的底层引擎增强。MLlib提供了许多流行的机器学习算法,这些算法构建在基于数据框架的高级api之上,用于构建模型。
NOTE
从Apache Spark 1.6开始,MLlib项目被分成两个包:Spark;Mllib和spark.ml。基于数据框架的API是后者,而前者包含基于rdd的API,后者现在处于维护模式。所有的新特性都在spark.ml中。
这些api允许你在部署期间提取或转换特性,构建管道(用于训练和评估),并保持模型(用于保存和重新加载它们)。其他实用程序包括使用常见的线性代数运算和统计。MLlib包括其他低级的ML原语,包括通用的梯度下降优化。以下Python代码片段封装了数据科学家在构建模型时可能执行的基本操作:
# In Python
from pyspark.ml.classification import LogisticRegression
...
training = spark.read.csv("s3://...")
test = spark.read.csv("s3://...")
# Load training data
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Fit the model
lrModel = lr.fit(training)
# Predict
lrModel.transform(test)
...
Spark Structured Streaming
Apache Spark 2.0引入了一个实验性的连续流模型和结构化流api,构建在Spark SQL引擎和基于数据框架的api之上。到Spark 2.2,结构化流媒体已经普遍可用,这意味着开发人员可以在他们的生产环境中使用它。
大数据开发人员需要对来自Apache Kafka等引擎和其他流数据源的静态数据和流数据进行实时组合和反应,新模型将流视为一个不断增长的表,最后添加新的数据行。开发人员可以将其视为结构化表,并像处理静态表一样对其进行查询。
在结构化流模型之下,Spark SQL核心引擎处理所有方面的容错和后期数据语义,允许开发人员相对轻松地专注于编写流应用程序。这个新模型消除了Spark 1中的旧DStreams模型。此外,Spark 2.x和Spark 3.0扩展了流数据源的范围,包括Apache Kafka、Kinesis和基于hdfs或云存储。
下面的代码片段展示了结构化流应用程序的典型剖析。它从localhost套接字读取并将单词计数结果写入Apache Kafka主题:
# In Python
# Read a stream from a local host
from pyspark.sql.functions import explode, split
lines = (spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load())
# Perform transformation
# Split the lines into words
words = lines.select(explode(split(lines.value, " ")).alias("word"))
# Generate running word count
word_counts = words.groupBy("word").count()
# Write out to the stream to Kafka
query = (word_counts
.writeStream
.format("kafka")
.option("topic", "output"))
GraphX
顾名思义,GraphX是一个用于操作图(例如,社交网络图,路由和连接点,或网络拓扑图)和执行图并行计算的库。它提供了用于分析、连接和遍历的标准图形算法,这些算法由社区中的用户贡献:可用的算法包括PageRank、Connected Components和Triangle Counting。
下面的代码片段展示了一个如何使用GraphX api连接两个图的简单示例:
// In Scala
val graph = Graph(vertices, edges)
messages = spark.textFile("hdfs://...")
val graph2 = graph.joinVertices(messages) {
(id, vertex, msg) => ...
}
Spark的分布式执行
Spark是一个分布式数据处理引擎,它的组件在一个机器集群上协同工作。
让我们从图1-4所示的每个单独组件以及它们如何适应体系结构开始。在Spark架构的高层,Spark应用程序由一个驱动程序组成,该程序负责编排Spark集群上的并行操作。驱动程序通过SparkSession
访问集群中的分布式组件——Spark执行器和集群管理器。
图1 - 4. Apache Spark组件和架构
Spark driver
作为Spark应用程序中负责实例化SparkSession的部分,Spark driver 有多个角色:它与集群管理器通信;它向集群管理器请求资源(CPU,内存等),用于Spark的执行器(jvm);它将所有Spark操作转换为DAG计算,对其进行调度,并将其作为任务分发给Spark executors。一旦分配了资源,它就直接与 executors 通信。
SparkSession
在Spark 2.0中,SparkSession成为所有Spark操作和数据的统一管道。它不仅包含了Spark之前的入口点,如SparkContext、SQLContext、HiveContext、SparkConf和StreamingContext,而且还使Spark的工作更简单、更容易。
在Spark 2中, 虽然SparkSession包含了所有其他上下文,但仍然可以访问各个上下文及其各自的方法。通过这种方式,社区保持了向后兼容性。那就是旧1.x代码, 使用SparkContext或SQLContext的仍然可以工作。
通过这个管道,可以创建JVM运行时参数、定义dataframe和Datasets、从数据源读取数据、访问目录元数据以及发出Spark SQL查询。SparkSession为Spark的所有功能提供了一个统一的入口点。
在独立的Spark应用程序中,可以使用选择的编程语言中的高级api之一创建SparkSession。在Spark shell中创建SparkSession
,可以通过一个名为Spark
或sc
的全局变量访问它.
而在Spark 1中, 将不得不创建单独的上下文(用于流,SQL等),引入额外的样板代码。在Spark 2中,可以在每个JVM上创建一个SparkSession
,并使用它来执行一些Spark操作。
让我们来看一个例子:
// In Scala
import org.apache.spark.sql.SparkSession
// Build SparkSession
val spark = SparkSession
.builder
.appName("LearnSpark")
.config("spark.sql.shuffle.partitions", 6)
.getOrCreate()
...
// Use the session to read JSON
val people = spark.read.json("...")
...
// Use the session to issue a SQL query
val resultsDF = spark.sql("SELECT city, pop, state, zip FROM table_name")
Cluster manager
集群管理器负责为运行Spark应用程序的节点集群管理和分配资源。目前,Spark支持四种集群管理器:内置的独立集群管理器、Apache Hadoop YARN、Apache Mesos和Kubernetes。
Spark executor
集群中的每个工作节点上都运行一个Spark执行器。执行程序与驱动程序通信,并负责在工作程序上执行任务。在大多数部署模式中,每个节点只运行一个执行器。
部署模式
Spark的一个吸引人的特性是它支持多种部署模式,使Spark能够在不同的配置和环境中运行。因为集群管理器不知道它在哪里运行(只要它能管理Spark的执行器并满足资源请求),所以Spark可以部署在一些最流行的环境中——比如Apache Hadoop YARN和kubernetes——并且可以在不同的模式下运行。部署方式如表1-1所示。
表1 - 1. Spark部署模式说明
Mode | Spark driver | Spark executor | Cluster manager |
---|---|---|---|
Local | 运行在单个JVM上,如笔记本电脑或单个节点 | 运行在与driver相同的JVM上 | 在同一主机上运行 |
Standalone | 可以在集群中的任何节点上运行 | 集群中的每个节点将启动自己的执行器JVM | 可以任意分配给集群中的任意主机 |
YARN (client) | 在客户机上运行,而不是集群的一部分 | YARN的NodeManager的容器 | YARN的资源管理器与YARN的资源管理器一起工作 |
YARN (cluster) | 与YARN应用程序主一起运行 | 与YARN客户端模式相同 | 与YARN客户端模式相同 |
Kubernetes | 在Kubernetes pod中运行 | 每个 worker 都在自己的 po d 中运行 | Kubernetes Master |
分布式数据和分区
实际的物理数据以分区的形式分布在HDFS或云存储中(见图1-5)。当数据作为分区分布在整个物理集群中时,Spark将每个分区视为一个高级逻辑数据抽象——作为内存中的一个DataFrame。最好为每个Spark executor 分配一个任务,该任务需要它观察数据位置读取网络中离它最近的分区,虽然这并不是一定的.
图1 - 5. 数据分布在物理机器上
分区允许有效的并行性。将数据分解成块或分区的分布式方案允许Spark执行器只处理与它们接近的数据,从而最大限度地减少网络带宽。也就是说,每个执行器的核心都被分配了自己的数据分区(参见图1-6)。
图1 - 6. 每个执行器的核心都有一个数据分区来处理
例如,下面的代码片段将把存储在集群上的物理数据分解成八个分区,每个执行器将获得一个或多个分区来读入其内存:
# In Python
log_df = spark.read.text("path_to_large_text_file").repartition(8)
print(log_df.rdd.getNumPartitions())
这段代码将创建一个包含10000个整数的DataFrame,分布在内存的8个分区上:
# In Python
df = spark.range(0, 10000, 1, 8)
print(df.rdd.getNumPartitions())
这两个代码片段都将输出8。
开发的经验
在所有开发人员的乐趣中,没有什么比一组可组合的api更吸引人了,这些api可以提高生产力,并且易于使用、直观和表达。Apache Spark对开发人员的主要吸引力之一是其易于使用的api,可以跨语言(Scala、Java、Python、SQL和R)操作小到大的数据集。
Spark 2 的一个主要目的是通过限制概念的数量来统一和简化框架,引入了高级抽象api作为特定于领域的语言结构,这使得Spark编程具有高度的表现力和愉快的开发体验。只需要表达你希望任务或操作计算什么,而不是如何计算它,并让Spark确定如何最好地为你完成它。
Spark 的使用人群与使用目的
毫不奇怪,大多数处理大数据的开发人员都是数据工程师、数据科学家或机器学习工程师。他们之所以被Spark所吸引,是因为它允许他们使用一个引擎和熟悉的编程语言构建一系列应用程序。
当然,开发人员可能身兼数职,有时会同时从事数据科学和数据工程任务,尤其是在初创公司或较小的工程团队中。然而,在所有这些任务中,数据——海量的数据——是基础。
数据科学任务
作为一门在大数据时代崭露头角的学科,数据科学就是用数据讲故事。但在他们讲述故事之前,数据科学家必须清理数据,探索数据以发现模式,并建立模型来预测或建议结果。其中一些任务需要统计学、数学、计算机科学和编程方面的知识。
大多数数据科学家都精通使用SQL等分析工具,熟悉NumPy和pandas等库,熟悉R和Python等编程语言。但是他们还必须知道如何处理或转换数据,以及如何使用已建立的分类、回归或聚类算法来构建模型。他们的任务通常是迭代的、互动的或特别的,或者是实验性的,以证明他们的假设。
幸运的是,Spark支持这些不同的工具。Spark的MLlib提供了一套通用的机器学习算法来构建模型管道,使用高级估计器、变压器和数据特性。Spark SQL和Spark shell促进了对数据的交互式和临时探索。
此外,Spark使数据科学家能够处理大型数据集并扩展他们的模型训练和评估。Apache Spark 2.4引入了一个新的组调度程序,作为Project Hydrogen的一部分,以适应以分布式方式训练和调度深度学习模型的容错需求,Spark 3.0引入了在单机、YARN和Kubernetes部署模式下支持GPU资源收集的能力。这意味着需要深度学习技术的开发人员可以使用Spark。
数据工程任务
在构建模型之后,数据科学家通常需要与其他团队成员合作,他们可能负责部署模型。或者,他们可能需要与其他人密切合作,将原始的脏数据构建并转换为其他数据科学家易于消费或使用的干净数据。例如,分类或聚类模型不是孤立存在的;它可以与其他组件一起工作,比如web应用程序或流引擎(如Apache Kafka),或者作为更大的数据管道的一部分。这个管道通常由数据工程师构建。
数据工程师对软件工程原理和方法有深刻的理解,并拥有为既定业务用例构建可伸缩数据管道的技能。数据管道支持对来自无数源的原始数据进行端到端转换——对数据进行清理,以便开发人员可以在下游使用它,将其存储在云中、NoSQL或rdbms中以生成报告,或者通过商业智能工具使数据分析师可以访问它。
Spark 2.x 引入了一种演进式的流模型,称为结构化流的连续应用程序。使用结构化流api,数据工程师可以构建复杂的数据管道,使他们能够从实时和静态数据源中获取ETL数据。
数据工程师之所以使用Spark,是因为它提供了一种简单的方法来并行计算,并隐藏了分布和容错的所有复杂性。这使得他们可以专注于使用基于数据框架的高级api和领域特定语言(DSL)查询来执行ETL,读取和组合来自多个数据源的数据。
由于SQL的Catalyst优化器和用于紧凑代码生成的Tungsten,Spark 2.x 和 Spark 3.0 中的性能得以改进,使数据工程师的工作变得更加轻松。他们可以选择使用适合手头任务的三种Spark api (rdd、dataframe或dataset)中的任何一种,并获得Spark的好处。
无论你是数据工程师、数据科学家还是机器学习工程师,你都会发现Spark对以下用例很有用:
- 并行处理分布在集群中的大型数据集
- 执行特别或交互式查询以探索和可视化数据集
- 使用MLlib构建、训练和评估机器学习模型
- 从海量数据流中实现端到端数据管道
- 分析图形数据集和社交网络