【spark operator】spark operator动态分配executor

背景:

之前在使用spark operator的时候必须指定executor的个数,在将任务发布到spark operator后,k8s会根据指定的个数启动executor,但是对于某些spark sql可能并不需要用到那么多executor,在此时executor的数量就不好控制了。而executor的多少代表了集群资源的多少,如果不提前指定,executor能够动态扩展那将是最好的策略。在查询了资料后,得知spark3.0已经支持了executor的动态分配。而且用法也很简单,所以在之前的spark operator发布k8s的基础上,又做了动态生成executor的功能。

本文参考之前一篇的文章,做了部分修改以支持该功能。

【Java Kubernates】Java调用kubernates提交Yaml到SparkOperator-CSDN博客文章浏览阅读1.1k次,点赞18次,收藏18次。最终我选择了fabric8io,因为我们需要使用k8s的自定义资源sparkApplication,对于自定义资源,kubernetes-client/java需要创建各个k8s对象的pojo,比较麻烦。这里提一下,我在重新使用spark operator的时候,发现原来官方的google的spark operator镜像已经不能拉取了,貌似是google发现它的两个镜像存在漏洞,所以关闭了开源镜像。目前查询框架使用的是trino,但是trino也有其局限性,需要准备一个备用的查询框架。https://blog.csdn.net/w8998036/article/details/135821058?spm=1001.2014.3001.5501

一 删除yaml中executor指定个数的配置

//测试spark 3.0的动态分配instance
	private static String buildSparkApplicationYAMLDynamic(String taskName, String sparkImage, String sparkJarFile, String mainClass, String instance,
			String driverCpu, String driverMemory, String executorCpu, String executorMemory, String dynamicSQLQuery) {
		
        return String.format(
                "apiVersion: \"sparkoperator.k8s.io/v1beta2\"\n" +
                "kind: SparkApplication\n" +
                "metadata:\n" +
                "  name: %s\n" +
                "  namespace: spark-app\n" +
                "spec:\n" +
                "  type: Scala\n" +
                "  mode: cluster\n" +
                "  image: \"%s\"\n" +
                "  imagePullPolicy: Always\n" +
                "  imagePullSecrets: [\"harbor\"]\n" +
                "  mainClass: \"%s\"\n" +
                "  mainApplicationFile: \"%s\"\n" +
                "  sparkVersion: \"3.3.1\"\n" +
                "  restartPolicy:\n" +
                "    type: Never\n" +
				"  volumes:\n" +
				"    - name: nfs-spark-volume\n" +
				"      persistentVolumeClaim:\n" +
				"        claimName: sparkcode\n" +
                
                "  driver:\n" +
                "    cores: %s\n" +
                "    coreLimit: \"1200m\"\n" +
                "    memory: \"%s\"\n" +
                "    labels:\n" +
                "      version: 3.3.1\n" +
                "    serviceAccount: spark-svc-account\n" +
                "    volumeMounts:\n" +
                "      - name: nfs-spark-volume\n" +
                "        mountPath: \"/app/sparkcode\"\n" +
                "    env:\n" +
                "      - name: AWS_REGION\n" +
                "        valueFrom:\n" +
                "          secretKeyRef:\n" +
                "            name: minio-secret\n" +
                "            key: AWS_REGION\n" +
                "      - name: AWS_ACCESS_KEY_ID\n" +
                "        valueFrom:\n" +
                "          secretKeyRef:\n" +
                "            name: minio-secret\n" +
                "            key: AWS_ACCESS_KEY_ID\n" +
                "      - name: AWS_SECRET_ACCESS_KEY\n" +
                "        valueFrom:\n" +
                "          secretKeyRef:\n" +
                "            name: minio-secret\n" +
                "            key: AWS_SECRET_ACCESS_KEY\n" +
                "      - name: MINIO_ENDPOINT\n" +
                "        valueFrom:\n" +
                "          secretKeyRef:\n" +
                "            name: minio-secret\n" +
                "            key: MINIO_ENDPOINT\n" +
                "      - name: MINIO_HOST\n" +
                "        valueFrom:\n" +
                "          secretKeyRef:\n" +
                "            name: minio-secret\n" +
                "            key: MINIO_HOST\n" +
                "      - name: BUCKET_NAME\n" +
                "        valueFrom:\n" +
                "          secretKeyRef:\n" +
                "            name: minio-secret\n" +
                "            key: BUCKET_NAME\n" +
                "  executor:\n" +
                "    cores: %s\n" +
############去除该配置#############################################################
                //"    instances: %s\n" +
##################################################################################
                "    memory: \"%s\"\n" +
                "    labels:\n" +
                "      version: 3.3.1\n" +
                "    volumeMounts:\n" +
                "      - name: nfs-spark-volume\n" +
                "        mountPath: \"/app/sparkcode\"\n" +
                "    env:\n" +
                "      - name: AWS_REGION\n" +
                "        valueFrom:\n" +
                "          secretKeyRef:\n" +
                "            name: minio-secret\n" +
                "            key: AWS_REGION\n" +
                "      - name: AWS_ACCESS_KEY_ID\n" +
                "        valueFrom:\n" +
                "          secretKeyRef:\n" +
                "            name: minio-secret\n" +
                "            key: AWS_ACCESS_KEY_ID\n" +
                "      - name: AWS_SECRET_ACCESS_KEY\n" +
                "        valueFrom:\n" +
                "          secretKeyRef:\n" +
                "            name: minio-secret\n" +
                "            key: AWS_SECRET_ACCESS_KEY\n" +
                "      - name: MINIO_ENDPOINT\n" +
                "        valueFrom:\n" +
                "          secretKeyRef:\n" +
                "            name: minio-secret\n" +
                "            key: MINIO_ENDPOINT\n" +
                "      - name: MINIO_HOST\n" +
                "        valueFrom:\n" +
                "          secretKeyRef:\n" +
                "            name: minio-secret\n" +
                "            key: MINIO_HOST\n" +
                "      - name: BUCKET_NAME\n" +
                "        valueFrom:\n" +
                "          secretKeyRef:\n" +
                "            name: minio-secret\n" +
                "            key: BUCKET_NAME\n" +
                "  sparkConf:\n" +
                "    spark.query.sql: \"%s\"",
                taskName,sparkImage,mainClass,sparkJarFile,driverCpu,driverMemory,executorCpu,executorMemory,dynamicSQLQuery
        );
    }

