Spark---资源、任务调度

一、Spark资源调度源码

1、Spark资源调度源码过程

Spark资源调度源码是在Driver启动之后注册Application完成后开始的。Spark资源调度主要就是Spark集群如何给当前提交的Spark application在Worker资源节点上划分资源。Spark资源调度源码在Master.scala类中的schedule()中进行的。

2、Spark资源调度源码结论

  1. Executor在集群中分散启动,有利于task计算的数据本地化。
  2. 默认情况下(提交任务的时候没有设置--executor-cores选项),每一个Worker为当前的Application启动一个Executor,这个Executor会使用这个Worker的所有的cores和1G内存。
  3. 如果想在Worker上启动多个Executor,提交Application的时候要加--executor-cores这个选项。
  4. 默认情况下没有设置--total-executor-cores,一个Application会使用Spark集群中所有的cores。
  5. 启动Executor不仅和core有关还和内存有关。

3、资源调度源码结论验证

使用Spark-submit提交任务演示。也可以使用spark-shell来验证。

1、默认情况每个worker为当前的Application启动一个Executor,这个Executor使用集群中所有的cores和1G内存。

./spark-submit 
--master spark://node1:7077
 --class org.apache.spark.examples.SparkPi
 ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

2、在workr上启动多个Executor,设置--executor-cores参数指定每个executor使用的core数量。

./spark-submit
 --master  spark://node1:7077
 --executor-cores 1 
 --class org.apache.spark.examples.SparkPi 
../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

3、内存不足的情况下启动core的情况。Spark启动是不仅看core配置参数,也要看配置的core的内存是否够用。

./spark-submit 
--master  spark://node1:7077 
--executor-cores 1  
--executor-memory 3g 
--class org.apache.spark.examples.SparkPi
 ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

--total-executor-cores集群中共使用多少cores

注意:一个进程不能让集群多个节点共同启动。

./spark-submit 
--master  spark://node1:7077 
--executor-cores 1  
--executor-memory 2g 
--total-executor-cores 3
--class org.apache.spark.examples.SparkPi
 ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

二、Spark任务调度源码

Spark任务调度源码是从Spark Application的一个Action算子开始的。action算子开始执行,会调用RDD的一系列触发job的逻辑。其中也有stage的划分过程:

三、Spark二次排序和分组取topN

1、二次排序

大数据中很多排序场景是需要先根据一列进行排序,如果当前列数据相同,再对其他某列进行排序的场景,这就是二次排序场景。例如:要找出网站活跃的前10名用户,活跃用户的评测标准就是用户在当前季度中登录网站的天数最多,如果某些用户在当前季度登录网站的天数相同,那么再比较这些用户的当前登录网站的时长进行排序,找出活跃用户。这就是一个典型的二次排序场景。

解决二次排序问题可以采用封装对象的方式,对象中实现对应的比较方法。

1.SparkConf sparkConf = new SparkConf()
2..setMaster("local")
3..setAppName("SecondarySortTest");
4.final JavaSparkContext sc = new JavaSparkContext(sparkConf);
5.
6.JavaRDD<String> secondRDD = sc.textFile("secondSort.txt");
7.
8.JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {
9.
10.  /**
11.   * 
12.  */
13.  private static final long serialVersionUID = 1L;
14.
15.  @Override
16.  public Tuple2<SecondSortKey, String> call(String line) throws Exception {
17.    String[] splited = line.split(" ");
18.    int first = Integer.valueOf(splited[0]);
19.    int second = Integer.valueOf(splited[1]);
20.    SecondSortKey secondSortKey = new SecondSortKey(first,second);
21.    return new Tuple2<SecondSortKey, String>(secondSortKey,line);
22.  }
23.});
24.
25.pairSecondRDD.sortByKey(false).foreach(new 
26.VoidFunction<Tuple2<SecondSortKey,String>>() {
27.
28.  /**
29.   * 
30.   */
31.  private static final long serialVersionUID = 1L;
32.
33.    @Override
34.    public void call(Tuple2<SecondSortKey, String> tuple) throws Exception {
35.      System.out.println(tuple._2);
36.  }
37.});
38.
39.
40.
41.public class SecondSortKey implements Serializable,Comparable<SecondSortKey>{
42.  /**
43.   * 
44.   */
45.  private static final long serialVersionUID = 1L;
46.  private int first;
47.  private int second;
48.  public int getFirst() {
49.    return first;
50.  }
51.  public void setFirst(int first) {
52.    this.first = first;
53.  }
54.  public int getSecond() {
55.    return second;
56.  }
57.  public void setSecond(int second) {
58.    this.second = second;
59.  }
60.  public SecondSortKey(int first, int second) {
61.    super();
62.    this.first = first;
63.    this.second = second;
64.  }
65.  @Override
66.  public int compareTo(SecondSortKey o1) {
67.    if(getFirst() - o1.getFirst() ==0 ){
68.      return getSecond() - o1.getSecond();
69.    }else{
70.      return getFirst() - o1.getFirst();
71.    }
72.  }
73.}

