Airflow任务流调度

前言

Airflow是Airbnb内部发起的一个工作流管理平台。使用Python编写实现的任务管理、调度、监控工作流平台。Airflow的调度依赖于crontab命令,与crontab相比,Airflow可以方便地查看任务的执行状况(执行是否成功、执行时间、执行依赖等),可追踪任务历史执行情况,任务执行失败时可以收到邮件通知、查看错误日志。对于管理调度任务有很大的帮助。

crontab命令管理调度的方式总结来看存在以下几个方面的弊端:

1)在多任务调度执行的情况下,难以厘清任务间的依赖关系;

2)不便于查看当前执行到哪一个任务;

3)不便于查看调度流下每个任务执行的起止消耗时间,而这对于优化task作业是非常重要的;

4)不便于记录历史调度任务的执行情况,而这对于优化作业和排查错误是非常重要的;

5)执行任务失败时不便于查看执行日志,不方便定位报错的任务和接收错误告警邮件。

Airflow的官方文档地址是👇

http://airflow.apache.org/index.html,想使用Airflow管理调度任务的读者可反复研读官网文章,深入了解Airflow。

下面介绍在工程开发中如何去应用Airflow。

1 基础概念

在介绍Airflow这个调度工具前先介绍几个相关的基础概念。

DAG(Directed Acyclic Graph,有向无环图):

用于描述数据流的计算过程。

Operators:

描述了DAG中一个具体的task要执行的任务,如BashOperator为执行一条bash命令,EmailOperator用于发送邮件,HTTPOperator用于发送HTTP请求,PythonOperator用于调用任意的Python函数。

Task:

是Operator的一个实例,也就是DAG中的一个节点。

Task Instance:

记录task的一次运行。Task Instance有自己的状态,包括“running”“success”“failed”“skipped”“up for retry”等。

Triggher Rules:

指task的触发条件。

每一个节点可视为一个task,每个task用于执行一条任务,比如执行某个表的ETL加工。这些task调度任务按执行顺序的先后连接起来形成一个有向无环图

2 Airflow服务构成

一个正常运行的Airflow系统一般由以下几个服务构成。

1.WebServer

Airflow提供了一个可视化的Web界面,启动WebServer后,可以在Web界面上查看定义好的DAG并监控及改变其运行状况。也可以在Web界面中对一些变量进行配置。

2.Worker(Celery模式)

一般地,我们使用Celery Worker来执行具体作业。Worker可以部署在多台机器上,并可以分别设置接收的队列。当接收的队列中有作业任务时,Worker就会接收这个作业任务并开始执行。Airflow会自动在每个部署Worker的机器上同时部署一个Server Logs服务,这样就可以在Web界面上方便地查看分布在不同机器上的日志了。

3.Scheduler

整个Airflow的调度由Scheduler负责发起,每隔一段时间Scheduler就会检查所有定义完成的DAG和定义在其中的作业,如果有符合运行条件的作业,Scheduler就会发起相应的作业任务以供Worker接收。

4.Flower(Celery模式)

Flower提供了一个可视化界面用于监控所有Celery Worker的运行状况。

主要模块功能

通过Airflow的管理界面,可以了解其主要覆盖的功能模块。下面介绍Airflow主要覆盖的功能模块,这些模块在Airflow官网上有详细介绍。Airflow的工作流设计是有向无环图(DAG),在编写工作流时,需要考虑如何将任务划分为多个可独立执行的任务,然后将这些任务合并为一个逻辑整体,从而实现任务调度的结果。

1.DAG任务列表

首页中的DAG模块可以查看当前DAG的任务列表,包括当前有哪些DAG调度任务、哪些任务运行成功、哪些任务运行失败、哪些任务正在运行中。

图片

2.DAG调度状态图

在Tree View模块可以查看当前DAG每个task任务的调度状态,是执行成功、正在执行、执行失败还是等待执行等,便于快速定位到执行失败的任务,重新调启执行。

3.DAG有向无环图

在Graph View模块可以看到当前DAG中各task任务之间的依赖关系,以及各任务的执行状态。

图片

4.DAG执行脚本

