Spark核心名词解释与编程

Spark核心概念

名词解释

1)ClusterManager:在Standalone(上述安装的模式,也就是依托于spark集群本身)模式中即为Master(主节点),控制整个集群,监控Worker。在YARN模式中为资源管理器ResourceManager(国内spark主要基于yarn集群运行,欧美主要基于mesos来运行)。

2)Application:Spark的应用程序,包含一个Driver program和若干Executor。

3)SparkConf:负责存储配置信息。作用相当于hadoop中的Configuration。

4)SparkContext:Spark应用程序的入口,负责调度各个运算资源,协调各个Worker Node上的Executor。

5)Worker:从节点,负责控制计算节点,启动Executor。在YARN模式中为NodeManager,负责计算节点的控制,启动的进程叫Container。

6)Driver:运行Application的main()函数并创建SparkContext(是spark中最重要的一个概念,是spark编程的入口,作用相当于mr中的Job)。

7)Executor:执行器,在worker node上执行任务的组件、用于启动线程池运行任务。每个Application拥有独立的一组Executors。

8)RDD:Spark的基本计算单元,一组RDD可形成执行的有向无环图RDD Graph。

9)RDD是弹性式分布式数据集,理解从3个方面去说:弹性、数据集、分布式。是Spark的第一代的编程模型。

10)DAGScheduler:实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Taskset放到TaskScheduler中。DAGScheduler就是Spark的大脑,中枢神经。

11)TaskScheduler:将任务(Task)分发给Executor执行。

12)Stage:一个Spark作业一般包含一到多个Stage。

13)Task:一个Stage包含一到多个Task,通过多个Task实现并行运行的功能。task的个数由rdd的partition分区决定,spark是一个分布式计算程序,所以一个大的计算任务,就会被拆分成多个小的部分,同时进行计算。一个partition对应一个task任务。

14)Transformations:转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。

15)Actions:操作/行动(Actions)算子 (如:count, collect, foreach等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。

Spark官网组件说明

官网组件说明如图-18所示:

图-18 Spark组件通信架构图

Spark应用程序作为集群上的独立进程集运行,由主程序(称为驱动程序)中的SparkContext对象协调。

具体来说,要在集群上运行,SparkContext可以连接到几种类型的集群管理器(Spark自己的独立集群管理器、Mesos或YARN),这些管理器可以跨应用程序分配资源。一旦连接,Spark将获取集群中节点上的执行器,这些执行器是为应用程序运行计算和存储数据的进程。接下来,它将应用程序代码(由传递给SparkContext的JAR或Python文件定义)发送给执行器。最后,SparkContext将任务发送给执行器以运行。

Spark编程体验

项目依赖管理

<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.12.10</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.2.1</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.23</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.12</artifactId>
        <version>3.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
        <version>3.2.1</version>
    </dependency>
</dependencies>

<build>
    <finalName>chapter1.WordCount</finalName>
    <plugins>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.4.6</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

项目编码

spark入门程序wordcount:

package com.fesco.bigdata.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
 * scala版本的wordcount
 */
object ScalaWordCountApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(s"${ScalaWordCountApp.getClass.getSimpleName}")
.setMaster("local[*]")
val sc = new SparkContext(conf)
//加载数据
val file: RDD[String] = sc.textFile("file:/E:/data/spark/hello.txt")

   //按照分隔符进行切分
val words:RDD[String] = lines.flatMap(line => line.split("\\s+"))

//每个单词记为1次
val pairs:RDD[(String, Int)] = words.map(word => (word, 1))

//聚合数据
val ret:RDD[(String, Int)] = pairs.reduceByKey(myReduceFunc)
//export data to external system
ret.foreach(println)

}
sc.stop()
}
def myReduceFunc(v1: Int, v2: Int): Int = {
v1 + v2
}
}

Master URL说明

首先在编程过程中,至少需要给spark程序传递一个参数master-url,通过sparkConf.setMaster来完成。改参数,代表的是spark作业的执行方式,或者指定的spark程序的cluster-manager的类型。

表-1 模式选择

master

含义

local

程序在本地运行,同时为本地程序提供一个线程来处理

local[M]

程序在本地运行,同时为本地程序分配M个工作线程

来处理

local[*]

程序在本地运行,同时为本地程序分配机器可用的CPU core的个数工作线程来处理

local[M, N]

程序在本地运行,同时为本地程序分配M个工作线程来处理,如果提交程序失败,会进行最多N次的重试

spark://ip:port

基于standalone的模式运行,提交撑到ip对应的master上运行

spark://ip1:port1,ip2:port2

基于standalone的ha模式运行,提交撑到ip对应的master上运行

yarn/启动脚本中的deploy-mode配置为cluster

