pt34-python-Celery

Celery异步网络框架

定义

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,它是一个专注于实时处理的任务队列,同时也支持任务调度。中文官网:http://docs.jinkan.org/docs/celery/

在线安装  sudo pip3 install -U Celery -i https://pypi.tuna.tsinghua.edu.cn/simple/

离线安装
    tar xvfz celery-0.0.0.tar.gz
    cd celery-0.0.0
    python3 setup.py build
    python3 setup.py install

名词解释

broker   消息传输的中间件,生产者一旦有消息发送,将发至broker;【RQ,redis】
backend  用于存储消息/任务结果,如果需要跟踪和查询任务状态,则需添加要配置相关
worker   工作者 - 消费/执行broker中消息/任务的进程

在这里插入图片描述

使用Celery

创建woker
#创建 tasks.py 文件
[root@vm ~]#  mkdir celery
[root@vm ~]# cd celery/
[root@vm celery]# vim tasks.py
import time 
from celery import Celery
#初始化celery, 指定broker
app = Celery('myselery', broker='redis://:@127.0.0.1:6379/1')

# 如果redis有密码,可添加password
# app = Celery('myselery', broker='redis://:123456@192.168.1.11:6379/1')

# 创建任务函数
@app.task
def task_test():
    print("task is running....") 
    time.sleep(10) #阻塞住
    print("task is over.")    
[root@vm celery]# celery -A tasks worker --loglevel=info
...
 -------------- celery@vm v5.1.2 (sun-harmonics)
--- ***** -----
-- ******* ---- Linux-3.10.0-1160.el7.x86_64-x86_64-with-centos-7.9.2009-Core 
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         myselery:0x7f85ddef3198     # selery名字
- ** ---------- .> transport:   redis://:**@192.168.1.11:6379/1  #redis
- ** ---------- .> results:     disabled://  #不使用存储
- *** --- * --- .> concurrency: 2 (prefork)    #2个并发,根据cpu核数
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.task_test   #异步的任务

...: INFO/MainProcess] celery@vm ready.   #执行后终端显示
创建生产者推送任务

在tasks.py文件的同级目录进入 ipython3 执行 如下代码

[root@vm celery]# python3
>>> from tasks import task_test
>>> task_test.delay()
<AsyncResult: 882c3877-d0a4-4529-8361-e4c5e3723a0d>
>>> print(1)   #不会阻塞,可以继续操作
1
查看celery执行
#celery 日志窗口
... INFO/MainProcess] Task tasks.task_test[f2c727f2-aa0d-4174-b766-9d525aa651e1] received
... WARNING/ForkPoolWorker-2] task is running....
... WARNING/ForkPoolWorker-2]
... WARNING/ForkPoolWorker-2] task is over.
... Task tasks.task_test[f2c727f2-aa0d-4174-b766-9d525aa651e1] succeeded in 10.06993673299439s: None

存储结果的celery

(一般不存储)

[root@vm celery]# vim backend.py
from celery import Celery

app = Celery("backend",
             broker="redis://:123456@192.168.1.1:6379/1",
             backend="redis://:123456@192.168.1.1:6379/2")

@app.task
def backend_task(m, n):
    print("backend_task...")

    return m + n

[root@vm celery]# celery -A backend worker --loglevel=info
[root@vm celery]# python3
>>> from backend import backend_task
>>> backend_task.delay(1,2)
<AsyncResult: 7375fd0f-e5fb-4e56-9343-3aafd6c80b26>


celery日志查看执行,redis里查看2号库
192.168.1.11:6379[2]> keys *
1) "celery-task-meta-7375fd0f-e5fb-4e56-9343-3aafd6c80b26"
#了解
192.168.1.11:6379[1]> keys *
1) "_kombu.binding.celeryev"
2) "_kombu.binding.celery"
3) "_kombu.binding.celery.pidbox"
192.168.1.11:6379[1]> SMEMBERS _kombu.binding.celeryev
1) "worker.#\x06\x16\x06\x16celeryev.a7473af2-c0b2-4b87-a781-14150476718c"

或存储mysql 或Memcached 等

​ Celery提供存储任务执行结果的方案,需借助 redis 或 mysql 或Memcached 等

​ 详情可见 http://docs.celeryproject.org/en/latest/reference/celery.result.html#module-celery.result

