Apache SeaTunnel Zeta引擎源码解析(三) Server端接收任务的执行流程

作者:刘乃杰

编辑整理:曾辉

引入

本系列文章是基于 Apache SeaTunnel 2.3.6版本,围绕Zeta引擎给大家介绍其任务是如何从提交到运行的全流程,希望通过这篇文档,对刚刚上手SeaTunnel的朋友提供一些帮助。

file

我们整体的文章将会分成三篇,从以下方向给大家介绍:

  1. SeaTunnel Server端的初始化
  2. Client端的任务提交流程

3. Server端的接收到任务的执行流程

由于涉及源码解析,涉及篇幅较大,所以分成系列文章来记录下一个任务的整体流程。

参考

  • [ST-Engine][Design] The Design of LogicalPlan to PhysicalPlan:https://github.com/apache/seatunnel/issues/2269

作者介绍

大家好,我是刘乃杰,一名大数据开发工程师,参与Apache SeaTunnel的开发也有一年多的时间了,不仅给SeaTunnel提交了一些PR,而且添加的一些功能也非常有意思,欢迎大家来找我交流,其中包括支持Avro格式文件,SQL Transform中支持嵌套结构查询,给节点添加Tag达到资源隔离等。

接之前的文章: Apache SeaTunnel Zeta引擎源码解析(一) Server端的初始化 Apache SeaTunnel Zeta引擎源码解析(二) Client端的任务提交流程

服务端提交任务相关

这篇是系列文章的最后一篇,我们再回顾一下当服务端启动后会执行的组件:

  • coordinatorService 仅在master/standby节点启用,会监听集群状态,主备切换
  • SlotService 在worker节点中启用,会定期上报自身信息到master中
  • TaskExecutionSerive 在worker节点中启用,会定时更新执行的任务指标到IMAP中

在集群未接收到任何任务时,会运行这些组件,当Client发送一条SeaTunnelSubmitJobCodec信息到服务端后,服务端又是如何处理的呢?

接收消息

因为客户端与服务端在不同的机器上,所有这里无法使用方法调用,而是使用了消息传递,当服务端接收到一条消息后是如何进行相关的方法调用的呢 首先我们在上面的代码中,知道客户端向服务端发送的是一条类型为SeaTunnelSubmitJobCodec的消息

// 客户端相关代码
ClientMessage request =
        SeaTunnelSubmitJobCodec.encodeRequest(
                jobImmutableInformation.getJobId(),
                seaTunnelHazelcastClient
                        .getSerializationService()
                        .toData(jobImmutableInformation),
                jobImmutableInformation.isStartWithSavePoint());
PassiveCompletableFuture<Void> submitJobFuture =
        seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(request);

我们进入SeaTunnelSubmitJobCodec这个类,查看他的相关调用类,可以找到一个SeaTunnelMessageTaskFactoryProvider的类,在这个里中维护了一个消息类型到MessageTask的映射关系,也可以理解为客户端消息到服务端调用类的映射关系,以SeaTunnelSubmitJobCodec为例,会返回SubmitJobTask这个类

private final Int2ObjectHashMap<MessageTaskFactory> factories = new Int2ObjectHashMap<>(60);

private void initFactories() {
    factories.put(
            SeaTunnelPrintMessageCodec.REQUEST_MESSAGE_TYPE,
            (clientMessage, connection) ->
                    new PrintMessageTask(clientMessage, node, connection));
    factories.put(
            SeaTunnelSubmitJobCodec.REQUEST_MESSAGE_TYPE,
            (clientMessage, connection) -> new SubmitJobTask(clientMessage, node, connection));
    .....
}

当我们查看SubmitJobTask这个类时,又会发现继续调用了SubmitJobOperation这个类

@Override
protected Operation prepareOperation() {
    return new SubmitJobOperation(
            parameters.jobId,
            parameters.jobImmutableInformation,
            parameters.isStartWithSavePoint);
}

SubmitJobOperation中我们可以看到真正调用的地方,将我们的信息交给了CoordinatorService组件,调用了其submitJob方法


@Override
protected PassiveCompletableFuture<?> doRun() throws Exception {
    SeaTunnelServer seaTunnelServer = getService();
    return seaTunnelServer
            .getCoordinatorService()
            .submitJob(jobId, jobImmutableInformation, isStartWithSavePoint);
}

这时一个客户端的消息就真正的被交给服务端来进行方法调用了,至于其他类型的操作也都可以类似找到相关的类,就不再赘述。

CoordinatorService

接下来看下在CoordinatorService是如何进行任务提交的

public PassiveCompletableFuture<Void> submitJob(
        long jobId, Data jobImmutableInformation, boolean isStartWithSavePoint) {
    CompletableFuture<Void> jobSubmitFuture = new CompletableFuture<>();

    // 首先会根据任务id来判断,当存在相同任务的id时,直接返回
    if (getJobMaster(jobId) != null) {
        logger.warning(
                String.format(
                        "The job %s is currently running; no need to submit again.", jobId));
        jobSubmitFuture.complete(null);
        return new PassiveCompletableFuture<>(jobSubmitFuture);
    }
    // 初始化JobMaster对象
    JobMaster jobMaster =
            new JobMaster(
                    jobImmutableInformation,
                    this.nodeEngine,
                    executorService,
                    getResourceManager(),
                    getJobHistoryService(),
                    runningJobStateIMap,
                    runningJobStateTimestampsIMap,
                    ownedSlotProfilesIMap,
                    runningJobInfoIMap,
                    metricsImap,
                    engineConfig,
                    seaTunnelServer);
    // 
    executorService.submit(
            () -> {
                try {
                // 由于2.3.6中任务id可以由用户传递,而在seatunnel中会根据任务id来做一些状态判断
                // 所以这里的检查是保证在当前的状态中,不会存在相同id的任务
                    if (!isStartWithSavePoint
                            && getJobHistoryService().getJobMetrics(jobId) != null) {
                        throw new JobException(
                                String.format(
                                        "The job id %s has already been submitted and is not starting with a savepoint.",
                                        jobId));
                    }
                    // 将当前任务的信息添加到IMAP中
                    runningJobInfoIMap.put(
                            jobId,
                            new JobInfo(System.currentTimeMillis(), jobImmutableInformation));
                    runningJobMasterMap.put(jobId, jobMaster);
                    // 对JobMaster做初始化操作
                    jobMaster.init(
                            runningJobInfoIMap.get(jobId).getInitializationTimestamp(), false);
                    // 当jobMaster初始化完成后,会认为任务创建成功
                    jobSubmitFuture.complete(null);
                } catch (Throwable e) {
                    String errorMsg = ExceptionUtils.getMessage(e);
                    logger.severe(String.format("submit job %s error %s ", jobId, errorMsg));
                    jobSubmitFuture.completeExceptionally(new JobException(errorMsg));
                }
                if (!jobSubmitFuture.isCompletedExceptionally()) {
                // 当任务正常提交后,调用jobMaster的run方法开始执行任务
                // 以及最后会检查任务状态,从内部状态中将此次任务信息删除
                    try {
                        jobMaster.run();
                    } finally {
                        // voidCompletableFuture will be cancelled when zeta master node
                        // shutdown to simulate master failure,
                        // don't update runningJobMasterMap is this case.
                        if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
                            runningJobMasterMap.remove(jobId);
                        }
                    }
                } else {
                    runningJobInfoIMap.remove(jobId);
                    runningJobMasterMap.remove(jobId);
                }
            });
    return new PassiveCompletableFuture<>(jobSubmitFuture);
}

可以看到在服务端,会通过创建一个JobMaster对象,由这个对象来进行单个任务的管理。

在创建JobMaster对象时,会通过getResourceManager方法来获取资源管理对象,以及通过getJobHistoryService方法获取任务历史信息,jobHistoryService在启动时就会创建完成,ResourceManage则采用了懒加载的方式,在第一次有任务提交之后才会进行创建

/** Lazy load for resource manager */
public ResourceManager getResourceManager() {
    if (resourceManager == null) {
        synchronized (this) {
            if (resourceManager == null) {
                ResourceManager manager =
                        new ResourceManagerFactory(nodeEngine, engineConfig)
                                .getResourceManager();
                manager.init();
                resourceManager = manager;
            }
        }
    }
    return resourceManager;
}
ResourceManager

目前seatunnel也仅支持standalone的部署方式,当初始化ResourceManager时,会获取到集群所有节点,然后向其发送SyncWorkerProfileOperation操作来获取节点的信息,然后更新到内部的registerWorker状态中

@Override
public void init() {
    log.info("Init ResourceManager");
    initWorker();
}

private void initWorker() {
    log.info("initWorker... ");
    List<Address> aliveNode =
            nodeEngine.getClusterService().getMembers().stream()
                    .map(Member::getAddress)
                    .collect(Collectors.toList());
    log.info("init live nodes: {}", aliveNode);
    List<CompletableFuture<Void>> futures =
            aliveNode.stream()
                    .map(
                            node ->
                                    sendToMember(new SyncWorkerProfileOperation(), node)
                                            .thenAccept(
                                                    p -> {
                                                        if (p != null) {
                                                            registerWorker.put(
                                                                    node, (WorkerProfile) p);
                                                            log.info(
                                                                    "received new worker register: "
                                                                            + ((WorkerProfile)
                                                                                            p)
                                                                                    .getAddress());
                                                        }
                                                    }))
                    .collect(Collectors.toList());
    futures.forEach(CompletableFuture::join);
    log.info("registerWorker: {}", registerWorker);
}

而我们之前在SlotService中注意到在每个节点会定时向master发送心跳信息,心跳信息里面包含了当前节点的状态,在ResourceManager中当接收到心跳信息后,也会在内部状态中更新每个节点的状态

@Override
public void heartbeat(WorkerProfile workerProfile) {
    if (!registerWorker.containsKey(workerProfile.getAddress())) {
        log.info("received new worker register: " + workerProfile.getAddress());
        sendToMember(new ResetResourceOperation(), workerProfile.getAddress()).join();
    } else {
        log.debug("received worker heartbeat from: " + workerProfile.getAddress());
    }
    registerWorker.put(workerProfile.getAddress(), workerProfile);
}

JobMaster

CoordinatorService中会创建JobMaster并调用其init方法,当init方法完成后会认为任务创建成功。然后再调用run方法来正式运行任务

我们看一下初始化以及init方法。

