Apache DolphinScheduler-1.3.9源码分析(二)

引言

随着大数据的发展,任务调度系统成为了数据处理和管理中至关重要的部分。Apache DolphinScheduler 是一款优秀的开源分布式工作流调度平台,在大数据场景中得到广泛应用。

在本文中,我们将对 Apache DolphinScheduler 1.3.9 版本的源码进行深入分析,主要分析一下Master和Worker的交互设计。

感兴趣的朋友也可以回顾我们上一篇文章:Apache DolphinScheduler-1.3.9源码分析(一)

Worker配置文件

# worker listener port
worker.listen.port=1234

# worker execute thread number to limit task instances in parallel
# worker可并行的任务数限制
worker.exec.threads=100

# worker heartbeat interval, the unit is second
# worker发送心跳间隔
worker.heartbeat.interval=10

# worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value -1: the number of cpu cores * 2
# worker最大cpu平均负载,只有系统cpu平均负载低于该值,才能执行任务
# 默认值为-1,则最大cpu平均负载=系统cpu核数 * 2
worker.max.cpuload.avg=-1

# worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G
# worker的预留内存,只有当系统可用内存大于等于该值,才能执行任务,单位为GB
# 默认0.3G
worker.reserved.memory=0.3

# default worker groups separated by comma, like 'worker.groups=default,test'
# 工作组名称,多个用,隔开
worker.groups=default

WorkerServer启动

public void run() {
    // init remoting server
    NettyServerConfig serverConfig = new NettyServerConfig();
    serverConfig.setListenPort(workerConfig.getListenPort());
    this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
    this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
    this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
    this.nettyRemotingServer.start();

    // worker registry
    try {
        this.workerRegistry.registry();
        this.workerRegistry.getZookeeperRegistryCenter().setStoppable(this);
        Set<String> workerZkPaths = this.workerRegistry.getWorkerZkPaths();
        this.workerRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(workerZkPaths, ZKNodeType.WORKER, Constants.DELETE_ZK_OP);
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
        throw new RuntimeException(e);
    }

    // retry report task status
    this.retryReportTaskStatusThread.start();

    /**
     * register hooks, which are called before the process exits
     */
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        if (Stopper.isRunning()) {
            close("shutdownHook");
        }
    }));
}
注册四个Command:
  1. TASK_EXECUTE_REQUEST:task执行请求
  2. TASK_KILL_REQUEST:task停止请求
  3. DB_TASK_ACK:Worker接受到Master的调度请求,回应master
  4. DB_TASK_RESPONSE:
  • 注册WorkerServer到Zookeeper,并发送心跳
  • 报告Task执行状态

RetryReportTaskStatusThread

这是一个兜底机制,主要负责定时轮询向Master汇报任务的状态,直到Master回复状态的ACK,避免任务状态丢失;

每隔5分钟,检查一下responceCache中的ACK Cache和Response Cache是否为空,如果不为空则向Master发送ack_commandresponse command请求。

public void run() {
    ResponceCache responceCache = ResponceCache.get();

    while (Stopper.isRunning()){

        // sleep 5 minutes
        ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL);

        try {
            if (!responceCache.getAckCache().isEmpty()){
                Map<Integer,Command> ackCache =  responceCache.getAckCache();
                for (Map.Entry<Integer, Command> entry : ackCache.entrySet()){
                    Integer taskInstanceId = entry.getKey();
                    Command ackCommand = entry.getValue();
                    taskCallbackService.sendAck(taskInstanceId,ackCommand);
                }
            }

            if (!responceCache.getResponseCache().isEmpty()){
                Map<Integer,Command> responseCache =  responceCache.getResponseCache();
                for (Map.Entry<Integer, Command> entry : responseCache.entrySet()){
                    Integer taskInstanceId = entry.getKey();
                    Command responseCommand = entry.getValue();
                    taskCallbackService.sendResult(taskInstanceId,responseCommand);
                }
            }
        }catch (Exception e){
            logger.warn("retry report task status error", e);
        }
    }
}

