Celery异步网络框架
定义
Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,它是一个专注于实时处理的任务队列,同时也支持任务调度。中文官网:http://docs.jinkan.org/docs/celery/
在线安装 sudo pip3 install -U Celery -i https://pypi.tuna.tsinghua.edu.cn/simple/
离线安装
tar xvfz celery-0.0.0.tar.gz
cd celery-0.0.0
python3 setup.py build
python3 setup.py install
名词解释
broker 消息传输的中间件,生产者一旦有消息发送,将发至broker;【RQ,redis】
backend 用于存储消息/任务结果,如果需要跟踪和查询任务状态,则需添加要配置相关
worker 工作者 - 消费/执行broker中消息/任务的进程
使用Celery
创建woker
#创建 tasks.py 文件
[root@vm ~]# mkdir celery
[root@vm ~]# cd celery/
[root@vm celery]# vim tasks.py
import time
from celery import Celery
#初始化celery, 指定broker
app = Celery('myselery', broker='redis://:@127.0.0.1:6379/1')
# 如果redis有密码,可添加password
# app = Celery('myselery', broker='redis://:123456@192.168.1.11:6379/1')
# 创建任务函数
@app.task
def task_test():
print("task is running....")
time.sleep(10) #阻塞住
print("task is over.")
[root@vm celery]# celery -A tasks worker --loglevel=info
...
-------------- celery@vm v5.1.2 (sun-harmonics)
--- ***** -----
-- ******* ---- Linux-3.10.0-1160.el7.x86_64-x86_64-with-centos-7.9.2009-Core
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: myselery:0x7f85ddef3198 # selery名字
- ** ---------- .> transport: redis://:**@192.168.1.11:6379/1 #redis
- ** ---------- .> results: disabled:// #不使用存储
- *** --- * --- .> concurrency: 2 (prefork) #2个并发,根据cpu核数
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.task_test #异步的任务
...: INFO/MainProcess] celery@vm ready. #执行后终端显示
创建生产者推送任务
在tasks.py文件的同级目录进入 ipython3 执行 如下代码
[root@vm celery]# python3
>>> from tasks import task_test
>>> task_test.delay()
<AsyncResult: 882c3877-d0a4-4529-8361-e4c5e3723a0d>
>>> print(1) #不会阻塞,可以继续操作
1
查看celery执行
#celery 日志窗口
... INFO/MainProcess] Task tasks.task_test[f2c727f2-aa0d-4174-b766-9d525aa651e1] received
... WARNING/ForkPoolWorker-2] task is running....
... WARNING/ForkPoolWorker-2]
... WARNING/ForkPoolWorker-2] task is over.
... Task tasks.task_test[f2c727f2-aa0d-4174-b766-9d525aa651e1] succeeded in 10.06993673299439s: None
存储结果的celery
(一般不存储)
[root@vm celery]# vim backend.py
from celery import Celery
app = Celery("backend",
broker="redis://:123456@192.168.1.1:6379/1",
backend="redis://:123456@192.168.1.1:6379/2")
@app.task
def backend_task(m, n):
print("backend_task...")
return m + n
[root@vm celery]# celery -A backend worker --loglevel=info
[root@vm celery]# python3
>>> from backend import backend_task
>>> backend_task.delay(1,2)
<AsyncResult: 7375fd0f-e5fb-4e56-9343-3aafd6c80b26>
celery日志查看执行,redis里查看2号库
192.168.1.11:6379[2]> keys *
1) "celery-task-meta-7375fd0f-e5fb-4e56-9343-3aafd6c80b26"
#了解
192.168.1.11:6379[1]> keys *
1) "_kombu.binding.celeryev"
2) "_kombu.binding.celery"
3) "_kombu.binding.celery.pidbox"
192.168.1.11:6379[1]> SMEMBERS _kombu.binding.celeryev
1) "worker.#\x06\x16\x06\x16celeryev.a7473af2-c0b2-4b87-a781-14150476718c"
或存储mysql 或Memcached 等
Celery提供存储任务执行结果的方案,需借助 redis 或 mysql 或Memcached 等
详情可见 http://docs.celeryproject.org/en/latest/reference/celery.result.html#module-celery.result
Django + Celery
创建项目+应用
[root@vm ~]# django-admin startproject test_celery
[root@vm ~]# cd test_celery
[root@vm test_celery]# python3 manage.py startapp user
[root@vm test_celery]# vim test_celery/settings.py
...
ALLOWED_HOSTS = ['*',]
INSTALLED_APPS = [
...
'user',
MIDDLEWARE = [
...
#先注释了,问题暂不解决 'django.middleware.csrf.CsrfViewMiddleware',
LANGUAGE_CODE = 'zh-Hans'
TIME_ZONE = "Asia/Shanghai"
[root@vm test_celery]# python3 manage.py runserver 0.0.0.0:8000
创建celery.py
[root@vm test_celery]# vim test_celery/celery.py
#在settings.py同级目录下 创建 celery.py文件
from celery import Celery
from django.conf import settings
import os
# 为celery设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'test_celery.settings')
# 指定消息中间件[broker
app = Celery("test_celery", broker='redis://:123456@192.168.1.11:6379/2')
# 设置自动发现异步任务
app.autodiscover_tasks(settings.INSTALLED_APPS)
主路由&分布式路由
[root@vm test_celery]# vim test_celery/urls.py
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urlcs),
path('v1/users/', include('user.urls')),
]
[root@vm test_celery]# vim user/urls.py
from django.urls import path
from . import views
urlpatterns = [
path('test_celery', views.test_celery),
]
app创建tasks.py文件
#在应用模块【user目录下】创建tasks.py文件
[root@vm test_celery]# vim user/tasks.py
from test_celery.celery import app
import time
@app.task
def task_test():
print("task begin....")
time.sleep(10)
print("task over....")
app视图函数
[root@vm test_celery]# vim user/views.py
from django.http import HttpResponse
from .tasks import task_test
import datetime
def test_celery(request):
task_test.delay()
now = datetime.datetime.now()
html = "return at %s"%(now.strftime('%H:%M:%S'))
return HttpResponse(html)
启动celery worker
在项目路径下,即test_celery 下 执行如下
[root@vm test_celery]# ls
db.sqlite3 manage.py test_celery user
[root@vm test_celery]# celery -A test_celery worker -l info
...
[tasks]
. user.tasks.task_test
... INFO/MainProcess] celery@vm ready.
浏览器访问测试
http://192.168.1.11:8000/v1/users/test_celery return at 17:31:19
#celery日志
...Task user.tasks.task_test[bf48dfbf-2711-422d-9cc8-a361a6e293ea] received
... 17:31:19,357: WARNING/ForkPoolWorker-2] task begin....
... 17:31:19,357: WARNING/ForkPoolWorker-2] #sleep 10
... 17:31:29,374: WARNING/ForkPoolWorker-2] task over....
django和celery结合步骤
创建celery配置文件【和settings.py同路径】
创建tasks.py存放异步任务函数【各自app应用体系下】
app视图函数中调用celery异步任务【delay()】
终端启动celery worker
浏览器触发测试
生产环境 启动
并发模式切换
默认并发采用 prefork – 多进程模式, 推荐采用 gevent – 协程模式
协程:纤程,微线程 ,一个线程可以有多个协程 函数
python中 进程(计算密集)、线程因gil锁,对高io的有效,计算型无效, 进程协程
celery -A proj worker -P gevent -c 1000
# P POOL Pool implementation: 支持 perfork or eventlet or gevent
# C CONCURRENCY 并发数 一般1000个
后台启动命令
nohup celery -A proj worker -P gevent -c 1000 > celery.log 2>&1 &
#1,nohup: 忽略所有挂断(SIGHUP)信号
#2,标准输入是文件描述符0。它是命令的输入,缺省是键盘,也可以是文件或其他命令的输出。
#标准输出是文件描述符1。它是命令的输出,缺省是屏幕,也可以是文件。
#标准错误是文件描述符2。这是命令错误的输出,缺省是屏幕,同样也可以是文件。
#3,&符号:代表将命令在后台执行
ent
C CONCURRENCY 并发数 一般1000个
### 后台启动命令
```shell
nohup celery -A proj worker -P gevent -c 1000 > celery.log 2>&1 &
#1,nohup: 忽略所有挂断(SIGHUP)信号
#2,标准输入是文件描述符0。它是命令的输入,缺省是键盘,也可以是文件或其他命令的输出。
#标准输出是文件描述符1。它是命令的输出,缺省是屏幕,也可以是文件。
#标准错误是文件描述符2。这是命令错误的输出,缺省是屏幕,同样也可以是文件。
#3,&符号:代表将命令在后台执行