public JobMaster(
        @NonNull Data jobImmutableInformationData,
        @NonNull NodeEngine nodeEngine,
        @NonNull ExecutorService executorService,
        @NonNull ResourceManager resourceManager,
        @NonNull JobHistoryService jobHistoryService,
        @NonNull IMap runningJobStateIMap,
        @NonNull IMap runningJobStateTimestampsIMap,
        @NonNull IMap ownedSlotProfilesIMap,
        @NonNull IMap<Long, JobInfo> runningJobInfoIMap,
        @NonNull IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap,
        EngineConfig engineConfig,
        SeaTunnelServer seaTunnelServer) {
    this.jobImmutableInformationData = jobImmutableInformationData;
    this.nodeEngine = nodeEngine;
    this.executorService = executorService;
    flakeIdGenerator =
            this.nodeEngine
                    .getHazelcastInstance()
                    .getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
    this.ownedSlotProfilesIMap = ownedSlotProfilesIMap;
    this.resourceManager = resourceManager;
    this.jobHistoryService = jobHistoryService;
    this.runningJobStateIMap = runningJobStateIMap;
    this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
    this.runningJobInfoIMap = runningJobInfoIMap;
    this.engineConfig = engineConfig;
    this.metricsImap = metricsImap;
    this.seaTunnelServer = seaTunnelServer;
    this.releasedSlotWhenTaskGroupFinished = new ConcurrentHashMap<>();
}

在初始化时只是进行简单的变量赋值,并没有进行什么操作,我们需要着重看下init方法

public synchronized void init(long initializationTimestamp, boolean restart) throws Exception {
     // 服务端接收到客户端传递过来的消息是一个二进制的对象
     // 首先将其转换为JobImmutableInformation对象,而这个对象也正是客户端发送给服务端的对象
    jobImmutableInformation =
            nodeEngine.getSerializationService().toObject(jobImmutableInformationData);
    // 获取checkpoint的相关配置,例如周期,超时时间等
    jobCheckpointConfig =
            createJobCheckpointConfig(
                    engineConfig.getCheckpointConfig(), jobImmutableInformation.getJobConfig());

    LOGGER.info(
            String.format(
                    "Init JobMaster for Job %s (%s) ",
                    jobImmutableInformation.getJobConfig().getName(),
                    jobImmutableInformation.getJobId()));
    LOGGER.info(
            String.format(
                    "Job %s (%s) needed jar urls %s",
                    jobImmutableInformation.getJobConfig().getName(),
                    jobImmutableInformation.getJobId(),
                    jobImmutableInformation.getPluginJarsUrls()));
    ClassLoader appClassLoader = Thread.currentThread().getContextClassLoader();
    // 获取ClassLoader
    ClassLoader classLoader =
            seaTunnelServer
                    .getClassLoaderService()
                    .getClassLoader(
                            jobImmutableInformation.getJobId(),
                            jobImmutableInformation.getPluginJarsUrls());
    // 将客户端传递的信息反序列化为逻辑计划
    logicalDag =
            CustomClassLoadedObject.deserializeWithCustomClassLoader(
                    nodeEngine.getSerializationService(),
                    classLoader,
                    jobImmutableInformation.getLogicalDag());
    try {
        Thread.currentThread().setContextClassLoader(classLoader);
        // 在服务端会执行savemode的功能,例如对表进行创建,删除操作。
        if (!restart
                && !logicalDag.isStartWithSavePoint()
                && ReadonlyConfig.fromMap(logicalDag.getJobConfig().getEnvOptions())
                        .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION)
                        .equals(SaveModeExecuteLocation.CLUSTER)) {
            logicalDag.getLogicalVertexMap().values().stream()
                    .map(LogicalVertex::getAction)
                    .filter(action -> action instanceof SinkAction)
                    .map(sink -> ((SinkAction<?, ?, ?, ?>) sink).getSink())
                    .forEach(JobMaster::handleSaveMode);
        }
        // 逻辑计划到物理计划的解析
        final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple =
                PlanUtils.fromLogicalDAG(
                        logicalDag,
                        nodeEngine,
                        jobImmutableInformation,
                        initializationTimestamp,
                        executorService,
                        flakeIdGenerator,
                        runningJobStateIMap,
                        runningJobStateTimestampsIMap,
                        engineConfig.getQueueType(),
                        engineConfig);
        this.physicalPlan = planTuple.f0();
        this.physicalPlan.setJobMaster(this);
        this.checkpointPlanMap = planTuple.f1();
    } finally {
        // 重置当前线程的ClassLoader,并且释放上面创建的classLoader
        Thread.currentThread().setContextClassLoader(appClassLoader);
        seaTunnelServer
                .getClassLoaderService()
                .releaseClassLoader(
                        jobImmutableInformation.getJobId(),
                        jobImmutableInformation.getPluginJarsUrls());
    }
    Exception initException = null;
    try {
    // 初始化checkpointManager
        this.initCheckPointManager(restart);
    } catch (Exception e) {
        initException = e;
    }
    // 添加一些回调函数做任务状态监听
    this.initStateFuture();
    if (initException != null) {
        if (restart) {
            cancelJob();
        }
        throw initException;
    }
}

最后再看下run方法

public void run() {
    try {
        physicalPlan.startJob();
    } catch (Throwable e) {
        LOGGER.severe(
                String.format(
                        "Job %s (%s) run error with: %s",
                        physicalPlan.getJobImmutableInformation().getJobConfig().getName(),
                        physicalPlan.getJobImmutableInformation().getJobId(),
                        ExceptionUtils.getMessage(e)));
    } finally {
        jobMasterCompleteFuture.join();
        if (engineConfig.getConnectorJarStorageConfig().getEnable()) {
            List<ConnectorJarIdentifier> pluginJarIdentifiers =
                    jobImmutableInformation.getPluginJarIdentifiers();
            seaTunnelServer
                    .getConnectorPackageService()
                    .cleanUpWhenJobFinished(
                            jobImmutableInformation.getJobId(), pluginJarIdentifiers);
        }
    }
}

此方法比较简单,调用physicalPlan.startJob()对生成的物理计划调用run方法

通过以上代码可以看出,当服务端接收到客户端提交任务请求后,会初始化JobMaster类,在JobMaster中完成了从逻辑计划到物理计划的生成,最终执行生成的物理计划。

下面需要深入看下如何从逻辑计划生成物理计划

逻辑计划到物理计划

物理计划的生成是由JobMaster中调用生成的

final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple =
        PlanUtils.fromLogicalDAG(
                logicalDag,
                nodeEngine,
                jobImmutableInformation,
                initializationTimestamp,
                executorService,
                flakeIdGenerator,
                runningJobStateIMap,
                runningJobStateTimestampsIMap,
                engineConfig.getQueueType(),
                engineConfig);

在生成的方法中可以看到中间会先从逻辑计划生成执行计划,然后再由执行计划生成物理计划

public static Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> fromLogicalDAG(
        @NonNull LogicalDag logicalDag,
        @NonNull NodeEngine nodeEngine,
        @NonNull JobImmutableInformation jobImmutableInformation,
        long initializationTimestamp,
        @NonNull ExecutorService executorService,
        @NonNull FlakeIdGenerator flakeIdGenerator,
        @NonNull IMap runningJobStateIMap,
        @NonNull IMap runningJobStateTimestampsIMap,
        @NonNull QueueType queueType,
        @NonNull EngineConfig engineConfig) {
    return new PhysicalPlanGenerator(
                    new ExecutionPlanGenerator(
                                    logicalDag, jobImmutableInformation, engineConfig)
                            .generate(),
                    nodeEngine,
                    jobImmutableInformation,
                    initializationTimestamp,
                    executorService,
                    flakeIdGenerator,
                    runningJobStateIMap,
                    runningJobStateTimestampsIMap,
                    queueType)
            .generate();
}
执行计划的生成
public ExecutionPlanGenerator(
        @NonNull LogicalDag logicalPlan,
        @NonNull JobImmutableInformation jobImmutableInformation,
        @NonNull EngineConfig engineConfig) {
    checkArgument(
            logicalPlan.getEdges().size() > 0, "ExecutionPlan Builder must have LogicalPlan.");
    this.logicalPlan = logicalPlan;
    this.jobImmutableInformation = jobImmutableInformation;
    this.engineConfig = engineConfig;
}

public ExecutionPlan generate() {
    log.debug("Generate execution plan using logical plan:");

    Set<ExecutionEdge> executionEdges = generateExecutionEdges(logicalPlan.getEdges());
    log.debug("Phase 1: generate execution edge list {}", executionEdges);

    executionEdges = generateShuffleEdges(executionEdges);
    log.debug("Phase 2: generate shuffle edge list {}", executionEdges);

    executionEdges = generateTransformChainEdges(executionEdges);
    log.debug("Phase 3: generate transform chain edge list {}", executionEdges);

    List<Pipeline> pipelines = generatePipelines(executionEdges);
    log.debug("Phase 4: generate pipeline list {}", pipelines);

    ExecutionPlan executionPlan = new ExecutionPlan(pipelines, jobImmutableInformation);
    log.debug("Phase 5: generate execution plan: {}", executionPlan);

    return executionPlan;
}

首先看下执行计划这个类里面有什么内容:

public class ExecutionPlan {
    private final List<Pipeline> pipelines;
    private final JobImmutableInformation jobImmutableInformation;
}

public class Pipeline {
    /** The ID of the pipeline. */
    private final Integer id;
    private final List<ExecutionEdge> edges;
    private final Map<Long, ExecutionVertex> vertexes;

 }

 public class ExecutionEdge {
    private ExecutionVertex leftVertex;
    private ExecutionVertex rightVertex;  
 }   

public class ExecutionVertex {
    private Long vertexId;
    private Action action;
    private int parallelism;
}        

我们再与逻辑计划比较一下

public class LogicalDag implements IdentifiedDataSerializable {
    @Getter private JobConfig jobConfig;
    private final Set<LogicalEdge> edges = new LinkedHashSet<>();
    private final Map<Long, LogicalVertex> logicalVertexMap = new LinkedHashMap<>();
    private IdGenerator idGenerator;
    private boolean isStartWithSavePoint = false;
  }

 public class LogicalEdge implements IdentifiedDataSerializable {
    private LogicalVertex inputVertex;
    private LogicalVertex targetVertex;
    private Long inputVertexId;
    private Long targetVertexId;
 }

public class LogicalVertex implements IdentifiedDataSerializable {
    private Long vertexId;
    private Action action;
    private int parallelism;  
 }        