Master与Worker的交互设计

Apache DolphinScheduler Master和Worker模块是两个独立的JVM进程,可以部署在不同的服务器上,Master与Worker的通信都是通过Netty实现RPC交互的,一共用到7种处理器。

模块处理器作用
mastermasterTaskResponseProcessor处理TaskExecuteResponseCommand消息,将消息添加到TaskResponseService的任务响应队列中
mastermasterTaskAckProcessor处理TaskExecuteAckCommand消息,将消息添加到TaskResponseService的任务响应队列中
mastermasterTaskKillResponseProcessor处理TaskKillResponseCommand消息,并在日志中打印消息内容
workerworkerTaskExecuteProcessor处理TaskExecuteRequestCommand消息,并发送TaskExecuteAckCommand到master,提交任务执行
workerworkerTaskKillProcessor处理TaskKillRequestCommand消息,调用kill -9 pid杀死任务对应的进程,并向master发送TaskKillResponseCommand消息
workerworkerDBTaskAckProcessor处理DBTaskAckCommand消息,针对执行成功的任务,从ResponseCache中删除
workerworkerDBTaskResponseProcessor处理DBTaskResponseCommand消息,针对执行成功的任务,从ResponseCache中删除

分发任务如何交互

master#TaskPriorityQueueConsumer

Master任务里有一个TaskPriorityQueueConsumer,会从TaskPriorityQueue里每次取3个Task分发给Worker执行,这里会创建TaskExecuteRequestCommand

TaskPriorityQueueConsumer#run()

@Override
public void run() {
    List<TaskPriority> failedDispatchTasks = new ArrayList<>();
    while (Stopper.isRunning()){
        try {
            // 每一批次分发任务数量,master.dispatch.task.num = 3
            int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();
            failedDispatchTasks.clear();
            for(int i = 0; i < fetchTaskNum; i++){
                if(taskPriorityQueue.size() <= 0){
                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                    continue;
                }
                // if not task , blocking here
                // 从队列里面获取task
                TaskPriority taskPriority = taskPriorityQueue.take();
                // 分发给worker执行
                boolean dispatchResult = dispatch(taskPriority);
                if(!dispatchResult){
                    failedDispatchTasks.add(taskPriority);
                }
            }
            if (!failedDispatchTasks.isEmpty()) {
                // 分发失败的任务,需要重新加入队列中,等待重新分发
                for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
                    taskPriorityQueue.put(dispatchFailedTask);
                }
                // If there are tasks in a cycle that cannot find the worker group,
                // sleep for 1 second
                if (taskPriorityQueue.size() <= failedDispatchTasks.size()) {
                    TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
                }
            }
        }catch (Exception e){
            logger.error("dispatcher task error",e);
        }
    }
}

dispatcher

/**
 * dispatch task
 *
 * @param taskPriority taskPriority
 * @return result
 */
protected boolean dispatch(TaskPriority taskPriority) {
    boolean result = false;
    try {
        int taskInstanceId = taskPriority.getTaskId();
        TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
        // 这里创建TaskExecuteRequestCommand
        ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());

        if (taskInstanceIsFinalState(taskInstanceId)){
            // when task finish, ignore this task, there is no need to dispatch anymore
            return true;
        }else{
            // 分发任务
            // 分发算法支持:低负载优先算法,随机算法, 轮询算法。
            result = dispatcher.dispatch(executionContext);
        }
    } catch (ExecuteException e) {
        logger.error("dispatch error: {}",e.getMessage());
    }
    return result;
}

TaskExecutionContext

// 摘录自org.apache.dolphinscheduler.server.entity.TaskExecutionContext#toCommand
public Command toCommand(){
    TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
    requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(this));
    return requestCommand.convert2Command();
}

分发算法实现

随机算法

public class RandomSelector<T> implements Selector<T> {

