DolphinScheduler + Amazon EMR Serverless 的集成实践

0a0d770ddf11e58ffd2cabfee44402c3.gif

01

背景

Apache DolphinScheduler 是一个分布式的可视化 DAG 工作流任务调度开源系统,具有简单易用、高可靠、高扩展性、⽀持丰富的使用场景、提供多租户模式等特性。适用于企业级场景,提供了一个可视化操作任务、工作流和全生命周期数据处理过程的解决方案。

随着企业规模的扩大,业务数据的激增,以及 Apache DolphinScheduler 产品的完善、社区的日益火爆,越来越多的 EMR 客户,使用其进行集群任务的日常调度。相关安装、集成实践,本文不做详述,可以参考博客《使用 DolphinScheduler 进行 EMR 任务调度》。

  • 使用 DolphinScheduler 进行 EMR 任务调度

    https://aws.amazon.com/cn/blogs/china/emr-task-scheduling-with-dolphinscheduler/

Amazon EMR Serverless 是 EMR 中的无服务器选项,数据分析师和工程师可借助其轻松运行开源大数据分析框架(例如 Apache Spark 和 Apache Hive ),而无需配置、管理和扩展集群或服务器,使得数据工程师和分析师能够进一步聚焦业务价值的创造,最终实现降本增效。因此,越来越多的客户,开始尝试从 EMR on EC2 切换到 EMR Serverless,或者说从 DolphinScheduler + EMR 切换到 DolphinScheduler + EMR Serverless。

但在实践过程中,如下问题往往成为了拦路虎:

  • 异步执行:在使用 EMR on Amazon EC2 + DolphinScheduler 时,很多客户选择 beeline、PyHive 或者 Spark-Submit 的方式,让任务提交后同步执行,以便调度引擎的正常工作与进度的监控。但 EMR Serverless 仅支持任务提交后的异步执行,这对于使用 DolphinScheduler 的客户来讲是很难接受的。

  • 日志获取:切换到 EMR Serverless 后,获取任务日志的方式也发生了变化。由于任务的异步执行,导致在 DolphinScheduler 提交任务后,往往需要到 EMR Serverless 的 Job 列表页面查看日志,影响工作效率。

  • 混合调度:很多客户经过实际评估后,往往需要将一部分任务放到 EMR on EC2 上运行,将另一部分任务放到 EMR Serverless,以达到最佳的性价比。但两类群集的任务执行与监控方式区别较大,将两种任务放到调度系统中混合调度的维护成本比较高。

  • 任务形态:客户实际案例中,有的任务是执行一段 SQL 语句,有的任务是执行一个 Spark 脚本文件。但在 EMR Serverless 中默认仅支持提交脚本文件,无形中又给客户多设置了一道使用障碍。

02

解决方案

整体介绍与示例

本文将以 Python 语言提交 Spark 任务为例,探索针对上述问题的解决方案。如下图所示,通过封装一个 Python 类库,将 EMR On EC2 与 EMR Serverless 两种形态下的 Spark 任务提交、执行与监控细节进行抽象,面向 DolphinScheduler 提供统一的接口来进行调用,简化用户使用 EMR Serverless 的门槛。

804448a65cb117fc1dccb8263bf832e9.png

我们先通过代码演示如何使用封装的 Python 类库提交 Spark 任务,代码示例如下。其中 emr_common.Session 是抽象出来的 Python 类。