我们看这两个类的内容,感觉每个Pipeline都像一个逻辑计划,为什么需要这一步转换呢,我们来具体看下逻辑计划的生成过程。 在上面可以看到生成执行计划共有5步,我们逐步看下

  • 第一步:逻辑计划转执行计划

    // 入参是逻辑计划的边,每个边存储了上下游的节点
    private Set<ExecutionEdge> generateExecutionEdges(Set<LogicalEdge> logicalEdges) {
      Set<ExecutionEdge> executionEdges = new LinkedHashSet<>();
    
      Map<Long, ExecutionVertex> logicalVertexIdToExecutionVertexMap = new HashMap();
      // 按照顺序进行排序,首先按照输入节点的顺序进行排序,当输入节点相同时,按照输出节点进行排序
      List<LogicalEdge> sortedLogicalEdges = new ArrayList<>(logicalEdges);
      Collections.sort(
              sortedLogicalEdges,
              (o1, o2) -> {
                  if (o1.getInputVertexId() != o2.getInputVertexId()) {
                      return o1.getInputVertexId() > o2.getInputVertexId() ? 1 : -1;
                  }
                  if (o1.getTargetVertexId() != o2.getTargetVertexId()) {
                      return o1.getTargetVertexId() > o2.getTargetVertexId() ? 1 : -1;
                  }
                  return 0;
              });
      // 循环将每个逻辑计划的边转换为执行计划的边         
      for (LogicalEdge logicalEdge : sortedLogicalEdges) {
          LogicalVertex logicalInputVertex = logicalEdge.getInputVertex();
          ExecutionVertex executionInputVertex =
                  logicalVertexIdToExecutionVertexMap.computeIfAbsent(
                          logicalInputVertex.getVertexId(),
                          vertexId -> {
                              long newId = idGenerator.getNextId();
                              // 对每个逻辑计划节点重新创建Action
                              Action newLogicalInputAction =
                                      recreateAction(
                                              logicalInputVertex.getAction(),
                                              newId,
                                              logicalInputVertex.getParallelism());
                              // 转换为执行计划节点
                              return new ExecutionVertex(
                                      newId,
                                      newLogicalInputAction,
                                      logicalInputVertex.getParallelism());
                          });
          // 与输入节点类似,重新创建执行计划节点
          LogicalVertex logicalTargetVertex = logicalEdge.getTargetVertex();
          ExecutionVertex executionTargetVertex =
                  logicalVertexIdToExecutionVertexMap.computeIfAbsent(
                          logicalTargetVertex.getVertexId(),
                          vertexId -> {
                              long newId = idGenerator.getNextId();
                              Action newLogicalTargetAction =
                                      recreateAction(
                                              logicalTargetVertex.getAction(),
                                              newId,
                                              logicalTargetVertex.getParallelism());
                              return new ExecutionVertex(
                                      newId,
                                      newLogicalTargetAction,
                                      logicalTargetVertex.getParallelism());
                          });
          // 生成执行计划的边
          ExecutionEdge executionEdge =
                  new ExecutionEdge(executionInputVertex, executionTargetVertex);
          executionEdges.add(executionEdge);
      }
      return executionEdges;
    }
  • 第二步

    private Set<ExecutionEdge> generateShuffleEdges(Set<ExecutionEdge> executionEdges) {
      // 以上游节点编号为key,list存储下游所有节点
      Map<Long, List<ExecutionVertex>> targetVerticesMap = new LinkedHashMap<>();
      // 仅存储类型为Source的节点
      Set<ExecutionVertex> sourceExecutionVertices = new HashSet<>();
      executionEdges.forEach(
              edge -> {
                  ExecutionVertex leftVertex = edge.getLeftVertex();
                  ExecutionVertex rightVertex = edge.getRightVertex();
                  if (leftVertex.getAction() instanceof SourceAction) {
                      sourceExecutionVertices.add(leftVertex);
                  }
                  targetVerticesMap
                          .computeIfAbsent(leftVertex.getVertexId(), id -> new ArrayList<>())
                          .add(rightVertex);
              });
      if (sourceExecutionVertices.size() != 1) {
          return executionEdges;
      }
      ExecutionVertex sourceExecutionVertex = sourceExecutionVertices.stream().findFirst().get();
      Action sourceAction = sourceExecutionVertex.getAction();
      List<CatalogTable> producedCatalogTables = new ArrayList<>();
      if (sourceAction instanceof SourceAction) {
          try {
              producedCatalogTables =
                      ((SourceAction<?, ?, ?>) sourceAction)
                              .getSource()
                              .getProducedCatalogTables();
          } catch (UnsupportedOperationException e) {
          }
      } else if (sourceAction instanceof TransformChainAction) {
          return executionEdges;
      } else {
          throw new SeaTunnelException(
                  "source action must be SourceAction or TransformChainAction");
      }
      // 数据源仅产生单表或
      // 数据源仅有一个下游输出时,直接返回
      if (producedCatalogTables.size() <= 1
              || targetVerticesMap.get(sourceExecutionVertex.getVertexId()).size() <= 1) {
          return executionEdges;
      }
    
      List<ExecutionVertex> sinkVertices =
              targetVerticesMap.get(sourceExecutionVertex.getVertexId());
      // 检查是否有其他类型的Action,在当前步骤下游节点尽可能有两种类型,Transform与Sink,这里是判断仅能有Sink类型
      Optional<ExecutionVertex> hasOtherAction =
              sinkVertices.stream()
                      .filter(vertex -> !(vertex.getAction() instanceof SinkAction))
                      .findFirst();
    
      checkArgument(!hasOtherAction.isPresent());
      // 当以上代码全部走完之后,当前的场景为:
      // 仅有一个数据源,该数据源会产生多张表,下游还有多个sink节点依赖与产生的多表
      // 也就是说当前任务仅有两类节点,一个会产生多张表的Source节点,一组依赖与该Source的Sink节点
      // 那么会新生成一个shuffle节点,添加到两者之间
      // 将依赖关系修改与source->shuffle->多个sink
      Set<ExecutionEdge> newExecutionEdges = new LinkedHashSet<>();
      // 这里的Shuffle策略此次不深入了解了
      ShuffleStrategy shuffleStrategy =
              ShuffleMultipleRowStrategy.builder()
                      .jobId(jobImmutableInformation.getJobId())
                      .inputPartitions(sourceAction.getParallelism())
                      .catalogTables(producedCatalogTables)
                      .queueEmptyQueueTtl(
                              (int)
                                      (engineConfig.getCheckpointConfig().getCheckpointInterval()
                                              * 3))
                      .build();
      ShuffleConfig shuffleConfig =
              ShuffleConfig.builder().shuffleStrategy(shuffleStrategy).build();
    
      long shuffleVertexId = idGenerator.getNextId();
      String shuffleActionName = String.format("Shuffle [%s]", sourceAction.getName());
      ShuffleAction shuffleAction =
              new ShuffleAction(shuffleVertexId, shuffleActionName, shuffleConfig);
      shuffleAction.setParallelism(sourceAction.getParallelism());
      ExecutionVertex shuffleVertex =
              new ExecutionVertex(shuffleVertexId, shuffleAction, shuffleAction.getParallelism());
      ExecutionEdge sourceToShuffleEdge = new ExecutionEdge(sourceExecutionVertex, shuffleVertex);
      newExecutionEdges.add(sourceToShuffleEdge);
      // 将多个sink节点的并行度修改为1
      for (ExecutionVertex sinkVertex : sinkVertices) {
          sinkVertex.setParallelism(1);
          sinkVertex.getAction().setParallelism(1);
          ExecutionEdge shuffleToSinkEdge = new ExecutionEdge(shuffleVertex, sinkVertex);
          newExecutionEdges.add(shuffleToSinkEdge);
      }
      return newExecutionEdges;
    }

这一步Shuffle是针对某些特殊场景,source支持多表读取,并且有多个sink节点依赖与该source节点时会在中间添加一个shuffle节点

  • 第三步
private Set<ExecutionEdge> generateTransformChainEdges(Set<ExecutionEdge> executionEdges) {
    // 使用了三个结构,存储所有的Source节点,以及每个输入,输出节点
    // inputVerticesMap中以下游节点id为key,存储了所有的上游输入节点
    // targetVerticesMap则以上游节点id为key,存储了所有的下游输出节点
    Map<Long, List<ExecutionVertex>> inputVerticesMap = new HashMap<>();
    Map<Long, List<ExecutionVertex>> targetVerticesMap = new HashMap<>();
    Set<ExecutionVertex> sourceExecutionVertices = new HashSet<>();
    executionEdges.forEach(
            edge -> {
                ExecutionVertex leftVertex = edge.getLeftVertex();
                ExecutionVertex rightVertex = edge.getRightVertex();
                if (leftVertex.getAction() instanceof SourceAction) {
                    sourceExecutionVertices.add(leftVertex);
                }
                inputVerticesMap
                        .computeIfAbsent(rightVertex.getVertexId(), id -> new ArrayList<>())
                        .add(leftVertex);
                targetVerticesMap
                        .computeIfAbsent(leftVertex.getVertexId(), id -> new ArrayList<>())
                        .add(rightVertex);
            });

    Map<Long, ExecutionVertex> transformChainVertexMap = new HashMap<>();
    Map<Long, Long> chainedTransformVerticesMapping = new HashMap<>();
    // 对每个source进行循环,即从DAG中所有的头节点开始变量
    for (ExecutionVertex sourceVertex : sourceExecutionVertices) {
        List<ExecutionVertex> vertices = new ArrayList<>();
        vertices.add(sourceVertex);
        for (int index = 0; index < vertices.size(); index++) {
            ExecutionVertex vertex = vertices.get(index);

            fillChainedTransformExecutionVertex(
                    vertex,
                    chainedTransformVerticesMapping,
                    transformChainVertexMap,
                    executionEdges,
                    Collections.unmodifiableMap(inputVerticesMap),
                    Collections.unmodifiableMap(targetVerticesMap));
            // 当当前节点存在下游节点时,将所有下游节点放入list中,二层循环会重新计算刚刚加入进去的下游节点,可能是Transform节点也可能是Sink节点
            if (targetVerticesMap.containsKey(vertex.getVertexId())) {
                vertices.addAll(targetVerticesMap.get(vertex.getVertexId()));
            }
        }
    }
    // 循环完成,会将可以链化的Transform节点进行链化,在链化过程中会将可以链化的关系边从执行计划中删除
    // 所以此时的逻辑计划已经无法构成图的关系,需要重新构建
    Set<ExecutionEdge> transformChainEdges = new LinkedHashSet<>();
    // 对现存关系进行循环
    for (ExecutionEdge executionEdge : executionEdges) {
        ExecutionVertex leftVertex = executionEdge.getLeftVertex();
        ExecutionVertex rightVertex = executionEdge.getRightVertex();
        boolean needRebuild = false;
        // 会从链化的map中查询当前边的输入,输出节点
        // 如果在链化的map中存在,则表明该节点已经被链化,需要从映射关系中找到链化之后的节点
        // 重新修正DAG
        if (chainedTransformVerticesMapping.containsKey(leftVertex.getVertexId())) {
            needRebuild = true;
            leftVertex =
                    transformChainVertexMap.get(
                            chainedTransformVerticesMapping.get(leftVertex.getVertexId()));
        }
        if (chainedTransformVerticesMapping.containsKey(rightVertex.getVertexId())) {
            needRebuild = true;
            rightVertex =
                    transformChainVertexMap.get(
                            chainedTransformVerticesMapping.get(rightVertex.getVertexId()));
        }
        if (needRebuild) {
            executionEdge = new ExecutionEdge(leftVertex, rightVertex);
        }
        transformChainEdges.add(executionEdge);
    }
    return transformChainEdges;
}

