Celery的任务流

在这里插入图片描述

Celery的任务流

在之前调用任务的时候只是使用delay()和apply_async()方法。但是有时我们并不想简单的执行单个异步任务,比如说需要将某个异步任务的结果作为另一个异步任务的参数或者需要将多个异步任务并行执行,返回一组返回值,为了实现此目标,Celery使用一种叫做signatures的东西

celery的简单使用

signature的引入

signature官方文档

可以简单理解为signature是将之前的异步任务以某种方式包装,包装后的异步任务仍可以使用之前的delay()和apply_async()方法,并且包装后的异步任务就可以以多种方式组合成复杂的工作流程

先创建一个tasks.py

import time

import celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)


# 加
@app.task
def add_num(a, b):
    print(f'{a}+{b}')
    time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序
    c = a + b
    return c


# 减
@app.task
def subtract_num(a, b):
    print(f'{a}-{b}')
    time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序
    c = a - b
    return c


# 乘
@app.task
def multiply_num(a, b):
    print(f'{a}*{b}')
    time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序
    c = a * b
    return c


# 除
@app.task
def divide_num(a, b):
    print(f'{a}/{b}')
    time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序
    c = a / b
    return c


@app.task
def test(args):
    print(args)
    return args

运行celery消费者

# 格式为:celery -A app对象所在的文件 worker -l 日志级别 -Q 队列名称(也可以不指定,默认为celry)
celery -A tasks worker -l info -Q test

在这里插入图片描述

用signature对上面的add_num包装

from celery import signature
from tasks import add_num, subtract_num, multiply_num, divide_num

# 方法1
sign = add_num.signature((1, 1), queue='test')
ret = sign.delay()
print(ret.get())

# 方法2
sign = signature('tasks.add_num', (1, 1), queue='test')
ret = sign.delay()
print(ret.get())

# 方法3
sign = signature(add_num, (1, 1), queue='test')
ret = sign.delay()
print(ret.get())

chain的使用

chain官方链接

chain可以将signature包装的任务函数一个一个执行,一个执行完将执行return结果传递给下一个任务函数

from celery import signature, chain
from tasks import add_num, subtract_num, multiply_num, divide_num

add_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (2,), queue='test')
multiply_sign = signature(multiply_num, (2,), queue='test')
divide_sign = signature(divide_num, (2,), queue='test')

# 对某个数依次做加减乘除处理
chain1 = chain(add_sign, subtract_sign, multiply_sign, divide_sign)
ret = chain1.delay()
print(ret.get())

在这里插入图片描述

可以看到异步任务依次执行,并将上一个异步任务的结果作为参数传递给下一个,形成一个链条

group的使用

group官方链接

group可以将signature包装的任务函数并行执行,返回一组返回值

from celery import signature, chain, group
from tasks import add_num, subtract_num, multiply_num, divide_num

add_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (6, 2), queue='test')
multiply_sign = signature(multiply_num, (6, 2), queue='test')
divide_sign = signature(divide_num, (6, 2), queue='test')

# 对某个数分别做加减乘除处理

group1 = group(add_sign, subtract_sign, multiply_sign, divide_sign)
ret = group1.delay()
print(ret.get())
#[8, 4, 12, 3.0]

在这里插入图片描述
可以看到相比于chain,group里的任务是同时执行

chord的使用

chord官方链接
依赖一个group任务,group任务结束后,将所有子任务的返回值作为参数传递给chord的回调函数,即chord由group任务组与回调函数组成

上代码

from celery import signature, chain, group, chord
from tasks import add_num, subtract_num, multiply_num, divide_num, test

add_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (6, 2), queue='test')
multiply_sign = signature(multiply_num, (6, 2), queue='test')
divide_sign = signature(divide_num, (6, 2), queue='test')

group1 = group(add_sign, subtract_sign, multiply_sign, divide_sign)

#包装test异步任务函数
test_sign = signature(test, queue='test')

c1 = chord(group1, test_sign)
c1.delay()

在这里插入图片描述
可以看出,在执行完加减乘除所有异步任务后,chord会将任务组的结果作为list交给test函数,这里的test有点像回调函数

PS:根据我的观察,chain,group,chord在执行完后都会返回一个任务id,其中chain的任务id为任务链里最后一个任务的id,group的任务id是一个临时的任务id(group任务都结束后就会消失),chord的任务id是回调函数的任务id。因此chain和chord在任务结束后,任务结果还是可以查到的,而group则查询不到,因此group的任务结果可能无法用AsyncResult查询到

最后附上celery关于任务工作流的官方链接
celery工作流

PS

