使用DolphinScheduler接口实现批量导入工作流并上线

使用DS接口实现批量导入工作量并上线脚本

前面实现了批量生成DS的任务,当导入时发现只能逐个导入,因此通过接口实现会更方便。

DS接口文档

DS是有接口文档的地址是

http://IP:12345/dolphinscheduler/swagger-ui/index.html?language=zh_CN&lang=cn

不过这文档写的比较简略,不太能懂,那么只能自己去找了。

token

所有的接口都需要用到token
在这里插入图片描述
在安全中心-令牌管理 创建一个token 。记住这个token,后面所有的接口都需要用到 。

header

根据上面的token组成请求要用的header

token = ''
headers = {
    'Accept': 'application/json',
    'token': token
}

项目ID project_id 可以在查看项目工作流时,在url中找到。

DS导入任务接口

导入任务的接口是

import_url = 'http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition/import'

知道接口 就可以导入了

def import_job(file_path):
# 打开文件并读取为二进制数据
    with open(file_path, 'rb') as file:
        files = {'file': file}
        # 导入工作流
        response = requests.post(import_url, headers=headers, files=files)
        print(response.status_code)
        if response.status_code != 200:
            print('上传失败  '+file_path)

需要注意的是,导入任务时 只支持二进制 。
file_path 是工作流文件,具体实现 可以工作流中导出一个作为参考。
重复使用上述方法,就可以实现批量导入任务。

工作流上线

使用上述方法批量完成任务上传后,依旧有问题,逐个上线工作量也是个不小的工作量,因此继续使用接口。
经过研究发现,上线工作流需要先获取工作流的调度ID 。

获取工作流列表 - > 获取工作流code -> 获取所有工作流的调度ID -> 工作流上线

获取工作流列表

这是接口地址

jobs_url = 'http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition'

不过这个要分页查询,稍微有一点点麻烦

def get_jobs_list():
    # 分页查询
    # 初始化分页参数
    pageNo = 1
    pageSize = 10
    url = f'{jobs_url}?pageSize=10&pageNo=1&searchVal='
    # 构建完整的URL
    # 存储所有结果
    all_items = list()
    while True:
        # 构建完整的URL
        url = f'{jobs_url}?pageSize={pageSize}&pageNo={pageNo}&searchVal='

        # 发送GET请求
        response = requests.get(url, headers=headers)

        # 检查响应状态码
        if response.status_code == 200:
            # 请求成功,处理响应数据
            items = response.content.decode()
            total = json.loads(items)["data"]["total"]
            item = json.loads(items)["data"]["totalList"]
            # 将当前页的数据添加到结果列表中
            for i in item:
                all_items.append(i)

            # 如果当前页没有数据,退出循环
            if pageNo * pageSize > total:
                break
            if not items:
                break
            # 增加页码
            pageNo += 1
        else:
            # 请求失败,打印错误信息
            print('请求失败:', response.status_code, response.text)
            break

    return all_items

all_items 是所有工作流的具体内容,需要提取一下

 all_jobs = get_jobs_list()
        job_codes = [job['code'] for job in all_jobs]

这样就是所有的工作流code

获取调度ID

下面是调度ID的接口,因为不想分页,直接一页1000个。

schedules_url = 'http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules?pageSize=1000&pageNo=1&processDefinitionCode='

使用这个接口就能拿到所有的调度ID

def schedule_id(job_code):
    url = schedules_url+str(job_code)
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        data = response.content.decode()
        js = json.loads(data)
        if len(js['data']['totalList'])>0 and js['data']['totalList'][0]['releaseState']=='OFFLINE':
            return js['data']['totalList'][0]['id']
    else:return ''

这里过滤了已经上线的调度ID 。

上线

万事俱备 终于可以上线了

online_url = 'http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules/{scheduler_id}/online'

具体实现

