海豚调度监控:使用图关系网络解决核心链路告警,减轻任务运维负担!

💡 本系列文章是 DolphinScheduler 由浅入深的教程,涵盖搭建、二开迭代、核心原理解读、运维和管理等一系列内容。适用于想对 DolphinScheduler了解或想要加深理解的读者。
祝开卷有益。
大数据学习指南

大家好,我是小陶,之前分享了如果自动检测依赖缺失:文章在这,今天依然是有关依赖关系的分享。DolphinScheduler 在使用过程中,肯定会有任务出现失败的情况,那么问题来了:调度任务的告警是需要人为配置的,在生产环境中,面对海量的任务,如何找到重要的任务,并且在失败的时候,第一时间告警呢?

先思考一下。

先看思路

本文提供一个思路,接着往下看吧。

不卖关子了。

本质是路径查找,本文这里使用了图数据库,或者你也可以自己使用Java实现路径查找。

下面是需要实现的目标,看一组任务的关系,如下图所示,存在 A/B/C/D/E 五个任务,E 任务被配置为核心任务,当 B 任务报错时,检测到 B 和 E 之前存在路径,则需要电话告警。
截屏2024-06-20 13.39.53.png

所以在配置核心链路告警的时候,我们只需要配置叶子节点,在实际生产中,一般是应用层的任务,比如报表、标签、接口数据等任务。

清洗依赖数据

核心逻辑就是把所有工作流内部、跨工作流以及跨项目的依赖全部清洗出来,生成一张关系表。具体清洗逻辑,可以看:文章在这
image.png
最终生成了
t_ds_task_node_base_data 任务基础表,后续会用于 Nebula Graph,这个后面会讲。
t_ds_dag_task_relation_data_df 关系最终表,后续会用于 Nebula Graph,这个后面会讲。

t_ds_dag_task_relation_data_df 这个表结构如下:
截屏2024-06-20 13.53.00.png

关系导入图数据库

这里用的国产图数据库 Nebula Graph,当然你也可以自己使用 Java 实现路径查找

为什么我们一定要引入图数据库呢?有下面几方面考虑:

  • 可以减轻调度系统Mysql的压力,把负责的路径计算放在图数据库里面。
  • 探索更多调度任务数据治理和运维的可能性,比如任务权重,影响分析等。(不久的将来我也会分享这一块的实践。)

截屏2024-06-20 13.59.23.png
用到的组件是 Nebula Graph,最关键的函数是 find path 查询最短链路
① 用到的语法是:FIND SHORTEST PATH需要注意的是,注意查询步长,UPTO <N> {STEP|STEPS}:路径的最大跳数。默认值为5。
② 3.3.0 开始,子图支持了边的条件限制了,查询的时候只拿最新的一批关系。

创建图空间

CREATE SPACE s_schedule_job (partition_num = 225, replica_factor = 3, vid_type = FIXED_STRING(180)) COMMENT = "大数据平台调度系统任务的血缘关系";

创建边和点

## 任务标签
DROP tag if exists t_task;
CREATE tag if not exists t_task(  id string NULL COMMENT "project_code,dag_code,task_code,拼接,",  project_name string NULL COMMENT "project_name",  project_code string NULL COMMENT "project_code",  dag_name string NULL COMMENT "dag_name",  dag_code string NULL COMMENT "dag_code",  dag_version string NULL COMMENT "dag_version",  task_code string NULL COMMENT "task_code",  task_version string NULL COMMENT "task_version",  task_name string NULL COMMENT "task_name",  task_type string NULL COMMENT "task_type",  create_time string NULL COMMENT "时间戳") comment='调度任务节点';

## 调度任务关系
drop edge if exists e_task;
create edge if not exists e_task(  pre_project_name string NULL COMMENT "project_name",  pre_project_code string NULL COMMENT "project_code",  pre_dag_name string NULL COMMENT "dag_name",  pre_dag_code string NULL COMMENT "dag_code",  pre_dag_version string NULL COMMENT "dag_version",  pre_task_code string NULL COMMENT "task_code",  pre_task_version string NULL COMMENT "task_version",  pre_task_name string NULL COMMENT "task_name",  pre_task_type string NULL COMMENT "task_type",  post_project_name string NULL COMMENT "project_name",  post_project_code string NULL COMMENT "project_code",  post_dag_name string NULL COMMENT "dag_name",  post_dag_code string NULL COMMENT "dag_code",  post_dag_version string NULL COMMENT "dag_version",  post_task_code string NULL COMMENT "task_code",  post_task_version string NULL COMMENT "task_version",  post_task_name string NULL COMMENT "task_name",  post_task_type string NULL COMMENT "task_type",  create_time string NULL COMMENT "时间戳") comment='调度任务关系';

