分布式异步任务框架celery

Celery介绍

github地址:GitHub - celery/celery: Distributed Task Queue (development branch)

文档地址:Celery - Distributed Task Queue — Celery 5.3.6 documentation

1.1 Celery是什么

celery时一个灵活且可靠的处理大量消息的分布式系统,可以在多个节点之间处理某个任务

celery时一个专注于实时处理的任务队列,支持任务调度

celery是开源的,有很多的使用者

celery完全基于python语言编写的

celery本质上是一个【分布式的异步任务调度框架】,类似于Apache的airflow

celery只是用来调度任务的,但是它本身并不具备存储任务的功能,而调度任务的时候肯定是要把任务存起来,因此要使用celery的话,还需要搭配一些具有存储、访问功能的工具,比如:消息队列、Redis缓存、数据库等。官方推荐是消息队列RabbitMQ,我们使用Redis

同步调用函数 --》add--》执行5s钟--》数据返回了

异步调用函数--》add--》执行5s钟--》执行完的数据,找个地方存起来

调用方--》去存的地方看一下--》任务有没有执行完

1.2 应用场景

1)异步任务

 视频转码、邮件发送、消息推送等一些耗时操作

2)定时任务

定时推送消息、定时爬取一些数据、定时统计一些数据

3)延时任务

提交任务后,等一段时间再执行任务

1.3 celery架构

celery架构,它采用典型的生产者-消费者模式,主要由以下部分组成

生产者生产---消费者进行消费

producer:它负责把任务提交到broker钟

celery Beat:会读取文件、周期性的向broker中提交任务

broker:消息中间件,放任务的地点,celery本身不提供,借助redis等消息队列

worker:工人、消费者,负责从消息中间件中取出任务--》执行

backend:worker执行完,会有结果,结果存储再backend,celery不提供,借助redis等消息队列。

Celery使用

2.1 安装

pip install celery

使用redis作为消息队列

pip install redis

如果是Windows系统还需要安装eventlet

pip install eventlet

2.2 使用

创建main.py文件

import time

from celery import Celery

# 任务消息队列
broker = "redis://127.0.0.1:6379/1"
# 结果消息队列
backend = "redis://127.0.0.1:6379/2"
# 实例化app对象
app = Celery("demo", broker=broker, backend=backend)


# 编写任务
@app.task  # 被装饰器装饰了,才是celery任务
def add(a, b):
    print("a+b的结果是", a + b)
    time.sleep(1)  # 模拟耗时
    return a + b

创建add_task.py文件编写消费者代码

"""这个程序用来提交任务 producer"""
from main import add

# # 同步任务
# res = add(1, 2)
# print(res)
# 异步任务
# 像消息队列中提交了一个任务,计算1+5的任务,但是没有执行  ceec680b-e0fb-4636-9244-1fa7ca0c570c
res = add.delay(1, 5)  # 没有耗时,直接返回,但是没有返回值,而是返回一个uuid号
print(res)
# 启动worker 再终端使用命令启动,执行完成后会把结果存到redis的2库中
# win :celery -A main worker -l info -P eventlet
# mac/linux:celery -A main worker -l info

启动worker需要再终端下方进行启动

win :celery -A main worker -l info -P eventlet
mac/linux:celery -A main worker -l info

如果报错celery库找不到的问题,使用python -m celery -A main worker -l info -P eventlet进行启动

2.3 包结构

后续的项目越来越大,task任务越来越多,希望把任务拆分再多个py文件中

目录结构

celery.py

import time

from celery import Celery

# 任务消息队列
broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"
# 实例化app对象
app = Celery("demo", broker=broker, backend=backend,
             include=['celery_task.crawl_task', 'celery_task.user_task', 'celery_task.order_task'])

# 任务分到不同的py文件中

user_task.py

import time
from .celery import app


@app.task
def send_email(to):
    print("发送邮件")
    time.sleep(3)
    print(f"向{to}发送邮件成功")
    return f"向{to}发送邮件成功"

order_task.py

import time
from .celery import app


@app.task
def pay_order():
    print("开始下单")
    time.sleep(5)
    print("下单完成")
    return "下单完成"

crawl_task.py

import time
from .celery import app


@app.task
def crawl_baidu():
    print("开始爬虫百度")
    time.sleep(2)
    print("爬虫完毕百度")
    return "爬虫完毕百度"


@app.task
def crawl_dewu():
    print("开始爬虫得物")
    time.sleep(2)
    print("爬虫完毕百度")
    return "爬虫完毕得物"

add_task.py

from celery_task.crawl_task import crawl_baidu

res = crawl_baidu.delay()  # 没有参数,这里就不传
print(res)  # 得到uuid

get_result.py(查询是否被执行)

