海豚调度清理:使用 API 轻松清理历史工作流实例以及日志文件

💡 本系列文章是 DolphinScheduler 由浅入深的教程,涵盖搭建、二开迭代、核心原理解读、运维和管理等一系列内容。适用于想对 DolphinScheduler了解或想要加深理解的读者。
祝开卷有益。
大数据学习指南

大家好,我是小陶,DolphinScheduler 运行一段时间之后,会积累大量的历史运行记录,这些记录主要包括:工作流实例记录(MySQL)、任务实例记录(MySQL)、任务日志(本地磁盘),其中 MySQL 的记录越来越多,会影响页面分页查询的速度,进而影响用户使用体验和 MySQL 服务。

所以,需要清理以上历史记录,保证页面影响速度和 MySQL 服务。

本文的内容也比较简单,先是说明 API 的逻辑、存在的bug和修复方法,最后再介绍如何使用一个 Python 脚本来调用 API 删除历史实例。

1.API 逻辑介绍

DolphinScheduler 本身提供了批量删除工作流实例的接口,**process-instances/batch-delete,**接口逻辑这里简单描述一下就是,找到工作流下面的任务实例,依次删除任务日志和 Mysql 记录。

在这里插入图片描述

2.API bug说明和修复

但是这里需要注意的是,海豚调度 3.2.0(不包含)以前的版本,这里有一个 bug,在查询工作流实例下面的任务实例的时候,只查询了 flag =1 的任务实例,所以就导致了在清理日志和记录的时候,漏掉了一部分。

ProcessServiceImpl.java 中的 removeTaskLogFile 方法,在查询任务实例集合的时候,引用了 findValidTaskListByProcessId(processInstanceId); 而 findValidTaskListByProcessId 中仅查询了 Flag.YES 也就是 flag = 1 的记录。如下图所示:

在这里插入图片描述

这里解释一下 flag = 1 是标识该任务的最新的运行记录,表示任务多次重试之后,最新的运行记录。如果任务第一次失败了,第二次重试之后成功了,那么这个任务就会有两条运行记录,flag = 0 和 falg = 1,flag =1 的则标识最新的运行记录。

所以,如果你在使用海豚调度 3.2.0(不包含)以前的版本的时候,需要自行修复一下,或者升级到 3.2.0 。

修复的方式,也比较简单,新增 findAllTaskListByProcessId 方法,把工作流实例所有的运行实例都拿出来,不要加 flag 这个过滤条件。


3.使用 Python 脚本调用API

Python脚本的逻辑比较简单,使用了三个API,按照顺序是:

1.获取项目列表
2.获取工作流列表
3.批量删除工作流实例

入参是:日期

具体的代码如下:

#!/usr/bin/python
# -*- coding: utf8 -*-
## 定时清理调度工作流记录,入参是日期

import io
import subprocess
import requests
import json
import time
import datetime
from optparse import OptionParser
from optparse import OptionGroup

logging.basicConfig(format='%(asctime)s : %(levelname)s : %(module)s : %(message)s', level=logging.INFO,
                    stream=sys.stdout)
logger = logging.getLogger(__name__)

# 配置信息: ip 端口 token自行修改
base_url = 'http://IP:端口'
token = 'xxxxxxxxxxxxx'

# get args
def get_option_parser(params):
    usage = "usage: %prog [options] json-url"
    parser = OptionParser(usage=usage)
    prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
                                     "Normal user use these options to set jvm parameters, job runtime mode etc. "
                                     "Make sure these options can be used in Product Env.")
    for k in params:
        prodEnvOptionGroup.add_option("--" + k, metavar="<" + k + ">", dest=k, action="store", default="",
                                      help="" + params[k])

    parser.add_option_group(prodEnvOptionGroup)
    return parser
  