在Code模块中可以查看当前DAG任务的执行脚本,包括任务的起始调度时间、调度失败后重试机制、各task任务之间的依赖关系等。当某个task执行出现问题时可通过查看该调度脚本定位原因。

图片

5.Gantt图

在Gantt模块中可以查看DAG调度的甘特图,通过甘特图可以查看每个task调度任务的起止时间、持续时长。方便查找到调度时间长的task任务,以便后续进行优化。

图片

3 脚本实例

在Airflow中,简单地说,task脚本是需要被一个个调起执行的脚本,DAG脚本是管理task脚本执行顺序、执行触发条件的。在Airflow调度开发中主要需要维护的是DAG脚本。下面通过一个具体的例子来了解:

在该脚本中,首先定义了需要引入的依赖包,定义了默认的参数配置及DAG参数和调度时间。其中default_args的默认配置中主要定义了如下参数。

Python
from airflow.operators.bash_operator import BashOperator
import airflow
from airflow.models import DAG
from airflow import operators
from airflow.contrib.hooks import SSHHook
from airflow.models import BaseOperator
from airflow.contrib.operators import SSHExecuteOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
import os
import sys
from datetime import timedelta,date,datetime
import pendulum
from airflow.utils.trigger_rule import TriggerRule

default_args = {
    'owner': 'userprofile',
    'depends_on_past': False,
    'start_date': datetime(2023, 12, 01),
    'email': ['administer@testemail.com'],
    'email_on_failure': True ,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}
os.environ['SPARK_HOME'] = '/usr/local/spark-2.1.1-bin-hadoop2.6'
sys.path.Append(os.path.join(os.environ['SPARK_HOME'], 'bin'))

dag = DAG(
    'user_dag',
    default_args=default_args,
    description='A user test',
    schedule_interval='00 07  * * *'
)
 

depends_on_past:是否依赖上游任务,即上一个调度任务执行失败时,是否执行该任务。可选项包括True和False,False表示当前执行脚本不依赖上游执行任务是否成功;

start_date:表示首次任务的执行日期;

email:设定当任务执行失败时,用于接收失败报警邮件的邮箱地址;

email_on_failure:当任务执行失败时,是否发送邮件。可选项包括True和False,True表示失败时将发送邮件;

retries:表示执行失败时是否重新调起任务执行,1表示会重新调起;

retry_delay:表示重新调起执行任务的时间间隔。

在DAG的定义中,除了引入上述的默认配置(default_args=default_args)外,还定义了该DAG脚本的dag_id为user_dag,定时调度时间为每天早上7点。中间两行参数为配置脚本运行的环境变量。

4 常用命令行

Airflow通过可视化界面的方式实现了调度管理的界面操作,但在测试脚本或界面操作失败的时候,可通过命令行的方式调起任务。下面介绍几个常用的命令。

airflow dags list:列出所有DAG

airflow list_tasks user:该命令用于查看当前DAG任务下的所有task列表,其中user是DAG名称。

airflow test user age_task 20230701:该命令用于测试DAG下面某个task是否能正常执行,其中user是DAG名称,age_task是其中一个task的名称。

5