private void fillChainedTransformExecutionVertex(
        ExecutionVertex currentVertex,
        Map<Long, Long> chainedTransformVerticesMapping,
        Map<Long, ExecutionVertex> transformChainVertexMap,
        Set<ExecutionEdge> executionEdges,
        Map<Long, List<ExecutionVertex>> inputVerticesMap,
        Map<Long, List<ExecutionVertex>> targetVerticesMap) {
    // 当map中以及包含当前节点则退出
    if (chainedTransformVerticesMapping.containsKey(currentVertex.getVertexId())) {
        return;
    }

    List<ExecutionVertex> transformChainedVertices = new ArrayList<>();
    collectChainedVertices(
            currentVertex,
            transformChainedVertices,
            executionEdges,
            inputVerticesMap,
            targetVerticesMap);
    // 当list不为空时,表示list里面的transform节点可以被合并成一个
    if (transformChainedVertices.size() > 0) {
        long newVertexId = idGenerator.getNextId();
        List<SeaTunnelTransform> transforms = new ArrayList<>(transformChainedVertices.size());
        List<String> names = new ArrayList<>(transformChainedVertices.size());
        Set<URL> jars = new HashSet<>();
        Set<ConnectorJarIdentifier> identifiers = new HashSet<>();

        transformChainedVertices.stream()
                .peek(
                        // 在mapping中添加所有历史节点编号与新节点编号的映射
                        vertex ->
                                chainedTransformVerticesMapping.put(
                                        vertex.getVertexId(), newVertexId))
                .map(ExecutionVertex::getAction)
                .map(action -> (TransformAction) action)
                .forEach(
                        action -> {
                            transforms.add(action.getTransform());
                            jars.addAll(action.getJarUrls());
                            identifiers.addAll(action.getConnectorJarIdentifiers());
                            names.add(action.getName());
                        });
        String transformChainActionName =
                String.format("TransformChain[%s]", String.join("->", names));
        // 将多个TransformAction合并成一个TransformChainAction
        TransformChainAction transformChainAction =
                new TransformChainAction(
                        newVertexId, transformChainActionName, jars, identifiers, transforms);
        transformChainAction.setParallelism(currentVertex.getAction().getParallelism());

        ExecutionVertex executionVertex =
                new ExecutionVertex(
                        newVertexId, transformChainAction, currentVertex.getParallelism());
        // 在状态中将修改完成的节点信息放入
        transformChainVertexMap.put(newVertexId, executionVertex);
        chainedTransformVerticesMapping.put(
                currentVertex.getVertexId(), executionVertex.getVertexId());
    }
}


private void collectChainedVertices(
        ExecutionVertex currentVertex,
        List<ExecutionVertex> chainedVertices,
        Set<ExecutionEdge> executionEdges,
        Map<Long, List<ExecutionVertex>> inputVerticesMap,
        Map<Long, List<ExecutionVertex>> targetVerticesMap) {
    Action action = currentVertex.getAction();
    // 仅对TransformAction进行合并
    if (action instanceof TransformAction) {
        if (chainedVertices.size() == 0) {
            // 需要进行合并的节点list为空时,将自身添加到list中
            // 进入该分支的条件为当前节点为TransformAction并且所需链化列表为空
            // 此时可能有几种场景:第一个Transform节点进入,该Transform节点无任何限制
            chainedVertices.add(currentVertex);
        } else if (inputVerticesMap.get(currentVertex.getVertexId()).size() == 1) {
            // 当进入该条件分支则表明:
            // 所需链化的列表chainedVertices已经至少有一个TransformAction了
            // 此时的场景为:上游的Transform节点仅有一个下游节点,即当前节点。此限制是由下方的判断保证
            // 将当前TransformAction节点与上一个TransformAction节点进行链化
            // 在执行计划中将该关系删除
            executionEdges.remove(
                    new ExecutionEdge(
                            chainedVertices.get(chainedVertices.size() - 1), currentVertex));
            // 将自身加入需要链化的list中
            chainedVertices.add(currentVertex);
        } else {
            return;
        }
    } else {
        return;
    }

    // It cannot chain to any target vertex if it has multiple target vertices.
    if (targetVerticesMap.get(currentVertex.getVertexId()).size() == 1) {
        // 当当前节点仅有一个下游节点时,再次尝试链化
        // 如果当前节点存在多个下游节点,则不会将下游的节点进行链化,所以能保证上面的链化时两个节点是一对一的关系
        // 这里会调用的场景为Transform节点仅有一个下游节点
        collectChainedVertices(
                targetVerticesMap.get(currentVertex.getVertexId()).get(0),
                chainedVertices,
                executionEdges,
                inputVerticesMap,
                targetVerticesMap);
    }
}
  • 第四步

    private List<Pipeline> generatePipelines(Set<ExecutionEdge> executionEdges) {
      // 存储每个执行计划节点
      Set<ExecutionVertex> executionVertices = new LinkedHashSet<>();
      for (ExecutionEdge edge : executionEdges) {
          executionVertices.add(edge.getLeftVertex());
          executionVertices.add(edge.getRightVertex());
      }
      // 调用Pipeline执行器将执行计划转换为Pipeline
      PipelineGenerator pipelineGenerator =
              new PipelineGenerator(executionVertices, new ArrayList<>(executionEdges));
      List<Pipeline> pipelines = pipelineGenerator.generatePipelines();
    
      Set<String> duplicatedActionNames = new HashSet<>();
      Set<String> actionNames = new HashSet<>();
      for (Pipeline pipeline : pipelines) {
          Integer pipelineId = pipeline.getId();
          for (ExecutionVertex vertex : pipeline.getVertexes().values()) {
              // 获取当前Pipeline的每个执行节点,重新设置Action的名称,添加了pipeline的名称
              Action action = vertex.getAction();
              String actionName = String.format("pipeline-%s [%s]", pipelineId, action.getName());
              action.setName(actionName);
              if (actionNames.contains(actionName)) {
                  duplicatedActionNames.add(actionName);
              }
              actionNames.add(actionName);
          }
      }
      // 检查,不能存在重复的Action Name
      checkArgument(
              duplicatedActionNames.isEmpty(),
              "Action name is duplicated: " + duplicatedActionNames);
    
      return pipelines;
    }
    
    

public PipelineGenerator(Collection vertices, List edges) { this.vertices = vertices; this.edges = edges; }

public List generatePipelines() { List executionEdges = expandEdgeByParallelism(edges);

// 将执行计划进行拆分,按照关联关系,将执行计划进行拆分
// 拆分为几个不相关的执行计划
List<List<ExecutionEdge>> edgesList = splitUnrelatedEdges(executionEdges);

edgesList =
        edgesList.stream()
                .flatMap(e -> this.splitUnionEdge(e).stream())
                .collect(Collectors.toList());

// just convert execution plan to pipeline at now. We should split it to multi pipeline with
// cache in the future
IdGenerator idGenerator = new IdGenerator();
// 将执行计划图转换为Pipeline
return edgesList.stream()
        .map(
                e -> {
                    Map<Long, ExecutionVertex> vertexes = new HashMap<>();
                    List<ExecutionEdge> pipelineEdges =
                            e.stream()
                                    .map(
                                            edge -> {
                                                if (!vertexes.containsKey(
                                                        edge.getLeftVertexId())) {
                                                    vertexes.put(
                                                            edge.getLeftVertexId(),
                                                            edge.getLeftVertex());
                                                }
                                                ExecutionVertex source =
                                                        vertexes.get(
                                                                edge.getLeftVertexId());
                                                if (!vertexes.containsKey(
                                                        edge.getRightVertexId())) {
                                                    vertexes.put(
                                                            edge.getRightVertexId(),
                                                            edge.getRightVertex());
                                                }
                                                ExecutionVertex destination =
                                                        vertexes.get(
                                                                edge.getRightVertexId());
                                                return new ExecutionEdge(
                                                        source, destination);
                                            })
                                    .collect(Collectors.toList());
                    return new Pipeline(
                            (int) idGenerator.getNextId(), pipelineEdges, vertexes);
                })
        .collect(Collectors.toList());

}


- 第五步

第五步则是生成执行计划实例,传递了第四步生成的Pipeline参数

**小结一下:**
执行计划会将逻辑计划做这几件事情
1. 当source会生成多张表,并且有多个sink节点依赖于此source时,会在中间添加一个shuffle节点
2. 尝试对transform节点进行链化合并,将多个transform节点合并为一个节点
3. 将任务进行拆分,将一个`配置文件/LogicalDag`拆分为几个不相关的任务`List<Pipeline>`