二 配置动态参数

//测试spark3.0 动态分配executor的instance
	public static void sparkQueryForFhcS3DynamicExecutor() throws Exception{
        System.out.println("=========================1");
        String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
        System.out.println("===========================2");
        String metastoreUri = "thrift://wuxdihadl09b.seagate.com:9083";
        
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
        sparkConf.set("fs.s3a.access.key", "apPeWWr5KpXkzEW2jNKW");
        sparkConf.set("spark.hadoop.fs.s3a.path.style.access", "true");
        sparkConf.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "true");
        sparkConf.set("fs.s3a.secret.key", "cRt3inWAhDYtuzsDnKGLGg9EJSbJ083ekuW7PejM");
        sparkConf.set("fs.s3a.endpoint", "wuxdimiov001.seagate.com:9000"); // 替换为实际的 S3 存储的地址
        sparkConf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
        sparkConf.set("spark.sql.metastore.uris", metastoreUri);
        
        sparkConf.set("spark.sql.warehouse.dir", warehouseLocation);
        sparkConf.set("spark.sql.catalogImplementation", "hive");
        sparkConf.set("hive.metastore.uris", metastoreUri);


#####################################################添加动态参数#######################        
        
        //#总开关,是否开启动态资源配置,根据工作负载来衡量是否应该增加或减少executor,默认false
        sparkConf.set("spark.dynamicAllocation.enabled", "true");
        //#spark3新增,之前没有官方支持的on k8s的Dynamic Resouce Allocation。启用shuffle文件跟踪,此配置不会回收保存了shuffle数据的executor
        sparkConf.set("spark.dynamicAllocation.shuffleTracking.enabled", "true");
        //#启用shuffleTracking时控制保存shuffle数据的executor超时时间,默认使用GC垃圾回收控制释放。如果有时候GC不及时,配置此参数后,即使executor上存在shuffle数据,也会被回收。
        sparkConf.set("spark.dynamicAllocation.shuffleTracking.timeout", "60s");
        //#动态分配最小executor个数,在启动时就申请好的,默认0
        sparkConf.set("spark.dynamicAllocation.minExecutors", "1");
        //#动态分配最大executor个数,默认infinity
        sparkConf.set("spark.dynamicAllocation.maxExecutors", "10");
        //#动态分配初始executor个数默认值=spark.dynamicAllocation.minExecutors
        sparkConf.set("spark.dynamicAllocation.initialExecutors", "2");
        //#当某个executor空闲超过这个设定值,就会被kill,默认60s
        sparkConf.set("spark.dynamicAllocation.executorIdleTimeout", "60s");
        //#当某个缓存数据的executor空闲时间超过这个设定值,就会被kill,默认infinity
        sparkConf.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "240s");
        //#任务队列非空,资源不够,申请executor的时间间隔,默认1s(第一次申请)
        sparkConf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "3s");
        //#同schedulerBacklogTimeout,是申请了新executor之后继续申请的间隔,默认=schedulerBacklogTimeout(第二次及之后)
        sparkConf.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", "30s");
        //#开启推测执行,对长尾task,会在其他executor上启动相同task,先运行结束的作为结果
        sparkConf.set("spark.specution", "true");
 #######################################################################################       
        

        //Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem");
        
        
        long zhenyang2 =  System.currentTimeMillis();
        SparkSession sparkSession = SparkSession.builder()
					        		.appName("Fhc Spark Query")
									.config(sparkConf)
									.enableHiveSupport()
					        		.getOrCreate();
        
        System.out.println("sparkSession create cost:"+(System.currentTimeMillis()-zhenyang2));
        System.out.println("==============================3");
        
        // 获取 SparkConf 对象
        
        
        String exesql = sparkSession.sparkContext().getConf().get("spark.query.sql");
        
        
        System.out.println("==============================3.1:"+exesql);
        
        System.out.println("Hive Metastore URI: " + sparkConf.get("spark.sql.metastore.uris"));
        System.out.println("Hive Warehouse Directory: " + sparkConf.get("spark.sql.warehouse.dir"));
        
        System.out.println("SHOW DATABASES==============================3.1:"+exesql);
        sparkSession.sql("SHOW DATABASES").show();
        
        long zhenyang3 =  System.currentTimeMillis();
        Dataset<Row> sqlDF = sparkSession.sql(exesql);
        System.out.println("sparkSession sql:"+(System.currentTimeMillis()-zhenyang3));
        
        System.out.println("======================4");
        //System.out.println("===========sqlDF count===========:"+sqlDF.count());
        
        //sqlDF.show();
        
        long zhenyang5 =  System.currentTimeMillis();
        List<Row> jaList= sqlDF.javaRDD().collect();
        System.out.println("rdd collect cost:"+(System.currentTimeMillis()-zhenyang5));
        System.out.println("jaList list:"+jaList.size());
        
        List<TaskListModel> list = new ArrayList<TaskListModel>();
        long zhenyang4 =  System.currentTimeMillis();
        AtomicInteger i = new AtomicInteger(0);
        jaList.stream().forEachOrdered(result -> {
        	i.incrementAndGet();
        	//System.out.println("serial_num is :"+result.getString(1));
        });
        System.out.println("for each times:"+i.get());
        
        System.out.println("SparkDemo foreach cost:"+(System.currentTimeMillis()-zhenyang4));
        
        System.out.println("=========================5");

        sparkSession.close();
        
        sparkSession.stop();
	}

