Hadoop-MapReduce-MRAppMaster启动篇

 一、源码下载

下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧

Index of /dist/hadoop/core

二、上下文

在上一篇<Hadoop-MapReduce-源码跟读-客户端篇>中已经将到:作业提交到ResourceManager,那么对于该Job第一个容器(MRAppMaster)是怎么启动的呢?接下来我们一起来看看

三、结论

MRJobConfig是一个MRJob的配置,里面包含了Map、Reduce、Combine类以及Job名称、用户名称、队列名称、MapTask数量、ReduceTask数量、工作目录,jar在本地的路径、任务超时时间、任务id、输入输出目录,每个任务的内存大小和cpu核数等等。

此外它里面还有一个属性,如下:

package org.apache.hadoop.mapreduce;
public interface MRJobConfig {
    //......省略......

    public static final String APPLICATION_MASTER_CLASS =
          "org.apache.hadoop.mapreduce.v2.app.MRAppMaster";

    public static final String MAPREDUCE_V2_CHILD_CLASS = 
          "org.apache.hadoop.mapred.YarnChild";
    //......省略......
}

MRAppMaster是MapReduce的ApplicationMaster实现,负责整个MapReduce作业的过程调度和状态协调

YarnChid是运行在每个容器中的进程,负责运行某一个MapTask或者ReduceTask,

有兴趣的同学可以看一个任务的Yarn日志,也可以看我的<Hadoop-MapReduce-跟着日志理解整体流程>一篇中的日志,就可以发现ApplicationMaster容器和MapTask、ReduceTask所在容器的的日志开头分别就是MRAppMaster和YarnChid

MRAppMaster的启动参数是在YARNRunner中配置的:

public class YARNRunner implements ClientProtocol {
    private List<String> setupAMCommand(Configuration jobConf) {
    List<String> vargs = new ArrayList<>(8);
    vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
        + "/bin/java");

    //......省略......

    vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);

    //......省略......

    return vargs;
  }
}

YarnChid的启动参数是在MapReduceChildJVM中配置的:

public class MapReduceChildJVM {
  public static List<String> getVMCommand(
      InetSocketAddress taskAttemptListenerAddr, Task task, 
      JVMId jvmID) {

    TaskAttemptID attemptID = task.getTaskID();
    JobConf conf = task.conf;

    Vector<String> vargs = new Vector<String>(8);

    vargs.add(MRApps.crossPlatformifyMREnv(task.conf, Environment.JAVA_HOME)
        + "/bin/java");

    //......省略......

    vargs.add(YarnChild.class.getName());  // main of Child
    
    //......省略......

    return vargsFinal;
  }
}

YarnChid启动后会启动MapTask或者ReduceTask

四、调用细节(源码跟读)

我们接着上一篇<Hadoop-MapReduce-源码跟读-客户端篇>的源码开始分析,即:YARNRunner.submitJob()中的ApplicationSubmissionContext构建

1、YARNRunner

1.1、createApplicationSubmissionContext