基于yarn模式的cluster方式运行,SparkContext的创建在NodeManager上面,在yarn集群中

yarn/启动脚本中的deploy-mode配置为client

基于yarn模式的client方式运行,SparkContext的创建在提交程序的那台机器上面,不在yarn集群中

spark程序的其他提交方式

加载hdfs中的文件:

object RemoteSparkWordCountOps {

def main(args: Array[String]): Unit = {
    //创建程序入口
    val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc = new SparkContext(conf)
    //设置日志级别
    sc.setLogLevel("WARN")
    //加载数据
    val file = sc.textFile("hdfs://hadoop101:8020//wordcount//words.txt")
    //切分
    val spliFile: RDD[String] = file.flatMap(_.split(" "))
    //每个单词记为1次
    val wordAndOne: RDD[(String, Int)] = spliFile.map((_, 1))
    //聚合
    val wordAndCount: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
    //打印输出
    wordAndCount.foreach(println)
    //释放资源
    sc.stop()
}

}

提交spark程序到集群中

首先需要将spark-core模块进行打包,其次上传到集群中,才可以进行提交作业到spark或者yarn集群中运行。

1)Client:

bin/spark-submit \

--class chapter1.WordCount \

--master spark://hadoop101:7077 \

/root/word.jar \

hdfs://hadoop101:8020/wordcount/words.txt

2)Cluster:

bin/spark-submit \

--class chapter1.WordCount \

--master spark://hadoop101:7077 \

/root/word.jar \

hdfs://hadoop101:8020/wordcount/words.txt \

hdfs://hadoop101:8020/wordcount/output1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/582297.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

树莓集团整合行业资源 优化数字产业生态圈

树莓集团&#xff0c;作为国际数字影像产业园的运营方以及链主企业&#xff0c;自创立以来&#xff0c;一直致力于整合行业优质资源&#xff0c;为数字科技领域的优秀企业提供一片肥沃的创新土壤。随着信息技术的迅猛发展和数字经济的深入推进&#xff0c;树莓集团深知自身的责…

七彩虹(Colorful)隐星P16 2023款笔记本电脑原装出厂Win11系统镜像下载 带建Recovery一键还原功能

七彩虹原厂Windows预装OEM专用系统&#xff0c;恢复出厂开箱状态一模一样 适用型号&#xff1a;隐星P16 23 链接&#xff1a;https://pan.baidu.com/s/1Ig5MQMiC8k4VSuCOZRQHUw?pwdak5l 提取码&#xff1a;ak5l 原厂W11系统自带所有驱动、出厂时自带的主题与专用壁纸、系…

机器学习在医疗行业的应用:颠覆传统诊疗模式,开启智慧医疗新时代

文章目录 一、精准诊断的突破二、药物研发的革新三、患者管理的智能化四、智能辅助决策系统五、机器学习在医疗行业的前景 随着科技的飞速发展&#xff0c;机器学习作为人工智能的核心技术&#xff0c;正逐渐渗透到各个行业中&#xff0c;其中在医疗行业的应用尤为引人瞩目。机…

Strassen矩阵乘法——C++

【题目描述】 根据课本“Strassen矩阵乘法”的基本原理&#xff0c;设计并实现一个矩阵快速乘法的工具。并演示至少10000维的矩阵快速乘法对比样例。 【功能要求】 实现普通矩阵乘法算法和“Strassen矩阵乘法”算法对相同的矩阵&#xff0c;分别用普通矩阵乘法算法&#xff…

电机控制系列模块解析(11)—— 电流采样

一、电流采样分类 由下图可知&#xff0c;采样电阻的位置不同&#xff0c;电流采样分为输出电流采样、下桥电流采样、母线电流采样。 输出电流采样 定义&#xff1a;输出电流采样是指对电机定子绕组或转子绕组&#xff08;对于内转子永磁同步电机&#xff09;输出的电流进行测…

什么是区块链?智能合约有什么用?

一、什么是区块链&#xff1f; 区块链是一种去中心化的分布式账本技术&#xff0c;通过加密和共识机制确保数据的安全和透明。它将交易数据按照时间顺序记录在区块中&#xff0c;并通过链式链接保证了数据的不可篡改性。 二、什么是智能合约&#xff1f; 智能合约是运行在区…

如何修改php版本

我使用的Hostease的Windows虚拟主机产品,由于网站程序需要支持高版本的PHP,程序已经上传到主机&#xff0c;但是没有找到切换PHP以及查看PHP有哪些版本的位置&#xff0c;因此咨询了Hostease的技术支持&#xff0c;寻求帮助了解到可以实现在Plesk面板上找到此切换PHP版本的按钮…

linux tcpdump的交叉编译以及使用

