BuildFarm Worker 简要分析

更多BuildFarm/Bazel/Remote Execution API的文章见我的个人博客:

  • Bazel 报错:/tmp/external/gcc_toolchain_x86_64_files/bin/x86_64-linux-gcc: No such file or directory 记录
  • Bazel 编译 java 代码为独立运行的 jar 包的方法
  • BuildFarm Server 简要分析
  • BuildFarm Worker 简要分析
  • BuildFarm Worker 配置自定义镜像
  • Buildfarm 任务队列简要分析
  • BuildFarm 低版本遇到的 re api 兼容性问题
  • ByteStream API (proto version) 解析
  • Logstream API (go version) 解析
  • Remote Execution API (go version) 解析
  • find 查找 Bazel 构建覆盖率文件的一个☝️坑

仅关注任务队列和具体执行

worker 官方文档:https://bazelbuild.github.io/bazel-buildfarm/docs/architecture/workers/

启动

启动流程

main() -> SpringApplication.run(Worker.class, args)
   -> @PostConstruct init() -> start()
      -> 检查和加载配置
      -> 初始化 DigestUtil、Backplane、CAS、ExecFileSystem
      -> 创建 gRPC Server 并启动
      -> 开始 Pipeline 和 Failsafe Registration

backplane

backplane.start设置任务执行服务和 Redis 客户端,并启动订阅线程和 FAILSAFE 线程。

    if (SHARD.equals(configs.getBackplane().getType())) {
      backplane =
          new RedisShardBackplane(identifier, this::stripOperation, this::stripQueuedOperation);
      backplane.start(configs.getWorker().getPublicName());
    } else {
      throw new IllegalArgumentException("Shard Backplane not set in config");
    }
  @Override
  public void start(String clientPublicName) throws IOException {
    // Construct a single redis client to be used throughout the entire backplane.
    // We wish to avoid various synchronous and error handling issues that could occur when using
    // multiple clients.
    client = new RedisClient(jedisClusterFactory.get());
    // Create containers that make up the backplane
    state = DistributedStateCreator.create(client);

    if (configs.getBackplane().isSubscribeToBackplane()) {
      startSubscriptionThread();
    }
    if (configs.getBackplane().isRunFailsafeOperation()) {
      startFailsafeOperationThread();
    }

    // Record client start time
    client.call(
        jedis -> jedis.set("startTime/" + clientPublicName, Long.toString(new Date().getTime())));
  }

下面的函数启动对通道的监听线程,工作通道名为"WorkerChannel",线程为RedisShardSubscription

  private void startSubscriptionThread() {
    ListMultimap<String, TimedWatchFuture> watchers =
        Multimaps.synchronizedListMultimap(
            MultimapBuilder.linkedHashKeys().arrayListValues().build());
    subscriberService = BuildfarmExecutors.getSubscriberPool();
    subscriber =
        new RedisShardSubscriber(
            watchers,
            storageWorkerSet,
            configs.getBackplane().getWorkerChannel(),
            subscriberService);

    operationSubscription =
        new RedisShardSubscription(
            subscriber,
            /* onUnsubscribe=*/ () -> {
              subscriptionThread = null;
              if (onUnsubscribe != null) {
                onUnsubscribe.runInterruptibly();
              }
            },
            /* onReset=*/ this::updateWatchedIfDone,
            /* subscriptions=*/ subscriber::subscribedChannels,
            client);

    // use Executors...
    subscriptionThread = new Thread(operationSubscription, "Operation Subscription");

    subscriptionThread.start();
  }

RedisShardSubscriber:负责订阅消息和处理通道消息。
RedisShardSubscription:处理订阅的生命周期,包括启动、停止和重置等。
在订阅的通道收到任务后,RedisShardSubscription负责调用相应的处理逻辑,其继承了JedisPubSub类,onMessage 方法处理从 Redis 频道接收到的消息。

  • onWorkerMessage 和 onWorkerChange 方法处理工作节点变更消息,包括新增和移除节点。
  • onOperationMessage 和 onOperationChange 方法处理操作变更消息,包括重置和过期操作。
  @Override
  public void onMessage(String channel, String message) {
    if (channel.equals(workerChannel)) {
      onWorkerMessage(message);
    } else {
      onOperationMessage(channel, message);
    }
  }

  void onWorkerMessage(String message) {
    try {
      onWorkerChange(parseWorkerChange(message));
    } catch (InvalidProtocolBufferException e) {
      log.log(Level.INFO, format("invalid worker change message: %s", message), e);
    }
  }

  void onWorkerChange(WorkerChange workerChange) {
    switch (workerChange.getTypeCase()) {
      case TYPE_NOT_SET:
        log.log(
            Level.SEVERE,
            format(
                "WorkerChange oneof type is not set from %s at %s",
                workerChange.getName(), workerChange.getEffectiveAt()));
        break;
      case ADD:
        addWorker(workerChange.getName());
        break;
      case REMOVE:
        removeWorker(workerChange.getName());
        break;
    }
  }

   void onOperationMessage(String channel, String message) {
       try {
           onOperationChange(channel, parseOperationChange(message));
       } catch (InvalidProtocolBufferException e) {
           log.log(
               Level.INFO, format("invalid operation change message for %s: %s", channel, message), e);
       }
   }

   void onOperationChange(String channel, OperationChange operationChange) {
       switch (operationChange.getTypeCase()) {
           case TYPE_NOT_SET:
               log.log(
                   Level.SEVERE,
                   format(
                       "OperationChange oneof type is not set from %s at %s",
                       operationChange.getSource(), operationChange.getEffectiveAt()));
               break;
           case RESET:
               resetOperation(channel, operationChange.getReset());
               break;
           case EXPIRE:
               terminateExpiredWatchers(
                   channel,
                   toInstant(operationChange.getEffectiveAt()),
                   operationChange.getExpire().getForce());
               break;
       }
   }