//构建启动MR AM所需的所有信息
public ApplicationSubmissionContext createApplicationSubmissionContext(
      Configuration jobConf, String jobSubmitDir, Credentials ts)
      throws IOException {
    //获取applicationId (resMgrDelegate 是 YarnClient 的子类)
    //applicationId是应用程序的全局唯一标识符,通过使用集群时间戳(即ResourceManager的开始时间)以及应用程序的单调递增计数器来实现的。
    ApplicationId applicationId = resMgrDelegate.getApplicationId();

    //设置本地资源
    //LocalResource表示运行容器所需的本地资源
    //NodeManager负责在启动容器之前本地化资源
    //应用程序可以指定LocalResourceType和LocalResourceVisibility
    //LocalResourceType:
    //    1、FILE    : 常规文件,即不间断的字节
    //    2、ARCHIVE : 归档,由NodeManager自动取消归档
    //    3、PATTERN : 1和2的混合
    //LocalResourceVisibility:
    //    1、PUBLIC  :     由节点上的所有用户共享
    //    2、PRIVATE :     在节点上同一用户的所有应用程序之间共享
    //    3、APPLICATION : 仅在节点上的同一应用程序的容器之间共享。
    //该方法会设置job配置文件、job jar包的HDFS路径等,最后得到这样一个Map
    //    <"job.xml" , LocalResource>
    //    <"job.jar" , LocalResource>
    //    <"jobSubmitDir/job.split" , LocalResource>
    //    <"jobSubmitDir/job.splitmetainfo , LocalResource>
    Map<String, LocalResource> localResources =
        setupLocalResources(jobConf, jobSubmitDir);

    //设置安全令牌
    DataOutputBuffer dob = new DataOutputBuffer();
    ts.writeTokenStorageToStream(dob);
    ByteBuffer securityTokens =
        ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

    //为AM容器设置ContainerLaunchContext
    //ContainerLaunchContext表示NodeManager启动容器所需的所有信息,包括:
    //    1、ContainerId
    //    2、分配给容器的资源
    //    3、容器分配给的用户
    //    4、如果启用了安全性,还需要安全令牌
    //    5、我们上面设置的本地资源
    //    6、可选的、特定于应用程序的二进制服务数据
    //    7、已启动进程的环境变量
    //    8、启动容器的命令(里面包含了AM和Task所在容器的启动类,即结论中的MRAppMaster和YarnChild)
    //    9、容器失败退出时的重试策略
    //***********************************************
    //setupAMCommand方法会设置AM所在容器的启动命令参数,下面我们会展开看看命令是什么样的
    List<String> vargs = setupAMCommand(jobConf);
    ContainerLaunchContext amContainer = setupContainerLaunchContextForAM(
        jobConf, localResources, securityTokens, vargs);

    //设置RM用于续订令牌的配置
    String regex = conf.get(MRJobConfig.MR_JOB_SEND_TOKEN_CONF);
    if (regex != null && !regex.isEmpty()) {
      setTokenRenewerConf(amContainer, conf, regex);
    }


    Collection<String> tagsFromConf =
        jobConf.getTrimmedStringCollection(MRJobConfig.JOB_TAGS);

    //设置ApplicationSubmissionContext
    //ApplicationSubmissionContext表示ResourceManager为应用程序启动ApplicationMaster所需的所有信息,包括:
    //    1、ApplicationId
    //    2、Application用户
    //    3、Application名称
    //    4、Application属性
    //    5、执行ApplicationMaster的容器的ContainerLaunchContext,上面我们已经构建了
    //    6、maxAppAttempts。应用程序尝试的最大次数。它应该不大于YARN配置中最大尝试的全局次数。
    //    7、尝试失败有效性间隔。默认值为-1。当以毫秒为单位的attemptFailuresValidationInterval设置为>0时,故障数将不会将发生在validityInterval之外的故障计入故障计数。如果失败计数达到maxAppAttempts,则应用程序将失败。
    //    8、可选,特定于应用程序的LogAggregationContext(LogAggregationContext表示NodeManager处理应用程序日志所需的所有信息。)
    ApplicationSubmissionContext appContext =
        recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
    appContext.setApplicationId(applicationId);                // ApplicationId
    appContext.setQueue(                                       // Queue name
        jobConf.get(JobContext.QUEUE_NAME,
        YarnConfiguration.DEFAULT_QUEUE_NAME));
    // add reservationID if present
    ReservationId reservationID = null;
    try {
      reservationID =
          ReservationId.parseReservationId(jobConf
              .get(JobContext.RESERVATION_ID));
    } catch (NumberFormatException e) {
      // throw exception as reservationid as is invalid
      String errMsg =
          "Invalid reservationId: " + jobConf.get(JobContext.RESERVATION_ID)
              + " specified for the app: " + applicationId;
      LOG.warn(errMsg);
      throw new IOException(errMsg);
    }
    if (reservationID != null) {
      appContext.setReservationID(reservationID);
      LOG.info("SUBMITTING ApplicationSubmissionContext app:" + applicationId
          + " to queue:" + appContext.getQueue() + " with reservationId:"
          + appContext.getReservationID());
    }
    appContext.setApplicationName(                             // Job name
        jobConf.get(JobContext.JOB_NAME,
        YarnConfiguration.DEFAULT_APPLICATION_NAME));
    appContext.setCancelTokensWhenComplete(
        conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
    appContext.setAMContainerSpec(amContainer);         // AM Container
    appContext.setMaxAppAttempts(
        conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
            MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));

    // Setup the AM ResourceRequests
    List<ResourceRequest> amResourceRequests = generateResourceRequests();
    appContext.setAMContainerResourceRequests(amResourceRequests);

    // set labels for the AM container requests if present
    String amNodelabelExpression = conf.get(MRJobConfig.AM_NODE_LABEL_EXP);
    if (null != amNodelabelExpression
        && amNodelabelExpression.trim().length() != 0) {
      for (ResourceRequest amResourceRequest : amResourceRequests) {
        amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
      }
    }
    // set labels for the Job containers
    appContext.setNodeLabelExpression(jobConf
        .get(JobContext.JOB_NODE_LABEL_EXP));

    appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);
    if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
      appContext.setApplicationTags(new HashSet<>(tagsFromConf));
    }

    String jobPriority = jobConf.get(MRJobConfig.PRIORITY);
    if (jobPriority != null) {
      int iPriority;
      try {
        iPriority = TypeConverter.toYarnApplicationPriority(jobPriority);
      } catch (IllegalArgumentException e) {
        iPriority = Integer.parseInt(jobPriority);
      }
      appContext.setPriority(Priority.newInstance(iPriority));
    }

    return appContext;
  }

1.2、setupAMCommand

private List<String> setupAMCommand(Configuration jobConf) {
    //命令参数长度为固定的8个
    List<String> vargs = new ArrayList<>(8);
    //$JAVA_HOME/bin/java 即java在本地的路径
    vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
        + "/bin/java");

    Path amTmpDir =
        new Path(MRApps.crossPlatformifyMREnv(conf, Environment.PWD),
            YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
    //-Djava.io.tmpdir=容器的临时目录
    vargs.add("-Djava.io.tmpdir=" + amTmpDir);
    MRApps.addLog4jSystemProperties(null, vargs, conf);

    //检查MAP和REDUCE配置中的Java Lib路径使用情况
    warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS, ""),
        "map",
        MRJobConfig.MAP_JAVA_OPTS,
        MRJobConfig.MAP_ENV);
    warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, ""),
        "map",
        MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
        MRJobConfig.MAPRED_ADMIN_USER_ENV);
    warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS, ""),
        "reduce",
        MRJobConfig.REDUCE_JAVA_OPTS,
        MRJobConfig.REDUCE_ENV);
    warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, ""),
        "reduce",
        MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
        MRJobConfig.MAPRED_ADMIN_USER_ENV);

    //在用户命令选择之前添加AM管理命令选项,以便用户可以覆盖它
    String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
        MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
    warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
        MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
    //默认是-Xmx1024m ,用户可以通过yarn.app.mapreduce.am.admin-command-opts设置
    vargs.add(mrAppMasterAdminOptions);

    //添加AM用户命令选项
    String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
        MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
    warnForJavaLibPath(mrAppMasterUserOptions, "app master",
        MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
    默认是-Xmx1024m ,用户可以通过yarn.app.mapreduce.am.command-opts设置
    vargs.add(mrAppMasterUserOptions);

    //默认false,可以通过yarn.app.mapreduce.am.profile设置
    if (jobConf.getBoolean(MRJobConfig.MR_AM_PROFILE,
        MRJobConfig.DEFAULT_MR_AM_PROFILE)) {
      final String profileParams = jobConf.get(MRJobConfig.MR_AM_PROFILE_PARAMS,
          MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);
      if (profileParams != null) {
        //默认是-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=<LOG_DIR>/profile.out
        vargs.add(String.format(profileParams,
            ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR
                + TaskLog.LogName.PROFILE));
      }
    }

    //这里就是设置的启动类org.apache.hadoop.mapreduce.v2.app.MRAppMaster
    vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
    //1><LOG_DIR>/stdout
    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
        Path.SEPARATOR + ApplicationConstants.STDOUT);
    //2><LOG_DIR>/stderr
    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
        Path.SEPARATOR + ApplicationConstants.STDERR);
    return vargs;
  }