    private final Random random = new Random();
    public T select(final Collection<T> source) {

        if (source == null || source.size() == 0) {
            throw new IllegalArgumentException("Empty source.");
        }

        if (source.size() == 1) {
            return (T) source.toArray()[0];
        }

        int size = source.size();
        int randomIndex = random.nextInt(size);

        return (T) source.toArray()[randomIndex];
    }

}

轮询算法

public class RoundRobinSelector<T> implements Selector<T> {

    private final AtomicInteger index = new AtomicInteger(0);

    public T select(Collection<T> source) {
        if (source == null || source.size() == 0) {
            throw new IllegalArgumentException("Empty source.");
        }
        if (source.size() == 1) {
            return (T)source.toArray()[0];
        }

        int size = source.size();
        return (T) source.toArray()[index.getAndIncrement() % size];
    }
}

低负载优先算法

public class LowerWeightRoundRobin implements Selector<HostWeight>{
    public HostWeight select(Collection<HostWeight> sources){
        int totalWeight = 0;
        int lowWeight = 0;
        HostWeight lowerNode = null;
        for (HostWeight hostWeight : sources) {
            totalWeight += hostWeight.getWeight();
            hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());
            if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight() ) {
                lowerNode = hostWeight;
                lowWeight = hostWeight.getCurrentWeight();
            }
        }
        lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
        return lowerNode;

    }
}

TaskExecuteRequestCommand

TaskExecuteProcessor
构造方法
public TaskExecuteProcessor() {
    this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
    this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
    // worker.exec.threads,默认100
    this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
    this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
}
process()方法
public void process(Channel channel, Command command) {
    Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
                                String.format("invalid command type : %s", command.getType()));

    // 序列化TaskExecuteRequestCommand
    TaskExecuteRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
        command.getBody(), TaskExecuteRequestCommand.class);

    logger.info("received command : {}", taskRequestCommand);

    if (taskRequestCommand == null) {
        logger.error("task execute request command is null");
        return;
    }

    String contextJson = taskRequestCommand.getTaskExecutionContext();
    TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);

    if (taskExecutionContext == null) {
        logger.error("task execution context is null");
        return;
    }
    // 存入taskExecutionContextCacheManager
    setTaskCache(taskExecutionContext);
    // 创建任务日志
    Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
                                                                        taskExecutionContext.getProcessDefineId(),
                                                                        taskExecutionContext.getProcessInstanceId(),
                                                                        taskExecutionContext.getTaskInstanceId()));

    taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
    taskExecutionContext.setStartTime(new Date());
    taskExecutionContext.setLogPath(getTaskLogPath(taskExecutionContext));

    // local execute path
    String execLocalPath = getExecLocalPath(taskExecutionContext);
    logger.info("task instance local execute path : {}", execLocalPath);
    taskExecutionContext.setExecutePath(execLocalPath);

    // ThreadLocal存储任务日志
    FileUtils.taskLoggerThreadLocal.set(taskLogger);
    try {
        // 创建执行
        FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode());
    } catch (Throwable ex) {
        String errorLog = String.format("create execLocalPath : %s", execLocalPath);
        LoggerUtils.logError(Optional.ofNullable(logger), errorLog, ex);
        LoggerUtils.logError(Optional.ofNullable(taskLogger), errorLog, ex);
        taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
    }
    FileUtils.taskLoggerThreadLocal.remove();

    taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
                                         new NettyRemoteChannel(channel, command.getOpaque()));

    // 向master发送TaskExecuteAckCommand
    this.doAck(taskExecutionContext);

    // submit task
    workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger));
}

private void doAck(TaskExecutionContext taskExecutionContext){
    // tell master that task is in executing
    TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext);
    ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),ackCommand.convert2Command(),Event.ACK);
    taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
}

TaskExecuteThread

