海豚调度调优 | 正在运行的工作流(DAG)如何重新拉起失败的任务(Task)

💡  本系列文章是DolphinScheduler由浅入深的教程,涵盖搭建、二开迭代、核心原理解读、运维和管理等一系列内容。适用于想对 DolphinScheduler了解或想要加深理解的读者。

*祝开卷有益。 *

本系列教程基于 DolphinScheduler 2.0.5 做的优化。(稳定版推荐使用3.1.9

file

先抛出问题

1.场景描述

工作流 A 正在运行,里面有很多节点,依赖关系比较复杂。凌晨用户接到报警,a 节点失败了,此时其他分支的任务还在运行。此时工作流是不会是失败的,需要等待其他分支的任务运行结束,整个工作流才会失败。

失败的任务:是失败状态且重试次数用完了。

2.目前的处理流程

目前的做法是,先 kill 工作流,等待工作流变成失败状态,然后再点击恢复失败按钮,即可把失败的节点重新拉起来。

画外音:kill 工作流我也做了优化,后面会有文章介绍,kill之后,工作流会变成失败状态,这样做是为了可以恢复失败。

3.困惑

这种做法会影响正在运行的任务,强制把正在跑的任务 kill 掉,对一些运行时间比较久的任务来说,会降低执行效率。跑的好好的,被干掉了,恢复失败,又得重新跑。

这非常不划算。

优化建议:如何在不停止工作流的情况下,单独把失败的节点重新拉起来呢?

解决方案

后端优化:

分析了工作流启动、停止、恢复失败等操作类型 Master 和 Worker 的原理,打算新增一个操作类型、命令类型枚举值:RUN_FAILED_ONLY。

优化后的大致流程如下:

  1. 用户在页面上点击按钮,提交 executeType  = RUN_FAILED_ONLY、processInstanceId=xxxx的请求。

  2. API服务收到请求,判断是 RUN_FAILED_ONLY 操作,就封装一个 StateEventChangeCommand 命令,进行RPC请求。

  3. Master服务的 StateEventProcessor 监听到命令,提交给 StateEventResponseService ,它负责找到对应的工作流 WorkflowExecuteThread ,然后把这个stateEvent 给这个 WorkflowExecuteThread.

  4. WorkflowExecuteThread处理 stateEvent,判断这个 stateEvent 的 StateEventType  是 RUN_FAILED_ONLY_EVENT ,进行下面的处理:

找到改工作流失败且重试次数用完的任务列表,然后依次处理它们的执行记录(标记为失效,从失败列表移除,添加到待提交队列),最后提交到等待队列。

后端流程结束。

前端页面优化:

比较简单:新增一个按钮,文案是【重新失败节点】,在工作流列表上展示,用户可以点击。

源码

其中,新增了三个枚举类的值: 

org.apache.dolphinscheduler.api.enums.ExecuteType 

RUN_FAILED_ONLY

org.apache.dolphinscheduler.common.enums.CommandType

RUN_FAILED_ONLY(44, "run failed only");

org.apache.dolphinscheduler.common.enums.StateEventType

RUN_FAILED_ONLY_EVENT(4, "run failed only event");

几个关键步骤的代码,为了方便查看,上下文的代码、涉及改动的方法也会贴出来。

提示:序号对应上面的流程。

②  org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl#execute

switch (executeType) {
    case REPEAT_RUNNING:
        result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams);
        break;
    case RECOVER_SUSPENDED_PROCESS:
        result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
        break;
    // 新增 9-11 行代码
    case RUN_FAILED_ONLY:
        result = sendRunFailedOnlyMsg(processInstance, CommandType.RUN_FAILED_ONLY);
        break;
    case START_FAILURE_TASK_PROCESS:
        result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
        break;
    case STOP:
        if (processInstance.getState() == ExecutionStatus.READY_STOP) {
            putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
        } else {
            result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP);
        }
        break;
    case PAUSE:
        if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
            putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
        } else {
            result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE);
        }
        break;
    default:
        logger.error("unknown execute type : {}", executeType);
        putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");

        break;
}

sendRunFailedOnlyMsg 方法的逻辑,封装 stateEventChangeCommand,提交 RPC 请求。

 /**
     * send msg to master, run failed only
     *
     * @param processInstance process instance
     * @param commandType command type
     * @return update result
     */
    private Map<String, Object> sendRunFailedOnlyMsg(ProcessInstance processInstance, CommandType commandType) {
        Map<String, Object> result = new HashMap<>();
        String host = processInstance.getHost();
        String address = host.split(":")[0];
        int port = Integer.parseInt(host.split(":")[1]);
        StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
                processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0,StateEventType.RUN_FAILED_ONLY_EVENT
        );
        stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
        putMsg(result, Status.SUCCESS);
        return result;
    }