最终命令参数如下:

1、$JAVA_HOME/bin/java 
2、org.apache.hadoop.mapreduce.v2.app.MRAppMaster 
3、-Djava.io.tmpdir=容器的临时目录 
4、-Xmx1024m 
5、-Xmx1024m 
-6、agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=<LOG_DIR>/profile.out 
7、1><LOG_DIR>/stdout 
8、2><LOG_DIR>/stderr

1.3、submitJob

上面已经构建好了ApplicationSubmissionContext,下面可以提交了

public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
  throws IOException, InterruptedException {
   
    //......省略......

    ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);

    //向ResourceManager提交ApplicationSubmissionContext 
      ApplicationId applicationId =
          resMgrDelegate.submitApplication(appContext);

    //......省略......

  }

2、YarnClient

resMgrDelegate是YarnClient的子类,最终也是通过调用YarnClient.submitApplication()提交到Yarn

该方法注释如下:

向YARN提交新申请,这是一个阻塞调用-在提交的应用程序成功提交并被ResourceManager接受之前,它不会返回ApplicationId。

用户在提交新应用程序时应提供ApplicationId作为参数ApplicationSubmissionContext的一部分,否则将引发ApplicationIdNotProvideredException。

这在内部调用ApplicationClientProtocol.submitApplication(SubmitApplicationRequest),之后,它在内部调用ApplicationClientProtocol.getApplicationReport(GetApplicationReportRequest)并等待,直到它可以确保应用程序正确提交为止。如果在ResourceManager保存应用程序的状态之前RM发生故障转移或RM重新启动,ApplicationClientProtocol.getApplicationReport(GetApplicationReportRequest)将抛出ApplicationNotFoundException。当此API捕获到ApplicationNotFoundException时,它会自动重新提交具有相同ApplicationSubmissionContext的应用程序。

YarnClient.submitApplication() 由子类实现,既:YarnClientImpl.submitApplication()

YarnClientImpl

public ApplicationId
      submitApplication(ApplicationSubmissionContext appContext)
          throws YarnException, IOException {
    ApplicationId applicationId = appContext.getApplicationId();
    if (applicationId == null) {
      throw new ApplicationIdNotProvidedException(
          "ApplicationId is not provided in ApplicationSubmissionContext");
    }
    //构建SubmitApplicationRequest
    //客户端发送的请求向ResourceManager提交应用程序
    //该请求通过ApplicationSubmissionContext包含队列、运行ApplicationMaster所需的资源等详细信息,相当于启动ApplicationMaster的ContainerLaunchContext等
    SubmitApplicationRequest request =
        Records.newRecord(SubmitApplicationRequest.class);
    request.setApplicationSubmissionContext(appContext);

    //自动将时间线DT添加到CLC中仅当安全和时间线服务都启用时
    if (isSecurityEnabled() && timelineV1ServiceEnabled) {
      addTimelineDelegationToken(appContext.getAMContainerSpec());
    }

    //TODO: YARN-1763:Handle RM failovers during the submitApplication call.
    //在submitApplication调用期间处理RM故障切换。
    //这里会调用ApplicationClientProtocol.submitApplication()
    rmClient.submitApplication(request);

    int pollCount = 0;
    long startTime = System.currentTimeMillis();
    EnumSet<YarnApplicationState> waitingStates = 
                                 EnumSet.of(YarnApplicationState.NEW,
                                 YarnApplicationState.NEW_SAVING,
                                 YarnApplicationState.SUBMITTED);
    EnumSet<YarnApplicationState> failToSubmitStates = 
                                  EnumSet.of(YarnApplicationState.FAILED,
                                  YarnApplicationState.KILLED);		
    while (true) {
      try {
        ApplicationReport appReport = getApplicationReport(applicationId);
        YarnApplicationState state = appReport.getYarnApplicationState();
        //一直监控应用状态,直到状态不再是NEW、NEW_SAVING、SUBMITTED中的一个
        if (!waitingStates.contains(state)) {
          if(failToSubmitStates.contains(state)) {
            throw new YarnException("Failed to submit " + applicationId + 
                " to YARN : " + appReport.getDiagnostics());
          }
          LOG.info("Submitted application " + applicationId);
          break;
        }

        //......省略......
    }

    return applicationId;
  }

3、ApplicationClientProtocol

ApplicationClientProtocol是客户端和ResourceManager之间的协议,用于提交/中止作业以及获取有关应用程序、集群指标、节点、队列和ACL的信息。

submitApplication()注释如下:

客户端用于向ResourceManager提交新应用程序的接口

客户端需要通过SubmitApplicationRequest提供运行ApplicationMaster所需的队列、资源等详细信息,相当于ContainerLaunchContext,用于启动ApplicationMaster等

当前,ResourceManager在接受提交时立即(空)发送SubmitApplicationResponse,如果拒绝提交则抛出异常。但是,此调用之后需要执行getApplicationReport(GetApplicationReportRequest)以确保正确提交应用程序-从ResourceManager获得SubmitApplicationResponse并不保证RM在故障转移或重新启动之后“记住”此应用程序。如果在ResourceManager成功保存应用程序的状态之前发生RM故障切换或RM重新启动,则随后的getApplicationReport(GetApplicationReportReques)将抛出ApplicationNotFoundException。当客户端在(getApplicationReport(GetApplicationReportRequest)调用中遇到ApplicationNotFoundException时,客户端需要使用相同的ApplicationSubmissionContext重新提交应用程序。

在提交过程中,它会检查应用程序是否已经存在。如果应用程序存在,它将简单地返回SubmitApplicationResponse

在安全模式下,ResourceManager在接受应用程序提交之前验证对队列等的访问权限。

该方法最终会调用其子类ClientRMService.submitApplication()

4、ClientRMService

该类是资源管理器的客户端接口。该模块处理从客户端到资源管理器的所有rpc接口。

public SubmitApplicationResponse submitApplication(
      SubmitApplicationRequest request) throws YarnException, IOException {
    
    //......省略......
    
    //检查应用程序是否已经放入rmContext,如果是,只需返回响应
    if (rmContext.getRMApps().get(applicationId) != null) {
      LOG.info("This is an earlier submitted application: " + applicationId);
      return SubmitApplicationResponse.newInstance();
    }

    //......省略......

      //调用RMAppManager直接提交申请,我们接着看rmAppManager中的实现
      rmAppManager.submitApplication(submissionContext,
          System.currentTimeMillis(), userUgi);

    //......省略......

    return recordFactory
        .newRecordInstance(SubmitApplicationResponse.class);
  }

5、RMAppManager

该类管理资源管理器的应用程序列表。

protected void submitApplication(
      ApplicationSubmissionContext submissionContext, long submitTime,
      UserGroupInformation userUgi) throws YarnException {
    ApplicationId applicationId = submissionContext.getApplicationId();

    //将开始时间传递为-1。它最终将在RMAppImpl构造函数中设置
    RMAppImpl application = createAndPopulateNewRMApp(
        submissionContext, submitTime, userUgi, false, -1, null);
    try {
      //确定UserGroupInformation是使用Kerberos来确定用户身份,还是依赖于简单身份验证
      if (UserGroupInformation.isSecurityEnabled()) {
        this.rmContext.getDelegationTokenRenewer()
            .addApplicationAsync(applicationId,
                BuilderUtils.parseCredentials(submissionContext),
                submissionContext.getCancelTokensWhenComplete(),
                application.getUser(),
                BuilderUtils.parseTokensConf(submissionContext));
      } else {
        //调度程序此时尚未启动,因此应确保在调度程序启动时首先处理这些排队的START事件。
        //下面我们看下这个事件(RMAppEventType.START)是怎么处理的
        this.rmContext.getDispatcher().getEventHandler()
            .handle(new RMAppEvent(applicationId, RMAppEventType.START));
      }
    } catch (Exception e) {
      LOG.warn("Unable to parse credentials for " + applicationId, e);
      // 发送APP_REJECTED是可以的,因为我们假设RMApp处于NEW状态,因此我们还没有通知调度器应用程序的存在
      this.rmContext.getDispatcher().getEventHandler()
          .handle(new RMAppEvent(applicationId,
              RMAppEventType.APP_REJECTED, e.getMessage()));
      throw RPCUtil.getRemoteException(e);
    }
  }

RMAppEventType.START 是由RMAppImpl处理

6、RMAppImpl

private static final StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent> stateMachineFactory
                               = new StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent>(RMAppState.NEW)


     // Transitions from NEW state
    
    //......以上事件省略......

    //我们重点看这个事件的处理,既:RMAppNewlySavingTransition
    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())
    

    //......以下事件省略......

    // Transitions from NEW_SAVING state
    
    //第9步会用到
    .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
        RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
    

     // Transitions from SUBMITTED state
    
    //第10步会用到
    .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
        RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())

     // Transitions from ACCEPTED state
    

     // Transitions from RUNNING state
    

     // Transitions from FINAL_SAVING state
    

     // Transitions from FINISHING state
    

     // Transitions from KILLING state
    

     // Transitions from FINISHED state
     // ignorable transitions
   

     // Transitions from FAILED state
     // ignorable transitions
    

     // Transitions from KILLED state
     // ignorable transitions
    

     .installTopology();