# 获取项目列表
def get_project_list():
    url = "{base_url}/dolphinscheduler/projects?pageSize=100&pageNo=1&searchVal=&_t=0.3741042528841678".format(base_url=base_url)
    payload={}
    headers = {
      'Connection': 'keep-alive',
      'Accept': 'application/json, text/plain, */*',
      'language': 'zh_CN',
      'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
      'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
      'Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),
      'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
      'token':token
    }
    response = requests.request("GET", url, headers=headers, data=payload)
    response_data = json.loads(response.text)
    totalList = response_data['data']['totalList']
    return totalList

def get_page_detail(code,dt):
    url = "{base_url}/dolphinscheduler/projects/{code}/process-instances?searchVal=&pageSize=50&pageNo=1&host=&stateType=&startDate=2000-01-01 00:00:00&endDate={dt} 23:59:59&executorName=".format(code=code,dt=dt,base_url=base_url)
    payload={}
    headers = {
      'Connection': 'keep-alive',
      'Accept': 'application/json, text/plain, */*',
      'language': 'zh_CN',
      'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
      'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
      'Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),
      'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
      'token':token
    }
    response = requests.request("GET", url, headers=headers, data=payload)
    response_data = json.loads(response.text)
    page = response_data['data']['totalList']
    page_del = 'processInstanceIds='
    if len(page) == 0:
        print('列表为空,退出程序')
        return '0'
    for p in page:
        page_del = page_del + str(p['id']) + ','
    # print(page_del)
    return page_del

def delete(project,ids):
    print('即将删除如下工作流实例:')
    print(project)
    print(ids)
    url = "{base_url}/dolphinscheduler/projects/{project}/process-instances/batch-delete".format(base_url=base_url,project = project)
    # 'processInstanceIds=89767'
    payload= ids
    headers = {
      'Connection': 'keep-alive',
      'Accept': 'application/json, text/plain, */*',
      'language': 'zh_CN',
      'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
      'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
      'Content-Type': 'application/x-www-form-urlencoded',
      'Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),
      'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
      'token':token
    }
    response = requests.request("POST", url, headers=headers, data=payload)
    print('执行结果如下:')
    print(response.text)

if __name__ == '__main__':
    #获取请求参数()
    params = {"dt": "dt"};
    parser = get_option_parser(params)
    options, args = parser.parse_args(sys.argv[1:])
    logger.info('开始执行删除任务实例...' + " ".join(sys.argv))
    # 清理的日期
    dt = options.dt
    if dt == '' or len(dt) == 0:
        logger.error('调度系统-运维任务:日期为空,请输入日期')
        sys.exit(1)

    today_91 = (datetime.datetime.now()+datetime.timedelta(days=-61)).strftime("%Y-%m-%d")

    short_dt = dt.replace('-','')
    short_today_91 = today_91.replace('-','')
    if int(short_dt) > int(short_today_91):
        logger.error('调度系统-运维任务:不能删除最近90天之内的任务实例')
        sys.exit(1)
    # # 需要处理的项目
    projects = get_project_list()
    # 依次处理项目
    for project in projects:
        code = project['code']
        print('正在处理:'+ str(code))
        while True:
            page_del = get_page_detail(code,dt)
            if page_del == '0':
                break
            delete(code,page_del)
            time.sleep(1)

使用示例:dolphin_clean_process.py 是上面的脚本。

python  dolphin_clean_process.py 2024-01-01

**脚本在 GitHub 也维护了一份,欢迎 star **
https://github.com/aikuyun/dolphin_practices/blob/main/dolphin_clean_process.py

4.注意事项

1.token 获取的方式

在这里插入图片描述

2.可以删除的工作流的状态是一定要是完成状态的。否则,接口就会报错,非完成状态的工作流是不可以删除的。可以通过下面的SQL查看某个日期之前是否存在非完成状态的工作流实例。

SELECT *
FROM t_ds_process_instance
where state not in (7 ,13 ,6 ,8 ,5 ,9 ,3)
and start_time < '2024-01-01'

以上就使用 API 轻松清理历史工作流实例以及日志文件的全部内容,如果有任何疑问,都可以与我交流,希望可以帮到你,下次见。


大数据学习指南 专注于大数据技术分享与交流。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/704526.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

AI落地好项目,一张照片秒生成个人写真

