背景:
之前在使用spark operator的时候必须指定executor的个数,在将任务发布到spark operator后,k8s会根据指定的个数启动executor,但是对于某些spark sql可能并不需要用到那么多executor,在此时executor的数量就不好控制了。而executor的多少代表了集群资源的多少,如果不提前指定,executor能够动态扩展那将是最好的策略。在查询了资料后,得知spark3.0已经支持了executor的动态分配。而且用法也很简单,所以在之前的spark operator发布k8s的基础上,又做了动态生成executor的功能。
本文参考之前一篇的文章,做了部分修改以支持该功能。
【Java Kubernates】Java调用kubernates提交Yaml到SparkOperator-CSDN博客文章浏览阅读1.1k次,点赞18次,收藏18次。最终我选择了fabric8io,因为我们需要使用k8s的自定义资源sparkApplication,对于自定义资源,kubernetes-client/java需要创建各个k8s对象的pojo,比较麻烦。这里提一下,我在重新使用spark operator的时候,发现原来官方的google的spark operator镜像已经不能拉取了,貌似是google发现它的两个镜像存在漏洞,所以关闭了开源镜像。目前查询框架使用的是trino,但是trino也有其局限性,需要准备一个备用的查询框架。https://blog.csdn.net/w8998036/article/details/135821058?spm=1001.2014.3001.5501
一 删除yaml中executor指定个数的配置
//测试spark 3.0的动态分配instance
private static String buildSparkApplicationYAMLDynamic(String taskName, String sparkImage, String sparkJarFile, String mainClass, String instance,
String driverCpu, String driverMemory, String executorCpu, String executorMemory, String dynamicSQLQuery) {
return String.format(
"apiVersion: \"sparkoperator.k8s.io/v1beta2\"\n" +
"kind: SparkApplication\n" +
"metadata:\n" +
" name: %s\n" +
" namespace: spark-app\n" +
"spec:\n" +
" type: Scala\n" +
" mode: cluster\n" +
" image: \"%s\"\n" +
" imagePullPolicy: Always\n" +
" imagePullSecrets: [\"harbor\"]\n" +
" mainClass: \"%s\"\n" +
" mainApplicationFile: \"%s\"\n" +
" sparkVersion: \"3.3.1\"\n" +
" restartPolicy:\n" +
" type: Never\n" +
" volumes:\n" +
" - name: nfs-spark-volume\n" +
" persistentVolumeClaim:\n" +
" claimName: sparkcode\n" +
" driver:\n" +
" cores: %s\n" +
" coreLimit: \"1200m\"\n" +
" memory: \"%s\"\n" +
" labels:\n" +
" version: 3.3.1\n" +
" serviceAccount: spark-svc-account\n" +
" volumeMounts:\n" +
" - name: nfs-spark-volume\n" +
" mountPath: \"/app/sparkcode\"\n" +
" env:\n" +
" - name: AWS_REGION\n" +
" valueFrom:\n" +
" secretKeyRef:\n" +
" name: minio-secret\n" +
" key: AWS_REGION\n" +
" - name: AWS_ACCESS_KEY_ID\n" +
" valueFrom:\n" +
" secretKeyRef:\n" +
" name: minio-secret\n" +
" key: AWS_ACCESS_KEY_ID\n" +
" - name: AWS_SECRET_ACCESS_KEY\n" +
" valueFrom:\n" +
" secretKeyRef:\n" +
" name: minio-secret\n" +
" key: AWS_SECRET_ACCESS_KEY\n" +
" - name: MINIO_ENDPOINT\n" +
" valueFrom:\n" +
" secretKeyRef:\n" +
" name: minio-secret\n" +
" key: MINIO_ENDPOINT\n" +
" - name: MINIO_HOST\n" +
" valueFrom:\n" +
" secretKeyRef:\n" +
" name: minio-secret\n" +
" key: MINIO_HOST\n" +
" - name: BUCKET_NAME\n" +
" valueFrom:\n" +
" secretKeyRef:\n" +
" name: minio-secret\n" +
" key: BUCKET_NAME\n" +
" executor:\n" +
" cores: %s\n" +
############去除该配置#############################################################
//" instances: %s\n" +
##################################################################################
" memory: \"%s\"\n" +
" labels:\n" +
" version: 3.3.1\n" +
" volumeMounts:\n" +
" - name: nfs-spark-volume\n" +
" mountPath: \"/app/sparkcode\"\n" +
" env:\n" +
" - name: AWS_REGION\n" +
" valueFrom:\n" +
" secretKeyRef:\n" +
" name: minio-secret\n" +
" key: AWS_REGION\n" +
" - name: AWS_ACCESS_KEY_ID\n" +
" valueFrom:\n" +
" secretKeyRef:\n" +
" name: minio-secret\n" +
" key: AWS_ACCESS_KEY_ID\n" +
" - name: AWS_SECRET_ACCESS_KEY\n" +
" valueFrom:\n" +
" secretKeyRef:\n" +
" name: minio-secret\n" +
" key: AWS_SECRET_ACCESS_KEY\n" +
" - name: MINIO_ENDPOINT\n" +
" valueFrom:\n" +
" secretKeyRef:\n" +
" name: minio-secret\n" +
" key: MINIO_ENDPOINT\n" +
" - name: MINIO_HOST\n" +
" valueFrom:\n" +
" secretKeyRef:\n" +
" name: minio-secret\n" +
" key: MINIO_HOST\n" +
" - name: BUCKET_NAME\n" +
" valueFrom:\n" +
" secretKeyRef:\n" +
" name: minio-secret\n" +
" key: BUCKET_NAME\n" +
" sparkConf:\n" +
" spark.query.sql: \"%s\"",
taskName,sparkImage,mainClass,sparkJarFile,driverCpu,driverMemory,executorCpu,executorMemory,dynamicSQLQuery
);
}
二 配置动态参数
//测试spark3.0 动态分配executor的instance
public static void sparkQueryForFhcS3DynamicExecutor() throws Exception{
System.out.println("=========================1");
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
System.out.println("===========================2");
String metastoreUri = "thrift://wuxdihadl09b.seagate.com:9083";
SparkConf sparkConf = new SparkConf();
sparkConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
sparkConf.set("fs.s3a.access.key", "apPeWWr5KpXkzEW2jNKW");
sparkConf.set("spark.hadoop.fs.s3a.path.style.access", "true");
sparkConf.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "true");
sparkConf.set("fs.s3a.secret.key", "cRt3inWAhDYtuzsDnKGLGg9EJSbJ083ekuW7PejM");
sparkConf.set("fs.s3a.endpoint", "wuxdimiov001.seagate.com:9000"); // 替换为实际的 S3 存储的地址
sparkConf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
sparkConf.set("spark.sql.metastore.uris", metastoreUri);
sparkConf.set("spark.sql.warehouse.dir", warehouseLocation);
sparkConf.set("spark.sql.catalogImplementation", "hive");
sparkConf.set("hive.metastore.uris", metastoreUri);
#####################################################添加动态参数#######################
//#总开关,是否开启动态资源配置,根据工作负载来衡量是否应该增加或减少executor,默认false
sparkConf.set("spark.dynamicAllocation.enabled", "true");
//#spark3新增,之前没有官方支持的on k8s的Dynamic Resouce Allocation。启用shuffle文件跟踪,此配置不会回收保存了shuffle数据的executor
sparkConf.set("spark.dynamicAllocation.shuffleTracking.enabled", "true");
//#启用shuffleTracking时控制保存shuffle数据的executor超时时间,默认使用GC垃圾回收控制释放。如果有时候GC不及时,配置此参数后,即使executor上存在shuffle数据,也会被回收。
sparkConf.set("spark.dynamicAllocation.shuffleTracking.timeout", "60s");
//#动态分配最小executor个数,在启动时就申请好的,默认0
sparkConf.set("spark.dynamicAllocation.minExecutors", "1");
//#动态分配最大executor个数,默认infinity
sparkConf.set("spark.dynamicAllocation.maxExecutors", "10");
//#动态分配初始executor个数默认值=spark.dynamicAllocation.minExecutors
sparkConf.set("spark.dynamicAllocation.initialExecutors", "2");
//#当某个executor空闲超过这个设定值,就会被kill,默认60s
sparkConf.set("spark.dynamicAllocation.executorIdleTimeout", "60s");
//#当某个缓存数据的executor空闲时间超过这个设定值,就会被kill,默认infinity
sparkConf.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "240s");
//#任务队列非空,资源不够,申请executor的时间间隔,默认1s(第一次申请)
sparkConf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "3s");
//#同schedulerBacklogTimeout,是申请了新executor之后继续申请的间隔,默认=schedulerBacklogTimeout(第二次及之后)
sparkConf.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", "30s");
//#开启推测执行,对长尾task,会在其他executor上启动相同task,先运行结束的作为结果
sparkConf.set("spark.specution", "true");
#######################################################################################
//Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem");
long zhenyang2 = System.currentTimeMillis();
SparkSession sparkSession = SparkSession.builder()
.appName("Fhc Spark Query")
.config(sparkConf)
.enableHiveSupport()
.getOrCreate();
System.out.println("sparkSession create cost:"+(System.currentTimeMillis()-zhenyang2));
System.out.println("==============================3");
// 获取 SparkConf 对象
String exesql = sparkSession.sparkContext().getConf().get("spark.query.sql");
System.out.println("==============================3.1:"+exesql);
System.out.println("Hive Metastore URI: " + sparkConf.get("spark.sql.metastore.uris"));
System.out.println("Hive Warehouse Directory: " + sparkConf.get("spark.sql.warehouse.dir"));
System.out.println("SHOW DATABASES==============================3.1:"+exesql);
sparkSession.sql("SHOW DATABASES").show();
long zhenyang3 = System.currentTimeMillis();
Dataset<Row> sqlDF = sparkSession.sql(exesql);
System.out.println("sparkSession sql:"+(System.currentTimeMillis()-zhenyang3));
System.out.println("======================4");
//System.out.println("===========sqlDF count===========:"+sqlDF.count());
//sqlDF.show();
long zhenyang5 = System.currentTimeMillis();
List<Row> jaList= sqlDF.javaRDD().collect();
System.out.println("rdd collect cost:"+(System.currentTimeMillis()-zhenyang5));
System.out.println("jaList list:"+jaList.size());
List<TaskListModel> list = new ArrayList<TaskListModel>();
long zhenyang4 = System.currentTimeMillis();
AtomicInteger i = new AtomicInteger(0);
jaList.stream().forEachOrdered(result -> {
i.incrementAndGet();
//System.out.println("serial_num is :"+result.getString(1));
});
System.out.println("for each times:"+i.get());
System.out.println("SparkDemo foreach cost:"+(System.currentTimeMillis()-zhenyang4));
System.out.println("=========================5");
sparkSession.close();
sparkSession.stop();
}
三 发布一,二中的程序(逻辑见前面的博客文章)
四 测试
首先提交一个简单sql: select * from cimarronbp_n.p_vbar_metric_summary limit 10
查看k8s spark operator生成的pod
根据pod启动的时间可以看出,先生成了2个executor,在16s后又生成了1个,最后完成,可以看出executor确实根据任务的执行情况动态生成了。而之前文章中的executor 20个是同一时间生成的
再测试一个join的sql
select distinct t1.serial_num,t1.trans_seq,t2.state_name,t2.p_vbar_metric_summary,t1.event_date from cimarronbp_n.p_vbar_metric_summary t1 left join cimarronbp_n.p_vbar_metric_summary t2 on t1.serial_num = t2.serial_num AND t1.trans_seq = t2.trans_seq where t1.event_date='20231204' and t1.family='2TJ' and t1.operation='CAL2'
查看k8s spark operator生成的pod
executor从2个到3个,3个到4个,是动态的!
五 本文参考链接
「Spark从精通到重新入门(二)」Spark中不可不知的动态资源分配-阿里云开发者社区资源是影响 Spark 应用执行效率的一个重要因素。Spark 应用中真正执行 task 的组件是 Executor,可以通过spark.executor.instances 指定 Spark 应用的 Executor 的数量。在运行过程中,无论 Executor上是否有 task 在执行,都会被一直占有直到此 Spark 应用结束。https://developer.aliyun.com/article/832482