7、RMAppNewlySavingTransition

private static final class RMAppNewlySavingTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {

      long applicationLifetime =
          app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME);
      applicationLifetime = app.scheduler
          .checkAndGetApplicationLifetime(app.queue, applicationLifetime);
      //根据配置的队列生存期,验证提交的应用程序生存期是否有效。
      if (applicationLifetime > 0) {
        // calculate next timeout value
        Long newTimeout =
            Long.valueOf(app.submitTime + (applicationLifetime * 1000));
        app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
            ApplicationTimeoutType.LIFETIME, newTimeout);

        //使用新的绝对值更新applicationTimeouts
        app.applicationTimeouts.put(ApplicationTimeoutType.LIFETIME,
            newTimeout);

        LOG.info("Application " + app.applicationId
            + " is registered for timeout monitor, type="
            + ApplicationTimeoutType.LIFETIME + " value=" + applicationLifetime
            + " seconds");
      }

      //如果启用了恢复,则将应用程序信息存储在非阻塞调用中,以确保RM已存储了在RM重新启动后重新启动AM所需的信息,而无需进一步的客户端通信
      //这是一个新任务,我们看条线
      //非阻塞API资源管理器服务使用此来存储应用程序的状态此不阻塞调度程序线程RMAppStoredEvent将在完成时发送以通知RMApp
      LOG.info("Storing application with id " + app.applicationId);
      app.rmContext.getStateStore().storeNewApplication(app);
    }
  }

8、RMStateStore

public void storeNewApplication(RMApp app) {
    ApplicationSubmissionContext context = app
                                            .getApplicationSubmissionContext();
    assert context instanceof ApplicationSubmissionContextPBImpl;
    ApplicationStateData appState =
        ApplicationStateData.newInstance(app.getSubmitTime(),
            app.getStartTime(), context, app.getUser(), app.getRealUser(),
            app.getCallerContext());
    appState.setApplicationTimeouts(app.getApplicationTimeouts());
    //这里会设置一个状态:RMStateStoreEventType.STORE_APP,接下来我们看看这个事件怎么处理
    getRMStateStoreEventHandler().handle(new RMStateStoreAppEvent(appState));
  }
private static final StateMachineFactory<RMStateStore,
                                           RMStateStoreState,
                                           RMStateStoreEventType, 
                                           RMStateStoreEvent>
      stateMachineFactory = new StateMachineFactory<RMStateStore,
                                                    RMStateStoreState,
                                                    RMStateStoreEventType,
                                                    RMStateStoreEvent>(
      RMStateStoreState.ACTIVE)
      .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.STORE_APP, new StoreAppTransition())
      
      //......省略......

    );
private static class StoreAppTransition
      implements MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {

      //......省略......
        store.storeApplicationStateInternal(appId, appState);
        //接下来我们看看这个事件的处理RMAppEventType.APP_NEW_SAVED)
        store.notifyApplication(
            new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED));
      //......省略......

      return finalState(isFenced);
    };

  }