2、分组取topN

大数据中按照某个Key进行分组,找出每个组内数据的topN时,这种情况就是分组取topN问题。

解决分组取TopN问题有两种方式,第一种就是直接分组,对分组内的数据进行排序处理。第二种方式就是直接使用定长数组的方式解决分组取topN问题。

1.SparkConf conf = new SparkConf()
2..setMaster("local")
3..setAppName("TopOps");
4.JavaSparkContext sc = new JavaSparkContext(conf);
5.JavaRDD<String> linesRDD = sc.textFile("scores.txt");
6.
7.JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() {
8.
9.  /**
10.   * 
11.  */
12.  private static final long serialVersionUID = 1L;
13.
14.  @Override
15.  public Tuple2<String, Integer> call(String str) throws Exception {
16.    String[] splited = str.split("\t");
17.    String clazzName = splited[0];
18.    Integer score = Integer.valueOf(splited[1]);
19.    return new Tuple2<String, Integer> (clazzName,score);
20.  }
21.});
22.
23.pairRDD.groupByKey().foreach(new 
24.VoidFunction<Tuple2<String,Iterable<Integer>>>() {
25.
26.  /**
27.   * 
28.   */
29.  private static final long serialVersionUID = 1L;
30.
31.  @Override
32.  public void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {
33.    String clazzName = tuple._1;
34.    Iterator<Integer> iterator = tuple._2.iterator();
35.
36.    Integer[] top3 = new Integer[3];
37.
38.    while (iterator.hasNext()) {
39.      Integer score = iterator.next();
40.
41.      for (int i = 0; i < top3.length; i++) {
42.        if(top3[i] == null){
43.          top3[i] = score;
44.          break;
45.        }else if(score > top3[i]){
46.        for (int j = 2; j > i; j--) {
47.          top3[j] = top3[j-1];
48.        }
49.        top3[i] = score;
50.        break;
51.      }
52.    }
53.  }
54.  System.out.println("class Name:"+clazzName);
55.  for(Integer sscore : top3){
56.    System.out.println(sscore);
57.  }
58.}
59.});

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/198744.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于SSM的云鑫曦科技办公自动化管理系统设计与实现

基于SSM的云鑫曦科技办公自动化管理系统设计与实现 摘 要: 随着时代的发展&#xff0c;单位办公方式逐渐从传统的线下纸张办公转向了使用个人pc的线上办公&#xff0c;办公效率低下的传统纸质化办公时代的淘汰&#xff0c;转型到信息化办公时代&#xff0c;面对当今数据逐渐膨…

奇数求和(C++)