def online_job(scheduler_id):
    url = online_url.format(scheduler_id=scheduler_id)
    response = requests.post(url, headers=headers)
    if response.status_code == 200:
        print('success')
    else:
        print('online job failed')

到此 就可以实现导入-批量全自动了。

打完收工,祝你不加班。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/900703.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

安全见闻---清风

注:本文章源于泷羽SEC,如有侵权请联系我,违规必删 学习请认准泷羽SEC学习视频:https://space.bilibili.com/350329294 安全见闻1 泷哥语录:安全领域什么都有,不要被表象所迷惑,无论技术也好还是其他方面…

网站建设中需要注意哪些安全问题?----雷池社区版

服务器与应用安全指南 1. 服务器安全 1.1 操作系统安全 及时更新补丁:确保操作系统始终安装最新补丁,以防范系统漏洞。例如,Windows Server 定期推送安全更新,修复如远程代码执行等潜在威胁。优化系统服务配置:关闭不…

PoissonRecon学习笔记

1. Screened Poisson Reconstruction (SPR) 源码:https://github.com/mkazhdan/PoissonRecon However, as noted by several researchers, it suffers from a tendency to over-smooth the data. 泊松重建存在过度平滑的现象。 方法:position and gradi…

【C++】一文带你深入理解C++异常机制

⭐️个人主页:小羊 ⭐️所属专栏:C 很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~ 目录 前言一、C语言处理错误的方式二、C异常三、异常的使用3.1 异常的抛出和捕获3.2 异常的重新抛出3.3 异常安全3.4 异常规范 四、自定义异…

擎创科技声明

近日,我司陆续接到求职者反映,有自称是擎创科技招聘人员,冒用“上海擎创信息技术有限公司”名义,用“126.com”的邮箱向求职者发布招聘信息,要求用户下载注册APP,进行在线测评。 对此,我司郑重…

7、哈希表

7、哈希表 哈希表最主要的作用就是把一个比较庞大的空间或者值域 映射到比较小的值域 (0-n) 就是将-10^9 ~10^9 映射到 0 ~10^5 一、存储结构 映射的方法可以是 h(x) x mod 10^5 但是这样映射会出现一个问题 可能会有重复的数字出现 所以就引出了两个方法 开放寻址法 和…

【Javaee】网络原理—TCP协议的核心机制