有的时候我们可能需要在celery的task函数中调用其他的celery函数,并且需要同步的获取结果(其实着本质上就是把异步的celery函数变成同步运行),具体如下,先创建一个tasks.py

import time

import celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)


# 加
@app.task
def add_num(a, b):
    print(f'{a}+{b}')
    time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序
    c = a + b
    return c


@app.task
def test():#在test函数里调用add_num函数,并且同步获取结果,将结果作为test函数的返回值
    ret = add_num.delay(1,2)
    ret = ret.get()
    return ret
#启动消费者
celery -A tasks worker -l info

调用test异步函数

from tasks import test
ret = test.delay()

在这里插入图片描述

结果就是出错了,因为官方不建议在一个异步任务中区等待另一个异步任务的返回结果,所以这个时候就可以通过上面的chain方法实现这个需求。当然还有一种不建议的方法就是在同步获取celery任务结果的get方法中添加参数disable_sync_subtasks=False,具体如下

import time

import celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)


# 加
@app.task
def add_num(a, b):
    print(f'{a}+{b}')
    time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序
    c = a + b
    return c


@app.task
def test():
    ret = add_num.delay(1, 2)
    ret = ret.get(disable_sync_subtasks=False)#在这添加disable_sync_subtasks=False
    return ret

再调用一次test方法

在这里插入图片描述
成功调用

详见celery官方链接
链接传送门

结语

写这些,仅记录自己学习使用celery的过程。如果有什么错误的地方,还请大家批评指正。最后,希望小伙伴们都能有所收获。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/514155.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Flutter中setState函数的使用注意事项

文章目录 Flutter中setState函数的使用注意事项只能在具有State对象的类中使用不要在build方法中使用将状态更新逻辑放在setState方法内部避免频繁调用使用回调函数更新状态 Flutter中setState函数的使用注意事项 setState()函数是Flutter中非常重要的一个函数,它用…

【教程】宝塔default.db占用空间几十g解决方法|宝塔占用磁盘空间特别大解决方法|宝塔磁盘被占满怎么清理

目录 一、前言二、排查问题三、解决方法 一、前言 用过宝塔创建网站,大家应该都非常熟悉,但是用随着用的时间越来越多,宝塔所占用的空间也越来越多,不停的加大数据盘都没有用,我原先买了30G够用了,随着时间…

基于单片机的盆栽自动浇花系统

**单片机设计介绍,基于单片机的盆栽自动浇花系统 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机的盆栽自动浇花系统设计概要主要涵盖了通过单片机技术实现盆栽植物的自动化、智能化浇水管理。下面将从系统…

四川古力未来科技抖音小店:安全便捷,购物新体验

在数字化浪潮席卷全球的今天,电商平台的安全性与便捷性成为了消费者最为关心的问题。四川古力未来科技有限公司,凭借其强大的技术实力和深厚的行业经验,为广大消费者带来了一个安全可靠的购物新选择——古力未来科技抖音小店。 古力未来科技抖…

【论文速读】| MASTERKEY:大语言模型聊天机器人的自动化越狱

本次分享论文为:MASTERKEY: Automated Jailbreaking of Large Language Model Chatbots 基本信息 原文作者:Gelei Deng, Yi Liu, Yuekang Li, Kailong Wang, Ying Zhang, Zefeng Li, Haoyu Wang, Tianwei Zhang, Yang Liu 作者单位:南洋理工…

三维VR虚拟展馆打破传统展览的时间与空间限制

探索绿色未来,见证生态转型——这是长江经济带在国家发展蓝图中的承诺。如今,通过线上长江经济带发展阶段性成效展厅,这一承诺正以创新的互动体验呈现给公众,彰显了环境保护与经济增长的和谐统一。 深圳VR公司华锐视点精心策划的长…

网络攻防中json序列化漏洞案例,fastjson远程命令执行漏洞原理

网络攻防中json序列化漏洞案例,fastjson远程命令执行漏洞原理。 网络攻防中的JSON序列化漏洞是指当应用程序使用JSON(JavaScript Object Notation)格式来序列化和反序列化对象时,由于不当处理或不安全的编程实践,导致攻击者能够执行恶意操作的安全漏洞。这些操作可能包括远…

说说对排序算法的一些理解

对排序 - 冒泡排序的理解 冒泡排序是一种简单的排序算法,其基本思想是通过多次遍历数组,每次比较相邻的两个元素。如果前一个元素大于后一个元素,则交换它们的位置。这样,每一次遍历都会将当前未排序部分的最大元素“冒泡”到数组…

【Flutter】windows环境配置

