Airflow:TimeSensor感知时间条件

在数据管道工作流中,任务可能需要在特定的时间执行,或者在继续之前等待一定的时间。为了满足这些需求,Apache Airflow提供了TimeSensor,这是一种内置Sensor,可以监控当前时间,并在达到指定时间时触发后续任务。在这篇博文中,我们将深入研究TimeSensor,涵盖它的特性、用例、实现、定制和最佳实践。

介绍TimeSensor

  • 时间感知与触发:Airflow 的TimesSensor是一种传感器(Sensor)类型的操作符(Operator)。它的主要作用是按照指定的时间规则来感知时间,并且根据时间条件来触发后续的任务流程。例如,它可以等待到某个特定的时间点,如每天的下午 3 点,然后再启动相关的数据处理任务。
  • 阻塞任务执行:在满足时间条件之前,TimesSensor会阻塞当前任务实例(Task Instance)所在的分支流程。这意味着在指定时间到来之前,后续依赖于该传感器输出的任务不会被执行,从而确保任务是在期望的时间之后才开始运行。
    在这里插入图片描述

TimesSensor应用场景

定时数据提取与加载(ETL)

场景描述:在数据仓库的 ETL(Extract - Transform - Load)流程中,假设我们需要每天在特定时间从源数据库提取数据。例如,业务要求每天凌晨 2 点开始提取数据,因为这个时候源系统的负载较低,数据更新也基本完成。

实现方式:可以使用TimesSensor来设置等待时间为凌晨 2 点。在 Airflow 的 DAG(Directed Acyclic Graph)中,TimeSensor任务会在开始时检查时间,如果当前时间还没到凌晨 2 点,它会一直等待。一旦到达凌晨 2 点,它就会触发后续的数据提取任务,然后依次执行数据转换和加载任务,确保整个 ETL 流程按照预定的时间计划执行。

定期报表生成

场景描述:对于企业的定期报表任务,比如每周五下午 5 点生成本周的销售报表。销售数据在一天内不断更新,需要在特定时间点整合并生成报表。

实现方式:利用TimeSensor设置等待时间为每周五下午 5 点。当时间到达时,传感器触发报表生成任务,该任务可以从数据库中获取本周的销售数据,进行统计和格式化处理,最终生成销售报表,满足企业对数据时效性和定期性的要求。

系统维护任务调度

场景描述:在系统维护场景中,例如需要在每月的第一天凌晨进行系统日志清理和备份。这个任务需要在特定的日期和时间启动,以避免对日常业务产生影响。

实现方式:通过TimesSensor设定为每月 1 日凌晨的具体时间,如 0 点。在到达设定时间后,触发日志清理和备份任务,对系统日志进行清理和备份操作,保证系统的稳定性和数据的安全性。

TimeSensor举例

首先,假设已经安装并配置好 Airflow。在 Airflow 中,ETL 流程通常是通过定义 DAG(Directed Acyclic Graph)来实现的。DAG 是一组任务的集合,这些任务按照依赖关系和执行顺序排列。

例如,我们有一个简单的 ETL DAG,它包含三个任务:extract_data(提取数据)、transform_data(转换数据)和load_data(加载数据)。其中extract_data任务依赖于TimesSensor的触发。

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.time_sensor import TimesSensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG('etl_with_timesensor', default_args=default_args, schedule_interval='0 2 * * *')

这里的schedule_interval='0 2 * * *'表示 DAG 每天在凌晨 2 点(按照 cron 表达式的格式)被调度。但我们还希望在 DAG 内部使用TimesSensor来更精细地控制任务启动时间。

wait_for_two_am = TimesSensor(
    task_id='wait_for_two_am',
    target_time='02:00:00',
    dag=dag
)

这个TimesSensor任务(wait_for_two_am)会等待直到时间到达凌晨 2 点(target_time='02:00:00')。

下面定义数据提取任务:

def extract_data():
    # 这里可以是实际的从数据库或其他数据源提取数据的代码
    print("Extracting data...")
    return "extracted_data"
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag
)

数据转换和加载任务:

def transform_data(data):
    # 实际的数据转换逻辑,如清洗、聚合等
    print("Transforming data...")
    return "transformed_data"
transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    op_args=[extract_task.output],
    dag=dag
)
def load_data(data):
    # 实际的加载数据到目标位置的逻辑,如数据库插入
    print("Loading data...")
load_task = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    op_args=[transform_task.output],
    dag=dag
)

任务之间的依赖关系通过以下方式设置:

wait_for_two_am >> extract_task >> transform_task >> load_task

这意味着extract_data任务会在wait_for_two_am任务完成(即到达凌晨 2 点)后才开始执行,然后依次执行transform_dataload_data任务。