前言 TCP/IP五层协议是互联网中的主流模型,为网络通信提供了一个稳固的框架。 主要包含了应用层,传输层,网络层,数据链路层,物理层。 本篇主要介绍传输层的TCP协议的核心机制 一. 确认应答(ack&#xf…

SQL 干货 | SQL 半连接

大多数数据库开发人员和管理员都熟悉标准的内、外、左和右连接类型。虽然可以使用 ANSI SQL 编写这些连接类型,但还有一些连接类型是基于关系代数运算符的,在 SQL 中没有语法表示。今天我们将学习一种这样的连接类型:半连接(Semi …

内网穿透:如何借助Cloudflare连接没有公网的电脑的远程桌面(RDP)

内网穿透:如何借助Cloudflare连接没有公网的电脑的远程桌面(RDP)-含详细原理配置说明介绍 前言 远程桌面协议(RDP, Remote Desktop Protocol)可用于远程桌面连接,Windows系统(家庭版除外)也是支持这种协议的,无需安装…

[RK3566-Android11] 使用SPI方式点LED灯带-JE2815/WS2812,实现呼吸/渐变/随音量变化等效果

问题描述 之前写了一篇使用GPIO方式点亮LED灯带的文章 https://blog.csdn.net/jay547063443/article/details/134688745?fromshareblogdetail&sharetypeblogdetail&sharerId134688745&sharereferPC&sharesourcejay547063443&sharefromfrom_link 使用GPIO…

C++20中头文件ranges的使用

<ranges>是C20中新增加的头文件&#xff0c;提供了一组与范围(ranges)相关的功能&#xff0c;此头文件是ranges库的一部分。包括&#xff1a; 1.concepts: (1).std::ranges::range:指定类型为range&#xff0c;即它提供开始迭代器和结束标记(it provides a begin iterato…

博弈论 C++

前置知识 若一个游戏满足&#xff1a; 由两名玩家交替行动在游戏进行的任意时刻&#xff0c;可以执行的合法行动与轮到哪位玩家无关不能行动的玩家判负 则称该游戏为一个公平组合游戏。 尼姆游戏&#xff08;NIM&#xff09;属于公平组合游戏&#xff0c;但常见的棋类游戏&…

idea(2017版)创建项目的搭建方式

目录 一、普通Java项目 二、普通JavaWeb项目 三、maven的Java项目 四、maven的JavaWeb项目 一、普通Java项目 1.创建新项目 2.因为是普通的java项目&#xff0c;所以先点最上面的Java&#xff0c;然后确定jdk&#xff0c;然后next 3.这里直接点next 4.写好项目名称和路径…

互联网系统的微观与宏观架构

互联网系统的架构设计&#xff0c;通常会根据项目的体量、业务场景以及技术需求被划分为微观架构&#xff08;Micro-Architecture&#xff09;和宏观架构&#xff08;Macro-Architecture&#xff09;。这两者的概念与职责既独立又相互关联。本文将通过一些系统案例&#xff0c;…

Vue3 学习笔记(五)Vue3 模板语法详解

在 Vue3 的世界里&#xff0c;模板语法是我们构建用户界面的基石。今天&#xff0c;让我们一起深入了解 Vue3 的模板语法&#xff0c;我将用通俗易懂的语言和实用的例子&#xff0c;带你掌握这项必备技能。 1、文本插值&#xff1a;最基础的开始 想在页面上显示数据&#xff1f…

《探索 HarmonyOS NEXT(5.0):开启构建模块化项目架构奇幻之旅 —— 模块化基础篇》

从无到有&#xff0c;打造模块化项目。构建一个开箱即用的项目&#xff0c;从 Git 上拉取下来即可直接进行开发&#xff0c;其中涵盖路由通信、上下拉刷新、网络请求、事件通知、顶部tab封装等功能&#xff0c;项目里调用API为鸿洋大佬的wanAndroidAPI。后期将持续完善&#xf…

【C】数组(array)

数组(array) 数组的概念 数组是一组相同类型元素的集合 数组中存放的是1个或者多个数据&#xff0c;但是数组元素个数不能为0数组中存放的多个数据&#xff0c;类型是相同的 数组分为一维数组和多维数组&#xff0c;多维数组一般比较多见的是二维数组 一维数组的创建和初始…

JAVA面试八股文(五)

#1024程序员节&#xff5c;征文# 在1024程序员节这个特别的日子里&#xff0c;首先&#xff0c;我想对每一位程序员表示最诚挚的祝贺&#xff01;祝愿大家在未来的日子里&#xff0c;能够继续热爱编程、追求卓越&#xff0c;携手共创更美好的科技未来&#xff01;让我们共同庆祝…

Redis Search系列 - 第六讲 基准测试 - Redis Search VS. MongoDB VS. ElasticSearch

目录 一、引言二、Redis Search 2.x版本的性能提升三、Redis Search VS. MongoDB VS. ElasticSearch3.1 测试环境3.2 100%写 - 基准测试3.3 100%读 - 基准测试3.4 混合读/写/搜索 - 基准测试2.5 搜索延迟分析3.6 读延迟分析3.7 写延迟分析3.8 Redis Search VS. ElasticSearch3.…

混个1024勋章

一眨眼毕业工作已经一年了&#xff0c;偶然进了游戏公司成了一名初级游戏服务器开发。前两天总结的时候&#xff0c;本来以为自己这一年没学到多少东西&#xff0c;但是看看自己的博客其实也有在进步&#xff0c;虽然比不上博客里的众多大佬&#xff0c;但是回头看也算是自己的…