🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客
🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。
🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频
在Airflow的工作计划中,一个重要的概念就是catchup(追赶),在实现DAG具体逻辑后,如果将catchup设置为True(默认就为True),Airflow将“回填”所有过去的DAG run,如果将catchup设置为False,Airflow将从最新的DAG run时刻前一时刻开始执行 DAG run,忽略之前所有的记录。
例如:现在某个DAG每隔1分钟执行一次,调度开始时间为2001-01-01 ,当前日期为2021-10-01 15:23:21,如果catchup设置为True,那么DAG将从2001-01-01 00:00:00 开始每分钟都会运行当前DAG。如果catchup 设置为False,那么DAG将从2021-10-01 15:22:20(当前2021-10-01 15:23:21前一时刻)开始执行DAG run。
举例:有first ,second,third三个shell命令任务,按照顺序调度,每隔1分钟执行一次,首次执行时间为2000-01-01。
设置catchup 为True(默认),DAG python配置如下:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow', # 拥有者名称
'start_date': datetime(2001, 1, 1), # 第一次开始执行的时间,为 UTC 时间
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5), # 失败重试间隔
}
dag = DAG(
dag_id = 'catchup_test1 ', #DAG id ,必须完全由字母、数字、下划线组成
default_args = default_args, #外部定义的 dic 格式的参数
schedule_interval = timedelta(minutes=1), # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
catchup=True # 执行DAG时,将开始时间到目前所有该执行的任务都执行,默认为True
)
first = BashOperator(
task_id='first',
bash_command='echo "run first task"',
dag=dag
)
middle = BashOperator(
task_id='second',
bash_command='echo "run second task"',
dag=dag
)
last = BashOperator(
task_id='third',
bash_command='echo "run third task"',
dag=dag,
retries=3
)
first >> middle >>last
上传python配置文件到$AIRFLOW_HOME/dags下,重启airflow,DAG执行调度如下:
设置catchup 为False,DAG python配置如下:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow', # 拥有者名称
'start_date': datetime(2001, 1, 1), # 第一次开始执行的时间,为 UTC 时间
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5), # 失败重试间隔
}
dag = DAG(
dag_id = 'catchup_test2', #DAG id ,必须完全由字母、数字、下划线组成
default_args = default_args, #外部定义的 dic 格式的参数
schedule_interval = timedelta(minutes=1), # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
catchup=False # 执行DAG时,将开始时间到目前所有该执行的任务都执行,默认为True
)
first = BashOperator(
task_id='first',
bash_command='echo "run first task"',
dag=dag
)
middle = BashOperator(
task_id='second',
bash_command='echo "run second task"',
dag=dag
)
last = BashOperator(
task_id='third',
bash_command='echo "run third task"',
dag=dag,
retries=3
)
first >> middle >>last
上传python配置文件到$AIRFLOW_HOME/dags下,重启airflow,DAG执行调度如下:
有两种方式在Airflow中配置catchup:
- 全局配置
在airflow配置文件airflow.cfg的scheduler部分下,设置catchup_by_default=True(默认)或False,这个设置是全局性的设置。
- DAG文件配置
在python代码配置中设置DAG对象的参数:dag.catchup=True或False。
dag = DAG(
dag_id = 'myairflow_execute_bash',
default_args = default_args,
catchup=False,
schedule_interval = timedelta(days=1))