三 发布一,二中的程序(逻辑见前面的博客文章)

四 测试

首先提交一个简单sql: select * from cimarronbp_n.p_vbar_metric_summary limit 10

查看k8s spark operator生成的pod

根据pod启动的时间可以看出,先生成了2个executor,在16s后又生成了1个,最后完成,可以看出executor确实根据任务的执行情况动态生成了。而之前文章中的executor 20个是同一时间生成的

再测试一个join的sql

select distinct t1.serial_num,t1.trans_seq,t2.state_name,t2.p_vbar_metric_summary,t1.event_date from cimarronbp_n.p_vbar_metric_summary t1 left join cimarronbp_n.p_vbar_metric_summary t2 on t1.serial_num = t2.serial_num AND t1.trans_seq = t2.trans_seq where t1.event_date='20231204' and t1.family='2TJ' and t1.operation='CAL2'

查看k8s spark operator生成的pod

executor从2个到3个,3个到4个,是动态的!

五 本文参考链接

「Spark从精通到重新入门(二)」Spark中不可不知的动态资源分配-阿里云开发者社区资源是影响 Spark 应用执行效率的一个重要因素。Spark 应用中真正执行 task 的组件是 Executor,可以通过spark.executor.instances 指定 Spark 应用的 Executor 的数量。在运行过程中,无论 Executor上是否有 task 在执行,都会被一直占有直到此 Spark 应用结束。icon-default.png?t=N7T8https://developer.aliyun.com/article/832482

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/448007.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