这里也给 StateEventChangeCommand 家了一个状态类型的字段,对应枚举类:StateEventType

方便下游判断状态。

查看下面 ③ 处的代码,用这个状态赋值给 stateEvent 的 type。

③ org.apache.dolphinscheduler.server.master.processor.StateEventProcessor#process 用上面 ② 处的 StateEventType,赋值给 stateEvent 的 type,然后提交给 stateEventResponseService 的 BlockingQueueeventQueue 队列。

@Override
    public void process(Channel channel, Command command) {
        Preconditions.checkArgument(CommandType.STATE_EVENT_REQUEST == command.getType(), String.format("invalid command type: %s", command.getType()));

        StateEventChangeCommand stateEventChangeCommand = JSONUtils.parseObject(command.getBody(), StateEventChangeCommand.class);
        StateEvent stateEvent = new StateEvent();
        stateEvent.setKey(stateEventChangeCommand.getKey());
        if (stateEventChangeCommand.getSourceProcessInstanceId() != stateEventChangeCommand.getDestProcessInstanceId()) {
            stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
        } else {
            stateEvent.setExecutionStatus(stateEventChangeCommand.getSourceStatus());
        }
        stateEvent.setProcessInstanceId(stateEventChangeCommand.getDestProcessInstanceId());
        stateEvent.setTaskInstanceId(stateEventChangeCommand.getDestTaskInstanceId());
        // TODO 修改
        StateEventType stateEventType = stateEventChangeCommand.getStateEventType();
        if (stateEventType != null){
            stateEvent.setType(stateEventType);
        }else {
            StateEventType type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE;
            stateEvent.setType(type);
        }

        logger.info("received command : {}", stateEvent);
        stateEventResponseService.addResponse(stateEvent);
    }

StateEventResponseWorker 线程一直扫描这个队列,拿到 stateEvent,找到要处理的工作流对应的 WorkflowExecuteThread 线程,把这个事件提交给 WorkflowExecuteThread 线程。

 /**
 * task worker thread
 */
class StateEventResponseWorker extends Thread {

    @Override
    public void run() {

        while (Stopper.isRunning()) {
            try {
                // if not task , blocking here
                StateEvent stateEvent = eventQueue.take();
                persist(stateEvent);
            } catch (InterruptedException e) {
                logger.warn("persist task error", e);
                Thread.currentThread().interrupt();
                break;
            }
        }
        logger.info("StateEventResponseWorker stopped");
    }
}

private void persist(StateEvent stateEvent) {
    try {
        if (!this.processInstanceMapper.containsKey(stateEvent.getProcessInstanceId())) {
            writeResponse(stateEvent, ExecutionStatus.FAILURE);
            return;
        }

        WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(stateEvent.getProcessInstanceId());
        workflowExecuteThread.addStateEvent(stateEvent);
        writeResponse(stateEvent, ExecutionStatus.SUCCESS);
    } catch (Exception e) {
        logger.error("persist event queue error, event: {}", stateEvent, e);
    }
}

④WorkflowExecuteThread 内部循环扫描事件列表。

private void handleEvents() {
    while (!this.stateEvents.isEmpty()) {

        try {
            StateEvent stateEvent = this.stateEvents.peek();
            if (stateEventHandler(stateEvent)) {
                this.stateEvents.remove(stateEvent);
            }
        } catch (Exception e) {
            logger.error("state handle error:", e);

        }
    }
}

stateEventHandler 处理 RUN_FAILED_ONLY_EVENT 类型的事件,处理方法是:runFailedHandler

private boolean stateEventHandler(StateEvent stateEvent) {
        logger.info("process event: {}", stateEvent.toString());

        if (!checkStateEvent(stateEvent)) {
            return false;
        }
        boolean result = false;
        switch (stateEvent.getType()) {
            case RUN_FAILED_ONLY_EVENT:
                result = runFailedHandler(stateEvent);
                break;
            case PROCESS_STATE_CHANGE:
                result = processStateChangeHandler(stateEvent);
                break;
            case TASK_STATE_CHANGE:
                result = taskStateChangeHandler(stateEvent);
                break;
            case PROCESS_TIMEOUT:
                result = processTimeout();
                break;
            case TASK_TIMEOUT:
                result = taskTimeout(stateEvent);
                break;
            default:
                break;
        }

        if (result) {
            this.stateEvents.remove(stateEvent);
        }
        return result;
    }

runFailedHandler 的内部逻辑如下:找到改工作流失败且重试次数用完的任务列表,然后依次处理它们的执行记录(标记为失效,从失败列表移除,添加到待提交队列),最后提交到等待队列。

