在Celery在python中的应用除了实现异步任务(async task
)外也可以执行定时任务(beat
)
1.Celery定时任务是什么?
Celery默认任务单元由任务生产者触发,但有时可能需要其自动触发, 而beat
进程正是负责此类任务,能够自动触发定时/周期性任务.
只需要在配置中配置好周期任务,然后在运行一个周期任务触发器(beat)即可
2.直接上代码
目录结构如下:
celery_app.py 文件代码如下:
import os
import sys
import time
import celery
from pathlib import Path
from datetime import timedelta
# 实例化celery对象
app = celery.Celery(
"celery_worker",
backend="redis://:@127.0.0.1:6379/4",
broker="redis://:@127.0.0.1:6379/5",
include=[
"celery_worker.email.tasks"
],
)
# celery beat 定时任务
beat_schedule = {
'periodic_task-every-minute': {
# 'task': 'celery_worker.email.tasks.add',
'task': 'chain.send_chains',
'schedule': timedelta(seconds=10),
'args': (11, 22)
},
}
# 配置文件
app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
task_default_queue="normal",
timezone="Asia/Shanghai",
enable_utc=False,
task_ignore_result=True,
redis_max_connections=100,
result_expires=3600,
beat_schedule=beat_schedule
)
"""
celery -A celery_worker.celery_app worker -l info
celery -A celery_worker.celery_app beat
"""
email.tasks.py 代码如下:
from loguru import logger
# 模块化之后
from celery_worker.celery_app import app
@app.task(name='chain.send_chains')
def add(x, y):
logger.info(f'number_add 进来了...x:{x}, y:{y}')
return x + y
然后顺序启动 worker 和 beat 定时任务(记得两个都必须启动)
执行如下命令:
celery -A celery_worker.celery_app worker -l info (启动干活的人)
celery -A celery_worker.celery_app beat (启动定时任务 类似crontab)
效果如下:
其实简单的来说就是这点代码:
beat_schedule = {
'periodic_task-every-minute': {
# 'task': 'celery_worker.email.tasks.add',
'task': 'chain.send_chains',
'schedule': timedelta(seconds=10),
'args': (11, 22)
},
}
periodic_task-every-minute
这个就是定时任务的名字 ,随便起无所谓。
重点是这个 "task"
,经过实际测试,如果这个工作函数没有指定name
名字的话,默认就是 函数路径+函数名称
也就是 'celery_worker.email.tasks.add'
。
但是如果这函数添加name
属性值的话 直接用名字也是可以的,也就是'chain.send_chains'
。
好了 小伙伴们也自己实操下吧!