构造方法
public TaskExecuteThread(TaskExecutionContext taskExecutionContext
                         , TaskCallbackService taskCallbackService
                         , Logger taskLogger) {
    this.taskExecutionContext = taskExecutionContext;
    this.taskCallbackService = taskCallbackService;
    this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
    this.taskLogger = taskLogger;
}
运行方法
public void run() {

    TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId());
    try {
        logger.info("script path : {}", taskExecutionContext.getExecutePath());
        // task node
        TaskNode taskNode = JSONObject.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);

        // copy hdfs/minio file to local
        // 下载需要的资源,例如Spark/Flink jar,udf等
        downloadResource(taskExecutionContext.getExecutePath(),
                         taskExecutionContext.getResources(),
                         logger);

        taskExecutionContext.setTaskParams(taskNode.getParams());
        taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
        taskExecutionContext.setDefinedParams(getGlobalParamsMap());

        // set task timeout
        setTaskTimeout(taskExecutionContext, taskNode);

        taskExecutionContext.setTaskAppId(String.format("%s_%s_%s",
                                                        taskExecutionContext.getProcessDefineId(),
                                                        taskExecutionContext.getProcessInstanceId(),
                                                        taskExecutionContext.getTaskInstanceId()));

        // 创建任务
        task = TaskManager.newTask(taskExecutionContext, taskLogger);

        // 初始化任务
        task.init();
        // 构建任务所需要的参数
        preBuildBusinessParams();
        // 执行任务
        task.handle();

        // 任务执行完成后的动作
        task.after();
        responseCommand.setStatus(task.getExitStatus().getCode());
        responseCommand.setEndTime(new Date());
        responseCommand.setProcessId(task.getProcessId());
        responseCommand.setAppIds(task.getAppIds());
        logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
    } catch (Exception e) {
        logger.error("task scheduler failure", e);
        // 如果出现异常,kill task
        kill();
        responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
        responseCommand.setEndTime(new Date());
        responseCommand.setProcessId(task.getProcessId());
        responseCommand.setAppIds(task.getAppIds());
    } finally {
         // 从cache中去除任务执行上下文。
        taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        // 缓存responseCommand
        ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
        // 向master发送ResponseCommand
        taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
        // 清除task执行路径
        clearTaskExecPath();
    }
}

master#TaskResponseService

Worker在正常执行分发任务的时候,会向Master发送ACK Command 和 Response Command。

在Master中,则由TaskAckProcessorTaskResponseProcessor进行处理。

TaskAckProcessor

public void process(Channel channel, Command command) {
    Preconditions.checkArgument(CommandType.TASK_EXECUTE_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
    TaskExecuteAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), TaskExecuteAckCommand.class);
    logger.info("taskAckCommand : {}", taskAckCommand);

    // 添加缓存
    taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);

    String workerAddress = ChannelUtils.toAddress(channel).getAddress();

    ExecutionStatus ackStatus = ExecutionStatus.of(taskAckCommand.getStatus());

    // TaskResponseEvent
    TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ackStatus,
            taskAckCommand.getStartTime(),
            workerAddress,
            taskAckCommand.getExecutePath(),
            taskAckCommand.getLogPath(),
            taskAckCommand.getTaskInstanceId(),
            channel);

    // 主要处理逻辑
    taskResponseService.addResponse(taskResponseEvent);
}

TaskResponseProcessor

public void process(Channel channel, Command command) {
    Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));

    TaskExecuteResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskExecuteResponseCommand.class);
    logger.info("received command : {}", responseCommand);

    // 缓存
    taskInstanceCacheManager.cacheTaskInstance(responseCommand);

    // TaskResponseEvent
    TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),
            responseCommand.getEndTime(),
            responseCommand.getProcessId(),
            responseCommand.getAppIds(),
            responseCommand.getTaskInstanceId(),
            channel);
    // 主要处理逻辑
    taskResponseService.addResponse(taskResponseEvent);
}

TaskResponseService

通过TaskResponseProcessorTaskAckProcessor发现,其主要逻辑都在TaskResponseService类中,而TaskResponseService中处理事件,是通过TaskResponseWorker线程实现的。

// TaskResponseEvent队列是阻塞队列
private final BlockingQueue<TaskResponseEvent> eventQueue = new LinkedBlockingQueue<>(5000);


class TaskResponseWorker extends Thread {