看到这里,可以看出此处redis并没有用于实际action任务的分配,而是参与了worke调度和任务生命周期的轮转。这不是我们关心的。实际action任务涉及的代码还要进一步分析。

pipeline

createServer方法中,定义了数个pipeline

  private Server createServer(
      ServerBuilder<?> serverBuilder,
      ContentAddressableStorage storage,
      Instance instance,
      Pipeline pipeline,
      ShardWorkerContext context) {
    serverBuilder.addService(healthStatusManager.getHealthService());
    serverBuilder.addService(new ContentAddressableStorageService(instance));
    serverBuilder.addService(new ByteStreamService(instance));
    serverBuilder.addService(new ShutDownWorkerGracefully(this));

    // We will build a worker's server based on it's capabilities.
    // A worker that is capable of execution will construct an execution pipeline.
    // It will use various execution phases for it's profile service.
    // On the other hand, a worker that is only capable of CAS storage does not need a pipeline.
    if (configs.getWorker().getCapabilities().isExecution()) {
      PipelineStage completeStage =
          new PutOperationStage((operation) -> context.deactivate(operation.getName()));
      PipelineStage errorStage = completeStage; /* new ErrorStage(); */
      PipelineStage reportResultStage = new ReportResultStage(context, completeStage, errorStage);
      PipelineStage executeActionStage =
          new ExecuteActionStage(context, reportResultStage, errorStage);
      PipelineStage inputFetchStage =
          new InputFetchStage(context, executeActionStage, new PutOperationStage(context::requeue));
      PipelineStage matchStage = new MatchStage(context, inputFetchStage, errorStage);

      pipeline.add(matchStage, 4);
      pipeline.add(inputFetchStage, 3);
      pipeline.add(executeActionStage, 2);
      pipeline.add(reportResultStage, 1);

      serverBuilder.addService(
          new WorkerProfileService(
              storage, inputFetchStage, executeActionStage, context, completeStage, backplane));
    }
    GrpcMetrics.handleGrpcMetricIntercepts(serverBuilder, configs.getWorker().getGrpcMetrics());
    serverBuilder.intercept(new ServerHeadersInterceptor());

    return serverBuilder.build();
  }

最关键的任务执行阶段是 ExecuteActionStage,它负责实际的任务执行:
iterate是一个重写方法,负责从任务队列中取出任务,并分配给相应的执行器。

  @Override
  protected void iterate() throws InterruptedException {
    OperationContext operationContext = take();
    ResourceLimits limits = workerContext.commandExecutionSettings(operationContext.command);
    Executor executor = new Executor(workerContext, operationContext, this);
    Thread executorThread = new Thread(() -> executor.run(limits), "ExecuteActionStage.executor");

    synchronized (this) {
      executors.add(executorThread);
      int slotUsage = executorClaims.addAndGet(limits.cpu.claimed);
      executionSlotUsage.set(slotUsage);
      logStart(operationContext.operation.getName(), getUsage(slotUsage));
      executorThread.start();
    }
  }

take()函数会返回一个操作上下文,其中包含了要执行的具体action对象以及command对象等,均为proto所定义的结构:
image.png
image.png
然后在同步块中进行任务的实际执行。
梳理到这里,需要分别往上查看如何获取操作上下文和往下查看如何执行。

获取操作上下文

  @Override
  public OperationContext take() throws InterruptedException {
    return takeOrDrain(queue);
  }
  protected OperationContext takeOrDrain(BlockingQueue<OperationContext> queue)
      throws InterruptedException {
    boolean interrupted = false;
    InterruptedException exception;
    try {
      while (!isClosed() && !output.isClosed()) {
        OperationContext context = queue.poll(10, TimeUnit.MILLISECONDS);
        if (context != null) {
          return context;
        }
      }
      exception = new InterruptedException();
    } catch (InterruptedException e) {
      // only possible way to be terminated
      exception = e;
      // clear interrupted flag
      interrupted = Thread.interrupted();
    }
    waitForReleaseOrCatastrophe(queue);
    if (interrupted) {
      Thread.currentThread().interrupt();
    }
    throw exception;
  }

queue是ExecueteAction类的一个私有成员,是一个阻塞队列。

  private final BlockingQueue<OperationContext> queue = new ArrayBlockingQueue<>(1);

该队列只有一处入队点:matchStage阶段返回的operationContext。
matchStage会不断从消息队列从尝试获取operationContext。

  @Override
  protected void iterate() throws InterruptedException {
    // stop matching and picking up any works if the worker is in graceful shutdown.
    if (inGracefulShutdown) {
      return;
    }
    Stopwatch stopwatch = Stopwatch.createStarted();
    OperationContext operationContext = OperationContext.newBuilder().build();
    if (!output.claim(operationContext)) {
      return;
    }
    MatchOperationListener listener = new MatchOperationListener(operationContext, stoxpwatch);
    try {
      logStart();
      workerContext.match(listener);
    } finally {
      if (!listener.wasMatched()) {
        output.release();
      }
    }
  }

workerContext在此处注入的是ShardWorkerContext的实例,实际调用的match方法如下。

  @Override
  public void match(MatchListener listener) throws InterruptedException {
    RetryingMatchListener dedupMatchListener =
        new RetryingMatchListener() {
          boolean matched = false;

          @Override
          public boolean getMatched() {
            return !matched;
          }

          @Override
          public void onWaitStart() {
            listener.onWaitStart();
          }

          @Override
          public void onWaitEnd() {
            listener.onWaitEnd();
          }

          @Override
          public boolean onEntry(QueueEntry queueEntry) throws InterruptedException {
            if (queueEntry == null) {
              matched = true;
              return listener.onEntry(null);
            }
            String operationName = queueEntry.getExecuteEntry().getOperationName();
            if (activeOperations.putIfAbsent(operationName, queueEntry) != null) {
              log.log(Level.WARNING, "matched duplicate operation " + operationName);
              return false;
            }
            matched = true;
            boolean success = listener.onEntry(queueEntry);
            if (!success) {
              requeue(operationName);
            }
            return success;
          }

          @Override
          public void onError(Throwable t) {
            Throwables.throwIfUnchecked(t);
            throw new RuntimeException(t);
          }

          @Override
          public void setOnCancelHandler(Runnable onCancelHandler) {
            listener.setOnCancelHandler(onCancelHandler);
          }
        };
    while (dedupMatchListener.getMatched()) {
      try {
        matchInterruptible(dedupMatchListener);
      } catch (IOException e) {
        throw Status.fromThrowable(e).asRuntimeException();
      }
    }
  }