导入数据

同步点:

{
  spark: {
    app: {
      name: Nebula_Exchange_t_task
    }
    driver: {
      cores: 2
      maxResultSize: 5G
    }
  }

  nebula: {
    address:{
      graph:["10.1.x.xx:9669","10.1.x.xx:9669","10.1.x.xx:9669","10.1.x.xx3:9669","10.1.x.xx:9669"]
      meta:["10.1.x.xx:9559","10.1.x.xx:9559","10.1.x.xx:9559"]
    }
    user: root
    pswd: "nebula密码"
    space: s_schedule_job
    connection {
      timeout: 60000
      retry: 3
    }
    execution {
      retry: 3
    }
    error: {
      max: 32
      output: /tmp/errors/t_task
    }
    rate: {
      limit: 1024
      timeout: 10000
    }
  }
  tags: [

    {
      name: t_task
      type: {
        source: mysql
        sink: client
      }
      host:"调度系统MYSQL数据库IP"
      port:3307
      database:"调度系统MYSQL数据库"
      table:"t_ds_task_node_base_data"
      user:"调度系统MYSQL用户"
      password:"调度系统MYSQL用户密码"
      sentence:"SELECT concat(project_code,'_',dag_code,'_',task_code) as id,project_name,project_code,dag_name,dag_code,dag_version,task_code,task_version,task_name,task_type,create_time FROM t_ds_task_node_base_data"
      fields: [project_name,project_code,dag_name,dag_code,dag_version,task_code,task_version,task_name,task_type,create_time]
      nebula.fields: [project_name,project_code,dag_name,dag_code,dag_version,task_code,task_version,task_name,task_type,create_time]
      vertex:{
        field:id
      }
      batch: 256
      partition: 32
    }

  ]


}

同步边:

{
  spark: {
    app: {
      name: Nebula_Exchange_e_task
    }
    driver: {
      cores: 2
      maxResultSize: 5G
    }
  }

  nebula: {
    address:{
      graph:["10.1.x.xx:9669","10.1.x.xx:9669","10.1.x.xx:9669","10.1.x.xx3:9669","10.1.x.xx:9669"]
      meta:["10.1.x.xx:9559","10.1.x.xx:9559","10.1.x.xx:9559"]
    }
    user: root
    pswd: "aD@VX2018#"
    space: s_schedule_job
    connection {
      timeout: 60000
      retry: 3
    }
    execution {
      retry: 3
    }
    error: {
      max: 32
      output: /tmp/errors/e_task
    }
    rate: {
      limit: 1024
      timeout: 10000
    }
  }
  edges: [

    {
      name: e_task
      type: {
        source: mysql
        sink: client
      }
      host:"调度系统MYSQL数据库IP"
      port:3307
      database:"调度系统MYSQL数据库"
      table:"t_ds_task_node_base_data"
      user:"调度系统MYSQL用户"
      password:"调度系统MYSQL用户密码"
      sentence:"SELECT concat(pre_project_code,'_',pre_dag_code,'_',pre_task_code) as from_id,concat(post_project_code,'_',post_dag_code,'_',post_task_code) as to_id,pre_project_name,pre_project_code,pre_dag_name,pre_dag_code,pre_dag_version,pre_task_code,pre_task_name,pre_task_type,pre_task_version,post_project_name,post_project_code,post_dag_name,post_dag_code,post_dag_version,post_task_code,post_task_name,post_task_type,post_task_version,create_time FROM t_ds_dag_task_relation_data_df"
      fields: [pre_project_name,pre_project_code,pre_dag_name,pre_dag_code,pre_dag_version,pre_task_code,pre_task_name,pre_task_type,pre_task_version,post_project_name,post_project_code,post_dag_name,post_dag_code,post_dag_version,post_task_code,post_task_name,post_task_type,post_task_version,create_time]
      nebula.fields: [pre_project_name,pre_project_code,pre_dag_name,pre_dag_code,pre_dag_version,pre_task_code,pre_task_name,pre_task_type,pre_task_version,post_project_name,post_project_code,post_dag_name,post_dag_code,post_dag_version,post_task_code,post_task_name,post_task_type,post_task_version,create_time]
      source: {
        field: from_id
      }
      target: {
        field: to_id
      }
      batch: 256
      partition: 225
    }

  ]

}