        @Override
        public void run() {

            while (Stopper.isRunning()){
                try {
                    // 如果没有任务事件,则会阻塞在这里
                    TaskResponseEvent taskResponseEvent = eventQueue.take();
                    // 任务实例状态持久化到数据库
                    persist(taskResponseEvent);
                } catch (InterruptedException e){
                    break;
                } catch (Exception e){
                    logger.error("persist task error",e);
                }
            }
            logger.info("TaskResponseWorker stopped");
        }
    }

    /**
     * persist  taskResponseEvent
     * @param taskResponseEvent taskResponseEvent
     */
    private void persist(TaskResponseEvent taskResponseEvent){
        Event event = taskResponseEvent.getEvent();
        Channel channel = taskResponseEvent.getChannel();

        switch (event){
            case ACK:
                try {
                    TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
                    if (taskInstance != null) {
                        ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState();
                        processService.changeTaskState(status,
                            taskResponseEvent.getStartTime(),
                            taskResponseEvent.getWorkerAddress(),
                            taskResponseEvent.getExecutePath(),
                            taskResponseEvent.getLogPath(),
                            taskResponseEvent.getTaskInstanceId());
                    }
                    // 向worker发送DB_TASK_ACK请求
                    DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
                    channel.writeAndFlush(taskAckCommand.convert2Command());
                }catch (Exception e){
                    logger.error("worker ack master error",e);
                    DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(),-1);
                    channel.writeAndFlush(taskAckCommand.convert2Command());
                }
                break;
            case RESULT:
                try {
                    TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
                    if (taskInstance != null){
                        processService.changeTaskState(taskResponseEvent.getState(),
                                taskResponseEvent.getEndTime(),
                                taskResponseEvent.getProcessId(),
                                taskResponseEvent.getAppIds(),
                                taskResponseEvent.getTaskInstanceId());
                    }
                    // 向worker发送DB_TASK_RESPONSE请求
                    DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId());
                    channel.writeAndFlush(taskResponseCommand.convert2Command());
                }catch (Exception e){
                    logger.error("worker response master error",e);
                    DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(),-1);
                    channel.writeAndFlush(taskResponseCommand.convert2Command());
                }
                break;
            default:
                throw new IllegalArgumentException("invalid event type : " + event);
        }
    }

Worker#DBTaskAckProcessor和DBTaskResponseProcessor

Worker接受到Master的db_task_ack commanddb_task_response command,对应的处理器为DBTaskAckProcessorDBTaskResponseProcessor,其逻辑都是从ResponceCache删除对应的task instance command

DBTaskAckProcessor

public void process(Channel channel, Command command) {
    Preconditions.checkArgument(CommandType.DB_TASK_ACK == command.getType(),
            String.format("invalid command type : %s", command.getType()));

    DBTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(
            command.getBody(), DBTaskAckCommand.class);

    if (taskAckCommand == null){
        return;
    }

    if (taskAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){
        ResponceCache.get().removeAckCache(taskAckCommand.getTaskInstanceId());
    }
}

DBTaskResponseProcessor

public void process(Channel channel, Command command) {
    Preconditions.checkArgument(CommandType.DB_TASK_RESPONSE == command.getType(),
                                String.format("invalid command type : %s", command.getType()));

    DBTaskResponseCommand taskResponseCommand = FastJsonSerializer.deserialize(
        command.getBody(), DBTaskResponseCommand.class);

    if (taskResponseCommand == null){
        return;
    }

    if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){
        ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId());
    }
}

停止任务如何交互

MasterTaskExecThread#waitTaskQuit

public Boolean waitTaskQuit(){
    // query new state
    taskInstance = processService.findTaskInstanceById(taskInstance.getId());

    while (Stopper.isRunning()){
        try {
            // 省略代码...

            // task instance add queue , waiting worker to kill
            // 如果master接受到cancal请求,或者工作流状态为准备停止的状态
            // master会给worker发送kill request command请求
            if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){
                cancelTaskInstance();
            }

            // 省略代码...
        } catch (Exception e) {
            // 省略代码...
        }
    }
    return true;
}