9、再回RMAppImpl

第6步已经有关相关代码,我们就不贴了,处理RMAppEventType.APP_NEW_SAVED事件调用的是new AddApplicationToSchedulerTransition()

private static final class AddApplicationToSchedulerTransition extends
      RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      //里面new了一个新事件(SchedulerEventType.APP_ADDED)
      app.handler.handle(
          new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
              app.applicationPriority, app.placementContext));
      // 发送ATS创建事件
      app.sendATSCreateEvent();
    }
  }

SchedulerEventType.APP_ADDED事件会被调度器处理(FifoScheduler、CapacityScheduler、FairScheduler)

我们先看FairScheduler中的实现

10、FairScheduler

public void handle(SchedulerEvent event) {
    switch (event.getType()) {
    case NODE_ADDED:
      
    case NODE_REMOVED:
      
    case NODE_UPDATE:
      
    //......省略以上......

    case APP_ADDED:
      if (!(event instanceof AppAddedSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
      String queueName =
          resolveReservationQueueName(appAddedEvent.getQueue(),
              appAddedEvent.getApplicationId(),
              appAddedEvent.getReservationID(),
              appAddedEvent.getIsAppRecovering());
      if (queueName != null) {
        //向调度程序中添加一个新的应用程序,该应用程序具有给定的id、队列名称和用户。
        //即使用户或队列超过配置的限制,这也会接受新的应用程序,但该应用程序不会被标记为可运行。会生成一个RMAppEventType.APP_ACCEPTED事件,接下来我们看看这个事件的处理
        addApplication(appAddedEvent.getApplicationId(),
            queueName, appAddedEvent.getUser(),
            appAddedEvent.getIsAppRecovering());
      }
      break;

    //......省略以下......

    case APP_REMOVED:
      
    case NODE_RESOURCE_UPDATE:
      
    case APP_ATTEMPT_ADDED:
    //第12步会用到
    if (!(event instanceof AppAttemptAddedSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
          (AppAttemptAddedSchedulerEvent) event;
      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
        appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
        appAttemptAddedEvent.getIsAttemptRecovering());
      break;
      
    case APP_ATTEMPT_REMOVED:
      
    case RELEASE_CONTAINER:
      
    case CONTAINER_EXPIRED:
      
    default:
      LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
    }
  }

RMAppEventType.APP_ACCEPTE 的处理还是在RMAppImpl

11、再回RMAppImpl

第6步已经有关相关代码,我们就不贴了,处理RMAppEventType.APP_ACCEPTE 事件调用的是new StartAppAttemptTransition()

private static final class StartAppAttemptTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.createAndStartNewAttempt(false);
    };
  }

    |
    |
   \ /

private void
      createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
    //创建一个新的Attempt
    createNewAttempt();
    //这里会生成一个新事件RMAppAttemptEventType.START
    handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
      transferStateFromPreviousAttempt));
  }

RMAppAttemptEventType.START事件由RMAppAttemptImpl进行处理

12、RMAppAttemptImpl

private static final StateMachineFactory<RMAppAttemptImpl,
                                           RMAppAttemptState,
                                           RMAppAttemptEventType,
                                           RMAppAttemptEvent>
       stateMachineFactory  = new StateMachineFactory<RMAppAttemptImpl,
                                            RMAppAttemptState,
                                            RMAppAttemptEventType,
                                     RMAppAttemptEvent>(RMAppAttemptState.NEW)

       // Transitions from NEW State
      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
          RMAppAttemptEventType.START, new AttemptStartedTransition())
      
      //......省略以下......
          
      // Transitions from SUBMITTED state
      
          
       // Transitions from SCHEDULED State
       //第13步会用到
       .addTransition(RMAppAttemptState.SUBMITTED, 
          EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
                     RMAppAttemptState.SCHEDULED),
          RMAppAttemptEventType.ATTEMPT_ADDED,
          new ScheduleTransition())
      
       // Transitions from SUBMITTED state
       //第16步会用到
       .addTransition(RMAppAttemptState.SUBMITTED, 
          EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
                     RMAppAttemptState.SCHEDULED),
          RMAppAttemptEventType.ATTEMPT_ADDED,
          new ScheduleTransition())

       // Transitions from ALLOCATED_SAVING State
       //第16步会用到
       .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
          RMAppAttemptState.ALLOCATED,
          RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
      

       // Transitions from LAUNCHED_UNMANAGED_SAVING State
     

       // Transitions from ALLOCATED State
     

       // Transitions from LAUNCHED State
      

       // Transitions from RUNNING State
      

       // Transitions from FINAL_SAVING State
      

      // Transitions from FAILED State
      

      // Transitions from FINISHING State
      

      // Transitions from FINISHED State
      

      // Transitions from KILLED State
      
    .installTopology();

RMAppAttemptEventType.START事件由AttemptStartedTransition()处理

private static final class AttemptStartedTransition extends BaseTransition {
	@Override
    public void transition(RMAppAttemptImpl appAttempt,
        RMAppAttemptEvent event) {
      
      //......省略......

      //将应用程序尝试添加到计划程序,并通知计划程序是否从上次尝试转移状态。
      //生成新的事件SchedulerEventType.APP_ATTEMPT_ADDED
      appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
        appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));
    }
  }

13、再回FairScheduler

第10步FairScheduler中有对SchedulerEventType.APP_ATTEMPT_ADDED事件的处理,这里就不贴代码了

protected void addApplicationAttempt(
      ApplicationAttemptId applicationAttemptId,
      boolean transferStateFromPreviousAttempt,
      boolean isAttemptRecovering) {
    writeLock.lock();
    
        //生成一个新的事件RMAppAttemptEventType.ATTEMPT_ADDED
        rmContext.getDispatcher().getEventHandler().handle(
            new RMAppAttemptEvent(applicationAttemptId,
                RMAppAttemptEventType.ATTEMPT_ADDED));
        
  }

14、再回RMAppAttemptImpl

第12步已经有关相关代码,我们就不贴了,处理RMAppAttemptEventType.ATTEMPT_ADDED