Django + Celery

创建项目+应用

[root@vm ~]# django-admin startproject test_celery
[root@vm ~]# cd test_celery
[root@vm test_celery]# python3 manage.py startapp user

[root@vm test_celery]# vim test_celery/settings.py
...
ALLOWED_HOSTS = ['*',]
INSTALLED_APPS = [
...
    'user',
    
MIDDLEWARE = [
...
#先注释了,问题暂不解决  'django.middleware.csrf.CsrfViewMiddleware',

LANGUAGE_CODE = 'zh-Hans'

TIME_ZONE = "Asia/Shanghai"

[root@vm test_celery]# python3 manage.py runserver 0.0.0.0:8000

创建celery.py

[root@vm test_celery]# vim test_celery/celery.py
#在settings.py同级目录下 创建 celery.py文件
from celery import Celery
from django.conf import settings
import os

# 为celery设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'test_celery.settings')

# 指定消息中间件[broker
app = Celery("test_celery", broker='redis://:123456@192.168.1.11:6379/2')

# 设置自动发现异步任务
app.autodiscover_tasks(settings.INSTALLED_APPS)

主路由&分布式路由

[root@vm test_celery]# vim test_celery/urls.py

from django.urls import path, include

urlpatterns = [
    path('admin/', admin.site.urlcs),
    path('v1/users/', include('user.urls')),
]

[root@vm test_celery]# vim user/urls.py

from django.urls import path
from . import views

urlpatterns = [
    path('test_celery', views.test_celery),
]

app创建tasks.py文件

#在应用模块【user目录下】创建tasks.py文件
[root@vm test_celery]# vim user/tasks.py
from test_celery.celery import app
import time

@app.task
def task_test():
    print("task begin....")
    time.sleep(10)
    print("task over....")

app视图函数

[root@vm test_celery]# vim user/views.py
from django.http import HttpResponse
from .tasks import task_test
import datetime

def test_celery(request):
    task_test.delay()
    now = datetime.datetime.now()
    html = "return at %s"%(now.strftime('%H:%M:%S'))
    return HttpResponse(html)

启动celery worker

在项目路径下,即test_celery 下 执行如下

[root@vm test_celery]# ls
db.sqlite3  manage.py  test_celery  user
[root@vm test_celery]# celery -A test_celery worker -l info
...
[tasks]
  . user.tasks.task_test
... INFO/MainProcess] celery@vm ready.

浏览器访问测试

http://192.168.1.11:8000/v1/users/test_celery    return at 17:31:19


#celery日志
...Task user.tasks.task_test[bf48dfbf-2711-422d-9cc8-a361a6e293ea] received
... 17:31:19,357: WARNING/ForkPoolWorker-2] task begin....
... 17:31:19,357: WARNING/ForkPoolWorker-2]    #sleep  10
... 17:31:29,374: WARNING/ForkPoolWorker-2] task over....

django和celery结合步骤

创建celery配置文件【和settings.py同路径】

创建tasks.py存放异步任务函数【各自app应用体系下】

app视图函数中调用celery异步任务【delay()】

终端启动celery worker

浏览器触发测试

生产环境 启动

并发模式切换

默认并发采用 prefork – 多进程模式, 推荐采用 gevent – 协程模式

协程:纤程,微线程 ,一个线程可以有多个协程 函数

python中 进程(计算密集)、线程因gil锁,对高io的有效,计算型无效, 进程协程

celery -A proj worker -P gevent -c 1000
# P POOL Pool implementation: 支持 perfork or eventlet or gevent
# C CONCURRENCY 并发数  一般1000个

后台启动命令

nohup celery -A proj worker -P gevent -c 1000 > celery.log 2>&1 &

#1,nohup: 忽略所有挂断(SIGHUP)信号
#2,标准输入是文件描述符0。它是命令的输入,缺省是键盘,也可以是文件或其他命令的输出。
#标准输出是文件描述符1。它是命令的输出,缺省是屏幕,也可以是文件。
#标准错误是文件描述符2。这是命令错误的输出,缺省是屏幕,同样也可以是文件。
#3,&符号:代表将命令在后台执行

ent

C CONCURRENCY 并发数 一般1000个


### 	后台启动命令