private void cancelTaskInstance() throws Exception{
    if(alreadyKilled){
        return;
    }
    alreadyKilled = true;
    taskInstance = processService.findTaskInstanceById(taskInstance.getId());
    if(StringUtils.isBlank(taskInstance.getHost())){
        taskInstance.setState(ExecutionStatus.KILL);
        taskInstance.setEndTime(new Date());
        processService.updateTaskInstance(taskInstance);
        return;
    }

    // 构造TaskKillRequestCommand
    TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
    killCommand.setTaskInstanceId(taskInstance.getId());

    ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER);

    Host host = Host.of(taskInstance.getHost());
    executionContext.setHost(host);

    nettyExecutorManager.executeDirectly(executionContext);

    logger.info("master kill taskInstance name :{} taskInstance id:{}",
            taskInstance.getName(), taskInstance.getId() );
}

Worker#TaskKillProcessor

TaskKillProcessor用于处理Master发送的Kill request command

public void process(Channel channel, Command command) {
    Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
    TaskKillRequestCommand killCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class);
    logger.info("received kill command : {}", killCommand);

    Pair<Boolean, List<String>> result = doKill(killCommand);

    taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(),
            new NettyRemoteChannel(channel, command.getOpaque()));

    // 向master发送kill response command
    TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand,result);
    taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
    taskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId());
}


private Pair<Boolean, List<String>> doKill(TaskKillRequestCommand killCommand){
    boolean processFlag = true;
    List<String> appIds = Collections.emptyList();
    int taskInstanceId = killCommand.getTaskInstanceId();
    TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
    try {
        Integer processId = taskExecutionContext.getProcessId();

        if (processId.equals(0)) {
            taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
            logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
            return Pair.of(true, appIds);
        }

        // 执行Kill -9 命令直接删除进程
        // spark or flink如果是提交到集群,暂时Kill不掉
        String pidsStr = ProcessUtils.getPidsStr(taskExecutionContext.getProcessId());
        if (StringUtils.isNotEmpty(pidsStr)) {
            String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(taskExecutionContext.getProcessId()));
            logger.info("process id:{}, cmd:{}", taskExecutionContext.getProcessId(), cmd);
            OSUtils.exeCmd(cmd);
        }

    } catch (Exception e) {
        processFlag = false;
        logger.error("kill task error", e);
    }
    // find log and kill yarn job
    Pair<Boolean, List<String>> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()).getIp(),
            taskExecutionContext.getLogPath(),
            taskExecutionContext.getExecutePath(),
            taskExecutionContext.getTenantCode());
    return Pair.of(processFlag && yarnResult.getLeft(), yarnResult.getRight());
}

master#TaskKillResponseProcessor

TaskKillResponseProcessor用于master处理worker停止任务的响应请求。

public void process(Channel channel, Command command) {
    Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));

    TaskKillResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillResponseCommand.class);
    logger.info("received task kill response command : {}", responseCommand);
}

通过对 Apache DolphinScheduler 1.3.9 的源码分析,我们深入了解了其核心模块的设计和实现。

如果你对 Apache DolphinScheduler 的源码有兴趣,可以深入研究其任务调度策略的细节部分,或者根据自身业务场景进行二次开发,充分发挥 DolphinScheduler 的调度能力。

本文完!

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/889421.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于FPGA的ov5640摄像头图像采集(二)

之前讲过ov5640摄像头图像采集&#xff0c;但是只包了的摄像头驱动与数据对齐两部分&#xff0c;但是由于摄像头输入的像素时钟与HDMI输出的驱动时钟并不相同&#xff0c;所有需要利用DDR3来将像素数据进行缓存再将像素数据从DDR3中读出&#xff0c;对DDR3的读写参考米联客的IP…

JVM系列(二) -类的加载过程介绍

一、背景介绍 我们知道 Java 是先通过编译器将.java类文件转成.class字节码文件&#xff0c;然后再通过虚拟机将.class字节码文件加载到内存中来实现应用程序的运行。 那么虚拟机是什么时候加载class文件&#xff1f;如何加载class文件&#xff1f;class文件进入到虚拟机后发…