AI变现好项目&#xff0c;秒生成个人写真 只需要输入1张照片&#xff0c;无需训练&#xff0c;就能秒级生成个人写真&#xff1f; 你没听错&#xff0c;就是这个神奇的AI工具–InstantID。 众所周知&#xff0c;AI图生图面临的挑战之一是无法保持较高的角色一致性&#xff0c…

敏捷项目管理工具排行榜:打造高效敏捷开发环境的利器

最常见的敏捷项目管理工具包括&#xff1a;Leangoo领歌、Trello、Asana、ClickUp等 在敏捷开发的世界里&#xff0c;项目管理工具如同指挥棒&#xff0c;引领着团队快速响应变化&#xff0c;持续交付价值。介绍几款业内领先的敏捷项目管理工具&#xff0c;帮组大家选择最适合自…

男士内裤买便宜还是贵的?2024年高性价比男士内裤汇总分享

男生内裤&#xff0c;作为贴身衣物&#xff0c;承载着男性的私密与舒适。然而&#xff0c;许多男士的内裤状况却让人大跌眼镜&#xff1a;穿到变形、腰部松垮无弹性&#xff0c;屁股后面甚至出现破洞&#xff0c;这样的景象已然屡见不鲜。更有些男士的内裤&#xff0c;中间一个…

【Pandas】可视化plot()参数kind

Pandas是一个强大的数据分析库&#xff0c;它内置了基于matplotlib的数据可视化功能&#xff0c;使得直接在DataFrame和Series上进行绘图变得非常方便。在pandas中&#xff0c;.plot()方法允许用户通过kind参数灵活地选择多种图表类型。 导库 import numpy as np import pand…

「51媒体」江苏有哪些媒体-参会-宣发-专访-直播

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 在江苏省&#xff0c;媒体资源丰富&#xff0c;涵盖了参会、宣发、专访和直播等多个方面。以下是对这些媒体资源的详细归纳&#xff1a; 一、参会媒体 本地主流媒体&#xff1a;如无锡日…

HTTP3版本和实现验证

HTTP3协议基于Google的 QUIC 协议&#xff0c;由互联网工程任务组&#xff08;IETF&#xff09;来制定。目录还是草案&#xff0c;已经进行到第33版。 HTTP3 是基于 QUIC 协议的 http。传输层是UDPQUIC&#xff0c;应用层仍是HTTP&#xff0c;即request/respose, request里也仍…

学会情感化设计,让用户舍不得离开!

万物皆有个性&#xff0c;万物都会发出情感信号。即使这不是设计者的初衷&#xff0c;用户在浏览网站时也会推断出网站的个性&#xff0c;体验到网站的情感。 — 用户体验设计大师 Don Norman 正如设计大师唐诺曼所说&#xff0c;情感是我们生活重要的组成部分&#xff0c;情感…

Postgresql源码(135)生成执行计划——Var的调整set_plan_references

1 总结 set_plan_references主要有两个功能&#xff1a; 拉平&#xff1a;生成拉平后的RTE列表&#xff08;add_rtes_to_flat_rtable&#xff09;。调整&#xff1a;调整前每一层计划中varno的引用都是相对于本层RTE的偏移量。放在一个整体计划后&#xff0c;需要指向一个统一…

【区块链】记账的千年演化:从泥板到区块链

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 记账的千年演化&#xff1a;从泥板到区块链引言一、古代记账&#xff1a;泥板与…

深度学习(三)——Transforms的使用

一、Transforms的结构及用法 导入transforms from torchvision import transforms作用&#xff1a;图片输入transforms后&#xff0c;可以得到一些预期的变换 1. Transforms的python用法 写在前面&#xff1a;tensor数据类型 通过transforms.ToTensor去说明两个问题&#…

如何更新 iOS 18 Beta 版本?具体步骤总结

如何更新 iOS 18 Beta 想必有一些用户已经迫不及待的想要知道怎么更新 iOS 18 Beta 版本了吧&#xff0c;下面就给大家总结了具体的操作步骤&#xff1a; 在更新 iOS 18 Beta 版本之前记得我们需要将手机的数据进行备份&#xff0c;大家可以自行选用备份软件比如 iCloud 等。…

