目录
引言
Celery 是什么?
安装 Celery
配置 Celery
创建任务
运行 Celery Worker
调用任务
更多示例
示例 1:发送电子邮件
示例 2:图片处理
示例 3:数据处理
结论
引言
今天我们来分享一个超强的 python 库,celery
Github地址:https://github.com/celery/celery
在现代应用开发中,后台任务处理是一项非常常见的需求。无论是处理大批量的数据,还是发送电子邮件,亦或是执行耗时的计算任务,将这些操作放在后台执行可以显著提升应用的性能和用户体验。Celery 就是这样一个强大的分布式任务队列系统,它能够帮助我们轻松地实现后台任务处理。
在这篇文章中,我们将详细介绍 Celery 的基本概念、安装和配置过程,并通过一些简单的示例代码来展示如何使用 Celery 来处理后台任务。
Celery 是什么?
Celery 是一个简单、灵活且可靠的分布式系统,可以处理大量消息,用于实时操作、调度任务和并发执行等场景。Celery 的核心概念包括任务(task)和队列(queue),任务是你希望异步执行的功能,而队列是存放这些任务的地方。
安装 Celery
在开始使用 Celery 之前,我们需要先安装它。Celery 依赖于一个消息代理(例如 RabbitMQ 或 Redis)来协调任务的分发。在这里,我们选择使用 Redis 作为消息代理。首先,我们需要安装 Celery 和 Redis。
使用以下命令安装 Celery 和 Redis:
pip install celery[redis]
同时,我们需要确保系统中已经安装了 Redis。如果还没有安装 Redis,可以参考 Redis 的官方文档进行安装。
配置 Celery
安装完成后,我们需要配置 Celery。通常情况下,我们会在项目中创建一个专门用于 Celery 配置的文件,例如 celery.py
。
下面是一个简单的 Celery 配置示例:
from celery import Celery
# 创建 Celery 实例
app = Celery('tasks', broker='redis://localhost:6379/0')
# 加载配置
app.conf.update(
result_backend='redis://localhost:6379/0',
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='UTC',
enable_utc=True,
)
在这个示例中,我们创建了一个名为 app
的 Celery 实例,并指定 Redis 作为消息代理和结果存储的后台。我们还配置了任务的序列化格式和时区等参数。
创建任务
有了 Celery 配置后,我们就可以开始创建任务了。任务是 Celery 的核心,它是我们希望异步执行的函数。在这里,我们定义一个简单的任务来演示 Celery 的使用。
创建一个名为 tasks.py
的文件,并添加以下代码:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
在这个示例中,我们定义了一个名为 add
的任务函数,它接受两个参数 x
和 y
,并返回它们的和。通过 @app.task
装饰器,我们将 add
函数声明为一个 Celery 任务。
运行 Celery Worker
要执行 Celery 任务,我们需要启动一个 Celery Worker。Worker 是负责执行任务的进程。使用以下命令启动 Celery Worker:
celery -A tasks worker --loglevel=info
在这个命令中,-A tasks
指定了我们之前创建的 tasks.py
文件,而 --loglevel=info
则设置了日志级别。
调用任务
现在,我们已经配置好了 Celery 并启动了 Worker,接下来我们就可以调用任务了。我们可以通过 Celery 的 delay
方法来异步调用任务。
创建一个新的 Python 文件(例如 main.py
),并添加以下代码:
from tasks import add
result = add.delay(4, 6)
print('Task result:', result.get(timeout=10))
在这个示例中,我们使用 add.delay(4, 6)
来异步调用 add
任务,并传入参数 4
和 6
。result.get(timeout=10)
用于获取任务的执行结果,如果在 10 秒内未完成,则抛出异常。
更多示例
除了简单的加法任务,Celery 还可以处理更复杂的任务,例如发送电子邮件、处理文件等。以下是一些更高级的示例:
示例 1:发送电子邮件
假设我们需要在用户注册后发送欢迎邮件,我们可以使用 Celery 来异步处理邮件发送任务。
from celery import Celery
import smtplib
from email.mime.text import MIMEText
app = Celery('email_tasks', broker='redis://localhost:6379/0')
@app.task
def send_welcome_email(email):
msg = MIMEText('Welcome to our service!')
msg['Subject'] = 'Welcome'
msg['From'] = 'noreply@example.com'
msg['To'] = email
with smtplib.SMTP('smtp.example.com') as server:
server.login('user', 'password')
server.sendmail('noreply@example.com', [email], msg.as_string())
在这个示例中,我们定义了一个 send_welcome_email
任务,用于发送欢迎邮件。通过 Celery,我们可以在用户注册后异步调用这个任务,从而避免阻塞主线程。
示例 2:图片处理
假设我们需要对用户上传的图片进行处理,例如生成缩略图,我们可以使用 Celery 来异步处理这些任务。
from celery import Celery
from PIL import Image
app = Celery('image_tasks', broker='redis://localhost:6379/0')
@app.task
def generate_thumbnail(image_path, thumbnail_path):
with Image.open(image_path) as img:
img.thumbnail((128, 128))
img.save(thumbnail_path, "JPEG")
在这个示例中,我们定义了一个 generate_thumbnail
任务,用于生成图片的缩略图。通过 Celery,我们可以在用户上传图片后异步调用这个任务,从而避免影响用户体验。
示例 3:数据处理
假设我们需要对大批量的数据进行处理,例如对数据库中的记录进行批量更新,我们可以使用 Celery 来异步处理这些任务。
from celery import Celery
import sqlite3
app = Celery('data_tasks', broker='redis://localhost:6379/0')
@app.task
def update_records():
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
cursor.execute("UPDATE records SET status = 'processed' WHERE status = 'pending'")
conn.commit()
conn.close()
在这个示例中,我们定义了一个 update_records
任务,用于批量更新数据库中的记录。通过 Celery,我们可以将这些耗时的操作放在后台处理,从而提升应用的性能。
结论
通过这篇文章,我们初步了解了 Celery 的基本概念和使用方法。Celery 是一个非常强大的工具,它可以帮助我们轻松地实现后台任务处理,从而提升应用的性能和用户体验。希望通过本文的介绍,大家能够对 Celery 有一个初步的认识,并能在实际项目中尝试使用 Celery 来处理后台任务。
在接下来的学习中,大家可以深入了解 Celery 的高级功能,例如任务调度、重试机制等,并尝试在实际项目中应用这些功能。祝大家学习愉快!