Spark 部署与应用程序交互简单使用说明


文章目录

    • 前言
    • 步骤一:下载安装包
      • Spark的目录和文件
    • 步骤二:使用Scala或PySpark Shell
      • 本地 shell 运行
    • 步骤3:理解Spark应用中的概念
      • Spark Application and SparkSession
      • Spark Jobs
      • Spark Stages
      • Spark Tasks
    • 转换、立即执行操作和延迟求值
      • 窄变换和宽变换
    • Spark UI
    • 单机的应用程序
      • 计算巧克力豆的数量
      • 单机编译 Scala 程序
    • 总结

前言

本文将讲解 Spark 的部署,并通过三个简单的步骤来编写一个独立应用程序。
我们将使用本地模式,其中所有的处理都是在Spark shell中的一台机器上完成的——这是学习框架的一种简单方法,迭代执行的方式可以及时反馈直接结果。使用Spark shell,可以在编写复杂的Spark应用程序之前使用小数据集对Spark进行操作验证,但是对于想要获得分布式执行好处的大数据集或生产环境,建议使用YARN或Kubernetes部署模式。
虽然Spark shell只支持Scala、Python和R,但你可以用任何支持的语言(包括Java)编写Spark应用程序,并使用Spark SQL发出查询。

步骤一:下载安装包

进入Spark下载页面,在步骤2的下拉菜单中选择“Pre-built for Apache Hadoop 3.3”,然后点击步骤3中的“download Spark”链接(如图2-1所示)。
image.png
图2 - 1. Apache Spark下载页面
这将下载压缩包spark-3.5.1-bin-hadoop3.tgz。它包含在笔记本电脑上以本地模式运行Spark所需的所有与hadoop相关的二进制文件。或者,如果要将它安装在现有的HDFS或Hadoop安装上,可以从下拉菜单中选择匹配的Hadoop版本。如果想要以源码编译的方式部署,可以在官方文档中相关内容。
自Apache Spark 2.2发布以来,只关心在Python中学习Spark的开发人员可以选择从PyPI存储库安装PySpark。如果你只用Python编程,你不需要安装运行Scala、Java或R所需的所有其他库; 要从PyPI安装PySpark,只需运行pip install PySpark
可以通过pip install pyspark[SQL, ML, MLlib]安装SQL, ML和MLlib的一些额外依赖项(如果只想要SQL依赖项,也可以通过pip install pyspark[SQL])。

NOTE
需要在机器上安装Java 8或更高版本,并设置JAVA_HOME环境变量。有关如何下载和安装Java的说明,请参阅文档。

如果想以解释性shell模式运行R,则必须先安装R,然后再运行sparkR。要使用R进行分布式计算,还可以使用R社区创建的开源项目sparklyr

Spark的目录和文件

本文中的所有命令和指令都是在 Unix 系统上运行的。下载完tarball后,cd到下载目录,使用tar -xf spark-3.5.1-bin-hadoop3.tgz解压tarball内容,其中内容如下:

$ cd spark-3.0.0-preview2-bin-hadoop2.7
$ ls
LICENSE   R          RELEASE   conf    examples   kubernetes  python   yarn
NOTICE    README.md  bin       data    jars       licenses    sbin

README.md

  • 这个文件包含了关于如何使用Spark shell、如何从源代码构建Spark、如何运行独立的Spark示例、如何阅读Spark文档和配置指南的链接,以及如何为Spark做出贡献的新的详细说明.

bin

  • 顾名思义,该目录包含用于与Spark交互的大多数脚本,包括Spark shell (Spark -sql、pyspark、Spark -shell和sparkR)。我们将在后面使用这个目录中的shell和可执行文件,使用Spark -submit提交一个独立的Spark应用程序,并编写一个脚本,在Kubernetes支持下运行Spark时构建和推送Docker 镜像。

sbin

  • 该目录中的大多数脚本都是用于管理的,用于在集群的各种部署模式下启动和停止Spark组件。

kubernetes

  • 自从Spark 2.4发布以来,这个目录包含了用于在Kubernetes集群上为Spark发行版创建Docker镜像的Dockerfiles。它还包含一个文件,提供如何在构建Docker映像之前构建Spark发行版的说明。

data

  • 该目录中填充了*.txt文件,这些文件作为Spark组件的输入:MLlib、Structured Streaming和GraphX。

examples

  • Spark提供了Java、Python、R和Scala的示例,可以在学习该框架时用到它们。

步骤二:使用Scala或PySpark Shell

