分布式任务队列系统 celery 原理及入门

基本

Celery 是一个简单、灵活且可靠的分布式任务队列系统,用于在后台执行异步任务处理大量消息。支持任务调度、任务分发和结果存储,并且可以与消息代理(如 RabbitMQ、Redis 等)一起工作,以实现任务的队列管理和执行。

关键特性和概念:

  1. 分布式任务队列:Celery 允许你将任务分发到多个工作节点上,这些节点可以并行处理任务,从而提高系统的吞吐量和性能。
  2. 异步执行:Celery 支持异步执行任务,即任务可以在后台运行,而不阻塞主程序的执行。
  3. 定时任务:Celery 提供了定时任务功能,可以按照预定的时间间隔或特定时间点执行任务。
  4. 持久化:Celery 支持将任务结果持久化到数据库中,以便后续查询和分析。
  5. 多种消息传递协议:Celery 支持多种消息传递协议,如 RabbitMQ、Redis 等。

核心模块:

  1. 任务(Task):任务是 Celery 的基本单位,代表需要异步执行的函数。任务通过 @app.task 装饰器定义。
  2. 消息代理(Broker):消息代理是一个中间件,用于在客户端和 Worker 之间传递任务消息。常用的消息代理包括 RabbitMQ、Redis、Amazon SQS 等。
  3. Worker:Worker 是实际执行任务的进程。它从消息代理中获取任务并执行,然后将结果返回给结果后端(如果配置了结果后端)。
  4. 客户端(Client):客户端是发送任务到消息代理的部分,通常是你的应用程序代码。它调用任务并将其发送到消息代理。
  5. 结果后端(Result Backend):结果后端用于存储任务的执行结果,便于后续查询。常用的结果后端包括 Redis、数据库(如 PostgreSQL、MySQL)、MongoDB 等。
  6. Beat Scheduler:这是一个定时调度器,用于定期发送周期性任务到消息代理。它可以按照预定的时间间隔或特定时间点调度任务。

组成架构

请添加图片描述

Celery 的架构可以简化为三大核心组件:消息中间件(Message Broker)、任务执行单元(Worker)和任务执行结果存储(Task Result Store)

1. 消息中间件(Message Broker)

功能

  • 作为客户端和 Worker 之间的通信桥梁。
  • 接收来自客户端的任务消息,并将其分发给可用的 Worker。

常用实现

  • RabbitMQ:高性能、可靠性强,支持复杂路由规则。
  • Redis:轻量级、速度快,适合小规模应用。
  • Amazon SQS:托管服务,无需自行维护服务器。

2. 任务执行单元(Worker)

功能

  • 从消息中间件获取待处理的任务。
  • 执行实际的业务逻辑,即运行被装饰为 Celery 任务的函数。
  • 将执行结果发送到结果存储系统。

启动方式:

使用命令启动 Worker,例如:

celery -A tasks worker --loglevel=info

3. 任务执行结果存储(Task Result Store)

功能

  • 存储每个已完成任务的结果,以便后续查询或处理。

常用实现:

  1. Redis
  2. 数据库系统 (如 PostgreSQL, MySQL)
  3. MongoDB

入门

安装celery

pip install celery redis
或
pip3 install celery redis

定义和装饰任务

在代码中定义 Celery 应用和需要异步执行的任务函数。例如创建celery_task.py

from celery import Celery

# broker 为消息中间件配置,这里用的是redis
# backend 为任务执行结果存储,也用的是redis
app = Celery('celery_task', broker='redis://localhost:32769/0', backend='redis://localhost:32769/1')


# 通过装饰器指定任务执行单元,即消息接受后的处理函数
@app.task
def add(x, y):
    return f'{x}{y} 的和为 {x + y}'


# 启动celery,准备好接收消息,一旦接收到消息就执行任务,并存储结果
if __name__ == '__main__':
    app.worker_main(['worker', '--loglevel=info'])

启动 Worker

启动一个或多个 Worker 进程来处理任务。这些 Worker 会连接到指定的消息代理并等待新任务到达。

方式一:

# 启动celery,准备好接收消息,一旦接收到消息就执行任务,并存储结果
if __name__ == '__main__':
    app.worker_main(['worker', '--loglevel=info'])

方式二:

确保你已经安装了celery并且正确配置环境变量

celery -A tasks worker --loglevel=info

启动完成:

请添加图片描述

发送任务