当 Airflow 调度器启动这个 DAG 时,TimeSensor首先检查当前时间。如果当前时间还没到凌晨 2 点,它会定期检查(由 Airflow 的内部机制控制检查间隔)。

一旦到达凌晨 2 点,TimesSensor任务完成,触发extract_data任务。extract_data任务提取数据后,将数据传递给transform_data任务进行转换,转换后的数据再传递给load_data任务进行加载,从而完成整个 ETL 流程。这种方式确保了 ETL 流程在每天凌晨 2 点准时开始,符合业务对数据提取时间的要求。

自定义TimeSensor行为

TimeSensor提供了几个参数,你可以使用它们来定制它的行为:

  • target_time:Sensor定义一天中触发条件为成功的时间(作为datetime.time对象)。
  • mode:Sensor的工作模式。默认情况下,它使用“poke”模式,定期检查所需的条件。
  • timeout:传感器在失败前等待所需条件满足的最大时间(以秒为单位)。缺省情况下,没有超时。
  • poke_interval:检查所需条件的时间间隔(以秒为单位)。默认值是60秒。

最佳实践

  • 使用描述性的task_id:为TimeSensor定义清晰和有意义的task_id,以提高dag的可读性和可维护性。

  • 设置适当的超时时间:为TimeSensor设置合理的超时时间,以避免任务无限期地等待特定的时间到达。这有助于防止资源耗尽,并确保如果在预期的时间范围内没有满足所需的条件,管道可以正常失败。

  • 调整戳间隔:根据您的具体用例自定义poke_interval。如果目标时间很遥远,您可能希望使用更长的时间间隔来避免过多的轮询。相反,如果您希望很快达到目标时间,那么较短的间隔可能更合适。

  • •考虑时区差异:当处理依赖于时间敏感事件或数据的任务时,请确保考虑到Airflow实例和时间敏感数据来源之间的任何时区差异。

总结

Apache Airflow TimeSensor是强大而通用的工具,用于管理数据管道中基于时间的依赖关系。通过了解它的各种用例和参数,你可以创建在特定时间执行任务的高效工作流,或者在继续之前等待一定数量的时间。当你继续使用Apache Airflow时,请记住利用TimeSensor的强大功能,来有效地监控和管理dag中时间驱动的依赖项,并构建健壮的、可扩展的数据管道。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/951836.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JS爬虫实战演练