事件调用的是new ScheduleTransition()

 public static final class ScheduleTransition
      implements
      MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
    @Override
    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
        RMAppAttemptEvent event) {

       //......省略......

        //提交时已检查AM资源
        //这里会通过YarnScheduler申请容器
        Allocation amContainerAllocation =
            appAttempt.scheduler.allocate(
                appAttempt.applicationAttemptId,
                appAttempt.amReqs, null, EMPTY_CONTAINER_RELEASE_LIST,
                amBlacklist.getBlacklistAdditions(),
                amBlacklist.getBlacklistRemovals(),
                new ContainerUpdates());
        if (amContainerAllocation != null
            && amContainerAllocation.getContainers() != null) {
          assert (amContainerAllocation.getContainers().size() == 0);
        }
        appAttempt.scheduledTime = System.currentTimeMillis();
        //现在状态是RMAppAttemptState.SCHEDULED
        return RMAppAttemptState.SCHEDULED;

      //......省略......

    }
  }

YarnScheduler.allocate(),最终会调用三种调度器中的方法进行实现,我们还是先只看FairScheduler

15、再回FairScheduler

public Allocation allocate(ApplicationAttemptId appAttemptId,
      List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests,
      List<ContainerId> release, List<String> blacklistAdditions,
      List<String> blacklistRemovals, ContainerUpdates updateRequests) {
    //请确保此应用程序存在
    FSAppAttempt application = getSchedulerApp(appAttemptId);
    if (application == null) {
      LOG.error("Calling allocate on removed or non existent application " +
          appAttemptId.getApplicationId());
      return EMPTY_ALLOCATION;
    }

    //分配可能是上一次尝试的剩余部分,它将影响当前尝试,例如混淆当前尝试的AM容器的请求和分配。
    //请注意,尝试id的外部先决条件检查在这里可能已经过时,因此有必要在这里进行双重检查。
    if (!application.getApplicationAttemptId().equals(appAttemptId)) {
      LOG.error("Calling allocate on previous or removed " +
          "or non existent application attempt " + appAttemptId);
      return EMPTY_ALLOCATION;
    }

    ApplicationId applicationId = application.getApplicationId();
    FSLeafQueue queue = application.getQueue();
    List<MaxResourceValidationResult> invalidAsks =
            validateResourceRequests(ask, queue);

    //如果检测到任何无效的请求,我们需要在这里快速失败。如果我们稍后抛出异常,这可能会有问题,        
    //因为令牌和升级/降级的容器会丢失,因为调度程序会立即清除它们,AM不会获得这些信息。
    if (!invalidAsks.isEmpty()) {
      throw new SchedulerInvalidResoureRequestException(String.format(
              "Resource request is invalid for application %s because queue %s "
                      + "has 0 amount of resource for a resource type! "
                      + "Validation result: %s",
              applicationId, queue.getName(), invalidAsks));
    }

    //处理晋升和降级
    handleContainerUpdates(application, updateRequests);

    //健全性检查
    normalizeResourceRequests(ask, queue.getName());

    // TODO, 正常计划请求

    //记录容器分配开始时间
    application.recordContainerRequestTime(getClock().getTime());

    //释放容器
    releaseContainers(release, application);

    ReentrantReadWriteLock.WriteLock lock = application.getWriteLock();
    lock.lock();
    try {
      if (!ask.isEmpty()) {
        if (LOG.isDebugEnabled()) {
          LOG.debug(
              "allocate: pre-update" + " applicationAttemptId=" + appAttemptId
                  + " application=" + application.getApplicationId());
        }
        application.showRequests();

        //更新应用程序请求
        application.updateResourceRequests(ask);

        // TODO, 处理SchedulingRequest
        application.showRequests();
      }
    } finally {
      lock.unlock();
    }

    Set<ContainerId> preemptionContainerIds =
        application.getPreemptionContainerIds();
    if (LOG.isDebugEnabled()) {
      LOG.debug(
          "allocate: post-update" + " applicationAttemptId=" + appAttemptId
              + " #ask=" + ask.size() + " reservation= " + application
              .getCurrentReservation());

      LOG.debug("Preempting " + preemptionContainerIds.size()
          + " container(s)");
    }

    application.updateBlacklist(blacklistAdditions, blacklistRemovals);

    List<Container> newlyAllocatedContainers =
        application.pullNewlyAllocatedContainers();
    //记录容器分配时间
    if (!(newlyAllocatedContainers.isEmpty())) {
      application.recordContainerAllocationTime(getClock().getTime());
    }

    //净空取决于集群中的资源、队列的当前使用情况、队列的公平共享和队列的最大资源。
    Resource headroom = application.getHeadroom();
    application.setApplicationHeadroomForMetrics(headroom);

    //当AM心跳时调用。AM注册后,RM回收了这些集装箱
    //它们在AllocateResponse.containersFromPreviousAttempts()中报告给AM
    List<Container> previousAttemptContainers = application
        .pullPreviousAttemptContainers();
    //NMToken用于验证与NodeManager的通信
    //ApplicationMaster与ResourceManager协商资源时,ResourceMananger发出,NodeManager端验证
    List<NMToken> updatedNMTokens = application.pullUpdatedNMTokens();
    //制作分配单元
    return new Allocation(newlyAllocatedContainers, headroom,
        preemptionContainerIds, null, null,
        updatedNMTokens, null, null,
        application.pullNewlyPromotedContainers(),
        application.pullNewlyDemotedContainers(),
        previousAttemptContainers);
  }

分配单元构建成功后,我们看下RMAppAttemptState.SCHEDULED事件的处理,

16、再回RMAppAttemptImpl

第12步已经有关相关代码,我们就不贴了,处理RMAppAttemptState.SCHEDULED

事件调用的是new AMContainerAllocatedTransition()