客户端代码调用定义好的 Celery 任务,并将其发送到消息代理。例如创建celery_add_task

from celery_task import add

# 发送任务
result = add.delay(4, 6)

print(result)
# 获取并打印结果(这会阻塞直到返回结果)
print(result.get())

执行结果:

请添加图片描述

(可选)获取结果

当你调用一个 Celery 任务时,你可以立即获取一个AsyncResult 实例,该实例可以用来检索任务的结果。

from celery_task import add
from celery.result import AsyncResult
import celery_task

"""
使用Task ID创建AsyncResult对象,用于检查状态与获取最终计算出的值。

第一个参数是发送任务时返回的task id
result = add.delay(4, 6)
print(result) # 直接打印出来的就是task_id
"""
# 第一种方式创建
# result = add.AsyncResult("16a6a1e4-9500-43b5-8a88-1058663d44b7")

# 第二种方式创建
result = AsyncResult("16a6a1e4-9500-43b5-8a88-1058663d44b7", app=celery_task.app)

status = result.status
if status == "SUCCESS":
    print('执行成功')
elif status == "FAILURE":
    print('执行失败')
elif status == "PENDING":
    print('任务等待中被执行')
elif status == "RETRY":
    print('任务异常后正在重试')
elif status == "STARTED":
    print('任务已经开始被执行')
else:
    print("未匹配到状态值")

if result.ready():
    print(result.result)  # 打印最终计算出的值,如果已完成。
else:
    print("Task is still running")

注意

celery_task.py文件名和

代码app = Celery('celery_task', broker='redis://localhost:32769/0', backend='redis://localhost:32769/1')中的第一个参数名要一致,否则会报错

app端报错日志:

[2024-05-31 16:13:26,538: ERROR/MainProcess] Received unregistered task of type 'celery_task.add'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
https://docs.celeryq.dev/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
b'[[4, 6], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (81b)

The full contents of the message headers:
{'lang': 'py', 'task': 'celery_task.add', 'id': 'c89d27d5-fa1f-4ae2-98c3-7eecc6cc01f6', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'c89d27d5-fa1f-4ae2-98c3-7eecc6cc01f6', 'parent_id': None, 'argsrepr': '(4, 6)', 'kwargsrepr': '{}', 'origin': 'gen13554@fangyirui.local', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}}