Python酷库之旅-第三方库Pandas(142)

目录 一、用法精讲 641、pandas.Timestamp.hour属性 641-1、语法 641-2、参数 641-3、功能 641-4、返回值 641-5、说明 641-6、用法 641-6-1、数据准备 641-6-2、代码示例 641-6-3、结果输出 642、pandas.Timestamp.is_leap_year属性 642-1、语法 642-2、参数 6…

【MySQL 08】复合查询

目录 1.准备工作 2.多表查询 笛卡尔积 多表查询案例 3. 自连接 4.子查询 1.单行子查询 2.多行子查询 3.多列子查询 4.在from子句中使用子查询 5.合并查询 1.union 2.union all 1.准备工作 如下三个表&#xff0c;将作为示例&#xff0c;理解复合查询 EMP员工表…

在IDEA里用XDebug调试PHP,断点....

做程序开发,调试必不可少,这里最近用到了PHP,顺便写个关于PHP的调试安装使用: 1、首先是PHP先安装xdebug扩展(还有zend的),这个我的工具是IDEA,所以安装方法也相对简单,如果你是用VSCode等应该也是一样,如下图,找到这个PHP->DEBUG 2、直接点上面的Install XDebug 就可以帮你…

C(十五)函数综合(一)--- 开公司吗?

在这篇文章中&#xff0c;杰哥将带大家 “开公司”。 主干内容部分&#xff08;你将收获&#xff09;&#xff1a;&#x1f449; 为什么要有函数&#xff1f;函数有哪些&#xff1f;怎么自定义函数以及获得函数的使用权&#xff1f;怎么对函数进行传参&#xff1f;函数中变量的…

springboot kafka多数据源,通过配置动态加载发送者和消费者

前言 最近做项目&#xff0c;需要支持kafka多数据源&#xff0c;实际上我们也可以通过代码固定写死多套kafka集群逻辑&#xff0c;但是如果需要不修改代码扩展呢&#xff0c;因为kafka本身不处理额外逻辑&#xff0c;只是起到削峰&#xff0c;和数据的传递&#xff0c;那么就需…

FastAPI框架使用枚举来型来限定参数、FastApi框架隐藏没多大意义的Schemes模型部分内容以及常见的WSGI服务器Gunicorn、uWSGI了解

一、FastAPI框架使用枚举来型来限定参数 FastAPI框架验证时&#xff0c;有时需要通过枚举的方式来限定参数只能为某几个值中的一个&#xff0c;这时就可以使用FastAPI框架的枚举类型Enum了。publish:December 23, 2020 -Wednesday 代码如下&#xff1a; #引入Enum模块 from fa…

Python常用的函数大全!

对Python的内置函数进行了非常详细且有条理的分组和描述。 第一组 print()&#xff1a;用于输出信息到控制台。input()&#xff1a;用于从用户那里接收输入。len()&#xff1a;返回对象&#xff08;如字符串、列表、元组等&#xff09;的长度。类型转换函数&#xff08;int()…

YOLOv11改进策略【损失函数篇】| 利用MPDIoU,加强边界框回归的准确性

一、背景 目标检测和实例分割中的关键问题&#xff1a; 现有的大多数边界框回归损失函数在不同的预测结果下可能具有相同的值&#xff0c;这降低了边界框回归的收敛速度和准确性。 现有损失函数的不足&#xff1a; 现有的基于 ℓ n \ell_n ℓn​范数的损失函数简单但对各种尺度…

vSAN06:ESA与OSA对比、ESA安装、新架构、工作方式、自动策略管理、原生快照、数据压缩、故障处理

目录 vSAN ESAvSAN ESA 安装ESA新架构ESA工作方式ESA自动策略管理自适应RAID5策略 原生快照支持数据压缩的改进ESA故障处理 vSAN ESA vSAN ESA 安装 流程和OSA完全一致&#xff0c;但要注意要勾选启用vSAN ESA ESA和OSA的底层架构不一样&#xff0c;但是UI上是一致的。 生产环…