windows 11 环境 官方教程 配置了flutter 环境变量在系统的path里 bin 路径。 死活没反应 关闭了git关闭了dart.exe关闭了vs还是不行卸载重新来 新版git flutter doctor 还需要android 环境

Redis性能管理及集群三种模式(一)

一、前期准备 至少准备三台服务器为主从复制、哨兵的实验做准备 一台主redis、两台从redis 二、Redis性能管理 2.1 查看Redis内存使用 查看Redis内存使用——info memory 2.2 内存使用率 1<内存碎片<1.5表示合理的内存碎片大于>1.5&#xff0c;需要输入shutdown save…

ES学习日记(十)-------Java操作ES之连接客户端

Elasticsearch有两种连接方式: transport、rest。transport 通过TCP方式访问ES(只支持iava)&#xff0c;rest 方式通过http API 访问ES(没有语言限制)。 ES官方建议使用Iest 方式&#xff0c;transport 在7.8 版本中不建议使用&#xff0c;在8.x的版本中废弃。你可以用Java客户…

小白水平理解面试经典题目1431. Kids With the Greatest Number of Candies【Array类】

1431. 拥有最多糖果的孩子 小白渣翻译 一群孩子手里拿 着不同数目的糖果。你打算额外给每个孩子一些糖果&#xff0c;然后再确定哪些孩子拥有最多的糖果。 给你一个数组 candies &#xff0c;其中 candies[i] 代表第 i 个孩子拥有的糖果数目。另给你一个整数 extraCandies &…

MVCC详细总结

简介 MVCC&#xff08;Multi-Version Concurrency Control&#xff09;是一种多版本并发控制机制&#xff0c;主要用于数据库管理系统中&#xff0c;实现对数据库的并发访问。在编程语言中&#xff0c;MVCC可以实现事务内存。 MVCC的特点是读不加锁&#xff0c;读写不冲突。MVC…

基于Unet的BraTS 3d 脑肿瘤医学图像分割,从nii.gz文件中切分出2D图片数据

1、前言 3D图像分割一直是医疗领域的难题&#xff0c;在这方面nnunet已经成为了标杆&#xff0c;不过nnunet教程较少&#xff0c;本人之前跑了好久&#xff0c;一直目录报错、格式报错&#xff0c;反正哪里都是报错等等。并且&#xff0c;nnunet对于硬件的要求很高&#xff0c…

mac、windows 电脑安装使用多个版本的node

我们为啥要安装多个不同版本的node&#xff1f; 开发旧项目时&#xff0c;使用低版本Nodejs。开发新项目时&#xff0c;需使用高版本Node.js。可使用n同时安装多个版本Node.js&#xff0c;并切换到指定版本Node.js。 mac电脑安装 一、全局安装 npm install -g n 二、mac电脑…

内存和网卡压力测试

1.内存压力测试 1.1测试目的 内存压力测试的目的是评估开发板中的内存子系统性能和稳定性&#xff0c;以确保它能够满足特定的应用需求。开发板通常用于嵌入式系统、物联网设备、嵌入式智能家居等场景&#xff0c;这些场景对内存的要求通常比较高。 其内存压力测试的主要目的…

RK3568 RTC驱动实验

RK3568 RTC驱动实验 1. RTC简介 ​ RTC 也就是实时时钟&#xff0c;用于记录当前系统时间&#xff0c;对于 Linux 系统而言时间是非常重要的&#xff0c;使用 Linux 设备的时候也需要查看时间。RTC是Linux的时间系统。 ​ RTC 设备驱动是一个标准的字符设备驱动&#xff0c;…

强大缓存清理工具 NetShred X for Mac激活版

NetShred X for Mac是一款专为Mac用户设计的强大缓存清理工具&#xff0c;旨在帮助用户轻松管理和优化系统性能。这款软件拥有直观易用的界面&#xff0c;即使是初次使用的用户也能快速上手。 软件下载&#xff1a;NetShred X for Mac激活版下载 NetShred X能够深入扫描Mac系统…

【MATLAB】PSO_BP神经网络时序预测算法

有意向获取代码&#xff0c;请转文末观看代码获取方式~ 1 基本定义 PSO_BP神经网络时序预测算法是一种结合了粒子群优化(PSO)算法和反向传播(BP)神经网络的时序预测方法。它利用了PSO算法的全局搜索能力和BP神经网络的优化能力&#xff0c;能够更准确地预测时序数据。 具体步…

idea maven 打包 内存溢出 报 GC overhead limit exceeded -> [Help 1]

idea 使用maven打包 报GC overhead limit exceeded -> [Help 1] 解决方法&#xff1a; 打开settings -> 点开如同所示 将 vm Options 参数 设为 -Xmx8g