定时脚本: 使用 Nebula Graph 社区提供的 exchange 工具把数据从 mysql 导入 Nebula Graph。

#!/bin/bash
# 作业参数
basepath='/opt/vcredit-graph-db/s_schedule_job/exchange'
tmpdir='/tmp/nebula/s_schedule_job'
mkdir -p $tmpdir
sourcefile=${basepath}/${jobname}.conf
targetfile=${tmpdir}/${jobname}_${vardate}.conf
cat ${sourcefile} > ${targetfile}
sed -i "s/vardate/${vardate}/g" ${targetfile}
sed -i "s/varhivetable/${varhivetable}/g" ${targetfile}

# 运行环境
export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera
spark_submit="/opt/spark-2.4.8-bin-hadoop2.7/bin/spark-submit"
# 开始运行
${spark_submit} \
--principal hive@VCREDIT.COM \
--keytab /etc/security/hive.keytab \
--master "local[*]" \
--class com.vesoft.nebula.exchange.Exchange /opt/nebula/nebula-exchange_spark_2.4-3.0.0.jar  -c ${targetfile} -h

Java 服务

/**
 * 判断这个任务是否会影响核心任务
 * @param projectName
 * @param dagName
 * @param taskName
 * @return
 */
@ApiOperation(value = "dolphinTaskIsOnCall", notes = "判断这个任务是否会影响核心任务,是 1 ,否 0")
@ApiImplicitParams({
        @ApiImplicitParam(name = "projectName", value = "T-1", required = false, dataType = "String", example = "BigData"),
        @ApiImplicitParam(name = "dagName", value = "T-1", required = false, dataType = "String", example = "公共和自定义域(pub)_daily"),
        @ApiImplicitParam(name = "taskName", value = "T-1", required = false, dataType = "String", example = "dwd_pub_screen_zxd_cust_df")
})
@GetMapping("/dolphinTaskIsOnCall")
@ResponseBody
public DataResult dolphinTaskIsOnCall(
        @RequestParam(value = "projectName", required = true) String projectName,
        @RequestParam(value = "dagName", required = true) String dagName,
        @RequestParam(value = "taskName", required = true) String taskName) throws GraphDatabaseException, UnsupportedEncodingException {

    HashMap<String,Object> res = dolphinService.dolphinTaskIsOnCall(projectName, dagName, taskName);
    return DataResult.ok(res);
}

核心代码,在第 17 行:

@Override
public HashMap<String, Object> dolphinTaskIsOnCall(String projectName, String dagName, String taskName) throws GraphDatabaseException, UnsupportedEncodingException {
    HashMap<String,Object> resMap = new HashMap<>();
    // 查询该任务 codes
    HashMap<String,Object> task = dolphinTaskInstanceMapper.getTaskCode(projectName,dagName,taskName);
    if (task == null){
        resMap.put("res","任务不存在!");
        return resMap;
    }
    String fromCodes = task.get("project_code") + "_" + task.get("dag_code") + "_" + task.get("task_code");
    // 查询核心任务 codes
    List<HashMap<String,Object>> tasks = dolphinTaskInstanceMapper.getOnCallTasks();
    // 查询最短链路
    for (HashMap<String,Object> t : tasks){
        String toCodes = t.get("project_code") + "_" + t.get("dag_code") + "_" + t.get("task_code");
        // 查询Nebula
        String NgSql = "FIND SHORTEST PATH with PROP FROM \"" + fromCodes + "\" TO \"" + toCodes + "\" OVER * WHERE e_task.create_time > '" + DateUtils.dayToString(DateUtils.getSomeDay(new Date(), -1)) + "' UPTO 100 STEPS  YIELD path AS p;";
        int res = nebulaService.isOnCallTask("s_schedule_job",NgSql);
        if (res > 0){
            resMap.put("res",res);
            return resMap;
        }
    }
    resMap.put("res",0);
    return resMap;
}

返回值说明:

① 影响核心任务,需要打电话
{“data”:{“res”:1},“code”:0,“msg”:“success”}
② 不影响核心任务,不需要打电话
{“data”:{“res”:0},“code”:0,“msg”:“success”}
③ 任务不存在,忽略
{“data”:{“res”:“任务不存在!”},“code”:0,“msg”:“success”}
④ code 不等于 0 ,接口异常,忽略。

