[ Tool ] celery分布式任务框架基本使用

celery官方
Celery 官网:www.celeryproject.org/

Celery 官方文档英文版:docs.celeryproject.org/en/latest/i…

Celery 官方文档中文版:docs.jinkan.org/docs/celery…

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统

专注于实时处理的异步任务队列

同时也支持任务调度

注意:

Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

celery异步任务框架
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的 注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

人是一个独立运行的服务 | 医院也是一个独立运行的服务 正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题 人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

celery架构
Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

使用场景
异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

延迟执行:解决延迟任务

定时执行:解决周期(周期)任务,比如每天数据统计

celery的安装配置

pip install celery

pip install eventlet

消息中间件:RabbitMQ/Redis

app=Celery(‘任务名’, broker=’xxx’, backend=’xxx’)

两种celery的任务结构

基本结构

只写一个py文件,如celery_task.py

# worker文件
from celery import Celery
broker='redis://127.0.0.1:6379/1'  # broker任务队列
backend='redis://127.0.0.1:6379/2' # 结果存储
app=Celery(__name__,broker=broker,backend=backend)
 
#添加任务(使用这个装饰器装饰,@app.task)
@app.task
def add(x,y):
    print(x,y)
    return x+y
启动worker:
    需要将路径切换到该py文件的路径下,用终端执行
    - 非windows:celery worker -A celery_task -l info
    - windows:celery worker -A celery_task -l info -P eventlet
        
# 提交任务到broker文件
from celery_task import add
add(3,4)  # 直接执行,不会被添加到broker中
ret = add.delay(1,2)  # 向broker中添加一个任务
print(ret)  # ret是任务编号,后期需要通过任务编号获取任务执行结果
 
# 查看任务执行结果文件
from celery_task import app-
from celery.result import AsyncResult
id = '3e397fd7-e0c1-4c5c-999c-2655a96793bb'
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
        elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
 
包结构
  • 新建一个包,包名随意,比如celery_task

包内结构

-celery_task

​ -__init__.py

​ -celery.py(必须叫celery)

​ -task01.py(任务文件,每个任务单独写一个py文件)

​ -task02.py

-add_task.py:为worker中提交任务

_get_res.py:获取任务执行结果

  • celery.py
from celery import Celery
broker='redis://127.0.0.1:6379/1'  # broker任务队列
backend='redis://127.0.0.1:6379/2' # 结果存储
app=Celery(__name__,broker=broker,backend=backend,include=['celery_task.task01','celery_task.task02'])
 
  • add_task.py
from celery_task.task01 import add
from celery_task.task02 import mul
 
 
ret = add.delay(1,2)
print(ret)
 
ret = mul.delay(10,10)
print(ret)
 
  • get_res.py
from celery_task.celery import app
from celery.result import AsyncResult
id = '3d3779be-fe75-4ad4-ab31-d7dbe13d3e63'
 
 
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
 
执行延时任务
 

执行延时任务

  • add_task.py
from celery_task.task01 import add
from celery_task.task02 import mul
 
# 非延迟任务
ret = add.delay(1,2)
print(ret)
ret = mul.delay(10,10)
print(ret)
 
 
# 将任务做成延迟任务
from datetime import datetime,timedelta
# 延迟时间必须是utc时间,timedelta是根据utc时间向后延迟
eta = datetime.utcnow() + timedelta(seconds=10)
ret = add.apply_async(args=(200,50),eta=eta)
print(ret)

执行定时任务
执行定时任务时,任务的配置需要写在celery.py中,就是worker,因为定时任务的配置需要在worker开启之前加载,否则无法执行定时任务。

执行定时任务除了需要woker以外,还需要一位‘工人’定时为worker送任务,这个工人就是beat

执行定时任务的时候,需要启动worker及beat,一个负责执行任务,一个负责定时运送任务

启动worker,启动beat -celery worker -A celery_task -l info -P eventlet -celery beat -A celery_task -l info

  • celery.py
from celery import Celery
 
 
broker='redis://127.0.0.1:6379/1'  # broker任务队列
backend='redis://127.0.0.1:6379/2' # 结果存储
app=Celery(__name__,broker=broker,backend=backend,include=['celery_task.task01','celery_task.task02'])
 
 
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'add-task': {
        # 需要定时执行的任务
        'task': 'celery_task.task01.add',
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        # 任务需要的参数
        'args': (300, 150),
    }
}
 
