Apache Airflow (九) :Airflow Operators及案例之BashOperator及调度Shell命令及脚本

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


目录

1. BashOperator 调度Shell命令案例

2. BashOperator 调度Shell脚本案例


Airflow中最重要的还是各种Operator,其允许生成特定类型的任务,这个任务在实例化时称为DAG中的任务节点,所有的Operator均派生自BaseOparator,并且继承了许多属性和方法。关于BaseOperator的参数可以参照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator

BaseOperator中常用参数如下:

task_id(str) : 唯一task_id标记



owner(str):任务的所有者,建议使用linux用户名



email(str or list[str]):出问题时,发送报警Email的地址,可以填写多个,用逗号隔开。



email_on_retry(bool):当任务重试时是否发送电子邮件



email_on_failure(bool):当任务执行失败时是否发送电子邮件



retries(int):在任务失败之前应该重试的次数



retry_delay(datetime.timedelta):重试间隔,必须是timedelta对象



start_date(datetime.datetime):DAG开始执行时间,这个参数必须是datetime对象,不可以使用字符串。



end_date(datetime.datetime):DAG运行结束时间,任务启动后一般都会一直执行下去,一般不设置此参数。



depends_on_past(bool,默认False):是否依赖于过去,如果为True,那么必须之前的DAG调度成功了,现在的DAG调度才能执行。



dag(airflow.models.DAG):指定的dag。



execution_timeout(datetime.timedelta):执行此任务实例允许的最长时间,超过最长时间则任务失败。



trigger_rule(str):定义依赖的触发规则,包括选项如下:{ all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy(无条件执行)} default is all_success。

BashOperator主要执行bash脚本或命令,BashOperator参数如下:

bash_command(str):要执行的命令或脚本(脚本必须是.sh结尾)

1. BashOperator 调度Shell命令案例

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner':'zhangsan',
    'start_date':datetime(2021, 9, 23),
    'email':'kettle_test1@163.com', #pwd:kettle123456
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_shell_cmd',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

t1=BashOperator(
    task_id='print_date',
    bash_command='date',
    dag = dag
)

t2=BashOperator(
    task_id='print_helloworld',
    bash_command='echo "hello world!"',
    dag=dag
)

t3=BashOperator(
    task_id='tempplated',
    bash_command="""
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ params.name}}"
        echo "{{ params.age}}"
    {% endfor %}
    """,
    params={'name':'wangwu','age':10},
    dag=dag
)

t1 >> t2 >> t3

注意在t3中使用了Jinja模板,“{% %}”内部是for标签,用于循环操作,但是必须以{% endfor %}结束。“{{}}”内部是变量,其中ds是执行日期,是airflow的宏变量,params.name和params.age是自定义变量。

在default_args中的email是指当DAG执行失败时,发送邮件到指定邮箱,想要使用airflow发送邮件,需要在$AIRFLOW_HOME/airflow.cfg中配置如下内容:

[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user =kettle_test2
# Example: smtp_password = airflow
smtp_password =VIOFSYMFDIKKIUEA
smtp_port = 25
smtp_mail_from =kettle_test2@163.com
smtp_timeout = 30
smtp_retry_limit = 5

此外,配置163邮箱时需要开启“POP3/SMTP/IMAP服务”服务,设置如下:

2. BashOperator 调度Shell脚本案例

准备如下两个shell脚本,将以下两个脚本放在$AIRFLOW_HOME/dags目录下,BashOperator默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。

first_shell.sh

#!/bin/bash

dt=$1

echo "==== execute first shell ===="

echo "---- first : time is ${dt}"

second_shell.sh

#!/bin/bash

dt=$1

echo "==== execute second shell ===="

echo "---- second : time is ${dt}"

编写airflow python 配置:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner':'zhangsan',
    'start_date':datetime(2021, 9, 23),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_shell_sh',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=BashOperator(
    task_id='first',
    #脚本路径建议写绝对路径
    bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
    dag = dag
)

second=BashOperator(
    task_id='second',
    #脚本路径建议写绝对路径
    bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
    dag=dag
)

first >> second

执行结果:

特别注意:在“bash_command”中写执行脚本时,一定要在脚本后跟上空格,有没有参数都要跟上空格,否则会找不到对应的脚本。如下:


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/159168.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

248: vue+openlayers 以静态图片作为底图,并在上面绘制矢量多边形

第248个 点击查看专栏目录 本示例是演示如何在vue+openlayers项目中以静态图片作为底图,并在上面绘制矢量多边形。这里主要通过pixels的坐标作为投射,将静态图片作为底图,然后通过正常的方式在地图上显示多边形。注意的是左下角为[0,0]。 直接复制下面的 vue+openlayers源代…

【蓝桥杯省赛真题01】C++水下探测器 第十届蓝桥杯中小学生创意编程大赛C++编程比赛省赛真题解析

目录 C/C++水下探测器 一、题目要求 1、编程实现 2、输入输出 二、算法分析

The import xxx.xxx.xxxx is never used

CTRL SHIFT O 就完成了,懒人,代码没洁癖啊,几千上万的代码没用的。

再学动态规划

先用一张图来理一下动态规划大纲 参考:https://www.zhihu.com/question/291280715/answer/1007691283 动态规划五个步骤 参考:https://www.zhihu.com/question/25814123 ①判断题目能否用动规解法 ②确定状态 最后一步 子问题 ③转移方程 ④确定初始条…

SSL证书哪个品牌最好用?