from .celery_task.celery import app
from celery.result import AsyncResult

id = "你的任务uuid"
if __name__ == '__main__':
    result = AsyncResult(id=id, app=app)
    if result.successful():
        result = result.get()
        print(result)
    elif result.failed():
        print("任务失败")
    elif result.status == "PENDING":
        print("任务等待被执行")
    elif result.status == "RETRY":
        print("任务异常后正在重试")
    elif result.status == "STARTED":
        print("任务已经开始被执行")

启动worker命令

celery -A celery_task(包名) worker -l info -P eventlet

异步任务-延时任务-定时任务

异步任务

上述介绍的均为异步任务

使用delay()

延时任务

from celery_task.user_task import send_email
from datetime import datetime, timedelta

eta = datetime.utcnow() + timedelta(seconds=5)  # 默认时区为utc时区
res = send_email.apply_async(args=['邮箱'], eta=eta)
print(res)

apply_async(args=['参数'],eta=延时时间)

如果延迟任务提交了,但是worker没启动,等延迟的时间,worker再启动,任务会立马启动

定时任务

在celery.py中

import time

from celery import Celery

# 任务消息队列
broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"
# 实例化app对象
app = Celery("demo", broker=broker, backend=backend,
             include=['celery_task.crawl_task', 'celery_task.user_task', 'celery_task.order_task'])

# 任务分到不同的py文件中


# 加入定时任务
# 指定了时区,中国时区,以后延时任务
app.conf.timezone = "Asia/Shanghai"
app.conf.enable_utc = False
# 每隔5s爬取百度
from datetime import datetime, timedelta

app.conf.beat_schedule = {
    'low-task': {
        'task': 'celery_task.crawl_task.crawl_baidu',
        'schedule': timedelta(seconds=5), # 每5秒发送一次
        'args': ()  # 参数
    }
}
# 必须启动beat

启动beat命令

celery -A celery_task beat -l info

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/476359.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

hcip复习总结1

OSI----------- 定义了数据的产生标准 。 7 层 应用 ------- 表示 会话 传输 -----Telnet - 23 ssh---22 http---80 https-443 TCP ---- 传输控制卋议。是一种面向连接的可靠的传输卋议。 UDP---- 用户数据报卋议。是一种非面向连接的丌可靠传输卋议。 保证可靠性&…

鸿蒙开发-UI-动画-页面间动画

鸿蒙开发-UI-组件导航-Navigation 鸿蒙开发-UI-组件导航-Tabs 鸿蒙开发-UI-图形-图片 鸿蒙开发-UI-图形-绘制几何图形 鸿蒙开发-UI-图形-绘制自定义图形 鸿蒙开发-UI-图形-页面内动画 鸿蒙开发-UI-图形-组件内转场动画 鸿蒙开发-UI-图形-弹簧曲线动画 文章目录 前言 一、放大缩…

Springboot+vue的医疗挂号管理系统+数据库+报告+免费远程调试