![](https://img-blog.csdnimg.cn/3e3c46a29e1b412fa772e08bafb69735.png)


#### 物理计划的生成

在看物理计划生成之前,先看下生成的物理计划中包含了什么信息,
我们对物理计划以及内部相关的内都拿出来看一下相关信息
```TypeScript
public class PhysicalPlan {
    private final List<SubPlan> pipelineList;
    private final AtomicInteger finishedPipelineNum = new AtomicInteger(0);
    private final AtomicInteger canceledPipelineNum = new AtomicInteger(0);
    private final AtomicInteger failedPipelineNum = new AtomicInteger(0);
    private final JobImmutableInformation jobImmutableInformation;
    private final IMap<Object, Object> runningJobStateIMap;
    private final IMap<Object, Long[]> runningJobStateTimestampsIMap;
    private CompletableFuture<JobResult> jobEndFuture;
    private final AtomicReference<String> errorBySubPlan = new AtomicReference<>();
    private final String jobFullName;
    private final long jobId;
    private JobMaster jobMaster;
    private boolean makeJobEndWhenPipelineEnded = true;
    private volatile boolean isRunning = false;

   }

这个类中有一个关键字段pipelineList,是一个SubPlan的列表

public class SubPlan {
    private final int pipelineMaxRestoreNum;
    private final int pipelineRestoreIntervalSeconds;
    private final List<PhysicalVertex> physicalVertexList;
    private final List<PhysicalVertex> coordinatorVertexList;
    private final int pipelineId;
    private final AtomicInteger finishedTaskNum = new AtomicInteger(0);
    private final AtomicInteger canceledTaskNum = new AtomicInteger(0);
    private final AtomicInteger failedTaskNum = new AtomicInteger(0);
    private final String pipelineFullName;
    private final IMap<Object, Object> runningJobStateIMap;
    private final Map<String, String> tags;
    private final IMap<Object, Long[]> runningJobStateTimestampsIMap;
    private CompletableFuture<PipelineExecutionState> pipelineFuture;
    private final PipelineLocation pipelineLocation;
    private AtomicReference<String> errorByPhysicalVertex = new AtomicReference<>();
    private final ExecutorService executorService;
    private JobMaster jobMaster;
    private PassiveCompletableFuture<Void> reSchedulerPipelineFuture;
    private Integer pipelineRestoreNum;
    private final Object restoreLock = new Object();
    private volatile PipelineStatus currPipelineStatus;
    public volatile boolean isRunning = false;
    private Map<TaskGroupLocation, SlotProfile> slotProfiles;
 } 

在SubPlan中,又维护了PhysicalVertex物理节点的一个列表,并且拆分成了物理计划节点和协调器节点。

public class PhysicalVertex {
    private final TaskGroupLocation taskGroupLocation;
    private final String taskFullName;
    private final TaskGroupDefaultImpl taskGroup;
    private final ExecutorService executorService;
    private final FlakeIdGenerator flakeIdGenerator;
    private final Set<URL> pluginJarsUrls;
    private final Set<ConnectorJarIdentifier> connectorJarIdentifiers;
    private final IMap<Object, Object> runningJobStateIMap;
    private CompletableFuture<TaskExecutionState> taskFuture;
    private final IMap<Object, Long[]> runningJobStateTimestampsIMap;
    private final NodeEngine nodeEngine;
    private JobMaster jobMaster;
    private volatile ExecutionState currExecutionState = ExecutionState.CREATED;
    public volatile boolean isRunning = false;
    private AtomicReference<String> errorByPhysicalVertex = new AtomicReference<>();

  }  
public class TaskGroupDefaultImpl implements TaskGroup {
    private final TaskGroupLocation taskGroupLocation;

    private final String taskGroupName;
    // 存储了当前物理节点所需要执行的task
    // 这里的每个task可能是一个读取数据的任务,也可能是一个写入数据的任务
    // 或者是数据拆分,checkpoint的任务等等
    private final Map<Long, Task> tasks;
public PhysicalPlanGenerator(
        @NonNull ExecutionPlan executionPlan,
        @NonNull NodeEngine nodeEngine,
        @NonNull JobImmutableInformation jobImmutableInformation,
        long initializationTimestamp,
        @NonNull ExecutorService executorService,
        @NonNull FlakeIdGenerator flakeIdGenerator,
        @NonNull IMap runningJobStateIMap,
        @NonNull IMap runningJobStateTimestampsIMap,
        @NonNull QueueType queueType) {
    this.pipelines = executionPlan.getPipelines();
    this.nodeEngine = nodeEngine;
    this.jobImmutableInformation = jobImmutableInformation;
    this.initializationTimestamp = initializationTimestamp;
    this.executorService = executorService;
    this.flakeIdGenerator = flakeIdGenerator;
    // the checkpoint of a pipeline
    this.pipelineTasks = new HashSet<>();
    this.startingTasks = new HashSet<>();
    this.subtaskActions = new HashMap<>();
    this.runningJobStateIMap = runningJobStateIMap;
    this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
    this.queueType = queueType;
}

public Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> generate() {
    // 获取用户配置中的节点过滤条件,用于选择任务将要运行的节点
    Map<String, String> tagFilter =
            (Map<String, String>)
                    jobImmutableInformation
                            .getJobConfig()
                            .getEnvOptions()
                            .get(EnvCommonOptions.NODE_TAG_FILTER.key());
    // TODO Determine which tasks do not need to be restored according to state
    CopyOnWriteArrayList<PassiveCompletableFuture<PipelineStatus>>
            waitForCompleteBySubPlanList = new CopyOnWriteArrayList<>();

    Map<Integer, CheckpointPlan> checkpointPlans = new HashMap<>();
    final int totalPipelineNum = pipelines.size();
    Stream<SubPlan> subPlanStream =
            pipelines.stream()
                    .map(
                            pipeline -> {
                                // 每次都将状态清空
                                this.pipelineTasks.clear();
                                this.startingTasks.clear();
                                this.subtaskActions.clear();
                                final int pipelineId = pipeline.getId();
                                // 获取当前任务的信息
                                final List<ExecutionEdge> edges = pipeline.getEdges();
                                // 获取所有的SourceAction
                                List<SourceAction<?, ?, ?>> sources = findSourceAction(edges);
                                // 生成Source数据切片任务,即SourceSplitEnumeratorTask,
                                // 这个任务会调用连接器中的SourceSplitEnumerator类,如果该连接器支持的话
                                List<PhysicalVertex> coordinatorVertexList =
                                        getEnumeratorTask(
                                                sources, pipelineId, totalPipelineNum);
                                // 生成Sink提交任务,即SinkAggregatedCommitterTask
                                // 这个任务会调用连接器中的SinkAggregatedCommitter类,如果该连接器支持的话
                                // 这两个任务是作为协调任务来执行的
                                coordinatorVertexList.addAll(
                                        getCommitterTask(edges, pipelineId, totalPipelineNum));
                                List<PhysicalVertex> physicalVertexList =
                                        getSourceTask(
                                                edges, sources, pipelineId, totalPipelineNum);
                                //
                                physicalVertexList.addAll(
                                        getShuffleTask(edges, pipelineId, totalPipelineNum));

                                CompletableFuture<PipelineStatus> pipelineFuture =
                                        new CompletableFuture<>();
                                waitForCompleteBySubPlanList.add(
                                        new PassiveCompletableFuture<>(pipelineFuture));
                                // 添加checkpoint的任务
                                checkpointPlans.put(
                                        pipelineId,
                                        CheckpointPlan.builder()
                                                .pipelineId(pipelineId)
                                                .pipelineSubtasks(pipelineTasks)
                                                .startingSubtasks(startingTasks)
                                                .pipelineActions(pipeline.getActions())
                                                .subtaskActions(subtaskActions)
                                                .build());
                                return new SubPlan(
                                        pipelineId,
                                        totalPipelineNum,
                                        initializationTimestamp,
                                        physicalVertexList,
                                        coordinatorVertexList,
                                        jobImmutableInformation,
                                        executorService,
                                        runningJobStateIMap,
                                        runningJobStateTimestampsIMap,
                                        tagFilter);
                            });

    PhysicalPlan physicalPlan =
            new PhysicalPlan(
                    subPlanStream.collect(Collectors.toList()),
                    executorService,
                    jobImmutableInformation,
                    initializationTimestamp,
                    runningJobStateIMap,
                    runningJobStateTimestampsIMap);
    return Tuple2.tuple2(physicalPlan, checkpointPlans);
}

生成物理计划的过程就是去将执行计划转换成SeaTunnelTask,并且在执行过程中添加各种协调任务,例如数据切分任务,数据提交任务,checkpoint任务。

SeaTunnelTask中,会将任务转换成SourceFlowLifeCycleSinkFlowLifeCycleTransformFlowLifeCycleShuffleSinkFlowLifeCycleShuffleSourceFlowLifeCycle

我们以SourceFlowLifeCycleSinkFlowLifeCycle为例

  • SourceFlowLifeCycle
    @Override
    public void init() throws Exception {
      this.splitSerializer = sourceAction.getSource().getSplitSerializer();
      this.reader =
              sourceAction
                      .getSource()
                      .createReader(
                              new SourceReaderContext(
                                      indexID,
                                      sourceAction.getSource().getBoundedness(),
                                      this,
                                      metricsContext,
                                      eventListener));
      this.enumeratorTaskAddress = getEnumeratorTaskAddress();
    }
    

@Override public void open() throws Exception { reader.open(); register(); }

public void collect() throws Exception { if (!prepareClose) { if (schemaChanging()) { log.debug("schema is changing, stop reader collect records");

        Thread.sleep(200);
        return;
    }

    reader.pollNext(collector);
    if (collector.isEmptyThisPollNext()) {
        Thread.sleep(100);
    } else {
        collector.resetEmptyThisPollNext();
        /**
         * The current thread obtain a checkpoint lock in the method {@link
         * SourceReader#pollNext(Collector)}. When trigger the checkpoint or savepoint,
         * other threads try to obtain the lock in the method {@link
         * SourceFlowLifeCycle#triggerBarrier(Barrier)}. When high CPU load, checkpoint
         * process may be blocked as long time. So we need sleep to free the CPU.
         */
        Thread.sleep(0L);
    }

    if (collector.captureSchemaChangeBeforeCheckpointSignal()) {
        if (schemaChangePhase.get() != null) {
            throw new IllegalStateException(
                    "previous schema changes in progress, schemaChangePhase: "
                            + schemaChangePhase.get());
        }
        schemaChangePhase.set(SchemaChangePhase.createBeforePhase());
        runningTask.triggerSchemaChangeBeforeCheckpoint().get();
        log.info("triggered schema-change-before checkpoint, stopping collect data");
    } else if (collector.captureSchemaChangeAfterCheckpointSignal()) {
        if (schemaChangePhase.get() != null) {
            throw new IllegalStateException(
                    "previous schema changes in progress, schemaChangePhase: "
                            + schemaChangePhase.get());
        }
        schemaChangePhase.set(SchemaChangePhase.createAfterPhase());
        runningTask.triggerSchemaChangeAfterCheckpoint().get();
        log.info("triggered schema-change-after checkpoint, stopping collect data");
    }
} else {
    Thread.sleep(100);
}

}


可以看到Source的数据读取,是在`SourceFlowLifeCycle`的`collect`方法中被真正的调用,

数据读取到之后,会放入`SeaTunnelSourceCollector`中,在这个collector中,当接收到数据时,会进行指标的更新,并将数据发送给相关的下游

```Java
@Override
public void collect(T row) {
    try {
        if (row instanceof SeaTunnelRow) {
            String tableId = ((SeaTunnelRow) row).getTableId();
            int size;
            if (rowType instanceof SeaTunnelRowType) {
                size = ((SeaTunnelRow) row).getBytesSize((SeaTunnelRowType) rowType);
            } else if (rowType instanceof MultipleRowType) {
                size = ((SeaTunnelRow) row).getBytesSize(rowTypeMap.get(tableId));
            } else {
                throw new SeaTunnelEngineException(
                        "Unsupported row type: " + rowType.getClass().getName());
            }
            sourceReceivedBytes.inc(size);
            sourceReceivedBytesPerSeconds.markEvent(size);
            flowControlGate.audit((SeaTunnelRow) row);
            if (StringUtils.isNotEmpty(tableId)) {
                String tableName = getFullName(TablePath.of(tableId));
                Counter sourceTableCounter = sourceReceivedCountPerTable.get(tableName);
                if (Objects.nonNull(sourceTableCounter)) {
                    sourceTableCounter.inc();
                } else {
                    Counter counter =
                            metricsContext.counter(SOURCE_RECEIVED_COUNT + "#" + tableName);
                    counter.inc();
                    sourceReceivedCountPerTable.put(tableName, counter);
                }
            }
        }
        sendRecordToNext(new Record<>(row));
        emptyThisPollNext = false;
        sourceReceivedCount.inc();
        sourceReceivedQPS.markEvent();
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

public void sendRecordToNext(Record<?> record) throws IOException {
    synchronized (checkpointLock) {
        for (OneInputFlowLifeCycle<Record<?>> output : outputs) {
            output.received(record);
        }
    }
}
  • SinkFlowLifeCycle

    @Override
    public void received(Record<?> record) {
      try {
          if (record.getData() instanceof Barrier) {
              long startTime = System.currentTimeMillis();
    
              Barrier barrier = (Barrier) record.getData();
              if (barrier.prepareClose(this.taskLocation)) {
                  prepareClose = true;
              }
              if (barrier.snapshot()) {
                  try {
                      lastCommitInfo = writer.prepareCommit();
                  } catch (Exception e) {
                      writer.abortPrepare();
                      throw e;
                  }
                  List<StateT> states = writer.snapshotState(barrier.getId());
                  if (!writerStateSerializer.isPresent()) {
                      runningTask.addState(
                              barrier, ActionStateKey.of(sinkAction), Collections.emptyList());
                  } else {
                      runningTask.addState(
                              barrier,
                              ActionStateKey.of(sinkAction),
                              serializeStates(writerStateSerializer.get(), states));
                  }
                  if (containAggCommitter) {
                      CommitInfoT commitInfoT = null;
                      if (lastCommitInfo.isPresent()) {
                          commitInfoT = lastCommitInfo.get();
                      }
                      runningTask
                              .getExecutionContext()
                              .sendToMember(
                                      new SinkPrepareCommitOperation<CommitInfoT>(
                                              barrier,
                                              committerTaskLocation,
                                              commitInfoSerializer.isPresent()
                                                      ? commitInfoSerializer
                                                              .get()
                                                              .serialize(commitInfoT)
                                                      : null),
                                      committerTaskAddress)
                              .join();
                  }
              } else {
                  if (containAggCommitter) {
                      runningTask
                              .getExecutionContext()
                              .sendToMember(
                                      new BarrierFlowOperation(barrier, committerTaskLocation),
                                      committerTaskAddress)
                              .join();
                  }
              }
              runningTask.ack(barrier);
    
              log.debug(
                      "trigger barrier [{}] finished, cost {}ms. taskLocation [{}]",
                      barrier.getId(),
                      System.currentTimeMillis() - startTime,
                      taskLocation);
          } else if (record.getData() instanceof SchemaChangeEvent) {
              if (prepareClose) {
                  return;
              }
              SchemaChangeEvent event = (SchemaChangeEvent) record.getData();
              writer.applySchemaChange(event);
          } else {
              if (prepareClose) {
                  return;
              }
              writer.write((T) record.getData());
              sinkWriteCount.inc();
              sinkWriteQPS.markEvent();
              if (record.getData() instanceof SeaTunnelRow) {
                  long size = ((SeaTunnelRow) record.getData()).getBytesSize();
                  sinkWriteBytes.inc(size);
                  sinkWriteBytesPerSeconds.markEvent(size);
                  String tableId = ((SeaTunnelRow) record.getData()).getTableId();
                  if (StringUtils.isNotBlank(tableId)) {
                      String tableName = getFullName(TablePath.of(tableId));
                      Counter sinkTableCounter = sinkWriteCountPerTable.get(tableName);
                      if (Objects.nonNull(sinkTableCounter)) {
                          sinkTableCounter.inc();
                      } else {
                          Counter counter =
                                  metricsContext.counter(SINK_WRITE_COUNT + "#" + tableName);
                          counter.inc();
                          sinkWriteCountPerTable.put(tableName, counter);
                      }
                  }
              }
          }
      } catch (Exception e) {
          throw new RuntimeException(e);
      }
    }

同样,在这个类中是真正调用Sink的Writer方法,将数据写入到下游中。

任务执行

CoordinatorService中通过init方法生成了物理计划,然后会再调用run来真正的将任务运行起来。

CoordinatorService {
    jobMaster.init(
            runningJobInfoIMap.get(jobId).getInitializationTimestamp(), false);
    ...
    jobMaster.run();

 }

 JobMaster { 
    public void run() {
        ... 
        physicalPlan.startJob();
        ...
    }
}    

在JobMaster中启动任务,会调用PhysicalPlan的startJob方法

public void startJob() {
    isRunning = true;
    log.info("{} state process is start", getJobFullName());
    stateProcess();
}

private synchronized void stateProcess() {
    if (!isRunning) {
        log.warn(String.format("%s state process is stopped", jobFullName));
        return;
    }
    switch (getJobStatus()) {
        case CREATED:
            updateJobState(JobStatus.SCHEDULED);
            break;
        case SCHEDULED:
            getPipelineList()
                    .forEach(
                            subPlan -> {
                                if (PipelineStatus.CREATED.equals(
                                        subPlan.getCurrPipelineStatus())) {
                                    subPlan.startSubPlanStateProcess();
                                }
                            });
            updateJobState(JobStatus.RUNNING);
            break;
        case RUNNING:
        case DOING_SAVEPOINT:
            break;
        case FAILING:
        case CANCELING:
            jobMaster.neverNeedRestore();
            getPipelineList().forEach(SubPlan::cancelPipeline);
            break;
        case FAILED:
        case CANCELED:
        case SAVEPOINT_DONE:
        case FINISHED:
            stopJobStateProcess();
            jobEndFuture.complete(new JobResult(getJobStatus(), errorBySubPlan.get()));
            return;
        default:
            throw new IllegalArgumentException("Unknown Job State: " + getJobStatus());
    }
}

在PhysicalPlan中,启动任务会将任务的状态更新为SCHEDULED状态,然后会继续调用SubPlan的启动方法。

public void startSubPlanStateProcess() {
    isRunning = true;
    log.info("{} state process is start", getPipelineFullName());
    stateProcess();
}

private synchronized void stateProcess() {
    if (!isRunning) {
        log.warn(String.format("%s state process not start", pipelineFullName));
        return;
    }
    PipelineStatus state = getCurrPipelineStatus();
    switch (state) {
        case CREATED:
            updatePipelineState(PipelineStatus.SCHEDULED);
            break;
        case SCHEDULED:
            try {
                ResourceUtils.applyResourceForPipeline(jobMaster.getResourceManager(), this);
                log.debug(
                        "slotProfiles: {}, PipelineLocation: {}",
                        slotProfiles,
                        this.getPipelineLocation());
                updatePipelineState(PipelineStatus.DEPLOYING);
            } catch (Exception e) {
                makePipelineFailing(e);
            }
            break;
        case DEPLOYING:
            coordinatorVertexList.forEach(
                    task -> {
                        if (task.getExecutionState().equals(ExecutionState.CREATED)) {
                            task.startPhysicalVertex();
                            task.makeTaskGroupDeploy();
                        }
                    });

            physicalVertexList.forEach(
                    task -> {
                        if (task.getExecutionState().equals(ExecutionState.CREATED)) {
                            task.startPhysicalVertex();
                            task.makeTaskGroupDeploy();
                        }
                    });
            updatePipelineState(PipelineStatus.RUNNING);
            break;
        case RUNNING:
            break;
        case FAILING:
        case CANCELING:
            coordinatorVertexList.forEach(
                    task -> {
                        task.startPhysicalVertex();
                        task.cancel();
                    });

            physicalVertexList.forEach(
                    task -> {
                        task.startPhysicalVertex();
                        task.cancel();
                    });
            break;
        case FAILED:
        case CANCELED:
            if (checkNeedRestore(state) && prepareRestorePipeline()) {
                jobMaster.releasePipelineResource(this);
                restorePipeline();
                return;
            }
            subPlanDone(state);
            stopSubPlanStateProcess();
            pipelineFuture.complete(
                    new PipelineExecutionState(pipelineId, state, errorByPhysicalVertex.get()));
            return;
        case FINISHED:
            subPlanDone(state);
            stopSubPlanStateProcess();
            pipelineFuture.complete(
                    new PipelineExecutionState(
                            pipelineId, getPipelineState(), errorByPhysicalVertex.get()));
            return;
        default:
            throw new IllegalArgumentException("Unknown Pipeline State: " + getPipelineState());
    }
}

在SubPlan中,当状态为SCHEDULED时,会进行资源的申请,

public static void applyResourceForPipeline(
        @NonNull ResourceManager resourceManager, @NonNull SubPlan subPlan) {
    Map<TaskGroupLocation, CompletableFuture<SlotProfile>> futures = new HashMap<>();
    Map<TaskGroupLocation, SlotProfile> slotProfiles = new HashMap<>();
    // TODO If there is no enough resources for tasks, we need add some wait profile
    subPlan.getCoordinatorVertexList()
            .forEach(
                    coordinator ->
                            futures.put(
                                    coordinator.getTaskGroupLocation(),
                                    applyResourceForTask(
                                            resourceManager, coordinator, subPlan.getTags())));

    subPlan.getPhysicalVertexList()
            .forEach(
                    task ->
                            futures.put(
                                    task.getTaskGroupLocation(),
                                    applyResourceForTask(
                                            resourceManager, task, subPlan.getTags())));

    futures.forEach(
            (key, value) -> {
                try {
                    slotProfiles.put(key, value == null ? null : value.join());
                } catch (CompletionException e) {
                    // do nothing
                }
            });
    // set it first, avoid can't get it when get resource not enough exception and need release
    // applied resource
    subPlan.getJobMaster().setOwnedSlotProfiles(subPlan.getPipelineLocation(), slotProfiles);
    if (futures.size() != slotProfiles.size()) {
        throw new NoEnoughResourceException();
    }
}

public static CompletableFuture<SlotProfile> applyResourceForTask(
        ResourceManager resourceManager, PhysicalVertex task, Map<String, String> tags) {
    // TODO custom resource size
    return resourceManager.applyResource(
            task.getTaskGroupLocation().getJobId(), new ResourceProfile(), tags);
}


public CompletableFuture<List<SlotProfile>> applyResources(
        long jobId, List<ResourceProfile> resourceProfile, Map<String, String> tagFilter)
        throws NoEnoughResourceException {
    waitingWorkerRegister();
    ConcurrentMap<Address, WorkerProfile> matchedWorker = filterWorkerByTag(tagFilter);
    if (matchedWorker.isEmpty()) {
        log.error("No matched worker with tag filter {}.", tagFilter);
        throw new NoEnoughResourceException();
    }
    return new ResourceRequestHandler(jobId, resourceProfile, matchedWorker, this)
            .request(tagFilter);
}

在一个SubPlan中会将所有的任务进行资源的申请,申请资源是通过ResourceManager进行的。申请时首先会按照用户任务中设置的tag来选择将要运行任务的节点,这样就可以将任务运行在我们指定的节点上,达到资源隔离的目的。

public Optional<WorkerProfile> preCheckWorkerResource(ResourceProfile r) {
    // Shuffle the order to ensure random selection of workers
    List<WorkerProfile> workerProfiles =
            Arrays.asList(registerWorker.values().toArray(new WorkerProfile[0]));
    Collections.shuffle(workerProfiles);
    // Check if there are still unassigned slots
    Optional<WorkerProfile> workerProfile =
            workerProfiles.stream()
                    .filter(
                            worker ->
                                    Arrays.stream(worker.getUnassignedSlots())
                                            .anyMatch(
                                                    slot ->
                                                            slot.getResourceProfile()
                                                                    .enoughThan(r)))
                    .findAny();

    if (!workerProfile.isPresent()) {
        // Check if there are still unassigned resources
        workerProfile =
                workerProfiles.stream()
                        .filter(WorkerProfile::isDynamicSlot)
                        .filter(worker -> worker.getUnassignedResource().enoughThan(r))
                        .findAny();
    }

    return workerProfile;
}

private CompletableFuture<SlotAndWorkerProfile> singleResourceRequestToMember(
        int i, ResourceProfile r, WorkerProfile workerProfile) {
    CompletableFuture<SlotAndWorkerProfile> future =
            resourceManager.sendToMember(
                    new RequestSlotOperation(jobId, r), workerProfile.getAddress());
    return future.whenComplete(
            withTryCatch(
                    LOGGER,
                    (slotAndWorkerProfile, error) -> {
                        if (error != null) {
                            throw new RuntimeException(error);
                        } else {
                            resourceManager.heartbeat(slotAndWorkerProfile.getWorkerProfile());
                            addSlotToCacheMap(i, slotAndWorkerProfile.getSlotProfile());
                        }
                    }));
}

当拿到全部可用节点后,会将节点先打乱,然后再随机查找一个可用资源比所需资源大的节点,随即与该节点通信,发送RequestSlotOperation给该节点

@Override
public synchronized SlotAndWorkerProfile requestSlot(
        long jobId, ResourceProfile resourceProfile) {
    initStatus = false;
    SlotProfile profile = selectBestMatchSlot(resourceProfile);
    if (profile != null) {
        profile.assign(jobId);
        assignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::merge);
        unassignedResource.accumulateAndGet(
                profile.getResourceProfile(), ResourceProfile::subtract);
        unassignedSlots.remove(profile.getSlotID());
        assignedSlots.put(profile.getSlotID(), profile);
        contexts.computeIfAbsent(
                profile.getSlotID(),
                p -> new SlotContext(profile.getSlotID(), taskExecutionService));
    }
    LOGGER.fine(
            String.format(
                    "received slot request, jobID: %d, resource profile: %s, return: %s",
                    jobId, resourceProfile, profile));
    return new SlotAndWorkerProfile(getWorkerProfile(), profile);
}

该节点的SlotService中接收到requestSlot请求后,会将自身信息进行更新,然后返回给master节点信息。 在请求资源的过程中,如果最终请求的资源没有达到预期结果,会得到NoEnoughResourceException异常,任务运行失败。 当资源请求成功后,会开始进行任务的部署,task.makeTaskGroupDeploy()将任务发送到worker节点上来运行任务

TaskDeployState deployState =
        deploy(jobMaster.getOwnedSlotProfiles(taskGroupLocation));


public TaskDeployState deploy(@NonNull SlotProfile slotProfile) {
    try {
        if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {
            return deployOnLocal(slotProfile);
        } else {
            return deployOnRemote(slotProfile);
        }
    } catch (Throwable th) {
        return TaskDeployState.failed(th);
    }
}


private TaskDeployState deployOnRemote(@NonNull SlotProfile slotProfile) {
    return deployInternal(
            taskGroupImmutableInformation -> {
                try {
                    return (TaskDeployState)
                            NodeEngineUtil.sendOperationToMemberNode(
                                            nodeEngine,
                                            new DeployTaskOperation(
                                                    slotProfile,
                                                    nodeEngine
                                                            .getSerializationService()
                                                            .toData(
                                                                    taskGroupImmutableInformation)),
                                            slotProfile.getWorker())
                                    .get();
                } catch (Exception e) {
                    if (getExecutionState().isEndState()) {
                        log.warn(ExceptionUtils.getMessage(e));
                        log.warn(
                                String.format(
                                        "%s deploy error, but the state is already in end state %s, skip this error",
                                        getTaskFullName(), currExecutionState));
                        return TaskDeployState.success();
                    } else {
                        return TaskDeployState.failed(e);
                    }
                }
            });
}

部署任务时,会将任务信息发送到刚刚在资源分配时获取到的节点上

public TaskDeployState deployTask(@NonNull Data taskImmutableInformation) {
    TaskGroupImmutableInformation taskImmutableInfo =
            nodeEngine.getSerializationService().toObject(taskImmutableInformation);
    return deployTask(taskImmutableInfo);
}

public TaskDeployState deployTask(@NonNull TaskGroupImmutableInformation taskImmutableInfo) {
    logger.info(
            String.format(
                    "received deploying task executionId [%s]",
                    taskImmutableInfo.getExecutionId()));
    TaskGroup taskGroup = null;
    try {
        Set<ConnectorJarIdentifier> connectorJarIdentifiers =
                taskImmutableInfo.getConnectorJarIdentifiers();
        Set<URL> jars = new HashSet<>();
        ClassLoader classLoader;
        if (!CollectionUtils.isEmpty(connectorJarIdentifiers)) {
            // Prioritize obtaining the jar package file required for the current task execution
            // from the local, if it does not exist locally, it will be downloaded from the
            // master node.
            jars =
                    serverConnectorPackageClient.getConnectorJarFromLocal(
                            connectorJarIdentifiers);
        } else if (!CollectionUtils.isEmpty(taskImmutableInfo.getJars())) {
            jars = taskImmutableInfo.getJars();
        }
        classLoader =
                classLoaderService.getClassLoader(
                        taskImmutableInfo.getJobId(), Lists.newArrayList(jars));
        if (jars.isEmpty()) {
            taskGroup =
                    nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
        } else {
            taskGroup =
                    CustomClassLoadedObject.deserializeWithCustomClassLoader(
                            nodeEngine.getSerializationService(),
                            classLoader,
                            taskImmutableInfo.getGroup());
        }

        logger.info(
                String.format(
                        "deploying task %s, executionId [%s]",
                        taskGroup.getTaskGroupLocation(), taskImmutableInfo.getExecutionId()));

        synchronized (this) {
            if (executionContexts.containsKey(taskGroup.getTaskGroupLocation())) {
                throw new RuntimeException(
                        String.format(
                                "TaskGroupLocation: %s already exists",
                                taskGroup.getTaskGroupLocation()));
            }
            deployLocalTask(taskGroup, classLoader, jars);
            return TaskDeployState.success();
        }
    } catch (Throwable t) {
        logger.severe(
                String.format(
                        "TaskGroupID : %s  deploy error with Exception: %s",
                        taskGroup != null && taskGroup.getTaskGroupLocation() != null
                                ? taskGroup.getTaskGroupLocation().toString()
                                : "taskGroupLocation is null",
                        ExceptionUtils.getMessage(t)));
        return TaskDeployState.failed(t);
    }
}

当worker节点接收到任务后,会调用TaskExecutionServicedeployTask方法将任务提交到启动时创建的线程池中。

当任务提交到线程池中

private final class BlockingWorker implements Runnable {

    private final TaskTracker tracker;
    private final CountDownLatch startedLatch;

    private BlockingWorker(TaskTracker tracker, CountDownLatch startedLatch) {
        this.tracker = tracker;
        this.startedLatch = startedLatch;
    }

    @Override
    public void run() {
        TaskExecutionService.TaskGroupExecutionTracker taskGroupExecutionTracker =
                tracker.taskGroupExecutionTracker;
        ClassLoader classLoader =
                executionContexts
                        .get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation())
                        .getClassLoader();
        ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(classLoader);
        final Task t = tracker.task;
        ProgressState result = null;
        try {
            startedLatch.countDown();
            t.init();
            do {
                result = t.call();
            } while (!result.isDone()
                    && isRunning
                    && !taskGroupExecutionTracker.executionCompletedExceptionally());
        ...
    }
}

会调用Task.call 方法,从而数据同步的任务会真正的被调用起来。

ClassLoader

在SeaTunnel中,修改了默认的ClassLoader的类,修改为子类优先,从而避免了与其他组件类冲突的问题

@Override
public synchronized ClassLoader getClassLoader(long jobId, Collection<URL> jars) {
    log.debug("Get classloader for job {} with jars {}", jobId, jars);
    if (cacheMode) {
        // with cache mode, all jobs share the same classloader if the jars are the same
        jobId = 1L;
    }
    if (!classLoaderCache.containsKey(jobId)) {
        classLoaderCache.put(jobId, new ConcurrentHashMap<>());
        classLoaderReferenceCount.put(jobId, new ConcurrentHashMap<>());
    }
    Map<String, ClassLoader> classLoaderMap = classLoaderCache.get(jobId);
    String key = covertJarsToKey(jars);
    if (classLoaderMap.containsKey(key)) {
        classLoaderReferenceCount.get(jobId).get(key).incrementAndGet();
        return classLoaderMap.get(key);
    } else {
        ClassLoader classLoader = new SeaTunnelChildFirstClassLoader(jars);
        log.info("Create classloader for job {} with jars {}", jobId, jars);
        classLoaderMap.put(key, classLoader);
        classLoaderReferenceCount.get(jobId).put(key, new AtomicInteger(1));
        return classLoader;
    }
}

RestAPI任务提交

SeaTunnel也支持RestAPI的提交方式,当需要此功能时,首先需要在hazelcast.yaml文件中添加这样一段配置

network:
    rest-api:
      enabled: true
      endpoint-groups:
        CLUSTER_WRITE:
          enabled: true
        DATA:
          enabled: true

当添加这样一段配置后,hazelcast节点启动后就可以接收http请求了

我们同样以提交任务为例,看下执行流程。

当我们使用RestAPI来提交任务时,客户端的就变成了我们发送http请求的节点,服务端就是seatunnel集群。

当服务端接收到请求后,会根据请求的链接,调用相应的方法

public void handle(HttpPostCommand httpPostCommand) {
    String uri = httpPostCommand.getURI();
    try {
        if (uri.startsWith(SUBMIT_JOB_URL)) {
            handleSubmitJob(httpPostCommand, uri);
        } else if (uri.startsWith(STOP_JOB_URL)) {
            handleStopJob(httpPostCommand, uri);
        } else if (uri.startsWith(ENCRYPT_CONFIG)) {
            handleEncrypt(httpPostCommand);
        } else {
            original.handle(httpPostCommand);
        }
    } catch (IllegalArgumentException e) {
        prepareResponse(SC_400, httpPostCommand, exceptionResponse(e));
    } catch (Throwable e) {
        logger.warning("An error occurred while handling request " + httpPostCommand, e);
        prepareResponse(SC_500, httpPostCommand, exceptionResponse(e));
    }

    this.textCommandService.sendResponse(httpPostCommand);
}

会根据路径来查找相应的方法

private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri)
        throws IllegalArgumentException {
    Map<String, String> requestParams = new HashMap<>();
    RestUtil.buildRequestParams(requestParams, uri);
    Config config = RestUtil.buildConfig(requestHandle(httpPostCommand), false);
    ReadonlyConfig envOptions = ReadonlyConfig.fromConfig(config.getConfig("env"));
    String jobName = envOptions.get(EnvCommonOptions.JOB_NAME);

    JobConfig jobConfig = new JobConfig();
    jobConfig.setName(
            StringUtils.isEmpty(requestParams.get(RestConstant.JOB_NAME))
                    ? jobName
                    : requestParams.get(RestConstant.JOB_NAME));

    boolean startWithSavePoint =
            Boolean.parseBoolean(requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT));
    String jobIdStr = requestParams.get(RestConstant.JOB_ID);
    Long finalJobId = StringUtils.isNotBlank(jobIdStr) ? Long.parseLong(jobIdStr) : null;
    SeaTunnelServer seaTunnelServer = getSeaTunnelServer();
    RestJobExecutionEnvironment restJobExecutionEnvironment =
            new RestJobExecutionEnvironment(
                    seaTunnelServer,
                    jobConfig,
                    config,
                    textCommandService.getNode(),
                    startWithSavePoint,
                    finalJobId);
    JobImmutableInformation jobImmutableInformation = restJobExecutionEnvironment.build();
    long jobId = jobImmutableInformation.getJobId();
    if (!seaTunnelServer.isMasterNode()) {

        NodeEngineUtil.sendOperationToMasterNode(
                        getNode().nodeEngine,
                        new SubmitJobOperation(
                                jobId,
                                getNode().nodeEngine.toData(jobImmutableInformation),
                                jobImmutableInformation.isStartWithSavePoint()))
                .join();

    } else {
        submitJob(seaTunnelServer, jobImmutableInformation, jobConfig);
    }

    this.prepareResponse(
            httpPostCommand,
            new JsonObject()
                    .add(RestConstant.JOB_ID, String.valueOf(jobId))
                    .add(RestConstant.JOB_NAME, jobConfig.getName()));
}