from emr_common import Session
#jobtype=0时,表示 EMR On EC2。可以手动设置集群 ID, 若不设置则默认会获取活动集群中的第 1 个。
session_emr=Session(jobtype=0)
#提交 SQL 语句,执行过程中,会持续打印状态并在任务完成时,打印日志
session_emr.submit_sql("sql-task","SELECT * FROM xxtable LIMIT 10"
#提交脚本文件,spark-test.py 是一个 pysark 或者 pyspark.sql 的程序脚本,执行过程中,会持续打印状态并在任务完成时,打印日志
session_emr.submit_file("script-task","spark-test.py")




#jobtype=1 时,表示 EMR Serverless。可以手动设置应用 ID,若不设置则默认会获取 spark 应用程序中的第 1 个。
session_emrserverless=Session(jobtype=1,logs_s3_path='s3://xxx/xx')
#提交 SQL 语句,执行过程中,会持续打印状态并在任务完成时,打印日志
session_emrserverless.submit_sql("sql-task","SELECT * FROM xxtable LIMIT 10")
#提交脚本文件,spark-test.py 是一个 pysark 或者 pyspark.sql 的程序脚本,执行过程中,会持续打印状态并在任务完成时,打印日志
session_emrserverless.submit_file("script-task","spark-test.py")

原理 & 细节阐述

整体的类结构设计,采用的是面向对象的代理模式。面向客户使用的类是 Session 类,在 Session 类的构造函数中,会根据传入 jobtype 字段值来进一步构建内部类:EMRSession 或者 EMRServerlessSession。而真正的 Spark 任务提交、监控、日志查询逻辑则是封装在 EMRSession 或者 EMRServerlessSession 的对应方法中。

9300b896afbd905336bba34e16e8e1a5.png

EMRSession 的实现逻辑

  • 当调用 submit_sql(jobname,sql) 方法来提交任务,则会先读取 sql_template.py 文件,使用参数 sql 来替换文件中的${query}占位符,并生成一个临时文件上传至 Amazon S3;若是通过 submit_file(jobname,file) 方法来提交任务,则需要提前将脚本文件通过 DolphinScheduler 的资源中心进行上传,DolphinScheduler 后台会将文件上传至 S3 的指定目录。

  • 当脚本文件上传至 S3 后,再通过 EMR Steps 中的 add_job_flow_steps 命令来远程提交 Spark 任务。这里有两点需要指出:若设置了 Python 虚拟环境,则在提交 Spark 任务时,会在 dd_job_flow_steps 命令的 spark-submit 配置部分设置相关参数来使用这个虚拟环境;同时也会使用默认的或者用户自定义的 spark_conf 参数来设置 spark 的 driver、executor 配置参数。

  • 在任务执行过程中,会每隔 10 秒获取一次任务状态,并打印至控制台。在失败状态时失败时,会到约定的 S3 路径上获取 Driver 的 stderr 与 stdout 日志文件。

EMRServerless 的实现逻辑

原理与 EMRSession 大同小异,只是各步骤具体的接口调用不同。

  • 若调用 submit_sql(jobname,sql) 方法来提交任务,则会先读取 sql_template.py 文件,使用参数 sql 来替换文件中的${query}占位符,并生成一个临时文件上传至 S3;若是通过 submit_file(jobname,file) 方法来提交任务,则需要提前将脚本文件通过 DolphinScheduler 的资源中心进行上传,DolphinScheduler 后台会将文件上传至 S3 的指定目录。

  • 当脚本文件上传至 S3 后,再通过 start_job_run 命令来远程提交 Spark 任务。这里有两点需要指出:若设置了 Python 虚拟环境,则在提交 Spark 任务时,会在 start_job_run 中 spark-submit 配置中设置相关参数来使用这个虚拟环境;同时也会使用默认的或者用户自定义的 spark_conf 参数来设置 Spark 的 driver、executor 配置参数。

  • 在任务执行过程中,会每隔 10 秒获取一次任务状态,并打印至控制台。在失败状态时失败时,会到约定的 S3 路径上获取 Driver 的 stderr 与 stdout 日志文件。

接下来,我们通过时序图来表示 submit_sql(jobname,sql) 的调用逻辑,如下图所示:

022ff562180d2619d4b623a51083840e.png

完整代码

下面将展示完整的代码。其中,Session 类构造函数的参数,大多设置了默认值,以减少调用时的反复设置。在实际使用时,需根据真实场景来替换这些参数的默认值。接下来,将逐一解释 Session 类构造函数的每个参数。

  • application_id:若是 serverless,则设置应用程序的 ID; 若是 emr on ec2,则设置集群 ID;若不设置,则自动其第一个 active 的 app 或者 cluster 的 ID

  • jobtype:0: EMR on EC2;1: serverless;默认值为 0

  • job_role:EMR On EC2 的集群角色或者 EMRServerless 的 Job 角色。考虑到两者都需要 S3、Glue 等服务的访问权限,可以统一使用一个角色

  • dolphin_s3_path:DolphinScheduler 中配置的用于存储文件的 S3 路径。在 DolphinScheduler 中调度的 Python 任务代码中,可以直接通过相对路径引用其它 python 文件

  • logs_s3_path:对于 EMR on EC2 来说,就是集群级别的保存日志的 S3 路径;对于 EMR Serverless 来讲是 Job 级别的保存日志的 S3 路径,但通常可以统一使用一个路径

  • tempfile_s3_path:类库中会创建一些临时文件并保存在 S3 上

  • python_venv_s3_path:有的客户在编写 pyspark 时,还会引用一些其它的 Python 库。这时就需要准备一个 Python 虚拟环境,提前预置各类所需要的 Python 第三方库,并将虚拟环境打包并上传至 S3

  • spark_conf:这将会是一个常用的参数,用于设置 spark 的 driver 与 executor 的相关参数

import gzip
import os
from string import Template
import time
import boto3
from datetime import datetime
class EMRResult:
    def __init__(self,job_run_id,status):
        self.job_run_id=job_run_id
        self.status=status
class Session:
    def __init__(self,
                 application_id='', #若是 serverless,则设置 应用的 ID; 若是emr on ec2,则设置集群 ID;若不设置,则自动其第一个active的 app 或者cluster
                 jobtype=0, #0:EMR on EC2; 1: serverless  
                 job_role='arn:aws:iam::******:role/AmazonEMR-ExecutionRole-1694412227712',
                 dolphin_s3_path='s3://*****/dolphinscheduler/ec2-user/resources/',
                 logs_s3_path='s3://aws-logs-****-ap-southeast-1/elasticmapreduce/',
                 tempfile_s3_path='s3://****/tmp/',
                 python_venv_s3_path='s3://****/python/pyspark_venv.tar.gz',
                 spark_conf='--conf spark.executor.cores=4 --conf spark.executor.memory=16g --conf spark.driver.cores=4 --conf spark.driver.memory=16g'
                 ):


        self.jobtype=jobtype
        self.application_id = application_id


        self.region='ap-southeast-1'
        self.job_role = job_role
        self.dolphin_s3_path = dolphin_s3_path
        self.logs_s3_path=logs_s3_path
        self.tempfile_s3_path=tempfile_s3_path
        self.spark_conf=spark_conf
        self.python_venv_s3_path=python_venv_s3_path


        self.client = boto3.client('emr', region_name=self.region)
        self.client_serverless = boto3.client('emr-serverless', region_name=self.region)


        #如果未设置application_id,则查询当前第一个 active 的 EMR 集群/或者 EMR Serverless 应用的 ID
        if self.application_id == '':
            self.application_id=self.getDefaultApplicaitonId()


        if jobtype == 0 :  #EMR on EC2
            self.session=EmrSession(
                region=self.region,
                application_id=self.application_id,
                job_role=self.job_role,
                dolphin_s3_path=self.dolphin_s3_path,
                logs_s3_path=self.logs_s3_path,
                tempfile_s3_path=self.tempfile_s3_path,
                python_venv_s3_path=self.python_venv_s3_path,
                spark_conf=self.spark_conf
            )
        elif jobtype ==1 : #EMR Serverless
            self.session=EmrServerlessSession(
                region=self.region,
                application_id=self.application_id,
                job_role=self.job_role,
                dolphin_s3_path=self.dolphin_s3_path,
                logs_s3_path=self.logs_s3_path,
                tempfile_s3_path=self.tempfile_s3_path,
                python_venv_s3_path=self.python_venv_s3_path,
                spark_conf=self.spark_conf
            )
        else: #Pyhive ,used on-premise
            self.session=PyHiveSession(
                host_ip="172.31.25.171",
                port=10000
            )


        self.initTemplateSQLFile()


    def submit_sql(self,jobname, sql):
        result= self.session.submit_sql(jobname,sql)
        if result.status == "FAILED" :
            raise Exception("ERROR:任务失败")


    def submit_file(self,jobname, filename):
        result=  self.session.submit_file(jobname,filename)
        if result.status == "FAILED":
            raise Exception("ERROR:任务失败")


    def getDefaultApplicaitonId(self):
        if self.jobtype == 0: #EMR on EC2
            emr_clusters = self.client.list_clusters(ClusterStates=['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING'])
            if emr_clusters['Clusters']:
                app_id= emr_clusters['Clusters'][0]['Id']
                print(f"选择默认的集群(或EMR Serverless 的应用程序)ID:{app_id}")
                return app_id
            else:
                raise Exception("没有找到活跃的EMR集群")
        elif self.jobtype == 1: #EMR Serverless
            emr_applications = self.client_serverless.list_applications()
            spark_applications = [app for app in emr_applications['applications'] if app['type'] == 'Spark']
            if spark_applications:
                app_id = spark_applications[0]['id']
                print(f"选择默认的应用ID:{app_id}")
                return app_id
            else:
                raise Exception("没有找到活跃的 EMR Serverless 应用")


    def initTemplateSQLFile(self):
        with open('sql_template.py', 'w') as f:
            f.write('''
from pyspark.sql import SparkSession


spark = (
    SparkSession.builder.enableHiveSupport()
    .appName("Python Spark SQL basic example")
    .getOrCreate()
)


df = spark.sql("$query")
df.show()
        ''')
class EmrSession:
    def __init__(self,
                 region,
                 application_id,  #若是EMR on EC2,则设置集群 ID;若不设置,则自动其第一个active的 app 或者cluster
                 job_role,
                 dolphin_s3_path,
                 logs_s3_path,
                 tempfile_s3_path,
                 python_venv_s3_path,
                 spark_conf
                 ):
        self.s3_client = boto3.client("s3")
        self.region=region
        self.client = boto3.client('emr', region_name=self.region)
        self.application_id = application_id


        self.job_role = job_role
        self.dolphin_s3_path = dolphin_s3_path
        self.logs_s3_path=logs_s3_path
        self.tempfile_s3_path=tempfile_s3_path
        self.python_venv_s3_path=python_venv_s3_path
        self.spark_conf=spark_conf


        self.client.modify_cluster(
            ClusterId=self.application_id,
            StepConcurrencyLevel=256
        )
    def submit_sql(self,jobname, sql):
        # temporary file for the sql parameter
        print(f"RUN SQL:{sql}")
        self.python_venv_conf=''
        with open(
                os.path.join(os.path.dirname(os.path.abspath(__file__)), "sql_template.py")
        ) as f:
            query_file = Template(f.read()).substitute(query=sql.replace('"', '\\"'))


            script_bucket = self.tempfile_s3_path.split('/')[2]
            script_key = '/'.join(self.tempfile_s3_path.split('/')[3:])


            current_time = datetime.now().strftime("%Y%m%d%H%M%S")
            script_key = script_key+"sql_template_"+current_time+".py"
            self.s3_client.put_object(
                Body=query_file, Bucket=script_bucket, Key=script_key
            )


            script_file=f"s3://{script_bucket}/{script_key}"
            result= self._submit_job_emr(jobname, script_file)
            self.s3_client.delete_object(
                Bucket=script_bucket, Key=script_key
            )
            return result
    def submit_file(self,jobname, filename):
        # temporary file for the sql parameter
        print(f"Run File :{filename}")
        self.python_venv_conf=''
        if self.python_venv_s3_path and self.python_venv_s3_path != '':
            self.python_venv_conf = f"--conf spark.yarn.dist.archives={self.python_venv_s3_path}#environment --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python --conf spark.executorEnv.PYSPARK_PYTHON=./environment/bin/python"




        script_file=f"{self.dolphin_s3_path}{filename}"
        result= self._submit_job_emr(jobname, script_file)


        return result




    def _submit_job_emr(self, jobname, script_file):
        spark_conf_args = self.spark_conf.split()


        #设置虚拟环境的地址,用于支持 pyspark 以外的库
        python_venv_args=[]
        if self.python_venv_conf and self.python_venv_conf != '':
            python_venv_args=self.python_venv_conf.split()


        jobconfig=[
            {
                'Name': f"{jobname}",
                'ActionOnFailure': 'CONTINUE',
                'HadoopJarStep': {
                    'Jar': 'command-runner.jar',
                    'Args': [
                                'spark-submit',
                                '--deploy-mode',
                                'cluster',
                                '--master',
                                'yarn',
                                '--conf',
                                'spark.yarn.submit.waitAppCompletion=true'


                            ] + spark_conf_args + python_venv_args + [script_file]
                }
            }
        ]
        response = self.client.add_job_flow_steps(
            JobFlowId=self.application_id,
            Steps=jobconfig
        )
        print(jobconfig)


        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
            print('task failed:')
            print(response)


        job_run_id = response['StepIds'][0]
        print(f"Submit job on EMR ,job id: {job_run_id}")
        job_done = False
        status='PENDING'
        while not job_done:
            status = self.get_job_run(job_run_id)
            print(f"current status:{status}")
            job_done = status in [
                "SUCCESS",
                "FAILED",
                "CANCELLING",
                "CANCELLED",
                "COMPLETED"
            ]
            time.sleep(10)


        if status == "FAILED":
            self.print_driver_log(job_run_id,log_type="stderr")
            self.print_driver_log(job_run_id,log_type="stdout")
        return EMRResult(job_run_id,status)




    def get_job_run(self, job_run_id: str) -> dict:
        step_status = self.client.describe_step(
            ClusterId=self.application_id,
            StepId=job_run_id
        )['Step']['Status']['State']
        return step_status.upper()


    def print_driver_log(self, job_run_id: str, log_type: str = "stderr") -> str:


        print("starting download the driver logs")


        s3_client = boto3.client("s3")
        logs_location = f"{self.logs_s3_path}{self.application_id}/steps/{job_run_id}/{log_type}.gz"
        logs_bucket = logs_location.split('/')[2]
        logs_key = '/'.join(logs_location.split('/')[3:])
        print(f"Fetching {log_type} from {logs_location}")
        try:
            #日志生成需要一段时间,最长 100 秒
            for _ in range(10):
                try:
                    s3_client.head_object(Bucket=logs_bucket, Key=logs_key)
                    break
                except Exception:
                    print("等待日志生成中...")
                    time.sleep(10)
            response = s3_client.get_object(Bucket=logs_bucket, Key=logs_key)
            file_content = gzip.decompress(response["Body"].read()).decode("utf-8")
        except s3_client.exceptions.NoSuchKey:
            file_content = ""
            print( f"等待超时,请稍后到 EMR 集群的步骤中查看错误日志或者手动前往: {logs_location} 下载")
        print(file_content)




class EmrServerlessSession:
    def __init__(self,
                 region,
                 application_id, #若是 serverless, 则设置 应用的 ID;若不设置,则自动其第一个active的 app 
                 job_role,
                 dolphin_s3_path,
                 logs_s3_path,
                 tempfile_s3_path,
                 python_venv_s3_path,
                 spark_conf
                 ):
        self.s3_client = boto3.client("s3")
        self.region=region
        self.client = boto3.client('emr-serverless', region_name=self.region)
        self.application_id = application_id


        self.job_role = job_role
        self.dolphin_s3_path = dolphin_s3_path
        self.logs_s3_path=logs_s3_path
        self.tempfile_s3_path=tempfile_s3_path
        self.python_venv_s3_path=python_venv_s3_path
        self.spark_conf=spark_conf






    def submit_sql(self,jobname, sql): #serverless
        # temporary file for the sql parameter
        print(f"RUN SQL:{sql}")
        self.python_venv_conf=''
        with open(
                os.path.join(os.path.dirname(os.path.abspath(__file__)), "sql_template.py")
        ) as f:
            query_file = Template(f.read()).substitute(query=sql.replace('"', '\\"'))


            script_bucket = self.tempfile_s3_path.split('/')[2]
            script_key = '/'.join(self.tempfile_s3_path.split('/')[3:])


            current_time = datetime.now().strftime("%Y%m%d%H%M%S")
            script_key = script_key+"sql_template_"+current_time+".py"
            self.s3_client.put_object(
                Body=query_file, Bucket=script_bucket, Key=script_key
            )


            script_file=f"s3://{script_bucket}/{script_key}"
            result= self._submit_job_emr(jobname, script_file)


            #delete the temp file
            self.s3_client.delete_object(
                Bucket=script_bucket, Key=script_key
            )
            return result
    def submit_file(self,jobname, filename):  #serverless
        # temporary file for the sql parameter
        print(f"RUN Script :{filename}")


        self.python_venv_conf=''
        if self.python_venv_s3_path and self.python_venv_s3_path != '':
            self.python_venv_conf = f"--conf spark.archives={self.python_venv_s3_path}#environment --conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python --conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python --conf spark.executorEnv.PYSPARK_PYTHON=./environment/bin/python"




        script_file=f"{self.dolphin_s3_path}{filename}"
        result= self._submit_job_emr(jobname, script_file)


        return result




    def _submit_job_emr(self, name, script_file):#serverless
        job_driver = {
            "sparkSubmit": {
                "entryPoint": f"{script_file}",
                "sparkSubmitParameters": f"{self.spark_conf} --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory {self.python_venv_conf}",
            }
        }
        print(f"job_driver:{job_driver}")
        response = self.client.start_job_run(
            applicationId=self.application_id,
            executionRoleArn=self.job_role,
            name=name,
            jobDriver=job_driver,
            configurationOverrides={
                "monitoringConfiguration": {
                    "s3MonitoringConfiguration": {
                        "logUri": self.logs_s3_path,
                    }
                }
            },
        )


        job_run_id = response.get("jobRunId")
        print(f"Emr Serverless Job submitted, job id: {job_run_id}")


        job_done = False
        status="PENDING"
        while not job_done:
            status = self.get_job_run(job_run_id).get("state")
            print(f"current status:{status}")
            job_done = status in [
                "SUCCESS",
                "FAILED",
                "CANCELLING",
                "CANCELLED",
            ]


            time.sleep(10)


        if status == "FAILED":
            self.print_driver_log(job_run_id,log_type="stderr")
            self.print_driver_log(job_run_id,log_type="stdout")
            raise Exception(f"EMR Serverless job failed:{job_run_id}")
        return EMRResult(job_run_id,status)




    def get_job_run(self, job_run_id: str) -> dict:
        response = self.client.get_job_run(
            applicationId=self.application_id, jobRunId=job_run_id
        )
        return response.get("jobRun")


    def print_driver_log(self, job_run_id: str, log_type: str = "stderr") -> str:




        s3_client = boto3.client("s3")
        logs_location = f"{self.logs_s3_path}applications/{self.application_id}/jobs/{job_run_id}/SPARK_DRIVER/{log_type}.gz"
        logs_bucket = logs_location.split('/')[2]
        logs_key = '/'.join(logs_location.split('/')[3:])
        print(f"Fetching {log_type} from {logs_location}")
        try:
            response = s3_client.get_object(Bucket=logs_bucket, Key=logs_key)
            file_content = gzip.decompress(response["Body"].read()).decode("utf-8")
        except Exception:
            file_content = ""
        print(file_content)

在 DolphinScheduler 上的应用

经过以上类库抽象与封装后,在 DolphinScheduler 上使用该类库,可以简单且灵活的向 EMR on EC2 和 EMR Serverless 提交 Spark 任务。

首先,将上述代码上传至 DolphinScheduler 的资源中心,文件名为 “emr_common.py”,如下图所示。

4e9bec40029badb6fe42fbe23a6e6203.png

然后在工作流程中插入 Python节点,按照 Demo 代码示例,提交 Spark 任务。通过 Session 的构造函数参数 jobtype 来控制,是向 EMR on EC2 提交 Spark 任务,还是向 EMR Serverless 提交 Spark 任务。需要注意的是,填写完 Python 代码后,为了让节点中的 Python 代码能正确地引用类库 “emr_common.py”,一定要在节点的资源设置中添加 “emr_common.py”,如下两图所示(注:需要提前在 DolphinScheduler 的节点上安装 emr_common.py 所引用的第三方 Python 库)。

a30b31dc8f3bf7f940f19a1b91de4197.png

当任务执行结束后,如果出现错误,就可以在 DolphinScheduler 中直接查看日志,无需到 Yarn、Spark UI 或者 EMRServerless 的 Job 页面去下载与查看日志了,如下图所示。

b2ab36aa65222798cb24d2cb54e5ce88.png

03

总结

本文通过对 EMR on EC2 与 EMRServerless 中 Spark 任务的提交、监控、下载日志过程进行抽象并封装成 Python 类库,极大的简化了使用 Spark 的门槛,以及从 EMR on EC2 切换至 EMRServerless 的改造成本,优化了 EMRServerless+DolphinScheduler 的集成实践,消除了客户对于使用 EMRServerless 的一些疑惑以及担忧。最终帮助客户逐渐从集群运维的工作负担中解脱出来,更加专注于应用逻辑的开发与业务价值的创造。

本篇作者

5e3e2168ee44ed8740a12ab8caf5e1da.jpeg

张盼富

亚马逊云科技解决方案架构师,从业十三年,先后经过历云计算、供应链金融、电商等多个行业,担任过高级开发、架构师、产品经理、开发总监等多种角色,有丰富的大数据应用与数据治理经验。加入亚马逊云科技后,致力于通过大数据+AI 技术,帮助企业加速数字化转型。

ca4ee3aa1fc8a2dd75cd8ce0970424b6.jpeg

刘元元

亚马逊云科技解决方案架构师,负责基于亚马逊云科技的云计算方案架构设计、咨询、实施等工作。曾担任研发经理、架构师的岗位并拥有多年的互联网系统的架构设计、系统开发的经验,覆盖金融、文旅、交通等行业,在 SaaS 系统和 Serverless 领域有着丰富的经验。

6438213e5e9cdfe0ea6512ec85415cd1.jpeg

庄颖勤

亚马逊云科技解决方案架构师,负责基于亚马逊云科技的云计算方案架构设计、咨询、实施等工作。在 DevOps、CI/CD 和容器等领域拥有丰富的技术和支持经验,致力于帮助客户实现技术创新和业务发展。

f994fc86384780598536bea44b84696a.gif

星标不迷路,开发更极速!

关注后记得星标「亚马逊云开发者」

cfcd233b92ff3850f99394baf51d6f9e.gif

听说,点完下面4个按钮

就不会碰到bug了!

811391b96f8448ef743a97b8a7dfaedd.gif

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/355323.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

2024.1.24 C++QT 作业

思维导图 练习题 1.提示并输入一个字符串&#xff0c;统计该字符中大写、小写字母个数、数字个数、空格个数以及其他字符个数 #include <iostream> #include <string.h> #include <array> using namespace std;int main() {string str;cout << "…

《微信小程序开发从入门到实战》学习九十六

7.2 基础内容组件 7.2.4 progress组件 progress组件的示例代码如下&#xff1a; <progress percent"20" show-info /> 7.3 表单组件 表单组件是用于收集信息的组件。第三章介绍了许多表单组件&#xff0c;包括form、input、textarea、picker、switch、butt…

在WebSocket中使用Redis出现空指针异常解决方案

文章目录 在WebSocket中使用Redis1.问题描述2.原因3.解决步骤1.新建一个SpringUtil.java类&#xff0c;通过getBean的方法主动获取实例2.在WebSocketSingleServer.java中导入 在WebSocket中使用Redis 1.问题描述 在controller 和 service中都可以正常使用Redis&#xff0c;在…

【Javaweb程序设计】【C00161】基于SSM电子产品交易管理系统(论文+PPT)

基于SSM电子产品交易管理系统&#xff08;论文PPT&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于ssm的电子产品交易系统 本系统分为前台用户和后台管理员2个功能模块. 前台用户模块&#xff1a;当游客打开系统的网址后&#xff0c;首先看到的就…

qemu + vscode图形化调试linux kernel

一、背景 使用命令行连接gdb 在调试时&#xff0c;虽然可以通过tui enable 显示源码&#xff0c;但还是存在设置断点麻烦&#xff08;需要对着源码设置&#xff09;&#xff0c;terminal显示代码不方便&#xff0c;不利于我们学习&#xff1b;另外在gdb 下p命令显示结构体内容…

猫用空气净化器哪款牌子好?好用能吸毛的宠物空气净化器推荐

作为一个养猫多年的铲屎官&#xff0c;我真的无法抗拒猫星人的可爱魅力&#xff01;以前&#xff0c;每当我路过宠物店&#xff0c;我总会忍不住停下来&#xff0c;在玻璃窗前停留半个小时以上。但是后来&#xff0c;我终于有了自己的猫咪。每天都能享受到给它摸小肚子的乐趣&a…

腾讯云幻兽帕鲁服务器创建教程,附4核16G服务器价格表

腾讯云0基础搭建帕鲁服务器4C16G14M服务器稳定无卡顿&#xff0c;先下载SteamCMD&#xff0c;并运行&#xff1b;然后下载Palserver&#xff0c;修改服务ini配置&#xff0c;启动PalServer&#xff0c;进入游戏服务器。腾讯云百科txybk.com分享腾讯云创建幻兽帕鲁服务器教程&am…

gdb 调试 - 在vscode图形化展示在远程的gdb debug过程

前言 本地机器的操作系统是windows&#xff0c;远程机器的操作系统是linux&#xff0c;开发在远程机器完成&#xff0c;本地只能通过ssh登录到远程。现在目的是要在本地进行图形化展示在远程的gdb debug过程。&#xff08;注意这并不是gdb remote &#xff01;&#xff01;&am…

产业需求大数据平台

产业需求大数据平台&#xff0c;依托大数据、NLP等技术&#xff0c;全面采集区域产业人才需求数据&#xff0c;从宏观、中观、微观三个层面对产业需求进行分析&#xff0c;并匹配学校自身的办学定位、专业布局、人才培养目标、培养规格和课程设置&#xff0c;进行专业设置匹配度…

【JavaScript基础入门】03 JavaScript 基础语法(一)

JavaScript 基础语法&#xff08;一&#xff09; 目录 JavaScript 基础语法&#xff08;一&#xff09;1. JS 初体验2. JavaScript注释2.1 单行注释2.2 多行注释 3. JavaScript结束符4. JavaScript输入输出语句 1. JS 初体验 JS 有3种书写位置&#xff0c;分别为内联、内部和外…

2024.1.28 GNSS 学习笔记

1.基于 地球自转改正卫地距 以及 伪距码偏差 重构定位方程&#xff1a; 先验残差计算公式如下所示&#xff1a; 2.观测值如何定权&#xff1f;权重如何确定&#xff1f; 每个卫星的轨钟精度以及电离层模型修正后的误差都有差异&#xff0c;所以我们不能简单的将各个观测值等权…

Hotspot源码解析-第25章-类的初始化

第25章-类的初始化 这一章主要是讲类的初始化操作&#xff0c;后续类加载章节中也会用到这一章的知识&#xff0c;只不过&#xff0c;这里就讲&#xff0c;是因为虚拟在初始化过程中&#xff0c;需要对基础类&#xff0c;比如System/Thread等类进行初始化操作&#xff0c;所以…

DL/T645、IEC104转OPC UA网关BE112

随着电力系统信息化建设和数字化转型的进程不断加速&#xff0c;对电力能源的智能化需求也日趋增强。健全稳定的智慧电力系统能够为工业生产、基础设施建设以及国防建设提供稳定的能源支持。在此背景下&#xff0c;高性能的工业电力数据传输解决方案——协议转换网关应运而生&a…

贪吃蛇项目(基于C语言和数据结构中的链表)

建立文件 首先先建立3个文件。 Snake.h 函数的声明 Snake.c 函数的定义 Test.c 贪吃蛇的测试 分析项目 我们分析这整个项目 建立节点 首先在我们实现游戏开始的部分之前&#xff0c;我们要先创建贪吃蛇的节点&#xff0c;再由此创建整个贪吃蛇所包含的一些信息&#…

Docker 基础篇

目录 一、Docker 简介 1. Docker 2. Linux 容器 3. 传统虚拟机和容器的对比 4. Docker 的作用 5. Docker 的基本组成&#xff08;Docker 三要素&#xff09; 6. Docker 工作原理 7. Docker 架构 8. Docker 下载 二、Docker 安装 1. CentOS Docker 安装 2. CentOS8 …

内网安全:NTLM-Relay

目录 NTLM认证过程以及攻击面 NTLM Relay攻击 NTLM攻击总结 实验环境说明 域横向移动&#xff1a;NTLM中继攻击 攻击条件 实战一&#xff1a;NTLM中继攻击-CS转发上线MSF 原理示意图 一. CS代理转发 二. MSF架设路由 三. 适用smb_relay模块进行中继攻击 域横向移动…

《HTML 简易速速上手小册》第2章:HTML 的标签和元素(2024 最新版)

文章目录 2.1 文本格式化标签&#xff08;&#x1f3a9;✨&#x1f4dc; 网页的“时尚搭配师”&#xff09;2.1.1 基础示例&#xff1a;一篇博客的格式化2.1.2 案例扩展一&#xff1a;产品介绍页面2.1.3 案例扩展二&#xff1a;个人简历 2.2 链接和锚点&#xff08;&#x1f6a…

案例分析技巧-软件工程

一、考试情况 需求分析&#xff08;※※※※&#xff09;面向对象设计&#xff08;※※&#xff09; 二、结构化需求分析 数据流图 数据流图的平衡原则 数据流图的答题技巧 利用数据平衡原则&#xff0c;比如顶层图的输入输出应与0层图一致补充实体 人物角色&#xff1a;客户、…

初识K8S(Kubernetes )

一、概述 Kubernetes 是一个可移植、可扩展的开源平台&#xff0c;用于管理容器化的工作负载和服务&#xff0c;可促进声明式配置和自动化。 Kubernetes 拥有一个庞大且快速增长的生态&#xff0c;其服务、支持和工具的使用范围相当广泛。&#xff08;官网&#xff09; Kuberne…

Navicat连接Oracle时报错ORA-28547:完美解决

先用你的IDEA或者别人的连接到oracle数据库&#xff08;为了查询版本&#xff09; 查询版本SQL&#xff1a;select * from v$version; 引入对应的oci.dll文件 链接&#xff1a;https://pan.baidu.com/s/1volkj328Ttm-Mt0Grt1X4g 提取码&#xff1a;3d5f 进行下载 在Navicat配…