一、源码下载 官网&#xff1a;点击跳转 二、编译 1、解压 tar -xf libpcap-1.10.4.tar.xz tar -xf tcpdump-4.99.4.tar.xz 2、配置及编译 //libpcap&#xff1a; ./configure --hostarm-linux --targetarm-linux CCarm-linux-gcc --with-pcaplinux --prefix$PWD/build//t…

37 线程控制

内核中没有明确的线程的概念&#xff0c;线程作为轻量级进程。所以不会提供线程的系统调用&#xff0c;只提供了轻量级进程的系统调用&#xff0c;但这个接口比较复杂&#xff0c;使用很不方便&#xff0c;我们用户&#xff0c;需要一个线程的接口。应用层对轻量级进程的接口进…

企业如何保证内部传输文件使用的工具是安全的?

企业内部文件的频繁交换成为了日常运营不可或缺的一环。然而&#xff0c;随着数据量的爆炸式增长和网络攻击手段的日益复杂&#xff0c;内网文件传输的安全隐患也日益凸显&#xff0c;成为企业信息安全的薄弱环节。本文将探讨内网文件传输的安全风险、企业常用的防护措施。 内网…

Python轻量级Web框架Flask(12)—— Flask类视图实现前后端分离

0、前言&#xff1a; 在学习类视图之前要了解前后端分离的概念&#xff0c;相对于之前的模板&#xff0c;前后端分离的模板会去除views文件&#xff0c;添加两个新python文件apis和urls&#xff0c;其中apis是用于传输数据和解析数据 的&#xff0c;urls是用于写模板路径的。 …

终于有人把无人机5G通信原理讲清楚了

在现代科技快速发展的背景下&#xff0c;无人机技术在各个领域都有了广泛应用&#xff0c;从送外卖到农业监控&#xff0c;无人机正变得越来越普遍。然而&#xff0c;无人机的效能很大程度上受到其通信系统的限制&#xff0c;尤其是在城市这种高楼林立、障碍物众多的环境中。为…

五一旅游必备物品清单 建议把这份清单记在备忘录

五一小长假就要来临&#xff0c;相信很多人已经跃跃欲试&#xff0c;准备带着家人或朋友外出旅游&#xff0c;享受这难得的休闲时光。出游总是让人兴奋不已&#xff0c;但带小孩出游&#xff0c;行李准备可是一项大工程。为了让旅程更加顺利&#xff0c;提前列一份必备物品清单…

Python绘制3D曲面图

&#x1f47d;发现宝藏 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。【点击进入巨牛的人工智能学习网站】。 探索Python中绘制3D曲面图的艺术 在数据可视化的世界中&#xff0c;3D曲面图是一种强大的工…

压铸机PQ控制阀比例放大器

压铸机PQ控制阀比例放大器是确保压铸机正常工作的重要组成部分&#xff0c;它通常由多种液压元件组成&#xff0c;负责提供动力和控制系统中各个部件的运动。液压系统通过液体&#xff08;通常是油&#xff09;传递压力能&#xff0c;以驱动机械装置工作。在压铸机中&#xff0…

ElasticSearch教程入门到精通——第五部分(基于ELK技术栈elasticsearch 7.x+8.x新特性)

ElasticSearch教程入门到精通——第五部分&#xff08;基于ELK技术栈elasticsearch 7.x8.x新特性&#xff09; 1. Elasticsearch集成1.1 框架集成-SpringData-整体介绍1.2 Spring Data Elasticsearch 介绍1.3 框架集成-SpringData-代码功能集成1.3.1 创建Maven项目1.3.2 修改po…

[C++] 类和对象 _ 剖析构造、析构与拷贝

一、构造函数 构造函数是特殊的成员函数&#xff0c;它在创建对象时自动调用。其主要作用是初始化对象的成员变量&#xff08;不是开辟空间&#xff09;。构造函数的名字必须与类名相同&#xff0c;且没有返回类型&#xff08;即使是void也不行&#xff09;。 在C中&#xff0…

Yolov5简单部署(使用自己的数据集)

一.注意事项 1.本文主要是引用大佬的文章&#xff08;侵权请联系&#xff0c;马上删除&#xff09;&#xff0c;做的工作为简单补充 二.正文 1.大体流程按照 准备&#xff1a;【简单易懂&#xff0c;一看就会】yolov5保姆级环境搭建_哔哩哔哩_bilibili 主要过程&#xff1…

Java | Leetcode Java题解之第55题跳跃游戏

题目&#xff1a; 题解&#xff1a; public class Solution {public boolean canJump(int[] nums) {int n nums.length;int rightmost 0;for (int i 0; i < n; i) {if (i < rightmost) {rightmost Math.max(rightmost, i nums[i]);if (rightmost > n - 1) {retu…