总结:

感谢每一个认真阅读我文章的人!!!

作为一位过来人也是希望大家少走一些弯路,如果你不想再体验一次学习时找不到资料,没人解答问题,坚持几天便放弃的感受的话,在这里我给大家分享一些自动化测试的学习资源,希望能给你前进的路上带来帮助。

软件测试面试文档

我们学习必然是为了找到高薪的工作,下面这些面试题是来自阿里、腾讯、字节等一线互联网大厂最新的面试资料,并且有字节大佬给出了权威的解答,刷完这一套面试资料相信大家都能找到满意的工作。

   视频文档获取方式:
这份文档和视频资料,对于想从事【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴我走过了最艰难的路程,希望也能帮助到你!以上均可以分享,点下方小卡片即可自行领取。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/288898.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

DRF从入门到精通七(djangorestframework-simplejwt、定制返回格式、多方式登录)

文章目录 一、djangorestframework-simplejwt快速使用1.基础使用步骤2.自己配置视图校验访问局部配置认证及权限类全局配置认证及权限类 3.关于双token认证问题 二、定制返回格式三、多方式登录 一、djangorestframework-simplejwt快速使用 JWT主要用于签发登录接口需要配合认证…

阿里云服务器云盘ESSD Entry、SSD、高效云盘性能测评

阿里云服务器系统盘或数据盘支持多种云盘类型,如高效云盘、ESSD Entry云盘、SSD云盘、ESSD云盘、ESSD PL-X云盘及ESSD AutoPL云盘等,阿里云百科aliyunbaike.com详细介绍不同云盘说明及单盘容量、最大/最小IOPS、最大/最小吞吐量、单路随机写平均时延等性…

yolo增加Shape-IoU,完美超越SIoU/EIoU/CIoU

论文地址:https://arxiv.org/pdf/2312.17663.pdf 代码地址:GitHub - malagoutou/Shape-IoU 摘要 作为检测定位分支的重要组成部分,边界框回归损失在目标检测任务中起着重要作用。现有的边界框回归方法通常考虑GT框和预测框之间的几何关系&…

C语言实例_math.h库函数功能及其用法详解

一、前言 数学在计算机编程中扮演着至关重要的角色,C语言的math.h头文件提供了一系列的函数和工具,用于数学计算和常用数学函数的实现。这些函数包括数值运算、三角函数、指数对数函数等,为开发人员提供了强大的数学处理能力。本文将对math.…

从零开始了解大数据(七):总结

系列文章目录 从零开始了解大数据(一):数据分析入门篇-CSDN博客 从零开始了解大数据(二):Hadoop篇-CSDN博客 从零开始了解大数据(三):HDFS分布式文件系统篇-CSDN博客 从零开始了解大数据(四):MapReduce篇-CSDN博客 从零开始了解大…

多模态大模型Vary:扩充视觉Vocabulary,实现更细粒度的视觉感知

前言 现代大型视觉语言模型(LVLMs)具有相同的视觉词汇- CLIP,它可以涵盖大多数常见的视觉任务。然而,对于一些需要密集和细粒度视觉感知的特殊视觉任务,例如文档级OCR或图表理解,特别是在非英语场景下,clip风格的词汇…

电子电路快速入门

参考: 电子电路设计入门篇一 https://www.bilibili.com/video/BV18C4y1p7p9Proteus与protel的区别 https://www.zhihu.com/question/385796380数字集成电路的设计流程简介 https://zhuanlan.zhihu.com/p/24476011?tt_fromweixin硬件电路设计的基本流程、作用和注意…

StratifiedGroupKFold解释和代码实现

StratifiedGroupKFold解释和代码实现 文章目录 一、StratifiedGroupKFold解释和代码实现是什么?二、 实验数据设置2.1 实验数据生成代码2.2 代码结果 三、实验代码3.1 实验代码3.2 实验结果3.3 结果解释 四、样本类别类别不平衡 一、StratifiedGroupKFold解释和代码…

简单Diff算法