private static final class AMContainerAllocatedTransition
      implements
      MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
    @Override
    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
        RMAppAttemptEvent event) {
      //从调度程序获取AM容器。
      Allocation amContainerAllocation =
          appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
            EMPTY_CONTAINER_REQUEST_LIST, null, EMPTY_CONTAINER_RELEASE_LIST, null,
            null, new ContainerUpdates());
      // 必须至少分配一个容器,因为container_allocated是在构造RMContainer并将其放入SchedulerApplication.newlyAllocatedContainers()之后发出的。

      //请注意,YarnScheduler.allocate()不能保证能够提取它,因为容器可能由于某些原因而无法提取,例如DNS不可用导致无法生成容器令牌。因此,我们返回到以前的状态并保持重试,直到提取到容器为止
      if (amContainerAllocation.getContainers().size() == 0) {
        appAttempt.retryFetchingAMContainer(appAttempt);
        return RMAppAttemptState.SCHEDULED;
      }

      //设置主容器
      Container amContainer = amContainerAllocation.getContainers().get(0);
      RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler
          .getRMContainer(amContainer.getId());
      //当删除一个NM时,调度器将清理容器,下面的container_FINISHED事件将处理已清理的容器。所以只需返回RMAppAttemptState。已排定
      if (rmMasterContainer == null) {
        return RMAppAttemptState.SCHEDULED;
      }
      //为尝试应用分配AM容器
      appAttempt.setMasterContainer(amContainer);
      rmMasterContainer.setAMContainer(true);
      //NMTokenSecrentManager中设置的节点用于标记此节点的NMToken是否已颁发给AM。
      //当AM容器分配给RM本身时,分配此AM容器的节点被标记为已发送的NMToken。
      //因此,清除此节点集,以便来自AM的以下分配请求能够检索相应的NMToken。
      appAttempt.rmContext.getNMTokenSecretManager()
        .clearNodeSetForAttempt(appAttempt.applicationAttemptId);
      appAttempt.getSubmissionContext().setResource(
        appAttempt.getMasterContainer().getResource());
      appAttempt.containerAllocatedTime = System.currentTimeMillis();
      long allocationDelay =
          appAttempt.containerAllocatedTime - appAttempt.scheduledTime;
      ClusterMetrics.getMetrics().addAMContainerAllocationDelay(
          allocationDelay);
      appAttempt.storeAttempt();
      //此时处理这个事件
      return RMAppAttemptState.ALLOCATED_SAVING;
    }
  }

第12步已经有关相关代码,处理RMAppAttemptState.ALLOCATED_SAVING

事件调用的是new AttemptStoredTransition()

private static final class AttemptStoredTransition extends BaseTransition {
    @Override
    public void transition(RMAppAttemptImpl appAttempt,
                                                    RMAppAttemptEvent event) {
      //将ClientTokenMasterKey保存到存储中后进行注册,否则RM重新启动后客户端可能会持有无效的ClientToken。
      appAttempt.registerClientToken();
      
      appAttempt.launchAttempt();
    }
  }

private void launchAttempt(){
    launchAMStartTime = System.currentTimeMillis();
    //发送事件以启动AM容器AMLauncherEventType.LAUNCH
    eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));
  }

五、流程总结

1、构建启动MR AM所需的所有信息(LocalResource、ContainerLaunchContext、启动命令
、ApplicationSubmissionContext等)

2、构建SubmitApplicationRequest(客户端需要通过SubmitApplicationRequest提供运行ApplicationMaster所需的队列、资源等详细信息,相当于ContainerLaunchContext,用于启动ApplicationMaster等)

3、YarnClient通过ApplicationClientProtocol提交SubmitApplicationRequest (ApplicationClientProtocol是客户端和ResourceManager之间的协议,用于提交/中止作业以及获取有关应用程序、集群指标、节点、队列和ACL的信息。)

4、转到ClientRMService提交

5、转到RMAppManager提交

6、创建RMAppEventType.START事件并处理

7、创建RMStateStoreEventType.STORE_APP事件并处理

8、创建RMAppEventType.APP_NEW_SAVED事件并被调度器处理

9、创建RMAppEventType.APP_ACCEPTE事件并处理

10、创建RMAppAttemptEventType.START事件并处理

11、创建SchedulerEventType.APP_ATTEMPT_ADDED事件并被调度器处理

12、创建RMAppAttemptState.SCHEDULED事件并处理

13、创建RMAppAttemptState.ALLOCATED_SAVING事件并处理

14、创建AMLauncherEventType.LAUNCH事件启动ApplicationMaster容器

15、使用命令启动MRAppMaster

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/351019.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

首发:2024全球DAO组织发展研究

作者&#xff0c;张群&#xff08;专注DAO及区块链应用研究&#xff0c;赛联区块链教育首席讲师&#xff0c;工信部赛迪特邀资深专家&#xff0c;CSDN认证业界专家&#xff0c;微软认证专家&#xff0c;多家企业区块链产品顾问&#xff09; DAO&#xff08;去中心化自治组织&am…

adb测试冷启动和热启动 Permission Denial解决

先清理日志 adb shell logcat -c 打开手机模拟器中的去哪儿网&#xff0c;然后日志找到包名和MainActivity adb shell logcat |grep Main com.Qunar/com.mqunar.atom.alexhome.ui.activity.MainActivity 把手机模拟器的去哪儿的进程给杀掉 执行 命令 adb shell am start -W…

TensorFlow2实战-系列教程1:回归问题预测

&#x1f9e1;&#x1f49b;&#x1f49a;TensorFlow2实战-系列教程 总目录 有任何问题欢迎在下面留言 本篇文章的代码运行界面均在Jupyter Notebook中进行 本篇文章配套的代码资源已经上传 1、环境测试 import tensorflow as tf import numpy as np tf.__version__打印结果 ‘…

深入理解Redis:如何设置缓存数据的过期时间及其背后的机制

目录 Redis 给缓存数据设置过期时间 Redis是如何判断数据是否过期的呢&#xff1f; 过期的数据的删除策略 Redis 内存淘汰机制 Redis 给缓存数据设置过期时间 一般情况下&#xff0c;我们设置保存的缓存数据的时候都会设置一个过期时间。为什么呢&#xff1f; 因为内存是有…

4小时精通MyBatisPlus框架

目录 1.介绍 2.快速入门 2.1.环境准备 2.2.快速开始 2.2.1引入依赖 2.2.2.定义Mapper ​编辑 2.2.3.测试 2.3.常见注解 ​编辑 2.3.1.TableName 2.3.2.TableId 2.3.3.TableField 2.4.常见配置 3.核心功能 3.1.条件构造器 3.1.1.QueryWrapper 3.1.2.UpdateWra…