The delivery info for this task is:
{'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
  File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/worker/consumer/consumer.py", line 659, in on_task_received
    strategy = strategies[type_]
KeyError: 'celery_task.add'

客户端报错日志:

c89d27d5-fa1f-4ae2-98c3-7eecc6cc01f6
Traceback (most recent call last):
  File "/Users/fangyirui/PycharmProjects/pythonProject/celery/celery_add_task.py", line 8, in <module>
    print(result.get())
  File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/result.py", line 251, in get
    return self.backend.wait_for_pending(
  File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/backends/asynchronous.py", line 223, in wait_for_pending
    return result.maybe_throw(callback=callback, propagate=propagate)
  File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/result.py", line 365, in maybe_throw
    self.throw(value, self._to_remote_traceback(tb))
  File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/result.py", line 358, in throw
    self.on_ready.throw(*args, **kwargs)
  File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/vine/promises.py", line 235, in throw
    reraise(type(exc), exc, tb)
  File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/vine/utils.py", line 27, in reraise
    raise value
celery.exceptions.NotRegistered: 'celery_task.add'

工作流程

一个任务的生命周期

+-----------------+       +-----------------+       +-----------------+
|     Client      | ----> |   Message Broker| ----> |     Worker      |
| (Task Producer) |       |   (e.g., Redis) |       | (Task Consumer) |
+-----------------+       +-----------------+       +-----------------+
        ^                                                |
        |                                                v
+-----------------+                               +-----------------+
|  Result Backend | <-----------------------------|  Task Execution |
| (e.g., Redis)   |                               +-----------------+
+-----------------+
  1. 定义阶段:使用 @app.task 装饰器定义了一个简单函数 add
  2. 创建与发送:调用 add.delay(4, 6) 将请求转换成一条包含操作数和操作类型的信息,并放入Redis队列中。
  3. 排队阶段:Redis接收到这条信息,将其存储起来等待worker拉取。
  4. 获取并锁定:运行中的worker从Redis中拉取这条信息,并锁定它以防止其他workers重复执行同一项工作。
  5. 执行阶段:Worker根据信息内容计算,在此期间,这个信息处于处理中状态。
  6. 存储阶段:计算完毕后,将结果存储回Redis,以便以后查询。如果没有配置result backend,则跳过这一步骤直接进入下一个步骤。
  7. 查询阶段:客户端通过调用 result.get() 来阻塞式地等待并获取计算结果。在实际应用场景中,也可能是非阻塞式地检查状态,例如使用 result.status 或者轮询机制查看是否完成。
  8. 清理和过期管理: 根据系统设置,如果不再需要保存这些历史记录,可以由系统自动或者手动清除这些数据。结果默认在redis中86400秒后过期(24H)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/666857.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

对于vsc中的vue命令 vue.json

打开vsc 然后在左下角有一个设置 2.点击用户代码片段 3.输入 vue.json回车 将此代码粘贴 &#xff08;我的不一定都适合&#xff09; { "vue2 template": { "prefix": "v2", "body": [ "<template>", " <…

更新详情 | Flutter 3.22 与 Dart 3.4

作者 / Michael Thomsen 过去几个月&#xff0c;Dart & Flutter 部门可谓忙碌非凡&#xff0c;但我们很高兴地宣布&#xff0c;Flutter 3.22 和 Dart 3.4 已经在今年的 Google I/O 大会上精彩亮相&#xff01; Google I/Ohttps://io.google/2024/intl/zh/ 我们始终致力于提…

【调试笔记-20240530-Linux-在 OpenWRT-23.05 上为 nginx 配置 HTTPS 网站】

调试笔记-系列文章目录 调试笔记-20240530-Linux-在 OpenWRT-23.05 上为 nginx 配置 HTTPS 网站 文章目录 调试笔记-系列文章目录调试笔记-20240530-Linux-在 OpenWRT-23.05 上为 nginx 配置 HTTPS 网站 前言一、调试环境操作系统&#xff1a;OpenWrt 23.05.3调试环境调试目标…

CS61C | lecture2

# CS61C | lecture2 C 语言是一种编译语言。C 编译器将 C 程序映射到特定与体系结构的机器代码(实际上是一串 0 和 1)。 而 Java 会通过 JVM(Java 虚拟机) 将代码转换为独立于架构的字节码。 Python 则会直接解释代码。C 不会直接解释代码&#xff0c;而是将其编译成机器代码之…

计算机基础学习路线

计算机基础学习路线 整理自学计算机基础的过程&#xff0c;虽学习内容众多&#xff0c;然始终相信世上无难事&#xff0c;只怕有心人&#xff0c;期间也遇到许多志同道合的同学&#xff0c;现在也分享自己的学习过程来帮助有需要的。 一、数据结构与算法 视频方面我看的是青…

Bean作用域和生产周期已经Bean的线程安全问题

bean 的作用域 单例(Singletion) : Spring 容器中只有一个 bean &#xff0c;这个 bean 在整个应用程序内共享。 原话(Prototype) : 每次 getBean()&#xff0c; 都是不同的bean&#xff0c;都会创建一个实例。 请求(Request)&#xff1a;每个HTTP请求都会创建一个新的 Bean …

ARM虚拟机安装OMV

OMV(OpenMediaVault)是基于 Debian GNU/Linux 的网络连接存储&#xff08;network attached storage&#xff0c;NAS&#xff09;解决方案。它包含 SSH、(S) FTP、SMB/CIFS、DAAP 媒体服务器、rsync、 BitTorrent 等很多种服务。它可用于 x86-64 和 ARM 平台。 在x86-64平台上&…

推荐一款开源电子签章/电子合同系统

文章目录 前言一、项目介绍二、项目地址三、技术架构四、代码结构介绍五、功能模块六、功能界面首页面手写签名面板电子印章制作数字证书生成 总结 前言 大家好&#xff01;我是智航云科技&#xff0c;今天为大家分享一个免费开源的电子签字系统。 一、项目介绍 开放签电子签…

【Python】如何使用 Python 自动发送每日电子邮件报告

人不走空 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌赋&#xff1a;斯是陋室&#xff0c;惟吾德馨 目录 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌…

详解Spring IoCDI(二)

目录 承接上文&#xff1a;详解Spring IoC&DI &#xff08;一&#xff09; 1.IoC详解 1.1方法注解Bean 1.2方法注解要配合类注解使用 1.3定义多个对象 1.4重命名Bean 1.5扫描路径 2.DI详解 2.1DI与IoC的关系 2.2属性注入 2.3构造方法注入 2.4Setter注入 2.5 三…

【Endnote】如何在word界面加载Endnote

如何在word界面加载Endnote 方法1&#xff1a;方法2&#xff1a;从word入手方法3&#xff1a;从CWYW入手参考 已下载EndNote,但Word中没有显示EndNote&#xff0c;应如何加载显示呢&#xff1f; 方法1&#xff1a; 使用EndNote的Configure EndNote.exe 。 具体步骤为&#x…

内网安全:横向传递攻击(PTH || PTK || PTT 哈希票据传递)

内网安全&#xff1a;横向传递攻击. 横向移动就是在拿下对方一台主机后&#xff0c;以拿下的那台主机作为跳板&#xff0c;对内网的其他主机再进行后面渗透&#xff0c;利用既有的资源尝试获取更多的凭据、更高的权限&#xff0c;一步一步拿下更多的主机&#xff0c;进而达到控…

理解AdaBoost算法:简单流程概述(一)【流程理解、无数学推导】

什么是AdaBoost 算法&#xff1f; AdaBoost&#xff08;Adaptive Boosting&#xff09;算法&#xff0c;全称为 自适应提升 &#xff0c;是 一种在机器学习中用作集成方法的提升技术 。它之所以被称为自适应提升&#xff0c;因为每个实例的权重会重新分配&#xff0c;错误分类…

C语言 | Leetcode C语言题解之第123题买卖股票的最佳时机III

题目&#xff1a; 题解&#xff1a; #define max(a, b) ((a) < (b) ? (b) : (a))int maxProfit(int* prices, int pricesSize) {int buy1 -prices[0], sell1 0;int buy2 -prices[0], sell2 0;for (int i 1; i < pricesSize; i) {buy1 max(buy1, -prices[i]);sell…

解决:【无法安装“vue.volar“扩展,因为它与当前 VIsual Studio Code 版本不兼容(版本 1.80.0)】

目录 问题复现问题分析解决步骤1、进入VSCode插件市场&#xff0c;搜索Vue.volar2、点击搜索结果&#xff0c;进入详情页面3、下载.vsix文件完成后&#xff0c;用解压软件打开 4、复制package.json文件&#xff0c;修改vscode版本5、保存package.json文件&#xff0c;并更新.v…

【NPS】微软NPS配置802.1x,验证域账号,动态分配VLAN(有线网络篇)

上两篇中介绍了如何配置NPS和在WLC上如何配置802.1X来实现验证域账号和动态分配VLAN&#xff0c;802.1x协议作为一种成熟的身份验证框架&#xff0c;不仅适用于无线网络&#xff0c;同样也适用于有线网络环境。这里我们将介绍如何在有线网络中部署802.1x认证&#xff0c;以验证…

Visual Studio的桌面快捷方式图标不显示

1.问题描述 以下以Visual Studio 2019举例&#xff0c; 正常图标&#xff1a; 但是当前Visual Studio的桌面快捷方式图标不显示了&#xff1f; 2.问题原因分析 Visual Studio 2019桌面快捷方式图标不显示可能由以下几个原因造成&#xff1a; 图标缓存问题&#xff1a;Windo…

Spring boot 集成thymeleaf

Spring boot 集成thymeleaf 背景 自己通过Spring boot集成通义千问实现了一个智能问答系统。Spring boot集成通义千问已经完成&#xff0c;现在需要做一个简单的页面展示&#xff0c;作为一个八年没有摸过前端的后端开发人员&#xff0c;不得不又拿起了html和thymeleaf。 Sp…

笔记-docker基于ubuntu22.04安装Jitsi Meet

背景 利用JitsiMeet打造一个可以在线会议的环境&#xff0c;根据躺的坑&#xff0c;做个记录 参考 JitsMeet部署安装说明 开始操作 环境 docker run -it --name ubuntu22.04 ubuntu:22.04 /bin/bash问题 1、安装 openjdk-11 apt install openjdk-11-jdk配置环境变量&…

探索 Ollama: 你的本地 AI 助手

本期推荐的开源项目是 Ollama&#xff0c;它是一款本地大模型运行工具&#xff0c;可以帮助用户轻松下载和运行各种大型语言模型&#xff08;LLM&#xff09;&#xff0c;而无需将数据上传到云端。以下是关于 Ollama 的介绍以及安装和使用教程&#xff1a; Ollama 是什么&#…