封装好接口之后,任务失败的程序调这个接口,判断失败任务是否影响核心任务,如果影响就打电话。

钉钉告警样式:
image.png
外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
电话告警,直接给对应负责人打电话。

至此,我们减少了很多任务告警的配置工作,只需要关注核心的叶子节点是什么,也就是核心的应用任务是什么,大大提高了任务告警的配置效率!!!

注意:清洗数据 和 导入图数据库,在每天的 23:30 分进行,一天初始化一次,确保凌晨的任务关系是最新的,主要是用于凌晨告警。

以上就使用图关系网络解决核心链路告警的全部内容,如果有任何疑问,都可以与我交流,希望可以帮到你,下次见。


**大数据学习指南 **专注于大数据技术分享与交流。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/726921.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Centos7.9安装openldap

文章目录 一、背景二、正文2.1 openldap服务端必要软件安装2.1.1使用yum命令安装2.1.2安装libdb相关依赖2.1.3复制一个默认配置到指定目录下&#xff0c;并授权&#xff0c;这一步一定要做&#xff0c;然后再启动服务&#xff0c;不然生成密码时会报错2.1.4授权给ldap用户&…

【第13章】进阶调试思路:如何安装复杂节点IP-Adapter?(安装/复杂报错/节点详情页/精读)ComfyUI基础入门教程

🎈背景 IP-Adapter这个名字,大家可能听说过,可以让生成的结果从参考图中学习人物、画风的一致性,在目前是比较实用的一个节点,广泛的用于照片绘制、电商作图等方面。 但同时,这个节点也是比较难安装的一个节点。 所以,这节课,我们就通过一个案例,来学习如何在Comf…

电子期刊制作教程:跟着步骤轻松学会制作

随着数字时代的快速发展&#xff0c;电子期刊以其独特的便捷性和互动性&#xff0c;已经成为信息传播的重要载体。你是否也想掌握制作电子期刊的技能呢&#xff1f;今天&#xff0c;就让我来为你一步步解析电子期刊的制作过程&#xff0c;带你轻松学会制作属于自己的电子期刊。…

地瓜网络技术综合助手教你一键下载腾讯会议高清视频

当您错过腾讯会议的直播课程&#xff0c;不必担心&#xff0c;地瓜网络技术综合助手帮您轻松获取视频回放。 只需几个简单步骤&#xff0c;即可在手头保留珍贵的学习资料。 首先&#xff0c;启动地瓜网络技术综合助手&#xff0c; 进行软件初始化并开启监测功能。 接下来&…

PyTorch中实现Transformer模型

前言 对于论文给出的模型架构&#xff0c;使用 PyTorch 分别实现各个部分。 命名transformer.py&#xff0c;引入的相关库函数&#xff1a; import copy import torch import math from torch import nn from torch.nn.functional import log_softmax# module: 需要深拷贝的模…

DLS Markets:日本银行豪赌美债巨亏,危机是否重演?

摘要 日本第五大银行Norinchukin Bank农林中央金库宣布大规模抛售美债&#xff0c;以弥补因利率上升导致的巨额损失。这一决定引发市场关注&#xff0c;担忧是否会引发一场美债“清仓甩卖”。本文详细分析了Norinchukin的背景、抛售美债的原因及其对全球金融市场的潜在影响。 …

2024年,计算机相关专业依旧是热门选择吗?未来趋势大揭秘!

文章目录 引言一、行业竞争现状二、专业与个人的匹配度判断三、专业前景分析总结 引言 在科技日新月异的今天&#xff0c;计算机专业一直以其强大的实用性和广阔的就业前景吸引着无数学子的目光。然而&#xff0c;随着人工智能、大数据、云计算等领域的飞速发展&#xff0c;我…

设计软件有哪些?景观插件篇,渲染100邀请码1a12

建立大型景观也是设计师常用的设计方法&#xff0c;我们介绍一些景观插件。 1、AutoGrass AutoGrass是用于快速生成逼真的草地和植被场景的3ds Max插件&#xff0c;它提供了大量的草地预设和工具&#xff0c;使用户能够轻松地创建各种各样的草地效果&#xff0c;包括草地、草…

磁盘未格式化:深度解析、恢复策略与预防措施

一、磁盘未格式化的定义与现象 在计算机存储领域&#xff0c;磁盘未格式化通常指的是磁盘分区或整个磁盘的文件系统信息出现丢失或损坏的情况&#xff0c;导致操作系统无法正确读取和识别磁盘上的数据。当尝试访问这样的磁盘时&#xff0c;系统往往会弹出一个警告框&#xff0…

