文章目录
- celery的介绍
- celery的架构
- celery的快速使用
- celery 包结构
- celery 定时 异步 延迟任务
- django使用celery
celery的介绍
celery是什么?
-翻译过来是芹菜
-官网:https://docs.celeryq.dev/en/stable/
-吉祥物:芹菜
-分布式的异步任务框架
- 分布式:一个任务,拆成多个任务在不同机器上做
- 异步任务:后台执行,不阻塞主任务
- 框架:集成到项目中
celery主要的几个功能
1 异步任务
异步发邮件,短信,通知
\
2 延迟任务
延迟 几秒 再执行某个任务
订单提交后,延迟半小时,把订单取消
\
3 定时任务
-每隔多长事件 执行某个任务
- 定时更新缓存
celery的架构
django和celery是一个服务
celery的三个模块
1 broker:消息中间件,消息队列,任务中间件
-存储任务(函数):发送短信任务,统计在线人数。。。
-redis,reabbitmq 存储
-字符串形式,能把任务表示出来即可
函数名,函数参数,函数位置
2 worker:任务执行单元
-从消息队列[broker–》redis]—》取出任务执行—>程序(进程)
3 backend:结果存储 Result Stores
-任务执行完成后的结果存储在这里
-redis存储,关系型数据库。。
执行流程
1 其他程序—》提交任务(函数)—》任务序列化后存到celery的broker中
2 接下来:worker执行—》从broker中取任务–》执行
3 任务执行完后,把结果存到 bancked中
celery和其他程序是 独立运行的
"""
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务(django),一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
人(django)是一个独立运行的服务 | 医院(celery)也是一个独立运行的服务
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
"""
celery的快速使用
pip celery install
# 可以把这个文件封装成一个包 以后直接调用即可
from celery import Celery
import time
# 提交任务的地址 注意 必须是单引号 不然会报错
broker = 'redis://127.0.0.1:6379/1'
# 完成任务之后的结果存储地址
backend = 'redis://127.0.0.1:6379/2'
# add是任务名字 broker提交任务地址 任务完成地址
# 写任务---》写函数---》必须用 @app.task 装饰---》装饰后,就变成了celery的任务了
app = Celery('app',broker=broker,backend=backend)
# 自己设置任务
@app.task
def add():
time.sleep(2)
return 'hello'
@ap.task
def add1(a,b):
time.sleep(2)
return a+b
# 在调用的文件
from demo import add,add1
# 同步执行 只需要加上括号即可 直接等待之后得到结果
# res = add()
# print(res)
# 异步执行 提交上去得到一个任务id号
res = add.delay()
print(res)
# 如果有参数 直接传即可
res1 = add1.delay(3,4)
print(res1)
# 启动执行任务命令
如果是win系统 需要在安装一个模块 pip3 install eventlet
###win运行:
pip3 install eventlet
celery -A 是指定命令 deno 是执行函数的文件名记得切换路径 worker -l info 指定级别
celery -A demo worker -l info -P eventlet
任务执行完成这个 会阻塞在哪里等待新任务的提交
####非win运行:mac linux
celery -A demo worker -l info
# 查询结果 可以查询执行状态和结果
from demo import app
from celery.result import AsyncResult
id = '5a7af383-6a4a-456e-b33b-43d5618f1208'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
if a.successful():
result = a.get() # hello world
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
celery 包结构
-celery_task
-celery.py
-user_task.py
-order_task.py
-goods_task.py
包下必须有一个celery的文件 所有任务一定要注册到 include中
from celery import Celery
#####1 实例化得到对象#######
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('app', broker=broker, backend=backend,include=['celery_task.order_task','celery_task.user_task'])
#######2 写任务 ##########以后各种类型任务,单独写在py文件中
-其他程序中提交任务
add_task_package.py
-其他程序中查询结果
get_result_package.py
启动 workera切换路径到路径上一级启动 比如 cd到celery_task的上一级即可 包名文件
celery 定时 异步 延迟任务
from celery_task.user_task import send
from celery_task.order_task import order_of
# 异步任务 任务函数.delay(参数)
res = send.delay('15800089521','3216')
print(res)
# 延迟任务
from datetime import datetime, timedelta
# # 当前时间 加 上 自己设置的时间 后面需要自己修改一下配置文件 当前时间是utc时间
eta = datetime.utcnow() + timedelta(seconds=20) # minutes=3 可以修改时间单位
# # args指定参数 eta指定时间延迟的时间
res = order_of.apply_async(args=['100860'], eta=eta)
print(res)
# 定时任务 单独写一个文件
# 定时任务
# 定时任务--》一定要启动beat
# 在celery.py 中写
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'send_sms': {
'task': 'celery_task.user_task.send',
'schedule': timedelta(seconds=8), # 每隔多久发一次
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': ('1896388888', '6666'), # 执行任务的函数 需要几个参数传几个
}
# 'cancle_order': {
# 'task': 'celery_task.order_task.cancel_order',
# # 'schedule': timedelta(minutes=30),
# 'schedule': crontab(hour=11, day_of_week=1, minute=21), # 每周一早八点
# 'args': ('9999999',),
# },
}
# 启动beat---》每隔一段时间,就提交任务
# celery -A celery_task beat -l info
# 启动worker
# celery -A celery_task worker -l info -P eventlet
# 两个都要启动 一个是执行任务 一个是提交任务 就不需要我们手动启了
django使用celery
# celery中如果用到django的配置文件 必须加上一句话
from celery import Celery
import os
# 异步任务使用到django的配置 必须要添加这一句加载配置文件
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
# 提交任务的地址 注意 必须是单引号 不然会报错
broker = 'redis://127.0.0.1:6379/1'
# 完成任务之后的结果存储地址
backend = 'redis://127.0.0.1:6379/2'
# add是任务名字 broker提交任务地址 任务完成地址
app = Celery('app',broker=broker,backend=backend,include=['celery_task.order_task','celery_task.user_task'])
# 任务函数
from .celery import app
from libs.sms_sen import common_send_sms
@app.task
def send(phone, code):
common_send_sms(mobile=phone,code=code).delay() # 掉用封装好的短信
return '手机号:%s,发送验证码:%s,成功' % (phone, code)
# 视图类中
# 使用celery异步发送短信
class Celery_send_sms(ViewSet):
def create(self,request):
mobile = request.data.get('mobile')
code = '8888'
res = send.delay(phone=mobile,code=code)
print(res)
return APIResponse('手机号%s短信发送成功' % mobile)