Apache Airflow (四) :Airflow 调度shell命令

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


上文说到使用Airflow进行任务调度大体步骤如下:

  1. 创建python文件,根据实际需要,使用不同的Operator
  2. 在python文件不同的Operator中传入具体参数,定义一系列task
  3. 在python文件中定义Task之间的关系,形成DAG
  4. 将python文件上传执行,调度DAG,每个task会形成一个Instance
  5. 使用命令行或者WEBUI进行查看和管理

以上python文件就是Airflow python脚本,使用代码方式指定DAG的结构。

下面我们以调度执行shell命令为例,来讲解Airflow使用。

1. 首先我们需要创建一个python文件,导入需要的类库

# 导入 DAG 对象,后面需要实例化DAG对象
from airflow import DAG

# 导入BashOperator Operators,我们需要利用这个对象去执行流程
from airflow.operators.bash import BashOperator

注意:以上代码可以在开发工具中创建,但是需要在使用的python3.7环境中导入安装Airflow包。

D:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow==2.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple

2. 实例化DAG

from datetime import datetime, timedelta

# default_args中定义一些参数,在实例化DAG时可以使用,使用python dic 格式定义
default_args = {
    'owner': 'airflow', # 拥有者名称
    'start_date': datetime(2021, 9, 4),  # 第一次开始执行的时间,为 UTC 时间
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5),  # 失败重试间隔
}

dag = DAG(
    dag_id = 'myairflow_execute_bash', #DAG id ,必须完全由字母、数字、下划线组成
    default_args = default_args, #外部定义的 dic 格式的参数
    schedule_interval = timedelta(days=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
)

注意:

  • 实例化DAG有三种方式

第一种方式:

with DAG("my_dag_name") as dag:
    op=XXOperator(task_id="task")

第二种方式(以上采用这种方式):

my_dag = DAG("my_dag_name")
op = XXOperator(task_id="task", dag=my_dag)

第三种方式:

@dag(start_date=days_ago(2))
def generate_dag():
    op = XXOperator(task_id="task")
dag = generate_dag()
  • baseoperator基础参数说明:

可以参照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator查看baseopartor中更多参数。

  • DAG参数说明

可以参照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html 查看DAG参数说明,也可以直接在开发工具点击DAG进入源码看下对应参数有哪些。

3. 定义Task

当实例化Operator时会生成Task任务,从一个Operator中实例化出来对象的过程被称为一个构造方法,每个构造方法中都有“task_id”充当任务的唯一标识符。

下面我们定义三个Operator,也就是三个Task,每个task_id 不能重复。

# operator 支持多种类型, 这里使用 BashOperator
first = BashOperator(
    task_id='first',
    bash_command='echo "run first task"',
    dag=dag
)

middle = BashOperator(
    task_id='middle',
    bash_command='echo "run middle task"',
    dag=dag
)

last = BashOperator(
    task_id='last',
    bash_command='echo "run last task"',
    dag=dag,
    retries=3
)

注意:

  • 每个operator中可以传入对应的参数,覆盖DAG默认的参数,例如:last task中“retries”=3 就替代了默认的1。任务参数的优先规则如下:①.显示传递的参数 ②.default_args字典中存在的值③.operator的默认值(如果存在)。
  • BashOperator使用方式参照:http://airflow.apache.org/docs/apache-airflow/stable/howto/operator/bash.html#howto-operator-bashoperator

4. 设置task依赖关系

#使用 set_upstream、set_downstream 设置依赖关系,不能出现环形链路,否则报错
# middle.set_upstream(first) # middle会在first执行完成之后执行
# last.set_upstream(middle) # last 会在 middle执行完成之后执行

#也可以使用位移符来设置依赖关系
first >> middle >>last # first 首先执行,middle次之,last最后
# first >> [middle,last] # first首先执行,middle ,last并行执行

注意:当执行脚本时,如果在DAG中找到一条环形链路(例如:A->B->C-A)会引发异常。更多DAG task依赖关系可参照官网:http://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#task-dependencies

5. 上传python配置脚本

到目前为止,python配置如下:

# 导入 DAG 对象,后面需要实例化DAG对象
from airflow import DAG

# 导入BashOperator Operators,我们需要利用这个对象去执行流程
from airflow.example_dags.example_bash_operator import dag

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta

# default_args中定义一些参数,在实例化DAG时可以使用,使用python dic 格式定义
default_args = {
    'owner': 'airflow', # 拥有者名称
    'start_date': datetime(2021, 9, 4),  # 第一次开始执行的时间,为 UTC 时间
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5),  # 失败重试间隔
}