py脚本模拟json数据,StructuredStreaming接收数据存储HDFS一些小细节 ERROR:‘path‘ is not specified

很多初次接触到StructuredStreaming 应该会写一个这样的案例 - py脚本不断产生数据写入linux本地&#xff0c; 通过hdfs dfs 建目录文件来实时存储到HDFS中 1. 指定数据schema&#xff1a; 实时json数据 2. 数据源地址&#xff1a;HDFS 3. 结果落地位置&#xff1a; HDFS …

淘宝电商产品价格官方防爬取采集设计机制,如何破?|淘宝电商API数据采集看完你也会!

在当今数字化时代&#xff0c;电商平台如淘宝已经成为人们购物的主要渠道之一。然而&#xff0c;随着电子商务的蓬勃发展&#xff0c;涌现出大量的第三方工具和应用&#xff0c;试图通过采集淘宝电商产品价格等信息来进行数据分析和竞争优势的获取。为了维护市场秩序和保护商家…

java中几种对象存储(文件存储)中间件的介绍

一、前言 在博主得到系统中使用的对象存储主要有OSS&#xff08;阿里云的对象存储&#xff09; COS&#xff08;腾讯云的对象存储&#xff09;OBS&#xff08;华为云的对象存储&#xff09;还有就是MinIO 这些玩意。其实这种东西大差不差&#xff0c;几乎实现方式都是一样&…

马斯克希望OpenAI与特斯拉合并或“完全控制”?

推荐阅读&#xff1a; AI大战升温&#xff1a;Claude 3号宣称具有“近乎人类”的能力-CSDN博客 【新手向】ChatGPT入门指南 - 订阅GPT4之前必须了解的十件事情-CSDN博客 Claude3“闪击”GPT&#xff0c;OpenAI半天就更新了这&#xff1f;-CSDN博客 【亲测】注册Claude3教程…

BLDC 驱动架构介绍

BLDC无刷电机&#xff0c;顾名思义就是没有电刷的电机&#xff0c;因为没有电刷&#xff0c;无刷电机在运行过程中噪音小&#xff0c;也不存在电刷损坏的情况。 BLDC 由于其高效率、长寿命、低噪音、易于维护等特点&#xff0c;正在逐渐替代有刷电机&#xff0c;今天就给大家介…

MessAuto-让验证码提取更加丝滑

专注于web漏洞挖掘、内网渗透、免杀和代码审计&#xff0c;感谢各位师傅的关注&#xff01;网安之路漫长&#xff0c;与君共勉&#xff01; MessAuto MessAuto 是一款 macOS 平台自动提取短信和邮箱验证码到粘贴板的软件&#xff0c;由Rust开发&#xff0c;适用于任何APP 下面展…

【竞技宝】LOL:knight阿狸伤害爆炸 BLG2-0轻取RA

北京时间2024年3月11日,英雄联盟LPL2024春季常规赛继续进行,昨日共进行三场比赛,首场比赛由BLG对阵RA。本场比赛BLG选手个人实力碾压RA2-0轻松击败对手。以下是本场比赛的详细战报。 第一局: BLG:剑魔、千珏、妮蔻、卡牌、洛 RA:乌迪尔、蔚、阿卡丽、斯莫德、芮尔 首局比赛,B…

智能测径仪在胶管行业的应用

关键字&#xff1a;胶管外径尺寸测量&#xff0c;胶管检测仪器&#xff0c;胶管外径检测&#xff0c;高温胶管外径检测&#xff0c;软硬胶管检测&#xff0c; 智能测径仪在家胶管行业中的应用主要体现在对胶管外径的精确测量和控制上。在胶管生产过程中&#xff0c;外径的大小直…

高级语言讲义2023软专(仅高级语言部分)

1.辗转相除求最大公约数过程如下: U/V...余 V/...余 /...余 当为0时&#xff0c;即为U、V最大公约数&#xff0c;编写函数int g< d(intU,intV)求最大公约数。 #include <stdio.h>int gcd(int a,int b) {if(b0)return a;elsereturn gcd(b,a%b); }int gcd2(int a,i…