Redis(八)哨兵机制(sentinel)

文章目录 哨兵机制案例认识异常 哨兵运行流程及选举原理主观下线(Subjectively Down)ODown客观下线(Objectively Down)选举出领导者哨兵选出新master过程 哨兵使用建议 哨兵机制 吹哨人巡查监控后台master主机是否故障&#xff0c;如果故障了根据投票数自动将某一个从库转换为新…

Java 基础知识-File类

大家好我是苏麟 , 今天聊聊File . 资料来自黑马程序员 File类 java.io.File 类是文件和目录路径名的抽象表示&#xff0c;主要用于文件和目录的创建、查找和删除等操作。 构造方法 public File(String pathname) &#xff1a;通过将给定的路径名字符串转换为抽象路径名来创建…

盘古信息IMS OS 数垒制造操作系统+ 产品及生态部正式营运

启新址吉祥如意&#xff0c;登高楼再谱新篇。2024年1月22日&#xff0c;广东盘古信息科技股份有限公司新办公楼层正式投入使用并举行了揭牌仪式&#xff0c;以崭新的面貌、奋进的姿态开启全新篇章。 盘古信息总部位于东莞市南信产业园&#xff0c;现根据公司战略发展需求、赋能…

【双目】基于findChessboardCorners的双目精度评估,可以直接使用

1. 基于findChessboardCorners的双目精度评估 原理&#xff1a; 代码&#xff1a; #include <iostream> #include <opencv2/opencv.hpp>using namespace std; using namespace cv;int main() {// 加载图像auto srcimage imread("/home/oem/data/steroe_p…

Hadoop增加新节点环境配置(自用)

完成Hadoop集群增添一个新的节点配置&#xff08;文中命名为&#xff09;Hadoop106&#xff0c;没有进行继续为该节点分配身份职能的步骤 1.在VMware中安装CentOS 7 新建虚拟机 1.⾸先我们创建⼀个新的虚拟机&#xff0c;也可以点⽂件-新建虚拟机。 2.选择⾃定义&#xff0c…

网页元素圈选

从前面我们已验证配置自动化是可行的&#xff0c;接下来就实现元素选择&#xff0c;当然有了配置化&#xff0c;我们也是可以通过浏览器F12的调试工具去把元素xpath复制出来&#xff08;ps:反正又不是不能用&#xff09;&#xff0c;但是这不是我们最终目的。 其实圈选效果如下…

CSS3如何实现从右往左布局的按钮组(固定间距)

可以通过下方CSS实现&#xff0c;下面的CSS表示按钮从右往左布局&#xff0c;且间距为10px: .right-btn {position: relative;float: right;margin-right: 10px; }类似这种&#xff1a; 这种&#xff1a; 注意&#xff1a; 不能使用right:10px代替margin-right:10px&#x…

聚醚醚酮(Polyether Ether Ketone)PEEK主要应用于哪些行业领域?

聚醚醚酮&#xff08;Polyether Ether Ketone&#xff0c;PEEK&#xff09;广泛应用于以下行业&#xff1a; 1.航空航天业&#xff1a; PEEK常被用于制造航空航天组件&#xff0c;如飞机零部件、航天器构件&#xff0c;因其轻量化、高强度和耐高温性能。 2.汽车工业&#xff1…

滑木块H5小游戏

欢迎来到程序小院 滑木块 玩法&#xff1a;点击木块横着的只能左右移动&#xff0c;竖着的只能上下移动&#xff0c; 移动到箭头的位置即过关&#xff0c;不同关卡不同的木块摆放&#xff0c;快去滑木块吧^^。开始游戏https://www.ormcc.com/play/gameStart/260 html <can…

Django项目搭建

一、创建项目 在命令行中执行代码 $ django-admin startproject mysitedjango-admin 为内部命令startproject 为参数mysite 项目名 备注 避免使用 Python 或 Django 的内部保留字来命名项目。比如说&#xff0c;避免使用像 django (会和 Django 自己产生冲突)或 test (会和 P…

深入浅出 diffusion(3):pytorch 实现 diffusion 中的 U-Net

导入python包 import mathimport torch import torch.nn as nn import torch.nn.functional as F silu激活函数 class SiLU(nn.Module): # SiLU激活函数staticmethoddef forward(x):return x * torch.sigmoid(x) 归一化设置 def get_norm(norm, num_channels, num_groups)…

Java集合-Map接口(key-value)

Map接口的特点&#xff1a;①KV键值对方式存储②Key键唯一&#xff0c;Value允许重复③无序。 Map有四个实现类&#xff1a;1.HashMap类2.LinkedHashMap类3.TreeMap类4.Hashtable类 1.HashMap类&#xff1a; 存储结构&#xff1a;哈希表 数组Node[ ] 链表&#xff08;红黑…

暴力破解

暴力破解工具使用汇总 1.查看密码加密方式 在线网站&#xff1a;https://cmd5.com/ http://www.158566.com/ https://encode.chahuo.com/kali&#xff1a;hash-identifier2.hydra 用于各种服务的账号密码爆破&#xff1a;FTP/Mysql/SSH/RDP...常用参数 -l name 指定破解登录…

BUUCTF--xor1

这题考察的是亦或。查壳&#xff1a; 无壳。看下IDA的流程&#xff1a; 我们看到将用户输入做一个异或操作&#xff0c;然后和一个变量做比较。如果相同则输出Success。这里的知识点就是两次异或会输出原文。因此我们只需要把global再做一次异或就能解出flag。在IDA中按住shift…

Excel 2019 for Mac/Win:商务数据分析与处理的终极工具

在当今快节奏的商业环境中&#xff0c;数据分析已经成为一项至关重要的技能。从市场趋势预测到财务报告&#xff0c;再到项目管理&#xff0c;数据无处不在。而作为数据分析的基石&#xff0c;Microsoft Excel 2019 for Mac/Win正是一个强大的工具&#xff0c;帮助用户高效地处…