🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客
🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。
🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频
PythonOperator可以调用Python函数,由于Python基本可以调用任何类型的任务,如果实在找不到合适的Operator,将任务转为Python函数,使用PythonOperator即可。
关于PythonOperator常用参数如下,更多参数可以查看官网:airflow.operators.python — Airflow Documentation
python_callable(python callable):调用的python函数
op_kwargs(dict):调用python函数对应的 **args 参数,dict格式,使用参照案例。
op_args(list):调用python函数对应的 *args 参数,多个封装到一个tuple中,list格式,使用参照案例。
PythonOperator调度案例
import random
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
# python中 * 关键字参数允许你传入0个或任意个参数,这些可变参数在函数调用时自动组装为一个tuple。
# python中 ** 关键字参数允许你传入0个或任意个含参数名的参数,这些关键字参数在函数内部自动组装为一个dict。
def print__hello1(*a,**b):
print(a)
print(b)
print("hello airflow1")
# 返回的值只会打印到日志中
return{"sss1":"xxx1"}
def print__hello2(random_base):
print(random_base)
print("hello airflow2")
# 返回的值只会打印到日志中
return{"sss2":"xxx2"}
default_args = {
'owner':'maliu',
'start_date':datetime(2021, 10, 1),
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5) # 失败重试间隔
}
dag = DAG(
dag_id = 'execute_pythoncode',
default_args=default_args,
schedule_interval=timedelta(minutes=1)
)
first=PythonOperator(
task_id='first',
#填写 print__hello1 方法时,不要加上“()”
python_callable=print__hello1,
# op_args 对应 print_hello1 方法中的a参数
op_args=[1,2,3,"hello","world"],
# op_kwargs 对应 print__hello1 方法中的b参数
op_kwargs={"id":"1","name":"zs","age":18},
dag = dag
)
second=PythonOperator(
task_id='second',
#填写 print__hello2 方法时,不要加上“()”
python_callable=print__hello2,
# random_base 参数对应 print_hello2 方法中参数“random_base”
op_kwargs={"random_base":random.randint(0,9)},
dag=dag
)
first >> second