这里的逻辑与客户端差不多,由于没有local模式,那么就不需要去创建本地服务了。

在客户端我们会通过ClientJobExecutionEnvironment这个类来进行逻辑计划解析等操作,同样这样也有一个RestJobExecutionEnvironment来做同样的事情。

最终提交任务时,如果当前节点非master节点,那么就会向master节点发送信息,master节点接收到信息后与从命令行客户端接收信息的处理逻辑就一致了。

如果当前节点是master节点,会直接调用submitJob方法,这里直接调用了coordinatorService.submitJob方法进行后续的处理。

private void submitJob(
        SeaTunnelServer seaTunnelServer,
        JobImmutableInformation jobImmutableInformation,
        JobConfig jobConfig) {
    CoordinatorService coordinatorService = seaTunnelServer.getCoordinatorService();
    Data data =
            textCommandService
                    .getNode()
                    .nodeEngine
                    .getSerializationService()
                    .toData(jobImmutableInformation);
    PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
            coordinatorService.submitJob(
                    Long.parseLong(jobConfig.getJobContext().getJobId()),
                    data,
                    jobImmutableInformation.isStartWithSavePoint());
    voidPassiveCompletableFuture.join();
}

可以看出,两种提交方式,都是会在提交任务的一端做逻辑计划解析,然后将信息发送给master节点,再由master节点做任务的物理计划解析,分配等操作。

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/879353.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