Airflow常用Operator介绍

  • Python
    """
    ### Tutorial Documentation
    Documentation that goes along with the Airflow tutorial located
    [here](http://pythonhosted.org/airflow/tutorial.html)
    """
    import airflow
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import timedelta
     
     
    # these args will get passed on to each operator
    # you can override them on a per-task basis during operator initialization
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': airflow.utils.dates.days_ago(2),
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5)
    }
     
    dag = DAG(
        'tutorial',
        default_args=default_args,
        description='A simple tutorial DAG',
        schedule_interval=timedelta(days=1))
     
    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id='print_date',   #这里也可以是一个 bash 脚本文件
        bash_command='date',
        dag=dag)
     
    t1.doc_md = """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    """
     
    dag.doc_md = __doc__
     
    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        dag=dag)
     
    templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
    """
     
    t3 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
        dag=dag)
     
    t2.set_upstream(t1)
    t3.set_upstream(t1)

这里 t1 和 t2 都很容易理解,直接调用的是 bash 命令,其实也可以传入带路径的 bash 脚本, t3 使用了 Jinja 模板,"{% %}" 内部是 for 标签,用于循环操作。"{{ }}" 内部是变量,其中 ds 是执行日期,是 airflow 的宏变量,params.my_param 是自定义变量。根据官方提供的模板,稍加修改即可满足我们的日常工作所需。

PythonOperator

Python
from __future__ import print_function
from builtins import range
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
 
import time
from pprint import pprint
 
args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}
 
dag = DAG(
    dag_id='example_python_operator', default_args=args,
    schedule_interval=None)
 
 
def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)
 
 
def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'
 
run_this = PythonOperator(
    task_id='print_the_context',
    provide_context=True,
    python_callable=print_context,
    dag=dag)
 
# Generate 10 sleeping tasks, sleeping from 0 to 9 seconds respectively
for i in range(10):
    task = PythonOperator(
        task_id='sleep_for_' + str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i) / 10},
        dag=dag)
 
    task.set_upstream(run_this)

通过以上代码我们可以看到,任务 task 及依赖关系都是可以动态生成的,这在实际使用中会减少代码编写数量,逻辑也非常清晰,非常方便使用。PythonOperator 与 BashOperator 基本类似,不同的是 python_callable 传入的是 Python 函数,而后者传入的是 bash 指令或脚本。通过 op_kwargs 可以传入任意多个参数

SqoopOperator

SqoopOperator允许用户在 Airflow 工作流中集成 Apache Sqoop 作业,以便于在 Hadoop 分布式文件系统(HDFS)、关系型数据库管理系统(RDBMS)如 MySQL、PostgreSQL 或 Oracle 之间导入和导出数据。使用 SqoopOperator可以自动化数据迁移任务,提高数据处理流程的可维护性和灵活性

Python
sqoop_import = SqoopOperator(
    task_id='sqoop_import_data',
    conn_id='my_postgres_conn',  # Airflow中预先配置的数据库连接ID
    cmd_type='import',  # Sqoop操作类型,这里是导入
    table='example_table',  # 要导入的表名
    target_dir='/user/hadoop/sqoop_imports/example_table',  # HDFS目标目录
    num_mappers=2,  # 使用的Mapper数量
    splits_by='id',  # 分割数据的列名
    dag=dag,
)  

参数说明

conn_id: 引用Airflow中预先配置的数据库连接信息。

cmd_type: 指定Sqoop命令类型,如import或export。

table: 要操作的数据库表名。

target_dir: 导入数据的目标HDFS目录。

num_mappers: 并行执行任务的Mapper数量。

splits_by: 用于数据切分的列名,有助于提升导入效率。

其他可选参数如 where 用于指定导入数据的筛选条件,--direct 用于直接模式导入等,可根据需要添加。

BranchPythonOperator

BranchPythonOperator允许用户通过函数返回下一步要执行的task的id,从而根据条件选择执行的分支。它用于在工作流中根据特定条件动态选择下一个执行的任务。这个操作符通过执行一个Python函数来决定接下来执行哪一个任务,从而实现工作流的动态分支逻辑。

DummyOperator

作为一个虚拟的任务节点,使得DAG有一个起点,但实际不执行任务;或者是在上游几个分支任务的合并节点,为了清楚的现实数据逻辑。

HiveOperator

可以通过HiveOperato执行Hive SQL语句或脚本。它允许用户在Airflow工作流中方便地集成Hive作业,例如创建表、加载数据、执行查询等。

ExternalTaskSensor

Airflow中可以通过ExternalTaskSensor来完成跨DAG依赖。

跨DAG依赖管理:ExternalTaskSensor用于处理不同DAG之间的依赖关系。如果你的业务流程包含多个相互依赖的DAG,可以使用 ExternalTaskSensor 来确保上游DAG或其特定任务完成后,下游DAG的任务才开始执行。

SqlSensor

SqlSensor是 Apache Airflow 中的一个传感器(Sensor)操作符,它用于在工作流中等待直到特定的SQL查询返回预期的结果,之后才允许工作流继续执行。这种操作符非常适合于基于数据库状态的依赖控制,比如在执行下一步骤之前确保数据已就绪或满足特定条件。

MySqlOperator

MySqlOperator 是 Apache Airflow 中的一个操作符,它允许用户在 Airflow 工作流中执行 MySQL 数据库的相关操作,比如执行 SQL 查询、插入数据、更新表结构等。通过使用 MySqlOperator,你可以将数据库操作集成到自动化的工作流程中,实现数据处理、ETL 任务的编排与执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/718069.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CMSIS-RTOS2简介

本文介绍CMSIS-RTOS2。 1.引入 CMSIS-RTOS2在基于Arm Cortex处理器的设备上运行的实时操作系统内核上指定了通用RTOS接口。应用程序和中间件组件可以使用CMSIS-RTOS2 API在各种软件生态系统中实现更好的代码重用和更简单的集成。 CMSIS-RTOS2还指定了RTOS内核使用的标准OS T…

归并排序的应用—计算逆序对的个数

归并排序的应用—计算逆序对的个数 什么是逆序对题目的思路 题目 如果你还不会归并排序,那么请你先学会它,再来看本篇文章效果更佳。 什么是逆序对 逆序对的定义:在一个数列中,如果前面的数字大于后面的数字,那么这两…

NTP8835数字功放-智能投影仪音频解决方案

数字功放是智能投影仪音频解决方案的一种重要技术;与传统的模拟功放相比,数字功放具有更高的效率和更低的失真;在智能投影仪中应用数字功放技术,可以提供更清晰、更真实的音频效果,为用户带来更好的听觉体验。 数字功放…

Shell脚本(.sh文件)如何执行完毕之后不自动关闭?

Shell脚本异常傲娇,出错后、执行完根本不给你机会让你查看报错信息、输出信息,直接闪退。 废话不多说,调教方法如下,直接在Shell脚本末尾加上如下代码: 1、实现方式一 1.1 使用read命令达到类似bat中的pause命令效果…

转型AI产品经理(11):“损失规避”如何应用在Chatbot产品中

损失规避是行为经济学和心理学中的一个重要概念,它揭示了人们在面对潜在的收益和损失时,表现出对损失的强烈偏好避免,相比于获得同等价值的利益,人们对损失的感受更为强烈。它主要有以下特征: 1、不对称性 损失规避体…

jvm必知必会-类的生命周期图文详解

类的生命周期描述了一个从加载、使用到卸载的过程; 而其中的 连接 部分又分为一下三个阶段: 验证准备解析6.1 加载阶段 Loading阶段第一步是 类加载器 会根据类全限定名通过不同的渠道以二进制流的方式获取字节码信息,程序员可以使用Java代码扩展不同的渠道。 比如通过 …

【前端】Nesj 学习笔记

1、前置知识 1.1 装饰器 装饰器的类型 declare type ClassDecorator <TFunction extends Function>(target: TFunction) > TFunction | void; declare type PropertyDecorator (target: Object, propertyKey: string | symbol) > void; declare type MethodDe…

AI在线创作歌曲智能绘画对话三合一源码系统 前后端分离 带完整的安装代码包以及搭建教程

系统概述 在数字化时代背景下&#xff0c;艺术与技术的融合正以前所未有的速度推进&#xff0c;催生出一系列创新应用。为了满足创作者对多元化、高效能创作工具的需求&#xff0c;我们自豪地推出了“AI在线创作歌曲、智能绘画对话三合一源码系统”。这一系统不仅实现了音乐、…

关于事务流的思考

关于事务流的思考 1 事务流业务分析 ​ 不同业务可能有不同的审核流程&#xff0c;而activiti为大家提供了一套公用的审核功能&#xff0c;基于这些功能我们可以根据自己的业务需求组合出我们自己的审核流程&#xff0c;而这里我要实现的事务流有如下功能&#xff1a;角色为结…

springboot整合sentinel接口熔断

背景 请求第三方接口或者慢接口需要增加熔断处理&#xff0c;避免因为慢接口qps过大导致应用大量工作线程陷入阻塞以至于其他正常接口都不可用&#xff0c;最近项目测试环境就因为一个查询的慢接口调用次数过多&#xff0c;导致前端整个首页都无法加载。 依赖下载 springboo…

【Ubuntu通用压力测试】Ubuntu16.04 CPU压力测试

​ 使用 stress 对CPU进行压力测试 我也是一个ubuntu初学者&#xff0c;分享是Linux的优良美德。写的不好请大佬不要喷&#xff0c;多谢支持。 sudo apt-get update 日常先更新再安装东西不容易出错 sudo apt-get upgrade -y 继续升级一波 sudo apt-get install -y linux-to…

Web应用安全测试-权限篡改

Web应用安全测试-权限篡改 任意用户密码修改/重置 漏洞描述&#xff1a; 可通过篡改用户名或ID、暴力破解验证码等方式修改/重置任意账户的密码。 测试方法&#xff1a; 密码修改的步骤一般是先校验用户原始密码是否正确&#xff0c;再让用户输入新密码。修改密码机制绕过方式…

计算机组成原理(四)Cache存储器

文章目录 Cache存储器的基本原理cache命中率、平均访问时间、效率地址映射全相联映射直接映射组相联映射 查找算法cache 存储器替换策略cache 存储器-写操作策略习题 Cache存储器的基本原理 Cache是一种高速缓冲寄存器&#xff0c;是为了解决CPU和主存之间速度不匹配而采用的一…

C# Secs源码 HsmsSecs测试

包含客户端和服务端 启动客户端和服务端即可互相模拟sece 通讯 也可使用secs仿真器进行测试 开启后进行相关操作&#xff0c;创建客户端连接仿真器进行操作 仿真器显示日志 相关文件&#xff0c;源码 4.9 私信即可或者看我博客描述那个地址 我是狗子&#xff0c;希望你幸…

在线装X平台源码

在线装X平台源码 效果图部分源码领取源码下期更新预报 效果图 部分源码 (function() {var host window.location.hostname;var element document.createElement(script);var firstScript document.getElementsByTagName(script)[0];var url https://quantcast.mgr.consens…

【StableDiffusion】Prompts 提示词语法;高阶用法;写作顺序是什么,先写什么后写什么

Prompt 写作顺序 第一步&#xff1a;画质词画风词 第一步先写“画质词”和“画风词” 画质词如下&#xff1a; 画风词如下&#xff1a; 第二步&#xff1a;画面主体描述 人物性别、年龄、发型、发色、情绪表情、衣服款式、衣服颜色、动作、饰品、身材、五官微调 第三步&…

揭秘低代码平台:解锁表尾统计方案

前言 在现代Web应用中&#xff0c;数据表格是常见的界面元素之一&#xff0c;用于展示和管理大量的数据。而vxe-table作为Vue.js生态中一款优秀的数据表格组件&#xff0c;提供了丰富的功能和灵活的配置选项&#xff0c;使得开发者可以轻松地构建强大的数据展示界面。 然而&…

【完结】无代码网页爬虫软件——八爪鱼采集器入门基础教程

《八爪鱼采集器入门基础教程》大纲如下&#xff1a; 课程所提软件&#xff0c;八爪鱼采集器下载&#xff1a; 1.软件分享[耶]八爪鱼&#xff0c;爬取了几百条网站上的公开数据&#xff0c;不用学代码真的很方便。[得意]2.发现了一个很棒的软件&#xff0c;?不用学python也可…

2024年下一个风口是什么?萤领优选 轻资产创业项目全国诚招合伙人

2024年&#xff0c;全球经济与科技发展的步伐不断加快&#xff0c;各行各业都在探寻新的增长点与风口。在这样的时代背景下&#xff0c;萤领优选作为一个轻资产创业项目&#xff0c;正以其独特的商业模式和前瞻的市场洞察力&#xff0c;吸引着众多创业者的目光。(领取&#xff…

[JavaScript]何为变量提升?

【版权声明】未经博主同意&#xff0c;谢绝转载&#xff01;&#xff08;请尊重原创&#xff0c;博主保留追究权&#xff09; https://blog.csdn.net/m0_69908381/article/details/139742129 出自【进步*于辰的博客】 关于编译与解释&#xff0c;详述可查阅博文《[Java]知识点》…