dag = DAG(
    dag_id = 'myairflow_execute_bash', #DAG id ,必须完全由字母、数字、下划线组成
    default_args = default_args, #外部定义的 dic 格式的参数
    schedule_interval = timedelta(days=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
)

# operator 支持多种类型, 这里使用 BashOperator
first = BashOperator(
    task_id='first',
    bash_command='echo "run first task"',
    dag=dag
)

middle = BashOperator(
    task_id='middle',
    bash_command='echo "run middle task"',
    dag=dag
)

last = BashOperator(
    task_id='last',
    bash_command='echo "run last task"',
    dag=dag,
    retries=3
)

#使用 set_upstream、set_downstream 设置依赖关系,不能出现环形链路,否则报错
# middle.set_upstream(first) # middle会在first执行完成之后执行
# last.set_upstream(middle) # last 会在 middle执行完成之后执行

#也可以使用位移符来设置依赖关系
first >> middle >>last # first 首先执行,middle次之,last最后
# first >> [middle,last] # first首先执行,middle ,last并行执行

将以上python配置文件上传到$AIRFLOW_HOME/dags目录下,默认$AIRFLOW_HOME为安装节点的“/root/airflow”目录,当前目录下的dags目录需要手动创建。

6. 重启Airflow

“ps aux|grep webserver”和“ps aux|grep scheduler”找到对应的airflow进程杀掉,重新启动Airflow。重启之后,可以在airflow webui看到对应的DAG ID ”myairflow_execute_bash”。

7. 执行airflow

按照如下步骤执行DAG,首先打开工作流,然后“Trigger DAG”执行,随后可以看到任务执行成功。

查看task执行日志:


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/130911.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Unity 制作血量滑动条(Slider)

1.创建UI slider 层级面板点击右键-UI-slider 2.调整UI位置 选择2D视图,调整锚点和滑动条位置 3.PS中制作UI 导出2个图层,PNG格式。 4.改成精灵模式(sprite2d) 把两个PNG导入Unity仓库中,选中两个图,右…

【LeetCode刷题笔记】堆和优先级队列

358. K 距离间隔重排字符串 解题思路: 大根堆 + 队列 , 1)首先 计数数组 统计 每个字符出现的次数 ,然后将 计数 > 0 的 字符 和 次数 一起放入 大根堆 ,大根堆中

Matlab的多项式留数与极点的计算

Matlab的多项式留数与极点的计算 以下面的多项式为例: 运算代码: clc clear closesyms p % 定义多项式 Zp(5*p^571*p^370*p)/(2*p^635*p^4117*p^236); % 提取分子与分母 [I,D]numden(Zp); Idouble(coeffs(I,p,"All"));%分子 Ddouble(coeffs…

Git系列之Git集成开发工具及git扩展使用

🎉🎉欢迎来到我的CSDN主页!🎉🎉 🏅我是君易--鑨,一个在CSDN分享笔记的博主。📚📚 🌟推荐给大家我的博客专栏《Git实战开发》。🎯🎯 &a…

基于若依的ruoyi-nbcio流程管理系统仿钉钉流程json转bpmn的flowable的xml格式(支持并行网关)

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码: https://gitee.com/nbacheng/ruoyi-nbcio 演示地址:RuoYi-Nbcio后台管理系统 这个章节来完成并行网关,前端无需修改,直接后端修改就可以了。 1、并行网关后端修…

C#开发的OpenRA游戏之世界存在的属性CombatDebugOverlay(3)

C#开发的OpenRA游戏之世界存在的属性CombatDebugOverlay(3) 这次来分析CombatDebugOverlay属性,这个属性只有在调试游戏的时候才会使用。当你设置这个属性的时候,就可以看到如下图的结果: 可以看到物品的周边都有一个圆圈,以及有一些十字的点位标志。 那些十字表示的点…

【ATTCK】MITRE Caldera 朴素贝叶斯规划器

CALDERA是一个由python语言编写的红蓝对抗工具(攻击模拟工具)。它是MITRE公司发起的一个研究项目,该工具的攻击流程是建立在ATT&CK攻击行为模型和知识库之上的,能够较真实地APT攻击行为模式。 通过CALDERA工具,安全…

C++源文件的编译过程 学习 CMake 文档的前置知识

OHHHH,发现自己的基础知识真他妈的是呼呼漏风,,,,,,,,,,, 尴尬得意识到,不仅是英语水平有问题,他码的基础知识…

Web实验总

目录 网站需求: 思路: 实验步骤: 第一步:准备工作 第二步:新建一个存储网页的目录 第三步:修改本地hosts映射 第四步:修改配置文件,建立基于http服务的网站 1)创建用户song和…