python推导式

python推导式是一种简洁且强大的内建语法结构&#xff0c;它允许我们以一种极其紧凑和易于理解的方式创建新的列表、字典、集合或生成器对象&#xff0c;能够更高效地操作和转换数据结构。 列表推导式基本语法如下图&#xff1a; 其他推导式的语法也基本相似&#xff0c;看着有…

最迟但到的 Star History 2023 年度开源精选!

千呼万唤始出来&#xff0c;Star History 2023 年终开源精选来啦&#xff01;&#x1f389; AI 是 2023 开源领域里最主要的关键词&#xff0c;但其实过去一年还是有很多其他值得关注的项目和发展趋势的&#xff01;Star History 小编总结了几个类别并精选了类别中最亮眼的项目…

ElasticSearchLinux安装和springboot整合的记录和遇到的问题

前面整合遇到的一些问题有的记录在下面了&#xff0c;有的当时忘了记录下来&#xff0c;希望下面的能帮到你们 1&#xff1a;Linux安装ES 下载安装&#xff1a; 参考文章&#xff1a;连接1 连接2 wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch…

校园小情书微信小程序源码 | 社区小程序前后端开源 | 校园表白墙交友小程序

项目描述&#xff1a; 校园小情书微信小程序源码 | 社区小程序前后端开源 | 校园表白墙交友小程序 功能介绍&#xff1a; 表白墙 卖舍友 步数旅行 步数排行榜 情侣脸 漫画脸 个人主页 私信 站内消息 今日话题 评论点赞收藏 服务器环境要求&#xff1a;PHP7.0 MySQL5.7 效果…

【三十】springboot项目上高并发解决示例

互相交流入口地址 整体目录&#xff1a; 【一】springboot整合swagger 【二】springboot整合自定义swagger 【三】springboot整合token 【四】springboot整合mybatis-plus 【五】springboot整合mybatis-plus 【六】springboot整合redis 【七】springboot整合AOP实现日志操作 【…

c++ primer plus 笔记 第十六章 string类和标准模板库

string类 string自动调整大小的功能&#xff1a; string字符串是怎么占用内存空间的&#xff1f; 前景&#xff1a; 如果只给string字符串分配string字符串大小的空间&#xff0c;当一个string字符串附加到另一个string字符串上&#xff0c;这个string字符串是以占用…

并发容器介绍(二)

并发容器介绍&#xff08;二&#xff09; 文章目录 并发容器介绍&#xff08;二&#xff09;BlockingQueueBlockingQueue 简介ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueue ConcurrentSkipListMap 文章来自Java Guide 用于学习如有侵权&#xff0c;立即删除 Bl…

大模型字典中加入特殊字符

大模型字典中加入特殊字符 在微调大模型的时候会遇到添加特殊字符&#xff0c;例如在微调多轮的数据的时候需要加入人和机器等特殊标识字符&#xff0c;如用这个特殊字符表示人&#xff0c;用这个特殊字符表示机器&#xff0c;从而实现了人机对话。一般在大模型中base字典中不…

二次供水无人值守解决方案

二次供水无人值守解决方案 二次供水系统存在一定的管理难题和技术瓶颈&#xff0c;如设备老化、维护不及时导致的水质安全隐患&#xff0c;以及如何实现高效运行和智能化管理等问题。在一些地区&#xff0c;特别是老旧小区或农村地区&#xff0c;二次供水设施建设和改造滞后&a…

【go语言开发】redis简单使用

本文主要介绍redis安装和使用。首先安装redis依赖库&#xff0c;这里是v8版本&#xff1b;然后连接redis&#xff0c;完成基本配置&#xff1b;最后测试封装的工具类 文章目录 安装redis依赖库连接redis和配置工具类封装代码测试 欢迎大家访问个人博客网址&#xff1a;https://…

初学Vue——Vue路由

0 什么是Vue路由 类似于Html中的超链接(<a>)一样&#xff0c;可以跳转页面的一种方式。 前端路由&#xff1a;URL中hash(#号之后的内容)与组件之间的对应关系&#xff0c;如下图&#xff1a; 当我们点击左侧导航栏时&#xff0c;浏览器的地址栏会发生变化&#xff0c;路…