在这个小红书私信通里面进行一个js的爬虫 文字发送 async function sendChatMessage(content) {const url https://pro.xiaohongshu.com/api/edith/ads/pro/chat/chatline/msg;const params new URLSearchParams({porch_user_id: 677e116404ee000000000001});const messageD…

Center Loss 和 ArcFace Loss 笔记

一、Center Loss 1. 定义 Center Loss 旨在最小化类内特征的离散程度,通过约束样本特征与其类别中心之间的距离,提高类内特征的聚合性。 2. 公式 对于样本 xi​ 和其类别yi​,Center Loss 的公式为: xi​: 当前样本的特征向量&…

【Maui】动态菜单实现(绑定数据视图)

前言 .NET 多平台应用 UI (.NET MAUI) 是一个跨平台框架,用于使用 C# 和 XAML 创建本机移动和桌面应用。 使用 .NET MAUI,可从单个共享代码库开发可在 Android、iOS、macOS 和 Windows 上运行的应用。 .NET MAUI 是一款开放源代码应用,是 X…

【json】

JSON JSON是一种轻量级的,按照指定的格式去组织和封装数据的数据交互格式。 本质上是一个带有特定格式的字符串(py打印json时认定为str类型) 在各个编程语言中流通的数据格式,负责不同编程语言中的数据传递和交互,类似于计算机普通话 python与json关系及相互转换…

51单片机——中断(重点)

学习51单片机的重点及难点主要有中断、定时器、串口等内容,这部分内容一定要认真掌握,这部分没有学好就不能说学会了51单片机 1、中断系统 1.1 概念 中断是为使单片机具有对外部或内部随机发生的事件实时处理而设置的,中断功能的存在&#…

易支付二次元网站源码及部署教程

易支付二次元网站源码及部署教程 引言 在当今数字化时代,二次元文化逐渐成为年轻人生活中不可或缺的一部分。为了满足这一庞大用户群体的需求,搭建一个二次元主题网站显得尤为重要。本文将为您详细介绍易支付二次元网站源码的特点及其部署教程&#xf…

开源生成式物理引擎Genesis,可模拟世界万物

这是生成大模型时代 —— 它们能生成文本、图像、音频、视频、3D 对象…… 而如果将所有这些组合到一起,我们可能会得到一个世界! 现在,不管是 LeCun 正在探索的世界模型,还是李飞飞想要攻克的空间智能,又或是其他研究…

【fly-iot飞凡物联】(19):开源飞凡物联项目重启,使用go重写后端代码,感兴趣的小伙伴可以一起参加,使用apache协议开源,招募感兴趣的小伙伴!!

目录 前言fly-iot飞凡物联,感兴趣的小伙伴可以一起参加,使用apache协议开源使用go重写后端代码 前言 fly-iot飞凡物联专栏: https://blog.csdn.net/freewebsys/category_12219758.html fly-iot飞凡物联,感兴趣的小伙伴可以一起参…

用于与多个数据库聊天的智能 SQL 代理问答和 RAG 系统(3) —— 基于 LangChain 框架的文档检索与问答功能以及RAG Tool的使用

介绍基于 LangChain 框架的文档检索与问答功能,目标是通过查询存储的向量数据库(VectorDB),为用户的问题检索相关内容,并生成自然语言的答案。以下是代码逻辑的详细解析: 代码结构与功能 初始化环境与加载…

消息中间件类型介绍

消息中间件是一种在分布式系统中用于实现消息传递的软件架构模式。它能够在不同的系统或应用之间异步地传输数据,实现系统的解耦、提高系统的可扩展性和可靠性。以下是几种常见的消息中间件类型及其介绍: 1.RabbitMQ 特点: • 基于AMQP&#…

uniapp使用scss mixin抽离css常用的公共样式

1、编写通用scss样式文件 // 通用 Flex Mixin mixin flex($direction: row, $justify: flex-start, $align: stretch, $wrap: nowrap) {display: flex;flex-direction: $direction;justify-content: $justify;align-items: $align;flex-wrap: $wrap; }// 水平居中 mixin flex-…

Matlab Steger算法提取条纹中心线(亚像素位置)

文章目录 一、简介二、实现代码三、实现效果参考文献一、简介 Steger 算法是一种常用的图像边缘检测算法,可以用于提取图像中的中心线或边缘信息。它的理论假设是:条纹的亮度是按照高斯分布呈现的,即中心亮两侧渐暗。 其计算过程如下所述: 1、首先,我们需要计算每个点Hess…

PySide6 Qt for Python Qt Quick参考网址

Qt QML BOOK: 《Qt for Python》 -Building an Application https://www.qt.io/product/qt6/qml-book/ch19-python-build-app#signals-and-slots Qt for Python:与C版本的差异即BUG处理(常见的DLL文件确实的问题等) Qt for Pyt…

【大数据】Apache Superset:可视化开源架构

Apache Superset是什么 Apache Superset 是一个开源的现代化数据可视化和数据探索平台,主要用于帮助用户以交互式的方式分析和展示数据。有不少丰富的可视化组件,可以将数据从多种数据源(如 SQL 数据库、数据仓库、NoSQL 数据库等&#xff0…

ELK实战(最详细)

一、什么是ELK ELK是三个产品的简称:ElasticSearch(简称ES) 、Logstash 、Kibana 。其中: ElasticSearch:是一个开源分布式搜索引擎Logstash :是一个数据收集引擎,支持日志搜集、分析、过滤,支持大量数据…

汽车物资拍卖系统架构与功能分析

2015工作至今,10年资深全栈工程师,CTO,擅长带团队、攻克各种技术难题、研发各类软件产品,我的代码态度:代码虐我千百遍,我待代码如初恋,我的工作态度:极致,责任&#xff…

利用 Python 爬虫从义乌购根据关键词获取商品列表

在当今数字化商业时代,数据是企业获取竞争优势的关键。对于从事国际贸易的商家而言,能够及时、准确地获取商品信息至关重要。义乌购作为知名的国际贸易批发平台,汇集了海量的商品资源。通过 Python 爬虫技术,我们可以高效地从义乌…

HDFS编程 - 使用HDFS Java API进行文件操作

文章目录 前言一、创建hdfs-demo项目1. 在idea上创建maven项目2. 导入hadoop相关依赖 二、常用 HDFS Java API1. 简介2. 获取文件系统实例3. 创建目录4. 创建文件4.1 创建文件并写入数据4.2 创建新空白文件 5. 查看文件内容6. 查看目录下的文件或目录信息6.1 查看指定目录下的文…

直流无刷电机控制(FOC):电流模式

目录 概述 1 系统框架结构 1.1 硬件模块介绍 1.2 硬件实物图 1.3 引脚接口定义 2 代码实现 2.1 软件架构 2.2 电流检测函数 3 电流环功能实现 3.1 代码实现 3.2 测试代码实现 4 测试 概述 本文主要介绍基于DengFOC的库函数,实现直流无刷电机控制&#x…

51单片机——串口通信(重点)

1、通信 通信的方式可以分为多种,按照数据传送方式可分为串行通信和并行通信; 按照通信的数据同步方式,可分为异步通信和同步通信; 按照数据的传输方向又可分为单工、半双工和全双工通信 1.1 通信速率 衡量通信性能的一个非常…