package.json简介

1、package.json简介 通过 npm init 初始化一个项目&#xff0c;会生成3个目录/文件&#xff0c; node_modules, package.json和 package.lock.json。其中package-lock.json文件是为了锁版本。 2、package.json常用属性 1&#xff09;name name是项目的名称&#xff0c;命名…

Linux下手动修改服务器时间(没网环境下)

在客户服务器上更新程序时&#xff0c;发现服务器时间不对&#xff0c;现在应该是下午13:44:00&#xff0c;但服务器却显示为&#xff1a;21:40:53&#xff0c;所有是不对的。 date解决办法&#xff1a; 1、由于服务器是没有网的&#xff0c;只能手动设置时间&#xff0c;输入…

森林火灾扑救特类车辆有哪些_鼎跃安全

森林消防是在森林火灾发生时&#xff0c;为了保护森林资源&#xff0c;防止火势蔓延&#xff0c;采取了一系列的应用措施&#xff0c;针对自然环境中的火灾消防工作。森林灭火主要包括预警、预防措施、火情监测、火势控制和灭火等&#xff0c;森林火灾发生的地形往往复杂崎岖&a…

XMind 2024软件最新版下载及详细安装教程

​人所共知的是XMind 在公司和教育领域都有很广泛的应用&#xff0c;在公司中它能够用来进行会议管理、项目管理、信息管理、计划和XMind 被认为是一种新一代演示软件的模式。也就是说XMind不仅能够绘制思维导图&#xff0c;还能够绘制鱼骨图、二维图、树形图、逻辑图、组织结构…

Ubuntu server 24 (Linux) 安装lua + 卸载软件

1 apt 安装 # sudo apt update #查看可安装的lua版本 sudo apt-get install luaversion # sudo apt-get install lua5.3 #查看版本 testiZbp1g7fmjea77vsqc5hmmZ:/data/soft$ lua -v Lua 5.3.6 Copyright (C) 1994-2020 Lua.org, PUC-Rio2 Ubuntu卸载软件 #查找lua已安装…

fyne的VBox布局02

VBox布局02 最常用的布局是layout.BoxLayout&#xff0c;它有两种变体&#xff0c;水平和垂直。box布局将所有元素排列在单行或单列中&#xff0c;并带有可选的空格以帮助对齐。 一步一步实现一个如下界面布局&#xff0c;这个界面可以使用VBox布局来实现。 这次添加了2个复…

吴恩达机器学习 第三课 week1 无监督机器学习(下)

目录 01 学习目标 02 异常检测算法 2.1 异常检测算法的概念 2.2 基于高斯模型的异常检测 03 利用异常检测算法检测网络服务器的故障 3.1 问题描述 3.2 算法实现 3.3 问题升级 04 总结 01 学习目标 &#xff08;1&#xff09;理解异常检测算法&#xff08;Anomaly Dete…

vue2实现打印功能(vue-print-nb的实现)

实现效果&#xff1a; 引入插件 npm install vue-print-nb --save import Print from vue-print-nb Vue.use(Print) <div ref"printTest" id"printTest"><div style"text-align: center; page-break-after: always"><div style…

强化学习——基本概念

何为强化学习 机器学习的一大分支 强化学习&#xff08;Reinforcement Learning&#xff09;是机器学习的一种&#xff0c;它通过与环境不断地交互&#xff0c;借助环境的反馈来调整自己的行为&#xff0c;使得累计回报最大。强化学习要解决的是决策问题——求取当前状态下最…

HNU-计算机系统(CSAPP)实验三 BombLab

前言 BombLab来自《深入理解计算机系统》&#xff08;CSAPP&#xff09;一书的第三章“程序的机器级表示”的配套实验&#xff0c;该实验的目的是通过反汇编可执行程序bomb&#xff0c;来反推出程序执行内容&#xff0c;进而能够正确破解“密码”&#xff0c;拆除“炸弹”。 …

【人机交互 复习】第8章 交互设计模型与理论

一、引文 1.模型&#xff1a; 有的人成功了&#xff0c;他把这一路的经验中可以供其他人参考的部分总结了出来&#xff0c;然后让别人套用。 2.本章模型 &#xff08;1&#xff09;计算用户完成任务的时间&#xff1a;KLM &#xff08;2&#xff09;描述交互过程中系统状态的变…