vb.net小demo(计算器、文件处理等/C#也可看)

Demo1&#xff1a;使用窗体控件实现一个简易版计算器 Public Class Form1Private Sub Button_1_Click(sender As Object, e As EventArgs) Handles Button_1.ClickCalSubBox.Text Button_1.TextEnd SubPrivate Sub Button_2_Click(sender As Object, e As EventArgs) Handles …

使用RV1126交叉编译工具链交叉编译opencv,c++代码直接调用VideoCapture 读取摄像头数据

使用RV1126交叉编译工具链交叉编译opencv&#xff0c;rv1126直接调用VideoCapture 读取摄像头数据 前言环境一、ubantu安装二、交叉编译工具安装三、cmake升级四、ffmpeg安装五、opencv安装六、c代码测试&#xff08;上板运行&#xff09; 前言 交叉编译是一种将软件在操作系统…

超详解——​深入理解Python中的位运算与常用内置函数/模块——基础篇

目录 ​编辑 1.位运算 2.常用内置函数/模块 math模块 random模块 decimal模块 常用内置函数 3.深入理解和应用 位运算的实际应用 1.权限管理 2.位图 3.图像处理 2.math模块的高级应用 统计计算 几何计算 总结 1.位运算 位运算是对整数在内存中的二进制表示进行…

Android Uri转File path路径,Kotlin

Android Uri转File path路径&#xff0c;Kotlin /*** URI转化为file path路径*/private fun getFilePathFromURI(context: Context, contentURI: Uri): String? {val result: String?var cursor: Cursor? nulltry {cursor context.contentResolver.query(contentURI, null…

设计模式-创建型-04-建造者模式

1、盖房项目需求 1&#xff09;需要建房子&#xff1a;这一过程为打桩、砌墙、封顶2&#xff09;房子有各种各样的&#xff0c;比如普通房&#xff0c;高楼&#xff0c;别墅&#xff0c;各种房子的过程虽然一样&#xff0c;但是要求不要相同的3&#xff09;请编写程序&#xf…

模拟信号转RS-485/232,数据采集A/D转换模块 YL21

特点&#xff1a; ● 模拟信号采集&#xff0c;隔离转换 RS-485/232输出 ● 采用12位AD转换器&#xff0c;测量精度优于0.1% ● 通过RS-485/232接口可以程控校准模块精度 ● 信号输入 / 输出之间隔离耐压3000VDC ● 宽电源供电范围&#xff1a;8 ~ 32VDC ● 可靠性高&…

【Axure高保真原型】拖拉拽动态编辑可视化页面

今天和大家分享拖拉拽动态编辑可视化页面的原型模板&#xff0c;我们可以拖动左侧工具列表的图表&#xff0c;添加到页面&#xff0c;可以多次添加&#xff0c;添加后可以拖动图表的位置&#xff0c;或者鼠标移入图表后点击delete键删除多余的图表&#xff0c;案例中提供10中常…

我国间二甲苯零售规模逐渐扩大 进口量有所下滑

我国间二甲苯零售规模逐渐扩大 进口量有所下滑 间二甲苯&#xff08;MX&#xff09;又称为1,3-二甲苯&#xff0c;是苯的两个氢基被两个甲基取代后形成的一种有机化合物。间二甲苯的化学方程式为C8H10&#xff0c;多表现为一种无色透明的液体&#xff0c;不溶于水&#xff0c;但…

Pikachu上的CSRF以及NSSCTF上的[NISACTF 2022]bingdundun~、 [SWPUCTF 2022 新生赛]xff

目录 一、CSRF CSRF(get) login CSRF(post) CSRF Token 二、CSRF的相关知识点 &#xff08;1&#xff09;什么是CSRF&#xff1f; &#xff08;2&#xff09;工作原理 &#xff08;3&#xff09;CSRF漏洞形成的条件 1、用户要在登录状态&#xff08;即浏览器保存了该…