使用Python编写你的第一个算法交易程序

背景 Background ​ 最近想学习一下量化金融&#xff0c;总算在盈透投资者教育&#xff08;IBKRCampus&#xff09;板块找到一篇比较好的算法交易入门教程。我在记录实践过程后&#xff0c;翻译成中文写成此csdn博客&#xff0c;分享给大家。 ​ 如果你的英语好可以直接看原文…

2024百度云智大会|百度大模型内容安全合规探索与实践

9月25日&#xff0c;2024百度云智大会在北京举办。会上&#xff0c;百度智能云分别针对算力、模型、AI 应用&#xff0c;全面升级百舸 AI 异构计算平台 4.0、千帆大模型平台 3.0 两大 AI 基础设施&#xff0c;并升级代码助手、智能客服、数字人三大 AI 原生应用产品。 在大模型…

[uni-app]小兔鲜-08云开发

uniCloud可以通过JS开发服务端,包含云数据库, 云函数, 云存储等功能, uniCloud可结合 uni-ui 组件库使用 效果展示: <picker>城市选择组件不支持h5端和APP端, 所以我们使用 <uni-data-picker>组件进行兼容处理 <uni-data-picker>的数据使用云数据库的数据 云…

K8s中pod的管理和优化

一、k8s中的资源 1.1 资源管理介绍 在kubernetes中&#xff0c;所有的内容都抽象 资源&#xff0c;用户需要通过操作资源来管理kubernetes。kubernetes的本质上就是一个集群系统&#xff0c;用户可以在集群中部署各种服务所谓的部署服务&#xff0c;其实就是在kubernetes集群中…

【D3.js in Action 3 精译_030】3.5 给 D3 条形图加注图表标签(下):Krisztina Szűcs 人物专访 + 3.6 本章小结

当前内容所在位置&#xff08;可进入专栏查看其他译好的章节内容&#xff09; 第一部分 D3.js 基础知识 第一章 D3.js 简介&#xff08;已完结&#xff09; 1.1 何为 D3.js&#xff1f;1.2 D3 生态系统——入门须知1.3 数据可视化最佳实践&#xff08;上&#xff09;1.3 数据可…

Hive3.x版本调优总结

文章目录 第 1 章 Explain 查看执行计划&#xff08;重点&#xff09;1.1 创建测试用表1&#xff09;建大表、小表和 JOIN 后表的语句2&#xff09;分别向大表和小表中导入数据 1.2 基本语法1.3 案例实操 第 2 章 Hive 建表优化2.1 分区表2.1.1 分区表基本操作2.1.2 二级分区2.…

WMS系统拣货管理的优化与创新

一、WMS系统拣货管理的重要性 随着电子商务的快速发展&#xff0c;物流仓储行业面临着巨大的挑战。订单量的激增导致传统的手工拣货方式难以满足需求&#xff0c;而WMS系统的引入则解决了这一问题。通过WMS系统&#xff0c;仓库可以实现自动化、智能化的拣货管理&#xff0c;大…

小米路由器R3Gv2安装openwrt记录

前言 小米路由器R3Gv2的硬件配置与小米路由器4A千兆版一致&#xff0c;但bootloader有所不同&#xff0c;因此openwrt的固件不要互刷。另外&#xff0c;R3Gv2和R3G、4A百兆版是不同的设备&#xff0c;切勿混淆。 硬件信息 OpenWrt参数页-Xiaomi MiWiFi 3G v2 CPU&#xff1a…

小猿口算APP脚本(协议版)

小猿口算是一款专注于数学学习的教育应用,主要面向小学阶段的学生。它提供多种数学练习和测试,包括口算、速算、应用题等。通过智能化的题目生成和实时批改功能,帮助学生提高数学计算能力。此外,它还提供详细的学习报告和分析,帮助家长和教师了解学生的学习进度和薄弱环节…