基本
Celery 是一个简单、灵活且可靠的分布式任务队列系统,用于在后台执行异步任务处理大量消息。支持任务调度、任务分发和结果存储,并且可以与消息代理(如 RabbitMQ、Redis 等)一起工作,以实现任务的队列管理和执行。
关键特性和概念:
- 分布式任务队列:Celery 允许你将任务分发到多个工作节点上,这些节点可以并行处理任务,从而提高系统的吞吐量和性能。
- 异步执行:Celery 支持异步执行任务,即任务可以在后台运行,而不阻塞主程序的执行。
- 定时任务:Celery 提供了定时任务功能,可以按照预定的时间间隔或特定时间点执行任务。
- 持久化:Celery 支持将任务结果持久化到数据库中,以便后续查询和分析。
- 多种消息传递协议:Celery 支持多种消息传递协议,如 RabbitMQ、Redis 等。
核心模块:
- 任务(Task):任务是 Celery 的基本单位,代表需要异步执行的函数。任务通过
@app.task
装饰器定义。 - 消息代理(Broker):消息代理是一个中间件,用于在客户端和 Worker 之间传递任务消息。常用的消息代理包括 RabbitMQ、Redis、Amazon SQS 等。
- Worker:Worker 是实际执行任务的进程。它从消息代理中获取任务并执行,然后将结果返回给结果后端(如果配置了结果后端)。
- 客户端(Client):客户端是发送任务到消息代理的部分,通常是你的应用程序代码。它调用任务并将其发送到消息代理。
- 结果后端(Result Backend):结果后端用于存储任务的执行结果,便于后续查询。常用的结果后端包括 Redis、数据库(如 PostgreSQL、MySQL)、MongoDB 等。
- Beat Scheduler:这是一个定时调度器,用于定期发送周期性任务到消息代理。它可以按照预定的时间间隔或特定时间点调度任务。
组成架构
Celery 的架构可以简化为三大核心组件:消息中间件(Message Broker)、任务执行单元(Worker)和任务执行结果存储(Task Result Store)
1. 消息中间件(Message Broker)
功能:
- 作为客户端和 Worker 之间的通信桥梁。
- 接收来自客户端的任务消息,并将其分发给可用的 Worker。
常用实现:
- RabbitMQ:高性能、可靠性强,支持复杂路由规则。
- Redis:轻量级、速度快,适合小规模应用。
- Amazon SQS:托管服务,无需自行维护服务器。
2. 任务执行单元(Worker)
功能:
- 从消息中间件获取待处理的任务。
- 执行实际的业务逻辑,即运行被装饰为 Celery 任务的函数。
- 将执行结果发送到结果存储系统。
启动方式:
使用命令启动 Worker,例如:
celery -A tasks worker --loglevel=info
3. 任务执行结果存储(Task Result Store)
功能:
- 存储每个已完成任务的结果,以便后续查询或处理。
常用实现:
- Redis
- 数据库系统 (如 PostgreSQL, MySQL)
- MongoDB
入门
安装celery
pip install celery redis
或
pip3 install celery redis
定义和装饰任务
在代码中定义 Celery 应用和需要异步执行的任务函数。例如创建celery_task.py
:
from celery import Celery
# broker 为消息中间件配置,这里用的是redis
# backend 为任务执行结果存储,也用的是redis
app = Celery('celery_task', broker='redis://localhost:32769/0', backend='redis://localhost:32769/1')
# 通过装饰器指定任务执行单元,即消息接受后的处理函数
@app.task
def add(x, y):
return f'{x} 和 {y} 的和为 {x + y}'
# 启动celery,准备好接收消息,一旦接收到消息就执行任务,并存储结果
if __name__ == '__main__':
app.worker_main(['worker', '--loglevel=info'])
启动 Worker
启动一个或多个 Worker 进程来处理任务。这些 Worker 会连接到指定的消息代理并等待新任务到达。
方式一:
# 启动celery,准备好接收消息,一旦接收到消息就执行任务,并存储结果
if __name__ == '__main__':
app.worker_main(['worker', '--loglevel=info'])
方式二:
确保你已经安装了celery并且正确配置环境变量
celery -A tasks worker --loglevel=info
启动完成:
发送任务
客户端代码调用定义好的 Celery 任务,并将其发送到消息代理。例如创建celery_add_task
:
from celery_task import add
# 发送任务
result = add.delay(4, 6)
print(result)
# 获取并打印结果(这会阻塞直到返回结果)
print(result.get())
执行结果:
(可选)获取结果
当你调用一个 Celery 任务时,你可以立即获取一个AsyncResult 实例,该实例可以用来检索任务的结果。
from celery_task import add
from celery.result import AsyncResult
import celery_task
"""
使用Task ID创建AsyncResult对象,用于检查状态与获取最终计算出的值。
第一个参数是发送任务时返回的task id
result = add.delay(4, 6)
print(result) # 直接打印出来的就是task_id
"""
# 第一种方式创建
# result = add.AsyncResult("16a6a1e4-9500-43b5-8a88-1058663d44b7")
# 第二种方式创建
result = AsyncResult("16a6a1e4-9500-43b5-8a88-1058663d44b7", app=celery_task.app)
status = result.status
if status == "SUCCESS":
print('执行成功')
elif status == "FAILURE":
print('执行失败')
elif status == "PENDING":
print('任务等待中被执行')
elif status == "RETRY":
print('任务异常后正在重试')
elif status == "STARTED":
print('任务已经开始被执行')
else:
print("未匹配到状态值")
if result.ready():
print(result.result) # 打印最终计算出的值,如果已完成。
else:
print("Task is still running")
注意
celery_task.py
文件名和
代码app = Celery('celery_task', broker='redis://localhost:32769/0', backend='redis://localhost:32769/1')
中的第一个参数名要一致,否则会报错
app端报错日志:
[2024-05-31 16:13:26,538: ERROR/MainProcess] Received unregistered task of type 'celery_task.add'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
https://docs.celeryq.dev/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
b'[[4, 6], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (81b)
The full contents of the message headers:
{'lang': 'py', 'task': 'celery_task.add', 'id': 'c89d27d5-fa1f-4ae2-98c3-7eecc6cc01f6', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'c89d27d5-fa1f-4ae2-98c3-7eecc6cc01f6', 'parent_id': None, 'argsrepr': '(4, 6)', 'kwargsrepr': '{}', 'origin': 'gen13554@fangyirui.local', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}}
The delivery info for this task is:
{'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/worker/consumer/consumer.py", line 659, in on_task_received
strategy = strategies[type_]
KeyError: 'celery_task.add'
客户端报错日志:
c89d27d5-fa1f-4ae2-98c3-7eecc6cc01f6
Traceback (most recent call last):
File "/Users/fangyirui/PycharmProjects/pythonProject/celery/celery_add_task.py", line 8, in <module>
print(result.get())
File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/result.py", line 251, in get
return self.backend.wait_for_pending(
File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/backends/asynchronous.py", line 223, in wait_for_pending
return result.maybe_throw(callback=callback, propagate=propagate)
File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/result.py", line 365, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/result.py", line 358, in throw
self.on_ready.throw(*args, **kwargs)
File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/vine/promises.py", line 235, in throw
reraise(type(exc), exc, tb)
File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/vine/utils.py", line 27, in reraise
raise value
celery.exceptions.NotRegistered: 'celery_task.add'
工作流程
一个任务的生命周期
+-----------------+ +-----------------+ +-----------------+
| Client | ----> | Message Broker| ----> | Worker |
| (Task Producer) | | (e.g., Redis) | | (Task Consumer) |
+-----------------+ +-----------------+ +-----------------+
^ |
| v
+-----------------+ +-----------------+
| Result Backend | <-----------------------------| Task Execution |
| (e.g., Redis) | +-----------------+
+-----------------+
- 定义阶段:使用
@app.task
装饰器定义了一个简单函数add
。 - 创建与发送:调用
add.delay(4, 6)
将请求转换成一条包含操作数和操作类型的信息,并放入Redis队列中。 - 排队阶段:Redis接收到这条信息,将其存储起来等待worker拉取。
- 获取并锁定:运行中的worker从Redis中拉取这条信息,并锁定它以防止其他workers重复执行同一项工作。
- 执行阶段:Worker根据信息内容计算,在此期间,这个信息处于处理中状态。
- 存储阶段:计算完毕后,将结果存储回Redis,以便以后查询。如果没有配置result backend,则跳过这一步骤直接进入下一个步骤。
- 查询阶段:客户端通过调用
result.get()
来阻塞式地等待并获取计算结果。在实际应用场景中,也可能是非阻塞式地检查状态,例如使用result.status
或者轮询机制查看是否完成。 - 清理和过期管理: 根据系统设置,如果不再需要保存这些历史记录,可以由系统自动或者手动清除这些数据。结果默认在redis中86400秒后过期(24H)。