iOS移动应用安全加固:保护您的App免受恶意攻击的重要步骤

目录 iOS移动应用安全加固:保护您的App免受恶意攻击的重要步骤 摘要 引言 一、APP加固的概念 二、APP加固方案的比较 三、保护iOS应用的安全 四、总结 参考资料 摘要 本文介绍了移动应用程序(App)加固的概念和流程,以及市…

[模版总结] - 树的基本算法1 - 遍历

树结构定义 一种非线性存储结构,具有存储“一对多”关系的数据元素集合 种类 General Tree TrieB/B 树二叉树 满/完满/完全二叉树 完美BT : 除了叶子结点外所有节点都有两个字节点,每一层都完满填充完全BT: 除最后一层以外其他每一层都完美…

单元测试工具-Junit

文章目录 一. 认识Junit二. Junit中常用的注解1. Test2. Disabled3. BeforeAll & AfterAll4. BeforeEach & AfterEach 三. ParameterizedTest参数化1. 单参数2. 多参数2.1. CSV 获取参数2.2. 方法获取参数 四. Order控制测试用例的执行顺序五. 断言六. 测试套件1. 通过…

Docker进阶——再次认识docker的概念 Docker的结构 Docker镜像结构 镜像的构建方式

前言 在微服务大量应用的互联网时代,经常能看到docker的身影。作为docker的爱好者(在服务器安装MySQL,Redis。。。我用的都是docker),我也会持续深入学习和认识docker。 本篇博客再次介绍docker的基本概念&#xff0…

SmartBear正式收购Stoplight,并计划在核心API设计、文档和门户产品中集成其功能

不久前,软件开发和可视化工具提供商SmartBear正式宣布收购全球领先的API设计公司Stoplight。这一收购是为了打造业内最全面的API开发平台,为寻求现代化API实践的开发团队提供更好的透明度、自动化与生产力。将Stoplight在API方面的优势(包括治…

吴恩达《机器学习》7-1->7-4:过拟合问题、代价函数、线性回归的正则化、正则化的逻辑回归模型

一、过拟合的本质 过拟合是指模型在训练集上表现良好,但在新数据上的泛化能力较差。考虑到多项式回归的例子,我们可以通过几个模型的比较来理解过拟合的本质。 线性模型(欠拟合): 第一个模型是一个线性模型&#xff0…

Elasticsearch:Lucene 中引入标量量化

作者:BENJAMIN TRENT 我们如何将标量量化引入 Lucene。 Lucene 中的自动字节量化 虽然 HNSW 是一种强大而灵活的存储和搜索向量的方法,但它确实需要大量内存才能快速运行。 例如,查询 768 维的 1MM float32 向量大约需要 1,000,000*4*(7681…

多维时序 | MATLAB实现TCN时间卷积神经网络多变量时间序列预测

多维时序 | MATLAB实现TCN时间卷积神经网络多变量时间序列预测 目录 多维时序 | MATLAB实现TCN时间卷积神经网络多变量时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 MATLAB实现TCN时间卷积神经网络多变量时间序列预测 模型描述 MATLAB实现TCN时间卷…

3.前端调式(断点调式)

1. Elements 先来看这张图最上头的一行是一个功能菜单,每一个菜单都有它相应的功能和使用方法,依次从左往右来看 箭头按钮 用于在页面选择一个元素来审查和查看它的相关信息,当我们在Elements这个按钮页面下点击某个Dom元素时,箭…

ubuntu16.04安装vscode遇到的code 依赖于 libnss3 (>= 2:3.30)解决

1、ubuntu16.04安装最新版本vscode失败原因 ubuntu16.04安装最新版本的vscode会遇到依赖libnss3(>2:3.30)的问题,原因是ubuntu16.04安装的库libnss3版本更低,与vscode需要的更高版本的libnss3库不兼容,只需要升级libnss3库版本高于2:3.30…

PROFINET和UDP、MODBUS-RTU通信速度对比实验

这篇博客我们介绍PROFINET 和MODBUS-RTU通信实验时的数据刷新速度,以及这种速度不同对控制系统带来的挑战都有哪些,在介绍这篇对比实验之前大家可以参考下面的文章链接: S7-1200PLC和SMART PLC的PN智能从站通信 S7-200 SMART 和 S7-1200PLC进行PROFINET IO通信-CSDN博客文…