在Dataworks调度里检查上游表的分区是否已经产出
新建PyOdps3节点,贴如如下代码:
import sys
import time
from datetime import datetime
bizdate = args['bizdate']
if not o.exist_table(args['table']):
sys.exit(1)
# 设置结束时间为今天的20:00
end_time = datetime.now().replace(hour=20, minute=0, second=0, microsecond=0)
table = o.get_table(args['table'])
while True:
if table.exist_partition('dt=' + bizdate):
print('partition dt=' + bizdate + ' exists')
sys.exit(0)
# 检查是否达到结束时间
if datetime.now() >= end_time:
print('达到结束时间,退出循环。')
break
formatted_time = datetime.now().strftime('%H:%M:%S')
print('[' + formatted_time + '] partition dt=' + bizdate + ' not exists, wait')
time.sleep(300) # 300秒 = 5分钟
sys.exit(0)
在调度配置
里填入如下参数:
提交任务即可。