private boolean runFailedHandler(StateEvent stateEvent) {
    try {
        logger.info("process:{} will do {}", processInstance.getId(), stateEvent.getExecutionStatus());
        // find failed tasks with max retry times and init these tasks
        List<Integer> failedList = processService.queryTaskByProcessIdAndStateWithMaxRetry(processInstance.getId(), ExecutionStatus.FAILURE);
        logger.info("run failed task size is : {}", failedList.size());
        for (Integer taskId : failedList) {
            logger.info("run failed task id is : {}", taskId);
            TaskInstance taskInstance = processService.findTaskInstanceById(taskId);
            taskInstance.setFlag(Flag.NO);
            // remove it from errorTaskList
            errorTaskList.remove(Long.toString(taskInstance.getTaskCode()));
            processService.updateTaskInstance(taskInstance);
            // submit current task nodes
            if (readyToSubmitTaskQueue.contains(taskInstance)) {
                continue;
            }
            logger.info("run failed task ,submit current task nodes : {}", taskInstance.toString());
            addTaskToStandByList(taskInstance);
        }
        submitStandByTask();
//            updateProcessInstanceState();
    } catch (Exception e) {
        logger.error("process only run failed task error:", e);
    }
    return true;
}

最终效果

再次回到文章开头的场景:

工作流 A 正在运行,里面有很多节点,依赖关系比较复杂。凌晨用户接到报警,a 节点失败了,此时其他分支的任务还在运行。

此时用户可以直接点击【重新拉起失败任务】按钮,失败的任务就会重新进入等待队列,后续流程就像任务正常运行一样,也会继续拉起下游任务。

画外音:本次优化简化了失败任务运维的复杂度,提高了效率。

作者从1.x开始使用海豚调度,那是还叫做 Easy Scheduler,是一个忠实用户,我们基于 2.x版本做了很多内部的改造,后续会分享出来,同样社区也推荐大家使用3.1.9版本,这是相对比较稳定的版本。

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/732561.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

javaSE字符串学习笔记

API和API帮助文档 API API(Application Programming Interface)&#xff1a;应用程序编程接口简单理解&#xff1a;API酒啊别人已经写好的东西&#xff0c;我们不需要自己编写&#xff0c;直接使用即可。 API这个术语在编程圈中非常常见.我第一次接触API这个词语是在大一下。老…

SQLCMD完全指南:掌控 SQL Server

SQL Server 拥有被广泛认可的一流管理工具——SQL Server Management Studio&#xff08;简称 SSMS&#xff09;。它提供了丰富的功能&#xff0c;极大地简化了开发人员和数据库管理员&#xff08;DBA&#xff09;的工作。 目录 SQLCMD 入门使用 SQLCMD 连接 SQL ServerSQLCMD …

进程、线程的区别

进程、线程的关系 开工厂生产手机&#xff0c;制作一条生产线&#xff0c;这个生产线上有很多的器件以及材料。一条生产线就是一个进程。 只有生产线是不够的&#xff0c;使用找五个工人来进行生产&#xff0c;这个工人能够利用这些材料最终一步步的将手机做出来&#xff0c;这…

Python xlrd库:读excel表格

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

【Apache Doris】周FAQ集锦:第 7 期

【Apache Doris】周FAQ集锦&#xff1a;第 7 期 SQL问题数据操作问题运维常见问题其它问题关于社区 欢迎查阅本周的 Apache Doris 社区 FAQ 栏目&#xff01; 在这个栏目中&#xff0c;每周将筛选社区反馈的热门问题和话题&#xff0c;重点回答并进行深入探讨。旨在为广大用户和…

【机器学习 复习】第7章 集成学习(小重点,混之前章节出题但小题)

一、概念 1.集成学习&#xff0c;顾名思义&#xff0c;不是一个玩意&#xff0c;而是一堆玩意混合到一块。 &#xff08;1&#xff09;基本思想是先 生成一定数量基学习器&#xff0c;再采用集成策略 将这堆基学习器的预测结果组合起来&#xff0c;从而形成最终结论。 &#x…

MicroBlaze IP核中Local Memory Bus (LMB)接口描述

LMB&#xff08;Local Memory Bus&#xff09;是一种同步总线&#xff0c;主要用于访问FPGA上的块RAM&#xff08;Block RAM&#xff0c;BRAM&#xff09;。LMB使用最少的控制信号和一个简单的协议&#xff0c;以保证块RAM能在一个时钟周期内被存取。所有的LMB信号都是高电平有…

计算机网络5:运输层

概述 进程间基于网络的通信 计算机网络中实际进行通信的真正实体&#xff0c;是位于通信两端主机中的进程。 如何为运行在不同主机上的应用进程提供直接的逻辑通信服务&#xff0c;就是运输层的主要任务。运输层协议又称为端到端协议。 运输层向应用层实体屏蔽了下面网络核心…