系列文章目录 进阶的卡莎C++_睡觉觉觉得的博客-CSDN博客数1的个数_睡觉觉觉得的博客-CSDN博客双精度浮点数的输入输出_睡觉觉觉得的博客-CSDN博客足球联赛积分_睡觉觉觉得的博客-CSDN博客大减价(一级)_睡觉觉觉得的博客-CSDN博客小写字母的判断_睡觉觉觉得的博客-CSDN博客纸币(…

Keil5 debug

目录 debug调试功能 基本功能&#xff1a; 程序复位&#xff1a;Reset 运行&#xff1a;Run 停止&#xff1a;Stop 断点调试&#xff08;Breakpoint Debugging&#xff09; 单步调试&#xff1a; 单步调试:Step 单步跳过调试&#xff1a;Step Over&#xff1a; 单步返…

vue项目中使用jsonp跨域请求百度联想接口

一. 内容简介 vue项目中使用jsonp跨域请求百度联想接口 二. 软件环境 2.1 Visual Studio Code 1.75.0 2.2 chrome浏览器 2.3 node v18.14.0 三.主要流程 3.1 代码 核心代码 // 这个是请求函数doLeno() {// 挂载回调函数&#xff0c;不挂载&#xff0c;会报不存在window…

算法中的时间复杂度,空间复杂度

一、前言 算法&#xff08;Algorithm&#xff09;是指用来操作数据、解决程序问题的一组方法。对于同一个问题&#xff0c;使用不同的算法&#xff0c;也许最终得到的结果是一样的&#xff0c;但在过程中消耗的资源和时间却会有很大的区别 衡量不同算法之间的优劣主要是通过时…

【动态规划】求最长递增子序列问题

目录 问题描述递推关系建立递推关系的思路约束条件:以 s [ k ] s[k] s[k] 结尾约束条件:以 s [ k ] s[k] s[k] 开头约束条件:增加子问题参数&#xff08;前缀&#xff09;约束条件:增加子问题参数&#xff08;后缀&#xff09;约束条件:LIS长度为k且末尾元素最小 运行实例 问…

MySQL -DDL 及表类型

DDL 创建数据库 CREATE DATABASE [IF NOT EXISTS] db_name [create_specification [, create_specification] ...] create_specification:[DEFAULT] CHARACTER SET charset_name [DEFAULT] COLLATE collation_name 1.CHARACTER SET&#xff1a…

Java后端开发——MVC商品管理程序

Java后端开发——MVC商品管理程序 本篇文章内容主要有下面几个部分&#xff1a; MVC架构介绍项目环境搭建商品管理模块Servlet代码重构BaseServlet文件上传 MVC 是模型-视图-控制器&#xff08;Model-View-Controller&#xff09;&#xff0c;它是一种设计模式&#xff0c;也…

.net7.0中把exe和dll分开打包

之前写过 C#把dll分别放在指定的文件夹_wpf core dll 放文件夹-CSDN博客 C#把dll打包到exe_c# 打包exe_故里2130的博客-CSDN博客 这都是老技术了&#xff0c;可以进行参考。 现在的.netcore系列有单独支持把exe和dll分开打包的功能了&#xff0c;当然也支持.net7.0和.net8.…

开源堡垒机Jumpserver

文章目录 开源堡垒机JumpserverJumpserver介绍安装环境部署安装jumpserver访问jumpserver的web界面 开源堡垒机Jumpserver Jumpserver介绍 Jumpserver 是全球首款完全开源的堡垒机&#xff0c;使用 GNU GPL v2.0 开源协议&#xff0c;是符合 4A 的运维安全审计系统。 Jumpse…

AIGC-文生视频

stable diffusion&#xff1a; stable diffusion原理解读通俗易懂&#xff0c;史诗级万字爆肝长文&#xff0c;喂到你嘴里 - 知乎个人网站一、前言&#xff08;可跳过&#xff09;hello&#xff0c;大家好我是 Tian-Feng&#xff0c;今天介绍一些stable diffusion的原理&#…

js小技巧|如何提取经过Function函数混淆了的代码

关注它&#xff0c;不迷路。 本文章中所有内容仅供学习交流&#xff0c;不可用于任何商业用途和非法用途&#xff0c;否则后果自负&#xff0c;如有侵权&#xff0c;请联系作者立即删除&#xff01; 1.需求 星友发过来一个混淆代码&#xff0c;打开一看&#xff0c;长这…

(三)Pytorch快速搭建卷积神经网络模型实现手写数字识别(代码+详细注解)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言Q1&#xff1a;卷积网络和传统网络的区别Q2:卷积神经网络的架构Q3:卷积神经网络中的参数共享&#xff0c;也是比传统网络的优势所在4、 具体的实现代码网络搭建…

C++二分查找、离线算法:最近的房间

作者推荐 利用广度优先或模拟解决米诺骨牌 本文涉及的基础知识点 二分查找算法合集 题目 一个酒店里有 n 个房间&#xff0c;这些房间用二维整数数组 rooms 表示&#xff0c;其中 rooms[i] [roomIdi, sizei] 表示有一个房间号为 roomIdi 的房间且它的面积为 sizei 。每一…

linux设置主机名

查看主机名&#xff1a;hostname 临时修改主机名&#xff1a;hostname 新主机名 [rootlocalhost ~]#hostname centos [rootlocalhost ~]#hostname centos 永久修改主机名&#xff1a; [rootlocalhost ~]#cat /etc/hostname localhost.localdomain

ArrayList 和 HashMap 源码解析

1、ArrayList 1.1、ArrayList 构造方法 无参创建一个 ArrayList 数组默认为空数组 transient Object[] elementData; private static final Object[] DEFAULTCAPACITY_EMPTY_ELEMENTDATA {}; private int size; // 数组容量大小public ArrayList() {this.elementData DEFA…

基于springboot校园车辆管理系统

背景 伴随着社会经济的快速发展&#xff0c;机动车保有量不断增加。不断提高的大众生活水平以及人们不断增长的自主出行需求&#xff0c;人们对汽车的 依赖性在不断增强。汽车已经发展成为公众日常出行的一种重要的交通工具。在如此形势下&#xff0c;高校校园内的机动车数量也…

java设计模式学习之【原型模式】

文章目录 引言原型模式简介定义与用途实现方式UML 使用场景优势与劣势原型模式在spring中的应用员工记录示例代码地址 引言 原型模式是一种创建型设计模式&#xff0c;它允许对象能够复制自身&#xff0c;以此来创建一个新的对象。这种模式在需要重复地创建相似对象时非常有用…

近五年—中国十大科技进展(2018年—2022年)

近五年—中国十大科技进展&#xff08;2018-2022&#xff09; 2022年中国十大科技进展1. 中国天眼FAST取得系列重要进展2. 中国空间站完成在轨建造并取得一系列重大进展3. 我国科学家发现玉米和水稻增产关键基因4. 科学家首次发现并证实玻色子奇异金属5. 我国科学家将二氧化碳人…

Vue 定义只读数据 readonly 与 shallowReadonly

readonly 让一个响应式数据变为 **深层次的只读数据**。 shallowReadonly 让一个响应式数据变为 **浅层次的只读数据**&#xff0c;只读第一层。 isReadonly 判断一个数据是不是只读数据。 应用场景&#xff1a;不希望数据被修改时使用。 readonly深层次只读&#xff1a; …