linux文件系统权限详解

注:目录的执行权限代表是否可以进入。 一、文件权限控制对文件的访问: 可以针对文件所属用户、所属组和其他用户可以设置不同的权限 权限具有优先级。user权限覆盖group权限,后者覆盖other权限。 有三种权限类别:读取、写入和执行 读权限:对文件:可读取文件…

[SAP ABAP] 修改内表数据

1.利用关键字修改数据 语法格式 MODIFY TABLE <itab> FTOM <wa> [TRANSPORTING f1 f2...].<itab>&#xff1a;代表内表 <wa>&#xff1a;代表工作区 示例1 内表修改前的数据 将上述数据行中的AGE字段值更改为25&#xff0c;SEX字段值更改为女 输出结…

5.基础漏洞——文件上传漏洞

目录 一.文件上传漏洞原理 二.文件上传漏洞条件&#xff1a; 三.上传限制手段分为两大类 (1)客户端校验 (2)服务端校验 四.具体实现 1.文件上传漏洞——绕过JS检测 2.文件上传漏洞——绕过MIME类型检测 3.文件上传漏洞——绕过黑名单检测 绕过方式:(1) 绕过方式:(2) …

城市脉络下的空间句法:整合度与选择度的深度解析

上回写过一篇&#xff0c;基于空间句法的路网整合度、选择度分析&#xff0c;当时碍于篇幅和侧重点&#xff0c;主要讲了如何安装sDNA这个插件来实现路网的整合度、选择度分析&#xff0c;并且分析部分也只是画了几条简单的线段&#xff0c;这次我们深化一下原理和指标的解析&a…