如前所述,Spark附带了四个广泛使用的解释器,它们就像交互式“shell”一样,支持临时数据分析:pyspark、Spark -shell、Spark-sql和sparkR。
这些shell已经支持连接到集群,并允许你将分布式数据加载到Spark worker的内存中。无论你是在处理千兆字节的数据还是小数据集,Spark shell都有助于快速学习Spark。
要启动PySpark, cd到 bin 目录并输入PySpark启动shell。如果你已经从PyPI安装了PySpark,那么只需输入PySpark就足够了:

$ pyspark
Python 3.7.3 (default, Mar 27 2019, 09:23:15)
[Clang 10.0.1 (clang-1001.0.46.3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
20/02/16 19:28:48 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.0-preview2
      /_/

Using Python version 3.7.3 (default, Mar 27 2019 09:23:15)
SparkSession available as 'spark'.
>>> spark.version
'3.0.0-preview2'
>>>

要用Scala启动一个类似的Spark shell, cd到bin目录并输入Spark -shell:

$ spark-shell
20/05/07 19:30:26 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://10.0.1.7:4040
Spark context available as 'sc' (master = local[*], app id = local-1581910231902)
Spark session available as 'spark'.
Welcome to

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.0-preview2
      /_/

Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_241)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.version
res0: String = 3.0.0-preview2
scala>

本地 shell 运行

Spark计算被表示为算子。然后,这些算子被转换成低级的基于rdd的字节码作为任务,分发给Spark的执行器执行。
让我们看一个简短的示例,其中我们以DataFrame的形式读取文本文件,显示读取的字符串示例,并计算文件中的总行数。这个简单的例子说明了高级结构化api的使用。DataFrame上的show(10, false)操作只显示前10行,不截断。
默认情况下,截断布尔标志为true。下面是它在Scala shell中的样子:

scala> val strings = spark.read.text("../README.md")
strings: org.apache.spark.sql.DataFrame = [value: string]

scala> strings.show(10, false)
+------------------------------------------------------------------------------+
|value                                                                         |
+------------------------------------------------------------------------------+
|# Apache Spark                                                                |
|                                                                              |
|Spark is a unified analytics engine for large-scale data processing. It       |
|provides high-level APIs in Scala, Java, Python, and R, and an optimized      |
|engine that supports general computation graphs for data analysis. It also    |
|supports a rich set of higher-level tools including Spark SQL for SQL and     |
|DataFrames, MLlib for machine learning, GraphX for graph processing,          |
| and Structured Streaming for stream processing.                              |
|                                                                              |
|<https://spark.apache.org/>                                                   |
+------------------------------------------------------------------------------+
only showing top 10 rows

scala> strings.count()
res2: Long = 109
scala>

让我们看一个使用Python解释性shell pyspark的类似示例:

$ pyspark
Python 3.7.3 (default, Mar 27 2019, 09:23:15)
[Clang 10.0.1 (clang-1001.0.46.3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform 
WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations
WARNING: All illegal access operations will be denied in a future release
20/01/10 11:28:29 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
Welcome to

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.0-preview2
      /_/

Using Python version 3.7.3 (default, Mar 27 2019 09:23:15)
SparkSession available as 'spark'.
>>> strings = spark.read.text("../README.md")
>>> strings.show(10, truncate=False)
+------------------------------------------------------------------------------+
|value                                                                         |
+------------------------------------------------------------------------------+
|# Apache Spark                                                                |
|                                                                              |
|Spark is a unified analytics engine for large-scale data processing. It       |
|provides high-level APIs in Scala, Java, Python, and R, and an optimized      |
|engine that supports general computation graphs for data analysis. It also    |
|supports a rich set of higher-level tools including Spark SQL for SQL and     |
|DataFrames, MLlib for machine learning, GraphX for graph processing,          |
|and Structured Streaming for stream processing.                               |
|                                                                              |
|<https://spark.apache.org/>                                                   |
+------------------------------------------------------------------------------+
only showing top 10 rows

>>> strings.count()
109
>>>

要退出任何Spark shell,按Ctrl-D。这种与Spark shell的快速交互不仅有利于快速学习,也有利于快速验证实验。
我们使用高级结构化api将文本文件读入Spark DataFrame而不是RDD,目前基本上已经很少直接使用 RDD 去操作数据,而是使用 API。

NOTE
在高级结构化api中表达的每一个计算都被分解为低级的RDD操作,然后转换为Scala字节码,供执行器的jvm使用。这个生成的RDD操作代码对用户来说是不可访问的,也与面向用户的RDD api不一样。

步骤3:理解Spark应用中的概念

要理解我们的示例代码在底层发生了什么,需要熟悉Spark应用程序的一些关键概念,以及代码如何作为任务在Spark执行器之间转换和执行:
Application

  • 使用Spark的 APIs 构建在Spark上的用户程序, 它由集群上的 driver 和 executors 组成。

SparkSession

  • 它提供了与底层Spark功能交互的入口点的一个实例化对象,并允许使用Spark的api对Spark进行编程。在交互式Spark shell中,Spark driver 会自动实例化一个SparkSession,而在Spark应用程序中,我们自己可以创建一个SparkSession对象。

Job

  • 由多个任务组成的并行计算,这些任务在响应Spark操作(例如,save() collect())时产生。

Stage

  • 每个任务被分成更小的任务集,称为阶段,这些阶段相互依赖。

Task

  • 将被发送到Spark executor 的操作命令或单个执行单元。

Spark Application and SparkSession

每个Spark application 的核心是Spark driver 程序,它创建一个SparkSession对象。当你使用Spark shell时,driver 是shell的一部分,并且创建了SparkSession对象(可通过变量Spark访问),正如在启动shell时在前面的示例中看到的那样。
在这些示例中,因为在笔记本电脑上本地启动了Spark shell,所以所有操作都在本地运行,在单个JVM中运行。但是,你可以像在本地模式下一样轻松地启动Spark shell来在集群上并行分析数据。命令Spark -shell——help或pyspark——help将向您展示如何连接到Spark集群管理器。图2-2显示了Spark在集群上的执行情况。
image.png
图2 - 2. 在Spark的分布式架构中,Spark组件通过Spark driver 进行通信
一旦有了SparkSession,就可以使用api对Spark进行编程来执行Spark操作。

Spark Jobs

在与Spark shell的交互会话中,Driver 将我们的Spark应用程序转换为一个或多个Spark作业(图2-3)。然后将每个作业转换为DAG。本质上,这就是Spark的执行计划,其中DAG中的每个节点可以是单个或多个Spark阶段。
image.png
图2 - 3. 创建一个或多个Spark作业的Spark Driver

Spark Stages

作为DAG节点的一部分,阶段是根据可以串行或并行执行的算子创建的(图2-4)。并非所有的Spark 算子都可以在一个阶段中发生,因此它们可以被划分为多个阶段。通常阶段是在运算符的计算边界上划分的,在那里它们规定了Spark executor 之间的数据传输。
image.png
图2 - 4. 创建一个或多个阶段的Spark job

Spark Tasks

每个阶段都由Spark任务(一个执行单元)组成,然后在每个Spark executor 上执行.
每个任务映射到一个核,在一个数据分区上工作(图2-5)。因此,一个16核的执行器可以在16个或更多的分区上并行运行16个或更多的任务,这使得Spark的任务执行并行度很高:
image.png
图2 - 5. 创建一个或多个任务以分发给 executor 的 Spark stage

转换、立即执行操作和延迟求值

分布式数据上的Spark操作可以分为两种类型: 转换执行操作。顾名思义,转换在不改变原始数据的情况下将Spark DataFrame转换为新的DataFrame,从而使其具有不可变性。
换句话说,像select()filter()这样的操作不会改变原始DataFrame;相反,它将返回转换后的操作结果作为一个新的DataFrame。
所有的转换都是延迟执行的,它们的结果不是立即计算出来的,而是作为一个转换关系被记录。这些记录允许Spark在稍后的执行计划中重新安排某些转换,合并它们,或者将转换优化到更有效的执行阶段。延迟计算是Spark延迟执行的策略,直到一个执行操作被调用或数据被“使用”(从磁盘读取或写入磁盘)。
执行操作触发所有转换记录的延迟计算。在图2-6中,所有的转换T都被记录下来,直到动作A被调用。每个转换T产生一个新的DataFrame。
image.png
图2 - 6. 延迟转换和立即执行求值的操作
延迟求值通过转换血缘关系和数据不变性提供了容错性,允许Spark通过链式调用转换来优化查询。由于Spark在转换血缘关系中记录了每个转换,并且dataframe在转换之间是不可变的,因此它可以通过简单地重新执行血缘关系的记录来重现其原始状态,从而在发生故障时提供弹性。
下边列出了一些转换和操作的示例:
image.png
这些动作和转换构成了一个Spark查询计划,在调用操作之前,查询计划中不会执行任何内容。下面的示例用Python和Scala显示,有两个转换——read()filter()——和一个立即执行操作 count()。该操作触发了作为查询执行计划的一部分记录的所有转换的执行。在这个例子中,在shell中执行filter .count()之前什么都不会发生:

# In Python 
>>> strings = spark.read.text("../README.md")
>>> filtered = strings.filter(strings.value.contains("Spark"))
>>> filtered.count()
20
// In Scala
scala> import org.apache.spark.sql.functions._
scala> val strings = spark.read.text("../README.md")
scala> val filtered = strings.filter(col("value").contains("Spark"))
scala> filtered.count()
res5: Long = 20s

窄变换和宽变换

如前所述,转换是Spark 延迟计算的操作。延迟求值方案的一个巨大优势是,Spark可以检查你的计算性查询,并确定如何优化它。这种优化可以通过连接或管道化一些操作并将它们分配到一个阶段来完成,或者通过确定哪些操作需要跨集群的shuffle或数据交换来将它们分解为阶段来完成。
转换可以分为窄依赖关系宽依赖关系。任何可以从单个输入分区计算单个输出分区的转换都是窄转换。例如,在前面的代码片段中,filter()contains()表示狭窄的转换,因为它们可以在单个分区上操作并生成结果输出分区,而无需交换任何数据
但是,groupBy()orderBy()之类的转换会指示Spark执行宽转换,其中来自其他分区的数据被读入、合并并写入磁盘。如果我们要通过调用.orderby()对前面示例中filtered后的DataFrame进行排序,那么每个分区都将在本地排序,但是我们需要强制对集群中每个执行器分区中的数据进行过滤,以便对所有记录进行排序。与窄转换相比,宽转换需要其他分区的输出来计算最终的聚合
下图说明了两种类型的依赖关系:
image.png

Spark UI

Spark包含一个图形用户界面,可以使用它来检查或监视Spark应用程序的各个分解阶段(即job、state 和 tasks)。根据Spark的部署方式,驱动程序启动一个web UI,默认在端口4040上运行,可以在其中查看指标和详细信息,例如:

  • 调度 stages 和 tasks 的列表
  • RDD大小和内存使用的概要描述
  • 运行环境相关信息
  • 正在运行的 executors 信息
  • 所有的Spark SQL 查询

在本地模式下,可以通过浏览器http://:4040访问该接口。

NOTE
当启动spark-shell时,输出日志部分会显示要在端口4040上访问的本地主机URL。

让我们看一下前边的Python示例是如何转换为job、stage 和 tasks的。要查看DAG的外观,单击web UI中的“DAG可视化”。如下图所示,Driver 创建了一个 job 和一个 stage:
image.png
注意,这里不需要Exchange(执行器之间交换数据的地方),因为只有一个阶段。每个单独的操作用蓝框表示。
stage 0由一个task 组成。如果你有多个任务,它们将并行执行。在“stages”页中可以查看各个stage的详细信息,如下图所示:
image.png
在后续的文章系列中,我会详细介绍 UI 界面的使用,这里先只做个简单的介绍。

单机的应用程序

Spark发行版为每个Spark组件提供了一组示例应用程序。
从本地机器上的安装目录中,可以运行使用该命令提供的几个Java或Scala示例程序之一:
bin/run-example _<class> [params]_

$ ./bin/run-example JavaWordCount README.md

这将在控制台上INFO 信息中输出 README.md 文件中每个单词的列表及其计数(计数单词是分布式计算的“Hello, World”)。

计算巧克力豆的数量

在前面的例子中,我们统计了文件中的单词。如果文件很大,它将分布在一个被划分为小数据块的集群上,我们的Spark程序将分配计算每个分区中每个单词的任务,并返回最终的聚合计数,但这个例子已经有点过时了。
让我们来解决一个类似的问题,但是使用更大的数据集,并使用更多Spark的分发功能和DataFrame api。
如下图有很多巧克力豆的饼干,我们需要将这些不同颜色的巧克力豆分配给不同的人。
image.png
让我们编写一个Spark程序,读取一个包含超过100,000个条目的文件(其中每行或每行都有一个<state, mnm_color, count>),并计算和汇总每种颜色和状态的计数。这些汇总的计数告诉我们每个人喜欢的m&m巧克力豆的颜色。下边给出了完整的 Python 代码:

# Import the necessary libraries.
# Since we are using Python, import the SparkSession and related functions
# from the PySpark module.
import sys

from pyspark.sql import SparkSession

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: mnmcount <file>", file=sys.stderr)
        sys.exit(-1)

    # Build a SparkSession using the SparkSession APIs.
    # If one does not exist, then create an instance. There
    # can only be one SparkSession per JVM.
    spark = (SparkSession
             .builder
             .appName("PythonMnMCount")
             .getOrCreate())
    # Get the M&M data set filename from the command-line arguments
    mnm_file = sys.argv[1]
    # Read the file into a Spark DataFrame using the CSV
    # format by inferring the schema and specifying that the
    # file contains a header, which provides column names for comma-
    # separated fields.
    mnm_df = (spark.read.format("csv") 
              .option("header", "true") 
              .option("inferSchema", "true") 
              .load(mnm_file))

    # We use the DataFrame high-level APIs. Note
    # that we don't use RDDs at all. Because some of Spark's 
    # functions return the same object, we can chain function calls.
    # 1. Select from the DataFrame the fields "State", "Color", and "Count"
    # 2. Since we want to group each state and its M&M color count,
    #    we use groupBy()
    # 3. Aggregate counts of all colors and groupBy() State and Color
    # 4  orderBy() in descending order
    count_mnm_df = (mnm_df
                    .select("State", "Color", "Count")
                    .groupBy("State", "Color")
                    .sum("Count")
                    .orderBy("sum(Count)", ascending=False))
    # Show the resulting aggregations for all the states and colors;
    # a total count of each color per state.
    # Note show() is an action, which will trigger the above
    # query to be executed.
    count_mnm_df.show(n=60, truncate=False)
    print("Total Rows = %d" % (count_mnm_df.count()))
    # While the above code aggregated and counted for all 
    # the states, what if we just want to see the data for 
    # a single state, e.g., CA? 
    # 1. Select from all rows in the DataFrame
    # 2. Filter only CA state
    # 3. groupBy() State and Color as we did above
    # 4. Aggregate the counts for each color
    # 5. orderBy() in descending order  
    # Find the aggregate count for California by filtering
    ca_count_mnm_df = (mnm_df
                       .select("State", "Color", "Count")
                       .where(mnm_df.State == "CA")
                       .groupBy("State", "Color")
                       .sum("Count")
                       .orderBy("sum(Count)", ascending=False))
    # Show the resulting aggregation for California.
    # As above, show() is an action that will trigger the execution of the
    # entire computation. 
    ca_count_mnm_df.show(n=10, truncate=False)
    # Stop the SparkSession
    spark.stop()

创建 mnmcount.py 文件,mnn_datasets .csv 文件数据集下载地址,并使用安装的bin目录中的submit- Spark脚本将其作为Spark作业提交。将SPARK_HOME环境变量设置为在本地机器上安装Spark的根目录。

NOTE
前面的代码使用DataFrame API,读起来像高级DSL查询。我将在后续文章中介绍这个和其他api。与RDD API不同,你可以使用它来指示Spark做什么,而不是如何做,这是清晰和简单的!

为了避免将详细的INFO消息打印到控制台中,请复制 log4j.properties.template 模板文件到 log4j.properties。并设置log4j.conf/log4j.conf文件中的rootCategory=WARN
执行提交命令,提交上边的 Pyhton 代码至 Spark 集群:

$SPARK_HOME/bin/spark-submit mnmcount.py data/mnm_dataset.csv

-----+------+----------+
|State| Color|sum(Count)|
+-----+------+----------+
|   CA|Yellow|    100956|
|   WA| Green|     96486|
|   CA| Brown|     95762|
|   TX| Green|     95753|
|   TX|   Red|     95404|
|   CO|Yellow|     95038|
|   NM|   Red|     94699|
|   OR|Orange|     94514|
|   WY| Green|     94339|
|   NV|Orange|     93929|
|   TX|Yellow|     93819|
|   CO| Green|     93724|
|   CO| Brown|     93692|
|   CA| Green|     93505|
|   NM| Brown|     93447|
|   CO|  Blue|     93412|
|   WA|   Red|     93332|
|   WA| Brown|     93082|
|   WA|Yellow|     92920|
|   NM|Yellow|     92747|
|   NV| Brown|     92478|
|   TX|Orange|     92315|
|   AZ| Brown|     92287|
|   AZ| Green|     91882|
|   WY|   Red|     91768|
|   AZ|Orange|     91684|
|   CA|   Red|     91527|
|   WA|Orange|     91521|
|   NV|Yellow|     91390|
|   UT|Orange|     91341|
|   NV| Green|     91331|
|   NM|Orange|     91251|
|   NM| Green|     91160|
|   WY|  Blue|     91002|
|   UT|   Red|     90995|
|   CO|Orange|     90971|
|   AZ|Yellow|     90946|
|   TX| Brown|     90736|
|   OR|  Blue|     90526|
|   CA|Orange|     90311|
|   OR|   Red|     90286|
|   NM|  Blue|     90150|
|   AZ|   Red|     90042|
|   NV|  Blue|     90003|
|   UT|  Blue|     89977|
|   AZ|  Blue|     89971|
|   WA|  Blue|     89886|
|   OR| Green|     89578|
|   CO|   Red|     89465|
|   NV|   Red|     89346|
|   UT|Yellow|     89264|
|   OR| Brown|     89136|
|   CA|  Blue|     89123|
|   UT| Brown|     88973|
|   TX|  Blue|     88466|
|   UT| Green|     88392|
|   OR|Yellow|     88129|
|   WY|Orange|     87956|
|   WY|Yellow|     87800|
|   WY| Brown|     86110|
+-----+------+----------+

Total Rows = 60

+-----+------+----------+
|State| Color|sum(Count)|
+-----+------+----------+
|   CA|Yellow|    100956|
|   CA| Brown|     95762|
|   CA| Green|     93505|
|   CA|   Red|     91527|
|   CA|Orange|     90311|
|   CA|  Blue|     89123|
+-----+------+----------+

首先我们看到每个地区的人喜欢的颜色的聚合数据,下边是单个地区的。
下边是 Scala 版本代码运行相同的应用程序:

package main.scala.chapter2

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

/**
 * Usage: MnMcount <mnm_file_dataset>
 */
object MnMcount {
  def main(args: Array[String]) {
    val spark = SparkSession
    .builder
    .appName("MnMCount")
    .getOrCreate()

    if (args.length < 1) {
      print("Usage: MnMcount <mnm_file_dataset>")
      sys.exit(1)
    }
    // Get the M&M data set filename
    val mnmFile = args(0)
    // Read the file into a Spark DataFrame
    val mnmDF = spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(mnmFile)
    // Aggregate counts of all colors and groupBy() State and Color
    // orderBy() in descending order
    val countMnMDF = mnmDF
    .select("State", "Color", "Count")
    .groupBy("State", "Color")
    .sum("Count")
    .orderBy(desc("sum(Count)"))
    // Show the resulting aggregations for all the states and colors
    countMnMDF.show(60)
    println(s"Total Rows = ${countMnMDF.count()}")
    println()
    // Find the aggregate counts for California by filtering
    val caCountMnNDF = mnmDF
    .select("State", "Color", "Count")
    .where(col("State") === "CA")
    .groupBy("State", "Color")
    .sum("Count")
    .orderBy(desc("sum(Count)"))
    // Show the resulting aggregations for California
    caCountMnMDF.show(10)
    // Stop the SparkSession
    spark.stop()
  }
}

单机编译 Scala 程序

下边将说明如何使用Scala构建工具(sbt)构建一个Scala Spark程序。
build.sbt 是规范文件,与makefile类似,它描述并指示Scala编译器构建与Scala相关的任务,例如 jar 包、packages、要解析的依赖项以及在哪里查找它们。下边是一个简单构建的例子:

// Name of the package
name := "main/scala/chapter2"
// Version of our package
version := "1.0"
// Version of Scala
scalaVersion := "2.12.10"
// Spark library dependencies
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.0.0-preview2",
  "org.apache.spark" %% "spark-sql"  % "3.0.0-preview2"
)

确保已经安装了Java开发工具包(JDK)和sbt,并设置了JAVA_HOME和SPARK_HOME,用一个命令,就可以构建Spark应用程序:

$ sbt clean package
[info] Updated file /Users/julesdamji/gits/LearningSparkV2/chapter2/scala/
project/build.properties: set sbt.version to 1.2.8
[info] Loading project definition from /Users/julesdamji/gits/LearningSparkV2/
chapter2/scala/project
[info] Updating 
[info] Done updating.
...
[info] Compiling 1 Scala source to /Users/julesdamji/gits/LearningSparkV2/
chapter2/scala/target/scala-2.12/classes ...
[info] Done compiling.
[info] Packaging /Users/julesdamji/gits/LearningSparkV2/chapter2/scala/target/
scala-2.12/main-scala-chapter2_2.12-1.0.jar ...
[info] Done packaging.
[success] Total time: 6 s, completed Jan 11, 2020, 4:11:02 PM

成功构建后,您可以运行Scala版本的计数示例,如下所示:

$SPARK_HOME/bin/spark-submit --class main.scala.chapter2.MnMcount \ 
jars/main-scala-chapter2_2.12-1.0.jar data/mnm_dataset.csv
...
...
20/01/11 16:00:48 INFO TaskSchedulerImpl: Killing all running tasks in stage 4: 
Stage finished
20/01/11 16:00:48 INFO DAGScheduler: Job 4 finished: show at MnMcount.scala:49, 
took 0.264579 s
+-----+------+-----+
|State| Color|Total|
+-----+------+-----+
|   CA|Yellow| 1807|
|   CA| Green| 1723|
|   CA| Brown| 1718|
|   CA|Orange| 1657|
|   CA|   Red| 1656|
|   CA|  Blue| 1603|
+-----+------+-----+

总结

在本章中,我们介绍了开始使用Apache Spark所需的三个简单步骤:下载框架,熟悉Scala或PySpark交互shell,掌握高级Spark应用程序概念和术语。我们快速概述了使用转换和操作来编写Spark应用程序的过程,并简要介绍了使用Spark UI来检查所创建的job、stage和task。
最后,通过一个简短的示例,展示了如何使用高级结构化api来告诉Spark要做什么——在下一篇文章我将更详细地介绍这些api。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/511777.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Redis高可用与持久化

一、Redis高可用 在web服务器中&#xff0c;高可用是指服务器可以正常访问的时间&#xff0c;衡量的标准是在多长时间内可以提供正常服务&#xff08;99.9%、99.99%、99.999%等等&#xff09;。 但是在Redis语境中&#xff0c;高可用的含义似乎要宽泛一些&#xff0c;除了保证…

智能视频翻译和配音处理工具:Pyvideotrans

pyVideoTrans&#xff1a;一键字幕识别翻译配音带新语言字幕和配音的视频 - 精选真开源&#xff0c;释放新价值。 概览 Pyvideotrans是一款卓著的智能化视频处理系统&#xff0c;专精于视频翻译与配音艺术&#xff0c;以其卓越的技术实力实现对原始视频中音频信息的精准捕捉、…

笔记本电脑win7 Wireless-AC 7265连不上wifi6

1.背景介绍 旧路由器连接人数有限&#xff0c;老旧&#xff0c;信号不稳定更换了新路由器&#xff0c;如 TL-XDR5430易展版用户电脑连不上新的WIFI网络了&#xff0c;比较着急 核心问题&#xff1a;有效解决笔记本连接wifi上网问题&#xff0c;方法不限 2.环境信息 Windows…

4.2总结(快速幂 || 抽象方法,抽象类,接口)

JAVA学习小结 一.抽象方法和抽象类 抽象类不一定有抽象方法&#xff0c;但有抽象方法的一定是抽象类 格式&#xff1a;public abstract 返回值类型 方法名&#xff08;参数列表&#xff09; public abstract class 类名 {} 抽象类和抽象方法的意义&#xff1a;统一子类具有相…

Android 的网络加载

发起网络请求的过程 当用户在应用程序中输入网址或关键字时&#xff0c;应用程序会发起网络请求。这个过程大致如下&#xff1a; 应用程序将请求发送到服务器&#xff0c;服务器返回响应数据。应用程序接收到响应数据后&#xff0c;将其转换为应用程序可识别的数据格式。应用…

单片机中的RAM vs ROM

其实&#xff0c;单片机就是个小计算机。大计算机少不了的数据存储系统&#xff0c;单片机一样有&#xff0c;而且往往和CPU集成在一起&#xff0c;显得更加小巧灵活。 直到90年代初&#xff0c;国内容易得到的单片机是8031&#xff1a;不带存储器的芯片&#xff0c;要想工作&a…

Spark 的结构化 APIs——RDD,DataFrame, Dataset, SparkSQL 使用和原理总结

前言 在本文中&#xff0c;我们将探索 Spark 的结构化 APIs&#xff08;DataFrames and Datasets)。我们还将看下 Spark SQL 引擎是如何支撑高级的结构化 APIs 的。当Spark SQL在早期的Spark 1.x 中首次引入时, 随后是DataFrames 继承了Spark 1.3中 SchemaRDDs &#xff0c;此…

就业班 第二阶段 2401--4.1 day10 shell之“三剑客”+Expect

十一、shell 编程-grep egrep 支持正则表达式的拓展元字符 &#xff08;或grep -E&#xff09; #egrep [0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} file1.txt [rootnewrain ~]# num11 1、运用正则&#xff0c;判断需要[[ ]] [rootnewrain ~]# [[ $num1 ~ ^[0-9]$ ]] &a…

STM32 | 通用同步/异步串行接收/发送器USART带蓝牙(第六天原理解析)

STM32 第六天 一、 USART 1、USART概念 USART:(Universal Synchronous/Asynchronous Receiver/Transmitter)通用同步/异步串行接收/发送器 USART是一个全双工通用同步/异步串行收发模块,该接口是一个高度灵活的串行通信设备 处理器与外部设备通信的两种方式: u并行通信(…

在c# 7.3中不可用,请使用9.0或更高的语言版本

参考连接&#xff1a;在c# 7.3中不可用,请使用8.0或更高的语言版本_功能“可为 null 的引用类型”在 c# 7.3 中不可用。请使用 8.0 或更高的语言版本-CSDN博客https://blog.csdn.net/liangyely/article/details/106163660 [踩坑记录] 某功能在C#7.3中不可用,请使用 8.0 或更高的…

python file怎么打开

Python open() 方法用于打开一个文件&#xff0c;并返回文件对象&#xff0c;在对文件进行处理过程都需要使用到这个函数&#xff0c;如果该文件无法被打开&#xff0c;会抛出 OSError。 注意&#xff1a;使用 open() 方法一定要保证关闭文件对象&#xff0c;即调用 close() 方…

纯CSS实现未读消息显示99+

在大佬那看到这个小技巧&#xff0c;我觉得这个功能点还挺常用&#xff0c;所以给大家分享下具体的实现。当未读消息数小于100的时候显示准确数值&#xff0c;大于99的时候显示99。 1. 实现效果 2. 组件封装 <template><span class"col"><sup :styl…

LabVIEW专栏二、调用子VI

该节目标是创建带子vi&#xff0c;修改vi属性&#xff0c;测试可重入和不可重入的区别 一 、设置子VI 把VI封装成为子VI&#xff0c;可以帮助模块化程序&#xff0c;简化代码结构。 任何VI本身都可以成为别的VI的子VI。 1.1、设置输入输出端子 1、在前面板空白处&#xff0…

Springboot工程依赖包与执行包分离打包与构建docker 镜像

文章目录 一、概述二、工程概况1. 代码原始结构2. 运行界面 三、常规打包1. 打包命令2. jar包结构 四、分离依赖包、执行包步骤1. 引入依赖包管理插件2. 打包验证 五、分离后构建docker 镜像1. 借助Dockerfile2. docker-maven-plugin实现 六、版本升级 一、概述 某大数据项目&…

从零开始学RSA:低加密指数分解攻击

RSA是一种非对称加密算法&#xff0c;它由 公钥&#xff08;n/e&#xff09;&#xff0c;私钥(n/d)&#xff0c;明文M和密文C组成。我们做CTF题目时&#xff0c;一般题目中会给出公钥和密文让我们推出对应的私钥或者明文。RSA的相关公式都写在上面脑图中&#xff0c;在正式讲解…

史上最强 PyTorch 2.2 GPU 版最新安装教程

一 深度学习主机 1.1 配置 先附上电脑配置图&#xff0c;如下&#xff1a; 利用公司的办公电脑对配置进行升级改造完成。除了显卡和电源&#xff0c;其他硬件都是公司电脑原装。 1.2 显卡 有钱直接上 RTX4090&#xff0c;也不能复用公司的电脑&#xff0c;其他配置跟不上。…

[Linux]基础IO(中)---理解重定向与系统调用dup2的使用、缓冲区的意义

重定向理解 在Linux下&#xff0c;当打开一个文件时&#xff0c;进程会遍历文件描述符表&#xff0c;找到当前没有被使用的 最小的一个下标&#xff0c;作为新的文件描述符。 代码验证&#xff1a; ①&#xff1a;先关闭下标为0的文件&#xff0c;在打开一个文件&#xff0c;…

鸿蒙原生应用开发-网络管理Socket连接(一)

一、简介 Socket连接主要是通过Socket进行数据传输&#xff0c;支持TCP/UDP/TLS协议。 二、基本概念 Socket&#xff1a;套接字&#xff0c;就是对网络中不同主机上的应用进程之间进行双向通信的端点的抽象。 TCP&#xff1a;传输控制协议(Transmission Control Protocol)。是一…

最优算法100例之24-打印1到最大的n位数

专栏主页&#xff1a;计算机专业基础知识总结&#xff08;适用于期末复习考研刷题求职面试&#xff09;系列文章https://blog.csdn.net/seeker1994/category_12585732.html 题目描述 输入数字 n&#xff0c;按顺序打印出从 1 到最大的 n 位十进制数。比如输入 3&#xff0c;则…

探索数据库-------MYSQL故障排除与优化

目录 mysql逻辑架构图 一、MySQL 数据库故障 1.1 MySQL 单实例故障排查 1.1.1故障现象 1 1.1.2故障现象 2 1.1.3故障现象 3 1.1.4故障现象 4 1.1.5故障现象 5 1.1.6故障现象 6 1.1.7故障现象 7 1.1.8故障现象 8 1.2MySQL 主从故障排查 1.2.1故障现象 1 1.2.2故障…