简单Diff算法 渲染器的核心 Diff算法 解决的问题 比较新旧虚拟节点的子节点,实现最小化更新。 虚拟节点key属性的作用 就像虚拟节点的“身份证号”,在更新时,渲染器会通过key属性找到可复用的节点,然后尽可能地通过DOM移动操…

Hexo 部署 Github Pages, Github Actions自动部署

想整个静态的博客部署在github pages 历经两天的折磨终于是摸索成功了,官网的文档太简陋了,很多东西没说清楚。 欢迎大家访问我的博客! CanyueThis is Canyues blog.https://mobeicanyue.github.io/ 最终实现的效果,一个项目仓库…

polar CTF 简单rce

一、题目 <?php /*PolarD&N CTF*/ highlight_file(__FILE__); function no($txt){if(!preg_match("/cat|more|less|head|tac|tail|nl|od|vim|uniq|system|proc_open|shell_exec|popen| /i", $txt)){return $txt;}else{ die("whats up");}} $yyds(…

【openGauss服务器端工具的使用】

【openGauss服务器端工具的使用】 gs_checkperf openGauss 不仅提供了gs_checkperf工具来帮助用户了解openGauss的负载情况。 使用数据库安装用户登录服务器&#xff0c;执行如下命令进行查看数据库性能&#xff1a; 简要信息展示&#xff1a;[ommopengauss03 ~]$ gs_checkperf…

Ubuntu Server 22.04 连接Wifi并配置静态IP

Ubuntu Server 22.04 连接Wifi并配置静态IP 前言&#xff1a;我家最近好几台电脑&#xff0c;我都想跑着Ubuntu Server做服务器&#xff0c;但是近几年的超级本已经不自带网口了&#xff0c;所以我就考虑用Wifi来联网&#xff0c;速度也还可以&#xff0c;但是既然是跑服务&…

《算法导论》复习——CHP1、CHP2 算法基础

基本定义&#xff1a; 算法是一组有穷的规则&#xff0c;规定了解决某一特定类型问题的一系列运算。 关心算法的正确性和效率。 算法的五个重要特性&#xff1a;确定性、能行性、输入、输出、有穷性。 基础方法&#xff1a; 伪代码&#xff08;Pseudocode&#xff09;&#xff…

Springboot集成RabbitMq二

接上一篇&#xff1a;Springboot集成RabbitMq一-CSDN博客 1、搭建项目-消费者 与之前一样 2、创建配置类 package com.wym.rabbitmqconsumer.utils;import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.spring…

11.盛水最多的容器(双指针,C解法)

题目描述&#xff1a; 给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多的水。 返回容器可以储存的最大水量。 说明&#xff1a;…

床垫选得好孩子睡得香!康姿百德学生床垫让孩子拥有甜美梦乡

睡眠与健康密切相关,而床垫的选购则直接关系到人们睡眠质量的好坏。而孩子的床垫选择更是重中之重,青少年儿童正处于生长发育的重要时期,床垫选不好很容易导致孩子睡眠不足,影响孩子的学习,甚至会影响孩子的脊椎发育。所以,给孩子挑张好用又合适的床垫十分重要,现在让我们来看看…

SpringMVC-域对象共享数据

一、request域对象共享数据 1.1 通过ServletAPI共享数据 RequestMapping("/servletAPI")public String servletAPI(HttpServletRequest request){request.setAttribute("requestAttribute","helloworld");return "servletAPI";}<!…

(学习打卡1)重学Java设计模式之设计模式介绍

前言&#xff1a;听说有本很牛的关于Java设计模式的书——重学Java设计模式&#xff0c;然后买了(*^▽^*) 开始跟着小傅哥学Java设计模式吧&#xff0c;本文主要记录笔者的学习笔记和心得。 打卡&#xff01;打卡&#xff01; 设计模式介绍 一、设计模式是什么&#xff1f; …

学习JavaEE的日子 day08 方法的重载,递归,万年历

day08 1.方法的重载 >理解&#xff1a;方法与方法之间的关系> 条件&#xff1a;> 1.方法必须在同一个类中> 2.方法名必须一致> 3.参数列表的个数或者类型不一致> 4.与返回值无关> 好处&#xff1a;系统会根据具体实参类型自动匹配到对应的方法…