效果介绍: Springbootvue的医疗挂号管理系统,Javaee项目,springboot vue前后端分离项目 本文设计了一个基于Springbootvue的前后端分离的医疗挂号管理系统,采用M(model)V(view)C(con…

Kubernetes集群搭建 kubernetes集群安装

Kubeadm kubeadm 是 Kubernetes 社区提供的集群构建工具,它能够以最佳实践的方式部署一个最小化的可用 Kubernetes 集群。 但是 kubeadm 在设计上并未安装网络解决方案,所以需要用户自行安装第三方符合 CNI 的网络解决方案,如 flanal&#…

7个方便快速使用的Tkinter控件源码分享,赶快收藏

文章目录 7个快速使用的Tkinter控件源码分享1. 按钮 Button2. 开关 Checkbutton3. 显示文本 Label4. 带名称、数值显示的划动条5. 带标签的复选框6. 带名称的输入框7. 带名称的微调框7个快速使用的Tkinter控件源码分享 tkinter 是一个简单入手,但是功能十分强大的GUI编程库,学…

阶乘的强悍溢出技能

【题目描述】 输入n,计算S=1!+2!+3!+…+n!的末6位(不含前导0)。,n!表示 前n个正整数之积。 【样例输入】 …

[python]bar_chart_race设置日期格式

1、设置日期标签的时间格式 # 设置日期格式,默认为%Y-%m-%dbcr.bar_chart_race(df, covid19_horiz.gif, period_fmt%b %-d, %Y) 2、更改日期标签为数值 # 设置日期标签为数值bcr.bar_chart_race(df.reset_index(dropTrue), covid19_horiz.gif, interpolate_period…

基于springboot+vue的中山社区医疗综合服务平台

博主主页:猫头鹰源码 博主简介:Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战,欢迎高校老师\讲师\同行交流合作 ​主要内容:毕业设计(Javaweb项目|小程序|Pyt…

第二十八天-ES6标准入门和Flex布局

目录 1.ES6标准入门 2.ES6与JavaScript关系 3.ES6常用新特性 1.变量与常量 1.let三大特性 2.常量三大特征 2.解构赋值 1.数组解构赋值 2.对象解构赋值 3.字符串解构赋值 3.函数与箭头函数 1.函数 2.箭头函数 4.JS的面向对象编程 5.模块化 export使用 import使用…

QT界面制作

#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);this->setWindowFlag(Qt::FramelessWindowHint);//接收动图QMovie *mv new QMovie(":/pictrue/th.gif…

二分查找算法(2)

852.山脉数组的峰顶索引 一、题目描述 即下标 i 前的所有元素都升序、后的所有元素都降序&#xff0c; i 是最大值 OJ题目链接&#xff1a;力扣&#xff08;LeetCode&#xff09; 二、思路解析 三、代码 class Solution { public:int peakIndexInMountainArray(vector<in…

算法打卡day11

今日任务&#xff1a; 1&#xff09;239. 滑动窗口最大值 2&#xff09;347.前 K 个高频元素 239. 滑动窗口最大值 题目链接&#xff1a;239. 滑动窗口最大值 - 力扣&#xff08;LeetCode&#xff09; 给定一个数组 nums&#xff0c;有一个大小为 k 的滑动窗口从数组的最左侧移…

利用Python如何实现数据驱动的接口自动化测试

前言 大家在接口测试的过程中&#xff0c;很多时候会用到对CSV的读取操作&#xff0c;本文主要说明Python3对CSV的写入和读取。下面话不多说了&#xff0c;来一起看看详细的介绍吧。 1、需求 某API&#xff0c;GET方法&#xff0c;token,mobile,email三个参数 token为必填项…

【项目】基于YOLOv8和RotNet实现圆形滑块验证码(拼图)自动识别(通过识别中间圆形的角度实现)

TOC 一、引言 1.1 实现目标 要达到的效果是使用算法预测中间圆形的角度&#xff0c;返回给服务器&#xff0c;实现自动完成验证码的问题。要实现的内容如下图所示。 1.2 实现思路 思路1&#xff08;效果较差&#xff09;&#xff1a;以RotNet要实现的验证码识别为灵感&…

微信小程序(五十九)使用鉴权组件时原页面js自动加载解决方法(24/3/14)

注释很详细&#xff0c;直接上代码 上一篇 新增内容&#xff1a; 1.使用覆盖函数的方法阻止原页面的自动执行方法 2.使用判断实现只有当未登录时才进行方法覆盖 源码&#xff1a; app.json {"pages": ["pages/index/index","pages/logs/logs"],…

python中如何生成词云

大家好&#xff0c;我是雄雄&#xff0c;欢迎关注微信公众号&#xff1a;雄雄的小课堂 今天给大家看看&#xff0c;如何使用python实现根据记录创建生成词云 首先我们看下效果图。 一个是生成了新闻的词云&#xff0c;另一个是生成了聊天记录的词云。下面是代码&#xff1a; …

Java学习笔记(19)

双列集合 键值对 一一对应 键值对对象 entry Map Put第一次给不存在的key&#xff0c;会把键值对添加到map中&#xff0c;返回null Put给同一个key是会覆盖value的&#xff0c;返回被覆盖的值value Remove根据key删除键值对&#xff0c;返回被删除的值value Map遍历 键找值 …

python语法踩坑 | list的append操作如何从in-place改为out-of-place

背景 博主写python遇到一个问题&#xff0c;需要把对list添加元素改为非原地操作&#xff0c;即不修改原list。 但是由于列表中的元素是字典类型&#xff0c;无法直接用运算符。 于是写出了下面这行代码 query_list message_list.copy().append(one_question) 其中message…

Anaconda安装 (windowsLinux)

文章目录 Anaconda简介设置国内源pip || conda 一、Anaconda &#xff08;Windows系统&#xff09;1.1 下载及安装1.2 虚拟环境创建1.3 在Pycharm中配置conda的环境 二、Anaconda&#xff08;Linux系统&#xff09; Anaconda简介 conda是一个开源的包、环境管理器&#xff0c;可…

探索 Atlassian 云平台:组织、站点、产品架构解析

我们通常访问的是 Atlassian 的某个云站点&#xff0c;比如填空题-中国站点为&#xff1a;cloze-cn.atlassian.net。当我们访问该站点内的具体产品时&#xff0c;只需在该站点的 URL 后添加相应产品的缩写&#xff0c;例如&#xff1a; Confluence: cloze-cn.atlassian.net/wi…