介绍
Celery 是异步任务调度工具就,或分布式任务队列(Distributed Task Queue),有多个worker。
Broker:中间人,所有任务放在broker中,worker到broker中提取任务执行。
这样系统为开环系统,broker无法知道任务执行情况,可使用backend broker获取任务执行结果,
broker:消息传输的中间件,即消息队列,发送和接收结果;
backend:数据库,存储消息接celery执行的消息及结果;可以是database 或cached。
celery 组成:消息中间件、任务执行单元、执行结果存储。
celery不提供消息服务,但可与其他消息中间件集成;
报错:daemonic processes are not allowed to have children
原代码使用了多进程(multiprocessing),在与其他代码合并使用时产生了如上报错。
根据新逻辑可以不使用multiprocessing处理,删除后报错解决。worker是celery提供的任务执行单元,并发的运行在分布式系统节点中。
如果不方便修改,可参考如下方法
方法一:通过-P设置其他celery运行方式,但其他方式不是多进程模式,可能会影响项目性能
-P POOL, --pool POOL Pool implementation: prefork (default), eventlet, gevent or solo.
方法二:设置环境变量,允许守护进程创建子进程
先执行 export PYTHONOPTIMIZE=1,再运行celery;如果在supervisord中托管celery,可在supervisord的配置文件中添加environment=PYTHONOPTIMIZE=1。
注意:PYTHONOPTIMIZE=1允许守护进程创建子进程,当主进程结束,会带走守护子进程,但守护子进程还来不及带走守护子进程的子进程(即使设置了daemon = True),守护子进程的子进程将会成为孤儿进程,需要特殊处理。
放到celery运行的代码需要添加装饰器
启动celery前需要对计划运行在celery队列中的函数添加装饰器,如@app_celery.task,这样celery才能找到该函数,否则在运行该函数时报错:Received unregistered task of type 'tasks.add'.,KeyError: 'tasks.add'。
task_time_limit及soft_task_time_limit的使用
时间限制的配置方法:
在装饰器中添加time_limit的时间显示,其优先级高于配置文件,如:@app.task(time_limit=10)
在配置文件中设置超时的字段如下:
CELERYD_TASK_SOFT_TIME_LIMIT CELERYD_TASK_TIME_LIMIT
soft_time_limit 会在内部抛一个 Exception, hard time limit 没法被 catch.
celery pool 用的是 gevent, 现在的实现里 gevent 做 worker pool 的时候会忽略 soft_time_limit, 只有 hard_time_limit 会被触发(通过 gevent.Timeout 实现).
@app.task def add(x, y): try: time.sleep(10) return x + y except SoftTimeLimitExceeded as stle: print(f"error: {stle}") except Exception as e: print(f"error: {e}")
如果出现soft超时异常,其报错信息显示在启动celery的界面,如果设置了保存结果的地址,如在redis中,因其异常已经捕获,其状态为success,但结果为空,如下:
如果出现hard超时异常,执行结果的状态为failure,异常被抛出,无法做后续处理:
查看celery中任务的数量及状态
1. 使用celery命令查看启动的应用(tasks)下正在执行的任务:celery -A tasks inspect active
2. 加入celery的任务,在执行delay()方法后会返回一个AsyncResult的类,包含任务的celery_task_id,可通过遍历的方法获取现有任务的状态。
3. 使用Flower监控工具
pip install flower,启动:celery -A tasks flower --port=5555
可在http://localhost:5555中查看到celery的状态和信息。
失败任务重试
参考:
Celery中文文档
Configuration and defaults — Celery 4.4.7 documentation
解决celery报错daemonic processes are not allowed to have children-CSDN博客
查询celery正在执行的任务_celery查看任务状态-CSDN博客
python celery 失败任务重试 python的celery_mob6454cc6ff2b9的技术博客_51CTO博客
Celery Time Limit 的坑 · Shining Moon (monsterxx03.com)
python 异步任务 开始 停止 python异步任务框架_mob64ca140d61c6的技术博客_51CTO博客
Celery 分布式任务队列_celery查看任务队列-CSDN博客