k8s资源的基本操作

文章目录 一、Namespace1、概述2、预定义的k8s命名空间2.1、default2.2、kube-public2.3、kube-system2.4、kube-node-lease 3、命名空间基本操作3.1、查看3.1.1、查看所有的命名空间3.1.2、查看指定的命名空间3.1.3、指定输出格式3.1.4、查看ns详情 3.2、创建3.2.1、命令行创建…

VMware vSphere Bitfusion 4.5.4 - 面向 AI 和 ML 应用提供弹性基础架构

VMware vSphere Bitfusion 4.5.4 - 面向 AI 和 ML 应用提供弹性基础架构 请访问原文链接&#xff1a;VMware vSphere Bitfusion 4.5.4 - 面向 AI 和 ML 应用提供弹性基础架构&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。 作者主页&#xff1a;sysin.org VM…

Android图片圆角转换 RoundedImageView开源项目 小记(1)

android:background“#7f000000” android:paddingLeft“8dp” android:paddingRight“8dp” android:textAppearance“?android:attr/textAppearanceMediumInverse” /> <TextView android:id“id/textView1” android:layout_width“wrap_content” android:la…

十一、数据结构(图的最短路)

文章目录 基础部分最短路径问题用 D F S DFS DFS搜索所有的路径用 B F S BFS BFS求最短路径 最短路算法 F l o y d Floyd Floydcode(Floyd的实现): S P F A SPFA SPFAcode(基于邻接表的 S P F A ) SPFA) SPFA) D i j k s t r a Dijkstra Dijkstracode&#xff08;dijkstra的实现…

Excel导出实例

在上一节的基础上&#xff0c;本文演示下如何导出excel数据。 Excel导出操作演示 继承ocean-easyexcel SDK <dependency><groupId>com.angel.ocean</groupId><artifactId>ocean-easyexcel</artifactId><version>1.0.0</version> …

2024头歌数据库期末综合(部分题)

目录 第1关&#xff1a;数据表结构修改1 任务描述 学习补充 答案 第2关&#xff1a;数据记录删除 任务描述 学习补充 答案 第3关&#xff1a;数据表结构修改2 任务描述 学习补充 答案 第5关&#xff1a;数据查询一 任务描述 学习补充 答案 本篇博客声明&…

【ARMv8/ARMv9 硬件加速系列 4 -- 加解密 Cryptographic Extension 介绍】

文章目录 ARMv8.0 Cryptographic ExtensionFEAT_AESFEAT_PMULLFEAT_SHA1FEAT_SHA256ARMv8.2 扩展FEAT_SHA512FEAT_SHA3FEAT_SM3FEAT_SM4ARMv8.0 Cryptographic Extension ARMv8.0引入了加密扩展(Cryptographic Extension),旨在加速加密和解密操作。这一扩展通过新增专用指令…

【Linux】 yum学习

yum介绍 在Linux系统中&#xff0c;yum&#xff08;Yellowdog Updater, Modified&#xff09;是一个用于管理软件包的命令行工具&#xff0c;特别适用于基于RPM&#xff08;Red Hat Package Manager&#xff09;的系统&#xff0c;如CentOS、Fedora和Red Hat Enterprise Linux…

一、docker简介及卸载、安装

目录 一、Docker 简介 二、dockers三要素 1、Docker镜像&#xff08;image&#xff09; 2、Docker仓库 3、Docker容器 三、docker架构图 四. Docker 运行的基本流程 五、docker 卸载 1、停止docker服务 2、查看yum安装的docker文件包 3、查看docker相关的rpm源文件 …

力扣随机一题 模拟+字符串

博客主页&#xff1a;誓则盟约系列专栏&#xff1a;IT竞赛 专栏关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ 1910.删除一个字符串中所有出现的给定子字符串【中等】 题目&#xff1a; …

利用LabVIEW项目管理和组织LabVIEW应用程序

如何利用LabVIEW项目管理和组织LabVIEW应用程序&#xff0c;提供了关于文件定义、磁盘上的文件组织、LabVIEW项目浏览器、交叉链接和相关资源的建议。这些推荐在开发前就应建立&#xff0c;以确保应用程序能扩展到大量VIs并适应多开发者环境。 目录 定义和识别应用程序文件 磁…

第五篇:构建与维护私有Docker Registry: 企业级实践指南

构建与维护私有Docker Registry: 企业级实践指南 1. 引言&#xff1a;解析私有Docker仓库的必要性 1.1 Docker Registry简介与私有化的好处 Docker Registry是一个用于存储和分发Docker镜像的系统。在Docker生态系统中&#xff0c;Registry扮演着至关重要的角色&#xff0c;为…