现在市面上的SSL证书品牌有很多,选购SSL证书时有很多人并不是很清楚,因此有很多伙伴对于选择哪个SSL证书品牌而感到疑惑。今天JoySSL小编就专门介绍下哪些比较好用的SSL证书品牌。 SSL证书兼容性主要包含操作系统、浏览器、服务器三个方面,好…

Python----图像的手绘效果

图像的数组表示 图像是有规则的二维数据,可以用numpy 库将图像转换成数组对象 : from PIL import Image import numpy as np imnp.array(Image.open("D://np.jpg")) print(im.shape,im.dtype)结果: 图像转换对应的ndarray 类型是3 维数据&am…

OpenAI的Whisper蒸馏:速度提升6倍的Distil-Whisper

1 Distil-Whisper诞生 Whisper 是 OpenAI 研发并开源的一个自动语音识别(ASR,Automatic Speech Recognition)模型,他们通过从网络上收集了 68 万小时的多语言(98 种语言)和多任务(multitask&am…

基于STM32的蓝牙低功耗(BLE)通信方案设计与实现

蓝牙低功耗(Bluetooth Low Energy,简称BLE)是一种能够在低功耗环境下实现无线通信的技术。在物联网应用中,BLE被广泛应用于传感器数据采集、健康监测设备、智能家居等领域。本文将基于STM32微控制器,设计并实现一个简单…

【FPGA】Verilog:升降计数器 | 波纹计数器 | 约翰逊计数器 | 实现 4-bit 升降计数器的 UP/DOWN

目录 Ⅰ. 理论部分 0x00 升降计数器(UP DOWN Counter) 0x01 波纹计数器(Ripple Counter) 0x02 约翰逊计数器(Johnson Counter) Ⅱ. 实践部分 0x00 实现:升降计数器(4-bit&…

cvf_使用lora方法增强能力

cvf_使用lora方法增强能力 实验对比图最终代码简介详细解析实验对比图 最终代码 import paddle import numpy as np import pandas as pd from tqdm import tqdmclass FeedFroward(paddle.nn.Layer)

基于SDN技术构建多平面业务承载网络

随着企业数字化的浪潮席卷各个行业,传统网络架构面临着更为复杂和多样化的挑战。企业正在寻找一种全面适应数字化需求的网络解决方案。随着软件定义网络(SDN)的发展,“多业务SDN一张网”解决方案为企业提供了一种全新的网络架构&a…

python趣味编程-5分钟实现一个F1 赛车公路游戏(含源码、步骤讲解)

Python 中的 F1 赛车公路游戏及其源代码 F1 Race Road Game是用Python编程语言开发的,它是一个桌面应用程序。 这款 Python 语言的 F1 赛道游戏可以免费下载开源代码,它是为想要学习 Python 的初学者创建的。 该项目系统使用了 Pygame 和 Random 函数。 Pygame 是一组跨平…

Bert浅谈

优点 首先,bert的创新点在于利用了双向transformer,这就跟openai的gpt有区别,gpt是采用单向的transformer,而作者认为双向transformer更能够融合上下文的信息。这里双向和单向的区别在于,单向只跟当前位置之前的tocke…

Pandas+Matplotlib 数据分析

利用可视化探索图表 一、数据可视化与探索图 数据可视化是指用图形或表格的方式来呈现数据。图表能够清楚地呈现数据性质, 以及数据间或属性间的关系,可以轻易地让人看图释义。用户通过探索图(Exploratory Graph)可以了解数据的…

《视觉SLAM十四讲》-- 后端 2

文章目录 09 后端 29.1 滑动窗口滤波和优化9.1.1 实际环境下的 BA 结构9.1.2 滑动窗口法 9.2 位姿图9.2.1 位姿图的意义9.2.2 位姿图优化 09 后端 2 9.1 滑动窗口滤波和优化 9.1.1 实际环境下的 BA 结构 由于计算机算力的限制,我们必须控制 BA 的规模&#xff0c…

【2022改良版】学法减分助手PRO小程序源码

【2022改良版】学法减分助手PRO小程序源码 ,交管推出个学法减分,每个驾驶员可以把被扣的6分,以看视频答题的形式学习回来,然后答题这个一共二十道题每道题60秒, 有好多人不会,用咱们的小程序就可以模拟练习…

gorm的简单操作

1. 什么是orm ORM全称是:Object Relational Mapping(对象关系映射),其主要作用是在编程中,把面向对象的概念跟数据库中表的概念对应起来。举例来说就是,我定义一个对象,那就对应着一张表,这个对象的实例&a…

故障发现、定位提效超 70%,去哪儿可观测体系做了哪些优化?

一分钟精华速览 去哪儿网的原有监控系统在指标数量上展现出了强大实力——上亿指标量和百万级的告警量,但在故障数据方面却稍显不足——订单类故障平均发现时间长达 4 分钟,仅有 20%的订单类故障能在 1 分钟内被发现,近半数的故障处理时长超…

Jenkins自动化部署一个Maven项目

Jenkins自动化部署 提示:本教程基于CentOS Linux 7系统下进行 Jenkins的安装 1. 下载安装jdk11 官网下载地址:https://www.oracle.com/cn/java/technologies/javase/jdk11-archive-downloads.html 本文档教程选择的是jdk-11.0.20_linux-x64_bin.tar.g…

接口测试实战工具如何选择?这6个工具首选(建议收藏)

常见接口类型 • HTTP/HTTPS 类型接口 基于HTTP协议开发的接口现在应用是最为广泛的,这类API使用起来简单明了,因为它是轻量级的、跨平台、跨语言的, 但凡是第三方提供的API都会有HTTP版本的接口。 RESTful API也是基于HTTP协议的&#xff0c…