该方法通过调用matchInterruptible方法从队列中获取任务,并通过 dedupMatchListener 进行处理。
dedupMatchListener的是一个RetryingMatchListener类,重写了onEntry函数,会在MathListener接收到一个新的实体时被调用。用于确保一个任务不会被匹配多次。
matchInterruptible方法会从Backplane中获取任务(QueueEntry实体)。

  @SuppressWarnings("ConstantConditions")
  private void matchInterruptible(MatchListener listener) throws IOException, InterruptedException {
    listener.onWaitStart();
    QueueEntry queueEntry = null;
    try {
      queueEntry =
          backplane.dispatchOperation(
              configs.getWorker().getDequeueMatchSettings().getPlatform().getPropertiesList());
    } catch (IOException e) {
      // ....
      // transient backplane errors will propagate a null queueEntry
    }
    listener.onWaitEnd();

    if (queueEntry == null
        || DequeueMatchEvaluator.shouldKeepOperation(matchProvisions, queueEntry)) {
      listener.onEntry(queueEntry);
    } else {
      backplane.rejectOperation(queueEntry);
  // ...
  }

listener.onWaitStart()通知 MatchListener 任务匹配开始,接着调用backplane.dispatchOperation 方法尝试从任务队列中获取 QueueEntry。

  @SuppressWarnings("ConstantConditions")
  @Override
  public QueueEntry dispatchOperation(List<Platform.Property> provisions)
      throws IOException, InterruptedException {
    return client.blockingCall(jedis -> dispatchOperation(jedis, provisions));
  }

  private @Nullable QueueEntry dispatchOperation(
      JedisCluster jedis, List<Platform.Property> provisions) throws InterruptedException {
    String queueEntryJson = state.operationQueue.dequeue(jedis, provisions);
    if (queueEntryJson == null) {
      return null;
    }

    QueueEntry.Builder queueEntryBuilder = QueueEntry.newBuilder();
    try {
      JsonFormat.parser().merge(queueEntryJson, queueEntryBuilder);
    } catch (InvalidProtocolBufferException e) {
      log.log(Level.SEVERE, "error parsing queue entry", e);
      return null;
    }
    QueueEntry queueEntry = queueEntryBuilder.build();

    String operationName = queueEntry.getExecuteEntry().getOperationName();
    Operation operation = keepaliveOperation(operationName);
    publishReset(jedis, operation);

    long requeueAt =
        System.currentTimeMillis() + configs.getBackplane().getDispatchingTimeoutMillis();
    DispatchedOperation o =
        DispatchedOperation.newBuilder().setQueueEntry(queueEntry).setRequeueAt(requeueAt).build();
    boolean success = false;
    try {
      String dispatchedOperationJson = JsonFormat.printer().print(o);

      /* if the operation is already in the dispatch list, fail the dispatch */
      success =
          state.dispatchedOperations.insertIfMissing(jedis, operationName, dispatchedOperationJson);
    } catch (InvalidProtocolBufferException e) {
      log.log(Level.SEVERE, "error printing dispatched operation", e);
      // very unlikely, printer would have to fail
    }

    if (success) {
      if (!state.operationQueue.removeFromDequeue(jedis, queueEntryJson)) {
        log.log(
            Level.WARNING,
            format(
                "operation %s was missing in %s, may be orphaned",
                operationName, state.operationQueue.getDequeueName()));
      }
      state.dispatchingOperations.remove(jedis, operationName);

      // Return an entry so that if it needs re-queued, it will have the correct "requeue attempts".
      return queueEntryBuilder.setRequeueAttempts(queueEntry.getRequeueAttempts() + 1).build();
    }
    return null;
  }

此处client为redisClient实例。disPatchOperation方法是从任务队列中获取任务的核心方法(String queueEntryJson = state.operationQueue.dequeue(jedis, provisions);。任务储存在Redis集群中。
image.png
在此可以确定,builfarm的默认实现确实是采用了redis作为action队列。之后只需确定使用的队列名称即可进行监控。

实际执行

回顾ExecuteActionStageiterate方法, 此方法结合了线程管理、资源限制和任务执行,是整个类中最关键的部分:

  @Override
  protected void iterate() throws InterruptedException {
    OperationContext operationContext = take();
    ResourceLimits limits = workerContext.commandExecutionSettings(operationContext.command);
    Executor executor = new Executor(workerContext, operationContext, this);
    Thread executorThread = new Thread(() -> executor.run(limits), "ExecuteActionStage.executor");

    synchronized (this) {
      executors.add(executorThread);
      int slotUsage = executorClaims.addAndGet(limits.cpu.claimed);
      executionSlotUsage.set(slotUsage);
      logStart(operationContext.operation.getName(), getUsage(slotUsage));
      executorThread.start();
    }
  }

其中的同步代码块用于线程管理和资源计数。executors是一个存放Thread对象的哈希集合。
executor对象负责执行实际的任务操作。operationCopntext在构造函数中被赋给一个私有final成员。其run方法被运行在一个新的线程中,是Execute类的核心。

public void run(ResourceLimits limits) {
  long stallUSecs = 0;
  Stopwatch stopwatch = Stopwatch.createStarted();
  String operationName = operationContext.operation.getName();
  try {
    stallUSecs = runInterruptible(stopwatch, limits);
  } catch (InterruptedException e) {
  // ...
  } catch (Exception e) {
  // ...
  } finally {
    boolean wasInterrupted = Thread.interrupted();
    try {
      owner.releaseExecutor(operationName, limits.cpu.claimed, stopwatch.elapsed(MICROSECONDS), stallUSecs, exitCode);
    } finally {
      if (wasInterrupted) {
        Thread.currentThread().interrupt();
      }
    }
  }
}

runInterruptible方法管理整个任务的执行过程,包括状态更新、资源设置、计时和超时控制。

  private long runInterruptible(Stopwatch stopwatch, ResourceLimits limits)
      throws InterruptedException {
    // ...
    Operation operation =
        operationContext
            .operation
            .toBuilder()
            .setMetadata(
                Any.pack(
                    ExecutingOperationMetadata.newBuilder()
                        .setStartedAt(startedAt)
                        .setExecutingOn(workerContext.getName())
                        .setExecuteOperationMetadata(executingMetadata)
                        .setRequestMetadata(
                            operationContext.queueEntry.getExecuteEntry().getRequestMetadata())
                        .build()))
            .build();

    boolean operationUpdateSuccess = false;
    try {
      operationUpdateSuccess = workerContext.putOperation(operation);
    } catch (IOException e) {}
    // ...
    try {
      return executePolled(operation, limits, policies, timeout, stopwatch);
    } finally {
      operationContext.poller.pause();
    }
  }

executePolled负责执行具体的任务,包括命令行的执行和结果处理。
该方法执行时会调用executeCommand方法,下面详细解释该方法:

executeCommand方法

方法签名

@SuppressWarnings("ConstantConditions")
private Code executeCommand(  // 返回 gRPC 的 Code,表示执行状态码
    String operationName,  // 操作名称
    Path execDir,  //  执行目录,任务将在此目录中执行
    List<String> arguments,  // 命令行参数
    List<EnvironmentVariable> environmentVariables,  // 环境变量列表
    ResourceLimits limits,  // 资源限制,包括 CPU 和内存限制
    Duration timeout,  // 执行超时
    ActionResult.Builder resultBuilder)  // 构建任务执行结果
    throws IOException, InterruptedException {

arguments即为希望执行的具体指令:
image.png
初始化 ProcessBuilder 和环境变量

    ProcessBuilder processBuilder =
        new ProcessBuilder(arguments).directory(execDir.toAbsolutePath().toFile());

    Map<String, String> environment = processBuilder.environment();
    environment.clear();
    for (EnvironmentVariable environmentVariable : environmentVariables) {
      environment.put(environmentVariable.getName(), environmentVariable.getValue());
    }
    for (Map.Entry<String, String> environmentVariable :
        limits.extraEnvironmentVariables.entrySet()) {
      environment.put(environmentVariable.getKey(), environmentVariable.getValue());
    }

ProcessBuilder是 Java 中提供的一个用于创建和管理操作系统进程的类,能够执行外部命令和可执行文件,并与它们进行交互。首先清空了默认的环境变量。然后添加 environmentVariables 和 limits 中的额外环境变量。
最终环境变量:
image.png
判断是否启用docker

if (limits.debugBeforeExecution) {  // 如果资源限制中启用了调试前执行,则执行相应调试操作。
  return ExecutionDebugger.performBeforeExecutionDebug(processBuilder, limits, resultBuilder);
}

if (limits.containerSettings.enabled) {
    // ...
}

如果资源限制中启用了 Docker 容器,则通过 Docker 执行命令。

buildfarm 2.10.2更新以下代码

判断是否持久化工作进程

boolean usePersistentWorker =
    !limits.persistentWorkerKey.isEmpty() && !limits.persistentWorkerCommand.isEmpty();

if (usePersistentWorker) {
  log.fine(
      "usePersistentWorker; got persistentWorkerCommand of : "
          + limits.persistentWorkerCommand);

  Tree execTree = operationContext.tree;

  WorkFilesContext filesContext =
      WorkFilesContext.fromContext(execDir, execTree, operationContext.command);

  return PersistentExecutor.runOnPersistentWorker(
      limits.persistentWorkerCommand,
      filesContext,
      operationName,
      ImmutableList.copyOf(arguments),
      ImmutableMap.copyOf(environment),
      limits,
      timeout,
      PersistentExecutor.defaultWorkRootsDir,
      resultBuilder);
}

UserPersistentWorker 是一种长时间运行的工作进程,用来执行重复性高、启动开销大的任务。其主要目的是通过保持工作进程的持续运行,减少任务的初始化和拆卸时间,从而提高构建系统的效率。

不使用 docker image

未指定镜像,跳过,并继续执行函数。
最终,本次执行的上下文形式如图
image.png
启动进程
使用 ProcessUtils.threadSafeStart 启动进程,并捕获可能的 IOException 以进行错误处理。
如果启动失败,设置错误消息,并返回 INVALID_ARGUMENT 状态码。

    long startNanoTime = System.nanoTime();
    Process process;
    try {
      process = ProcessUtils.threadSafeStart(processBuilder);
      process.getOutputStream().close();
    } catch (IOException e) {
      log.log(Level.SEVERE, format("error starting process for %s", operationName), e);
      resultBuilder.setExitCode(INCOMPLETE_EXIT_CODE);
      Throwable t = e.getCause();
      String message;
      if (t != null) {
        message = "Cannot run program \"" + processBuilder.command().get(0) + "\": " + t.getMessage();
      } else {
        message = e.getMessage();
      }
      resultBuilder.setStderrRaw(ByteString.copyFromUtf8(message));
      return Code.INVALID_ARGUMENT;
    }

创建并启动读取线程来异步读取进程的输出流
分别为标准输出(stdout)和错误输出(stderr)创建 ByteStringWriteReader 读取器。

    final Write stdoutWrite = new NullWrite();
    final Write stderrWrite = new NullWrite();
    ByteStringWriteReader stdoutReader = new ByteStringWriteReader(process.getInputStream(), stdoutWrite, (int) workerContext.getStandardOutputLimit());
    ByteStringWriteReader stderrReader = new ByteStringWriteReader(process.getErrorStream(), stderrWrite, (int) workerContext.getStandardErrorLimit());
    Thread stdoutReaderThread = new Thread(stdoutReader, "Executor.stdoutReader");
    Thread stderrReaderThread = new Thread(stderrReader, "Executor.stderrReader");
    stdoutReaderThread.start();
    stderrReaderThread.start();

等待进程完成或超时

    Code statusCode = Code.OK;
    boolean processCompleted = false;
    try {
      if (timeout == null) {
        exitCode = process.waitFor();
        processCompleted = true;
      } else {
        long timeoutNanos = timeout.getSeconds() * 1000000000L + timeout.getNanos();
        long remainingNanoTime = timeoutNanos - (System.nanoTime() - startNanoTime);
        if (process.waitFor(remainingNanoTime, TimeUnit.NANOSECONDS)) {
          exitCode = process.exitValue();
          processCompleted = true;
        } else {
          log.log(Level.INFO, format("process timed out for %s after %ds", operationName, timeout.getSeconds()));
          statusCode = Code.DEADLINE_EXCEEDED;
        }
      }
    } finally {
      if (!processCompleted) {
        process.destroy();
        int waitMillis = 1000;
        while (!process.waitFor(waitMillis, TimeUnit.MILLISECONDS)) {
          log.log(Level.INFO, format("process did not respond to termination for %s, killing it", operationName));
          process.destroyForcibly();
          waitMillis = 100;
        }
      }
    }

读取进程的标准输出和错误输出

    ByteString stdout = ByteString.EMPTY;
    ByteString stderr = ByteString.EMPTY;
    try {
      stdoutReaderThread.join();
      stderrReaderThread.join();
      stdout = stdoutReader.getData();
      stderr = stderrReader.getData();
    } catch (Exception e) {
      log.log(Level.SEVERE, "error extracting stdout/stderr: ", e.getMessage());
    }
    resultBuilder.setExitCode(exitCode).setStdoutRaw(stdout).setStderrRaw(stderr);
使用 docker image

指定了镜像,则进入DockerExecutor.runActionWithDocker执行。

if (limits.containerSettings.enabled) {
  DockerClient dockerClient = DockerClientBuilder.getInstance().build();
  DockerExecutorSettings settings = new DockerExecutorSettings();
  settings.fetchTimeout = Durations.fromMinutes(1);
  settings.operationContext = operationContext;
  settings.execDir = execDir;
  settings.limits = limits;
  settings.envVars = environment;
  settings.timeout = timeout;
  settings.arguments = arguments;
  return DockerExecutor.runActionWithDocker(dockerClient, settings, resultBuilder);
}

如何指定docker见https://aliyuque.antfin.com/g/cloudstorage/devops/gz4qlwgrw2uywiff/collaborator/join?token=xqbNIsKdMdR15gxn&source=doc_collaborator# 《BuildFarm Worker 配置自定义镜像》
runActionWithDocker是一个静态方法,使用传入的 Docker 客户端来运行一个构建任务。该方法会拉取必要的 Docker 镜像、启动容器、在容器内执行构建任务、提取执行结果并清理 Docker 资源。

public class DockerExecutor {
  /**
   * @brief Run the action using the docker client and populate the results.
   * @details This will fetch any images as needed, spawn a container for execution, and clean up
   *     docker resources if requested.
   * @param dockerClient Client used to interact with docker.
   * @param settings Settings used to perform action execition.
   * @param resultBuilder The action results to populate.
   * @return Grpc code as to whether buildfarm was able to run the action.
   * @note Suggested return identifier: code.
   */
  public static Code runActionWithDocker(
      DockerClient dockerClient,
      DockerExecutorSettings settings,
      ActionResult.Builder resultBuilder)
      throws InterruptedException, IOException {
    String containerId = prepareRequestedContainer(dockerClient, settings);
    String execId = runActionInsideContainer(dockerClient, settings, containerId, resultBuilder);
    extractInformationFromContainer(dockerClient, settings, containerId, execId, resultBuilder);
    cleanUpContainer(dockerClient, containerId);
    return Code.OK;
  }

下面依次介绍被调用的四个方法:
prepareRequestedContainer方法负责设置 Docker 容器以便在容器内运行构建任务。这个方法确保 Docker 镜像已经拉取到本地,启动容器,并将必要的文件复制到容器内。

  /**
   * @brief Setup the container for the action.
   * @details This ensures the image is fetched, the container is started, and that the container
   *     has proper visibility to the action's execution root. After this call it should be safe to
   *     spawn an action inside the container.
   * @param dockerClient Client used to interact with docker.
   * @param settings Settings used to perform action execition.
   * @return The ID of the started container.
   * @note Suggested return identifier: containerId.
   */
  private static String prepareRequestedContainer(
      DockerClient dockerClient, DockerExecutorSettings settings) throws InterruptedException {
    // this requires network access.  Once complete, "docker image ls" will show the downloaded
    // image
    fetchImageIfMissing(
        dockerClient, settings.limits.containerSettings.containerImage, settings.fetchTimeout);

    // build and start the container.  Once complete, "docker container ls" will show the started
    // container
    String containerId = createContainer(dockerClient, settings);
    dockerClient.startContainerCmd(containerId).exec();

    // copy files into it
    populateContainer(dockerClient, containerId, settings.execDir);

    // container is ready for running actions
    return containerId;
  }

runActionInsideContainer方法使用 execCreateCmd 和 execStartCmd 在容器内执行实际的构建任务,并返回 exec ID。

  /**
   * @brief Assuming the container is already created and properly populated/mounted with data, this
   *     can be used to spawn an action inside of it.
   * @details The stdout / stderr of the action execution are populated to the results.
   * @param dockerClient Client used to interact with docker.
   * @param settings Settings used to perform action execition.
   * @param containerId The ID of the container.
   * @param resultBuilder The results to populate.
   * @return The ID of the container execution.
   * @note Suggested return identifier: execId.
   */
  private static String runActionInsideContainer(
      DockerClient dockerClient,
      DockerExecutorSettings settings,
      String containerId,
      ActionResult.Builder resultBuilder)
      throws InterruptedException {
    // decide command to run
    ExecCreateCmd execCmd = dockerClient.execCreateCmd(containerId);
    execCmd.withWorkingDir(settings.execDir.toAbsolutePath().toString());
    execCmd.withAttachStderr(true);
    execCmd.withAttachStdout(true);
    execCmd.withCmd(settings.arguments.toArray(new String[0]));
    String execId = execCmd.exec().getId();
    // execute command (capture stdout / stderr)
    ExecStartCmd execStartCmd = dockerClient.execStartCmd(execId);
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    ByteArrayOutputStream err = new ByteArrayOutputStream();
    execStartCmd.exec(new ExecStartResultCallback(out, err)).awaitCompletion();
    // store results
    resultBuilder.setStdoutRaw(ByteString.copyFromUtf8(out.toString()));
    resultBuilder.setStderrRaw(ByteString.copyFromUtf8(err.toString()));

    return execId;
  }

extractInformationFromContainer方法使用 inspectExecCmd 提取执行任务后的结果,包括 stdout 和 stderr 信息。

  /**
   * @brief Extract information from the container after the action ran.
   * @details This can include exit code, output artifacts, and various docker information.
   * @param dockerClient Client used to interact with docker.
   * @param settings Settings used to perform action execition.
   * @param containerId The ID of the container.
   * @param execId The ID of the execution.
   * @param resultBuilder The results to populate.
   */
  private static void extractInformationFromContainer(
      DockerClient dockerClient,
      DockerExecutorSettings settings,
      String containerId,
      String execId,
      ActionResult.Builder resultBuilder)
      throws IOException {
    extractExitCode(dockerClient, execId, resultBuilder);
    copyOutputsOutOfContainer(dockerClient, settings, containerId);
  }

cleanUpContainer方法使用 stopContainerCmd 和 removeContainerCmd 停止并删除容器,释放资源

  /**
   * @brief Delete the container.
   * @details Forces container deletion.
   * @param dockerClient Client used to interact with docker.
   * @param containerId The ID of the container.
   */
  private static void cleanUpContainer(DockerClient dockerClient, String containerId) {
    try {
      dockerClient.removeContainerCmd(containerId).withRemoveVolumes(true).withForce(true).exec();
    } catch (Exception e) {
      log.log(Level.SEVERE, "couldn't shutdown container: ", e);
    }
  }

输入文件下载和输出文件目录预构建

在FetchInput阶段,随着函数调用InputFetcher.run()->runInterruptibly()->fetchPolled()->ShardWorkerContext.createExecDir()->CFCExecFileSystem.createExecDir(),最终会调用如下函数:

  public Path createExecDir(
      String operationName, Map<Digest, Directory> directoriesIndex, Action action, Command command)
      throws IOException, InterruptedException {
    Digest inputRootDigest = action.getInputRootDigest();
    OutputDirectory outputDirectory =
        OutputDirectory.parse(
            command.getOutputFilesList(),
            concat(
                command.getOutputDirectoriesList(),
                realDirectories(directoriesIndex, inputRootDigest)),
            command.getEnvironmentVariablesList());

    Path execDir = root.resolve(operationName);
    if (Files.exists(execDir)) {
      Directories.remove(execDir, fileStore);
    }
    Files.createDirectories(execDir);

    ImmutableList.Builder<String> inputFiles = new ImmutableList.Builder<>();
    ImmutableList.Builder<Digest> inputDirectories = new ImmutableList.Builder<>();

    log.log(
        Level.FINER, "ExecFileSystem::createExecDir(" + operationName + ") calling fetchInputs");
    Iterable<ListenableFuture<Void>> fetchedFutures =
        fetchInputs(
            execDir,
            execDir,
            inputRootDigest,
            directoriesIndex,
            outputDirectory,
            key -> {
              synchronized (inputFiles) {
                inputFiles.add(key);
              }
            },
            inputDirectories);
// ...
  }

execDir记录了本次构建的根目录:
image.png
随后调用fetchInputs,在execDir内下载输入文件及创建输出目录。

  private Iterable<ListenableFuture<Void>> fetchInputs(
      Path root,
      Path path,
      Digest directoryDigest,
      Map<Digest, Directory> directoriesIndex,
      OutputDirectory outputDirectory,
      Consumer<String> onKey,
      ImmutableList.Builder<Digest> inputDirectories)
      throws IOException {
    Directory directory = directoriesIndex.get(directoryDigest);
    if (directory == null) {
      // not quite IO...
      throw new IOException(
          "Directory " + DigestUtil.toString(directoryDigest) + " is not in directories index");
    }

    Iterable<ListenableFuture<Void>> downloads =
        directory.getFilesList().stream()
            .map(
                fileNode ->
                    catchingPut(
                        fileNode.getDigest(),
                        root,
                        path.resolve(fileNode.getName()),
                        fileNode.getIsExecutable(),
                        onKey))
            .collect(ImmutableList.toImmutableList());

    downloads =
        concat(
            downloads,
            directory.getSymlinksList().stream()
                .map(symlinkNode -> putSymlink(path, symlinkNode))
                .collect(ImmutableList.toImmutableList()));

    for (DirectoryNode directoryNode : directory.getDirectoriesList()) {
      Digest digest = directoryNode.getDigest();
      String name = directoryNode.getName();
      OutputDirectory childOutputDirectory =
          outputDirectory != null ? outputDirectory.getChild(name) : null;
      Path dirPath = path.resolve(name);
      if (childOutputDirectory != null || !linkInputDirectories) {
        Files.createDirectories(dirPath);
        downloads =
            concat(
                downloads,
                fetchInputs(
                    root,
                    dirPath,
                    digest,
                    directoriesIndex,
                    childOutputDirectory,
                    onKey,
                    inputDirectories));
      } else {
        downloads =
            concat(
                downloads,
                ImmutableList.of(
                    transform(
                        linkDirectory(dirPath, digest, directoriesIndex),
                        (result) -> {
                          // we saw null entries in the built immutable list without synchronization
                          synchronized (inputDirectories) {
                            inputDirectories.add(digest);
                          }
                          return null;
                        },
                        fetchService)));
      }
      if (Thread.currentThread().isInterrupted()) {
        break;
      }
    }
    return downloads;
  }

这个函数负责处理命令的输入输出。该方法遍历提供的 directoryDigest 对应的 Directory,处理其中的文件、符号链接和子目录。通过递归调用自己来处理子目录当处理子目录时,会检查每个目录是否需要存在,并确保其已创建。
注意:buildfarmv2.6.1并未支持command.outputPath,需要在v2.7之后才能正确解析传入的command.outputPath。
见:https://aliyuque.antfin.com/g/cloudstorage/devops/zzv5tra5rlk8xak6/collaborator/join?token=TgrjTlIzqde7uqHh&source=doc_collaborator#** 《BuildFarm 低版本遇到的问题》**

优雅关闭

config/Worker.java类中有gracefulShutdownSeconds成员,通过配置文件可以给其赋值,默认为0。
其对应的代码如下

public void prepareWorkerForGracefulShutdown() {
    if (configs.getWorker().getGracefulShutdownSeconds() == 0) {
        log.info(
            "Graceful Shutdown is not enabled. Worker is shutting down without finishing executions"
            + " in progress.");
    } else {
        inGracefulShutdown = true;
        log.info(
            "Graceful Shutdown - The current worker will not be registered again and should be"
            + " shutdown gracefully!");
        pipeline.stopMatchingOperations();
        int scanRate = 30; // check every 30 seconds
        int timeWaited = 0;
        int timeOut = configs.getWorker().getGracefulShutdownSeconds();
        try {
            if (pipeline.isEmpty()) {
                log.info("Graceful Shutdown - no work in the pipeline.");
            } else {
                log.info("Graceful Shutdown - waiting for executions to finish.");
            }
            while (!pipeline.isEmpty() && timeWaited < timeOut) {
                SECONDS.sleep(scanRate);
                timeWaited += scanRate;
                log.info(
                    String.format(
                        "Graceful Shutdown - Pipeline is still not empty after %d seconds.", timeWaited));
            }
        } catch (InterruptedException e) {
            log.info(
                "Graceful Shutdown - The worker gracefully shutdown is interrupted: " + e.getMessage());
        } finally {
            log.info(
                String.format(
                    "Graceful Shutdown - It took the worker %d seconds to %s",
                    timeWaited,
                    pipeline.isEmpty()
                    ? "finish all actions"
                    : "gracefully shutdown but still cannot finish all actions"));
        }
    }
}

main 函数保证了上面的函数在程序被中断时一定会被调用

  public static void main(String[] args) throws Exception {
    // 。。。
    try {
      worker.start();
      worker.awaitTermination();
    } catch (IOException e) {
      log.severe(formatIOError(e));
    } catch (InterruptedException e) {
      log.log(Level.WARNING, "interrupted", e);
    } finally {
      worker.stop();  // 最终调用prepareWorkerForGracefulShutdown()
    }
  }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/975383.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

docker修改镜像默认存储路径(基于页面迁移)

文章目录 1、停止服务2、拷贝镜像3、docker界面设置路径4、重新启动服务5、重启电脑 1、停止服务 桌面底部右键打开任务管理器 停止docker服务 2、拷贝镜像 从原目录拷贝到新的目录下&#xff0c;新的目录自己定&#xff0c;如果没有权限&#xff0c;需要先对原文件添加权限…

基于ffmpeg+openGL ES实现的视频编辑工具-opengl相关逻辑(五)

在我们的项目中,OpenGL ES 扮演着至关重要的角色,其主要功能是获取图像数据,经过一系列修饰后将处理结果展示到屏幕上,以此实现各种丰富多样的视觉效果。为了让大家更好地理解后续知识,本文将详细介绍 OpenGL 相关代码。需要注意的是,当前方案将对 OpenGL 的所有操作都集…

机器学习实战(7):聚类算法——发现数据中的隐藏模式

第7集&#xff1a;聚类算法——发现数据中的隐藏模式 在机器学习中&#xff0c;聚类&#xff08;Clustering&#xff09; 是一种无监督学习方法&#xff0c;用于发现数据中的隐藏模式或分组。与分类任务不同&#xff0c;聚类不需要标签&#xff0c;而是根据数据的相似性将其划…

七星棋牌顶级运营产品全开源修复版源码教程:6端支持,200+子游戏玩法,完整搭建指南(含代码解析)

棋牌游戏一直是移动端游戏市场中极具竞争力和受欢迎的品类&#xff0c;而七星棋牌源码修复版无疑是当前行业内不可多得的高质量棋牌项目之一。该项目支持 6大省区版本&#xff08;湖南、湖北、山西、江苏、贵州&#xff09;&#xff0c;拥有 200多种子游戏玩法&#xff0c;同时…

uniapp邪门事件

很久之前在这篇《THREEJS 在 uni-app 中使用&#xff08;微信小程序&#xff09;》&#xff1a;THREEJS 在 uni-app 中使用&#xff08;微信小程序&#xff09;_uni-app_帶刺的小葡萄-华为开发者空间 中学到了如何在uniapp的微信小程序里接入three.js的3d模型 由于小程序自身很…

【OS安装与使用】part6-ubuntu 22.04+CUDA 12.4运行MARL算法(多智能体强化学习)

文章目录 一、待解决问题1.1 问题描述1.2 解决方法 二、方法详述2.1 必要说明2.2 应用步骤2.2.1 下载源码并安装2.2.2 安装缺失的依赖项2.2.3 训练执行MAPPO算法实例 三、疑问四、总结 一、待解决问题 1.1 问题描述 已配置好基础的运行环境&#xff0c;尝试运行MARL算法。 1…

人工智能(AI)的不同维度分类

人工智能(AI)的分类 对机器学习进行分类的方式多种多样&#xff0c;可以根据算法的特性、学习方式、任务类型等不同维度进行分类这些分类都不是互斥的&#xff1a; 1、按数据模态不同:图像&#xff0c;文本&#xff0c;语音&#xff0c;多态等 2、按目标函数不同:判别式模型…

从零开始用react + tailwindcs + express + mongodb实现一个聊天程序(一)

项目包含5个模块 1.首页 (聊天主页) 2.注册 3.登录 4.个人资料 5.设置主题 一、配置开发环境 建立项目文件夹 mkdir chat-project cd chat-project mkdir server && mkdir webcd server npm init cd web npm create vitelatest 创建前端项目时我们选择javascrip…

【论文精读】VLM-AD:通过视觉-语言模型监督实现端到端自动驾驶

论文地址&#xff1a; VLM-AD: End-to-End Autonomous Driving through Vision-Language Model Supervision 摘要 人类驾驶员依赖常识推理来应对复杂多变的真实世界驾驶场景。现有的端到端&#xff08;E2E&#xff09;自动驾驶&#xff08;AD&#xff09;模型通常被优化以模仿…

基于Springboot学生宿舍水电信息管理系统【附源码】

基于Springboot学生宿舍水电信息管理系统 效果如下&#xff1a; 系统登陆页面 系统用户首页 用电信息页面 公告信息页面 管理员主页面 用水信息管理页面 公告信息页面 用户用电统计页面 研究背景 随着高校后勤管理信息化的不断推进&#xff0c;学生宿舍水电管理作为高校后勤…

POI pptx转图片

前言 ppt页面预览一直是个问题&#xff0c;office本身虽然有预览功能但是收费&#xff0c;一些开源的项目的预览又不太好用&#xff0c;例如开源的&#xff1a;kkfileview pptx转图片 1. 引入pom依赖 我这个项目比较老&#xff0c;使用版本较旧 <dependency><gro…

【JAVA】封装多线程实现

系列文章目录 java知识点 文章目录 系列文章目录&#x1f449;前言&#x1f449;一、封装的目标&#x1f449;二、常见的封装方式及原理&#x1f449;壁纸分享&#x1f449;总结 &#x1f449;前言 在 Java 中&#xff0c;封装多线程的原理主要围绕着将多线程相关的操作和逻辑…

nginx 反向代理 配置请求路由

nginx | 反向代理 | 配置请求路由 nginx简介 Nginx&#xff08;发音为“Engine-X”&#xff09;是一款高性能、开源的 Web 服务器和反向代理服务器&#xff0c;同时也支持邮件代理和负载均衡等功能。它由俄罗斯程序员伊戈尔西索夫&#xff08;Igor Sysoev&#xff09;于 2004…

用Python实现Excel数据同步到飞书文档

目录 一、整体目标 二、代码结构拆解 三、核心逻辑讲解&#xff08;重点&#xff09; 1. 建立安全连接&#xff08;获取access_token&#xff09; 2. 定位文档位置 3. 数据包装与投递 四、异常处理机制 五、函数讲解 get_access_token() 关键概念解释 1. 飞书API访问…

SQLMesh 系列教程8- 详解 seed 模型

在数据分析和建模过程中&#xff0c;外部模型&#xff08;External Models&#xff09;在 SQLMesh 中扮演着重要角色。外部模型允许用户引用外部数据源或现有数据库表&#xff0c;从而实现灵活的数据整合和分析。本文将介绍外部模型的定义、生成方法&#xff08;包括使用 CLI 和…

《微软量子芯片:开启量子计算新纪元》:此文为AI自动生成

量子计算的神秘面纱 在科技飞速发展的今天,量子计算作为前沿领域,正逐渐走进大众的视野。它宛如一把神秘的钥匙,有望开启未来科技变革的大门,而微软量子芯片则是这把钥匙上一颗璀璨的明珠。 量子计算,简单来说,是一种遵循量子力学规律调控量子信息单元进行计算的新型计算…

使用FFmpeg将PCMA格式的WAV文件转换为16K采样率的PCM WAV文件

使用FFmpeg将PCMA格式的WAV文件转换为16K采样率的PCM WAV文件 一、FFmpeg 简介二、PCMA 格式简介三、PCM 格式简介四、转换步骤五、注意事项六、总结在当今的数字音频处理领域,FFmpeg 无疑是一款功能强大的多媒体处理工具。它能够处理几乎所有格式的音频和视频文件,包括将特定…

【JavaEE进阶】#{}和${}

&#x1f343;前言 MyBatis参数赋值有两种⽅式,使⽤ #{} 和 ${}进⾏赋值,接下来我们看下⼆者的区别 &#x1f333;#{}和${}使⽤ 我们先来看一下两者在基础数据类型与string类型下的使用 &#x1f6a9;Interger类型的参数&#xff08;基础数据类型&#xff09; &#x1f3c…

【JavaEE进阶】图书管理系统 - 贰

目录 &#x1f332;前言 &#x1f384;设计数据库 &#x1f343;引⼊MyBatis和MySQL驱动依赖 &#x1f333;Model创建 &#x1f38d;约定前后端交互接口 &#x1f340;服务器代码 &#x1f6a9;控制层 &#x1f6a9;业务层 &#x1f6a9;数据层 &#x1f334;前端代码…

cline通过硅基流动平台接入DeepSeek-R1模型接入指南

为帮助您更高效、安全地通过硅基流动平台接入DeepSeek-R1模型&#xff0c;以下为优化后的接入方案&#xff1a; DeepSeek-R1硅基流动平台接入指南 &#x1f4cc; 核心优势 成本低廉&#xff1a;注册即送2000万Tokens&#xff08;价值约14元&#xff09;高可用性&#xff1a;规…