```shell
nohup celery -A proj worker -P gevent -c 1000 > celery.log 2>&1 &

#1,nohup: 忽略所有挂断(SIGHUP)信号
#2,标准输入是文件描述符0。它是命令的输入,缺省是键盘,也可以是文件或其他命令的输出。
#标准输出是文件描述符1。它是命令的输出,缺省是屏幕,也可以是文件。
#标准错误是文件描述符2。这是命令错误的输出,缺省是屏幕,同样也可以是文件。
#3,&符号:代表将命令在后台执行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/238290.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

语音验证码有哪些优势

稳定高效 链接各国运营商语音通道&#xff0c;不受时间地域限制&#xff0c;到达率较高&#xff0c;验证效果稳定高效&#xff0c;大大提高了客户回填率和转化率&#xff0c;减少客户流失。 海量并发 智能运维系统&#xff0c;实时监控&#xff0c;自动切换&#xff0c;无需…

【二分查找】【滑动窗口】LeeCode2528:最大化城市的最小电量

作者推荐 【动态规划】【广度优先】LeetCode2258:逃离火灾 本文涉及的基础知识点 二分查找算法合集 滑动窗口 题目 给你一个下标从 0 开始长度为 n 的整数数组 stations &#xff0c;其中 stations[i] 表示第 i 座城市的供电站数目。 每个供电站可以在一定 范围 内给所有城…

PR分屏模板|Premiere动态多画面多屏特效视频模板剪辑素材

这一个很棒的分屏效果PR幻灯片模板视频素材&#xff01;为您的视频制作多屏幕动画&#xff01; 非常易于定制、更改颜色。 仅支持Premier Pro 2024及最新版本。 高清分辨率&#xff1a;19201080/30fps。 持续时间–00:37。 21媒体占位符&#xff08;照片或视频&#xff09;。 包…

Mongdb常用复杂语句(nosql)总结

➡️ ➡️ 关于 MongoDB和MongoTemplate 嵌套数据判空查询 的讨论 ⬅️ ⬅️ 在本篇文章中小名会时常维护些来不及分类的日工作常用的复杂语句&#xff1a; 1、按照表id查询 db.getCollection(TABLE_NAME).find({"_id":ObjectId("62947c8fe2a399286a7259f7&q…

Web网站服务(二)

1、客户机地址限制。 Require all granted&#xff1a;表示允许所有主机访问。 Require all denied&#xff1a;表示拒绝所有主机访问。 Require local&#xff1a;表示仅允许本地主机访问。 Require [not] host <主机名或域名列表>&#xff1a;表示允许或拒绝指定主机或…

2、快速搞定Kafka术语

快速搞定Kafka术语 Kafka 服务端3层消息架构 Kafka 客户端Broker 如何持久化数据小结 Kafka 服务端 3层消息架构 第 1 层是主题层&#xff0c;每个主题可以配置 M 个分区&#xff0c;而每个分区又可以配置 N 个副本。第 2 层是分区层&#xff0c;每个分区的 N 个副本中只能有…

【sqli靶场】第二关和第三关通关思路

目录 前言 一、sqli靶场第二关 1.1 判断注入类型 1.2 判断数据表中的列数 1.3 使用union联合查询 1.4 使用group_concat()函数 1.5 爆出users表中的列名 1.6 爆出users表中的数据 二、sqli靶场第三关 2.1 判断注入类型 2.2 观察报错 2.3 判断数据表中的列数 2.4 使用union联合…

视频目标检测 yolov

目录 yolov相关介绍 预训练是检测动物世界的动物的&#xff0c;1060显卡单帧需要300毫秒 yolov相关介绍 论文地址&#xff1a; https://arxiv.org/pdf/2208.09686.pdf 代码地址&#xff1a; https://github.com/YuHengsss/YOLOV ModelsizemAP50valSpeed 2080Ti(batch size1)…

BGD 实战

梯度下降方法 2.1、三种梯度下降不同 梯度下降分三类&#xff1a;批量梯度下降BGD&#xff08;Batch Gradient Descent&#xff09;、小批量梯度下降MBGD&#xff08;Mini-Batch Gradient Descent&#xff09;、随机梯度下降SGD&#xff08;Stochastic Gradient Descent&…

ROB端口需求

对于一个 4-way 的超标量处理器来说&#xff0c;在重排序缓存&#xff08;ROB&#xff09;中每周期可以退休的指令个数应该是不小于四条的&#xff1b;如图给出的ROB,在那些最旧的指令中,有三条指令都已经变为了complete状态,那么在一个周期内就可以将这三条指令都退休。 要实现…

Vue路由跳转重定向动态路由VueCli

Vue路由跳转&重定向&动态路由&VueCli 一、声明式导航-导航链接 1.需求 实现导航高亮效果 如果使用a标签进行跳转的话&#xff0c;需要给当前跳转的导航加样式&#xff0c;同时要移除上一个a标签的样式&#xff0c;太麻烦&#xff01;&#xff01;&#xff01; …

抖店一件代发怎么做?保姆级运营教程!

我是电商珠珠 抖店是抖音发展的电商平台&#xff0c;一些没有电商经验的新手&#xff0c;不想囤货&#xff0c;担心卖不出去。 听说可以一件代发&#xff0c;但却不知道怎么做。 今天&#xff0c;我就来详细的跟大家讲一下。 一、选品上架 在选品的时候可以参考后台的电商…

GZ029 智能电子产品设计与开发赛题第5套

2023年全国职业院校技能大赛高职组 “GZ029智能电子产品设计与开发”赛项赛卷五 题目&#xff1a;模拟工业传送带物品检测系统的设计与开发 1 竞赛任务 在智能电视机上播放工业传送带传输物品视频&#xff0c;模拟工业传送带物品检测系统&#xff08;以下简称物品检测系统&…

wpf TelerikUI使用DragDropManager

首先&#xff0c;我先创建事务对象ApplicationInfo&#xff0c;当暴露出一对属性当例子集合对于构成ListBoxes。这个类在例子中显示如下代码&#xff1a; public class ApplicationInfo { public Double Price { get; set; } public String IconPath { get; set; } public …

Nature 子刊| 单细胞转录组揭示从猴到人的进化过程

期刊&#xff1a;Nature Ecology & Evolution IF&#xff1a;16.8 发表时间&#xff1a;2023 DOI号&#xff1a;10.1038/s41559-023-02186-7 研究背景 据推测&#xff0c;人类认知功能的增强是由于皮质扩张和细胞多样性增加所致。然而&#xff0c;推动这些表型创新的机…

产品入门第一讲:Axure的安装以及基本使用

&#x1f4da;&#x1f4da; &#x1f3c5;我是默&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; ​​​ &#x1f31f;在这里&#xff0c;我要推荐给大家我的专栏《Axure》。&#x1f3af;&#x1f3af; &#x1f680;无论你是编程小白&#xff0c;还是有…

【开源】基于Vue.js的就医保险管理系统

文末获取源码&#xff0c;项目编号&#xff1a; S 085 。 \color{red}{文末获取源码&#xff0c;项目编号&#xff1a;S085。} 文末获取源码&#xff0c;项目编号&#xff1a;S085。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 科室档案模块2.2 医生档案模块2.3 预…

竞赛保研 LSTM的预测算法 - 股票预测 天气预测 房价预测

0 简介 今天学长向大家介绍LSTM基础 基于LSTM的预测算法 - 股票预测 天气预测 房价预测 这是一个较为新颖的竞赛课题方向&#xff0c;学长非常推荐&#xff01; &#x1f9ff; 更多资料, 项目分享&#xff1a; https://gitee.com/dancheng-senior/postgraduate 1 基于 Ke…

yolov8学习

1.yolov8代码结构 1.ultralytics文件夹 &#xff08;1&#xff09;assets 待评估的图片 &#xff08;2&#xff09;cfg datasets:不同数据集对应yaml文件 models:不同模型对应的yaml文件 &#xff08;3&#xff09;data 用于图像预处理的py代码

【安卓】安卓xTS之Media模块 学习笔记(3) VTS测试

因为本文内容较多&#xff0c;将目录放在这里&#xff0c;便于检索。 文章目录 文章系列1. 背景2. 参考文档3. 测试步骤3.1 VTS测试包获取3.1.1 自行编译 3.2 VTS测试包文件结构3.3 测试命令3.3.1 常见命令3.3.2 一些常见的option&#xff1a; 4. 测试结果说明4.1 结果result4…