二十种编程语言庆祝中秋节

二十种编程语言庆祝中秋节 文章目录 二十种编程语言庆祝中秋节中秋快乐&#xff01;家人们 &#x1f973;一 Python二 C三 C四 Java五 C#六 Perl七 Go八 Asp九 PHP十 JavaScript十一 JavaScript HTML十二 Visual Basic十三 早期 VB十四 Visual C十五 Delphi十六 Shell十七 Cobo…

Codeforces practice C++ 2024/9/11 - 2024/9/18

D. Mathematical Problem Codeforces Round 954 (Div. 3) 原题链接&#xff1a;https://codeforces.com/contest/1986/problem/D 题目标签分类&#xff1a;brute force&#xff0c;dp&#xff0c;greedy&#xff0c;implementation&#xff0c;math&#xff0c;two pointers…

svn回退到以前历史版本修改并上传

svn回退到以前版本&#xff0c;并在以前版本上修改代码后&#xff0c;上传到svn库当中&#xff0c;如下步骤&#xff1a; 3、 以回退到版本号4为例&#xff1a;选中版本号4&#xff0c;右键->Revert to this version,在出现的对话框中 点击yes&#xff01; 4、 5、

【C++ Primer Plus习题】16.8

大家好,这里是国中之林! ❥前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。有兴趣的可以点点进去看看← 问题: 解答: main.cpp #include <iostream> #include <set> #includ…

矩阵分析 学习笔记3 多项式矩阵 jordan标准型

多项式矩阵 就是说这个矩阵里面的各个元素都是多项式&#xff0c;多项式的主角是类目大&#xff08;自变量&#xff09;。 多项式矩阵的秩 0多项式就是完全0的那种&#xff0c;就一个0&#xff0c;类目大都没有了。 多项式矩阵的秩和带一个类目大进去变成普通矩阵的秩不是一回…

深度学习|损失函数:网络参数优化基准

文章目录 引言均方误差计算示例矩阵形式代码实现 交叉熵误差计算示例代码实现 绝对误差计算示例代码实现 Hinge Loss计算示例代码实现 Kullback-Leibler Divergence计算示例代码实现 结语 引言 在上文「深度学习&#xff5c;模型训练&#xff1a;手写 SimpleNet」中&#xff0…

十款主流的供应链管理系统盘点,优缺点一目了然!

本文将盘点十款供应链管理系统&#xff0c;为企业选型提供参考&#xff01; 想象一下&#xff0c;一家企业在生产和销售产品的过程中&#xff0c;原材料供应不及时、库存积压严重、物流配送混乱。这时&#xff0c;供应链管理系统就如同一位高效的指挥家&#xff0c;将各个环节紧…

离散制造 vs 流程制造:锚定精准制造未来,从装配线到化学反应,实时数据集成在制造业案例中的多维应用

使用 TapData&#xff0c;化繁为简&#xff0c;摆脱手动搭建、维护数据管道的诸多烦扰&#xff0c;轻量替代 OGG, Kettle 等同步工具&#xff0c;以及基于 Kafka 的 ETL 解决方案&#xff0c;「CDC 流处理 数据集成」组合拳&#xff0c;加速仓内数据流转&#xff0c;帮助企业…

Linux权限理解【Shell的理解】【linux权限的概念、管理、切换】【粘滞位理解】

目录 Linux权限理解1.Xshell命令以及运行原理2.linux权限的学习2.1linux权限的切换2.2linux权限的概念2.3linux权限管理2.3.1linux中文件访问者的分类2.3.2文件类型和访问权限(文件属性)2.3.2.1文件类型2.3.2.2文件权限拓展—文件的起始权限 2.3.3文件权限管理2.3.4文件权限的应…

腾讯云Ubuntu系统安装宝塔,配置Java环境,运行spring boot项目

致谢 本次学习宝塔部署spring boot项目&#xff0c;参考如下资料 https://www.cnblogs.com/daen/p/15997872.html 系统安装宝塔 直接用的腾讯云云服务器面板上的登录&#xff0c;你可以换成 xshell 进入宝塔官网&#xff1a; https://www.bt.cn/new/download.html 我们采…

跟《经济学人》学英文:2024年09月14日这期 Volunteering has big benefits for the elderly

Volunteering has big benefits for the elderly But those Britons who would most benefit are least likely to do it 原文&#xff1a; THE CROSSNESS Pumping Station is not what you’d expect of a sewage works. With its spiral staircases, colourful tiling and…

Sapiens——人类视觉大模型的基础

引言 大规模预训练以及随后针对特定任务的语言建模微调取得了显著成功&#xff0c;已将这种方法确立为标准做法。同样&#xff0c; 计算机视觉方法正逐步采用大规模数据进行预训练。LAION5B、Instagram-3.5B、JFT-300M、LVD142M、Visual Genome 和 YFCC100M 等大型数据集的出现…

计算机毕业设计 健身房管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

Linux--守护进程与会话

进程组 概念 进程组就是一个或多个进程的集合。 一个进程组可以包含多个进程。 下面我们通过一句简单的命令行来展示&#xff1a; 为什么会有进程组&#xff1f; 批量操作&#xff1a;进程组允许将多个进程组织在一起&#xff0c;形成一个逻辑上的整体。当需要对多个进程…

Matlab如何配置小波工具(Wavelet Toolbox)

1、发现问题 因为实验要使用小波工具函数&#xff0c;运行时报错如下&#xff1a; 查看对应文件夹发现没有小波工具&#xff08;也可在控制台输入ver&#xff09;&#xff0c;检查是否有该工具&#xff0c;输入后回车返回如下&#xff1a; 2、下载工具包 没有这个工具就要去下…

【浅水模型MATLAB】尝试复刻SCI论文中的溃坝流算例

【浅水模型MATLAB】尝试复刻SCI论文中的溃坝流算例 前言问题描述控制方程及数值方法浅水方程及其数值计算方法边界条件的实现 代码框架与关键代码模拟结果 更新于2024年9月17日 前言 这篇博客算是学习浅水方程&#xff0c;并利用MATLAB复刻Liang (2004)1中溃坝流算例的一个记录…