Celery,一个实时处理的 Python 分布式系统

大家好!我是爱摸鱼的小鸿,关注我,收看每期的编程干货。

一个简单的库,也许能够开启我们的智慧之门,
一个普通的方法,也许能在危急时刻挽救我们于水深火热,
一个新颖的思维方式,也许能激发我们无尽的创造力,
一个独特的技巧,也许能成为我们的隐形盾牌……


神奇的 Python 库之旅,第 9

目录

    • 一、什么是 Celery?
    • 二、为什么选择 Celery?
    • 三、Celery 编程示例
    • 四、总结
    • 五、作者Info

一、什么是 Celery?

Celery 是一个强大的工具,它能够帮助我们管理和调度复杂的任务。无论是处理异步任务、计划任务,还是分布式任务,Celery 都能轻松胜任。在这篇文章中,我们将深入探讨 Celery 的魅力,通过多个代码示例,带你全面了解这个神器。

Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,并提供了维护这些任务的工具。它的使用场景非常广泛,比如电子邮件发送、视频处理、数据分析、Web 爬虫等。

Github 项目地址:

https://github.com/celery/celery

在这里插入图片描述

二、为什么选择 Celery?

选择 Celery 的理由有很多,这里列出几个主要的优势:

  • 异步任务处理:能让你在不阻塞主进程的情况下处理任务;
  • 定时任务:支持类似 cron 的定时任务调度;
  • 分布式执行:支持将任务分发到多个机器上执行,提升系统性能;
  • 高可用性:可以与消息队列(如 RabbitMQ, Redis)结合,实现高可用性和可靠性。

在这里插入图片描述

在开始之前,我们需要安装 Celery 和一个消息代理(这里我们选择 Redis):

pip install celery redis


三、Celery 编程示例

快速开始:一个简单的任务
让我们从一个简单的例子开始,创建一个任务来演示 Celery 的基本用法。

# tasks.py
from celery import Celery

# 创建 Celery 实例
app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

在这个例子中,我们定义了一个名为 add 的任务,它接收两个参数并返回它们的和。接下来,我们可以启动一个 Celery worker 来处理这个任务:

celery -A tasks worker --loglevel=info

启动 Celery worker 后,我们可以在 Python 交互式环境中调用这个任务:

>>> from tasks import add
>>> result = add.delay(4, 6)
>>> result.get(timeout=10)
10


深入挖掘:任务链和组
Celery 的强大之处在于它能够处理复杂的工作流,比如任务链和任务组。

任务链
任务链允许我们将多个任务串联起来,前一个任务的输出作为下一个任务的输入:

from celery import chain

# 定义任务
@app.task
def multiply(x, y):
    return x * y

@app.task
def add_and_multiply(x, y, z):
    return chain(add.s(x, y), multiply.s(z))()

# 调用任务链
result = add_and_multiply(2, 3, 4)
print(result.get())  # 输出 20,因为 (2 + 3) * 4 = 20

任务组
任务组允许我们并行执行一组任务,并在所有任务完成后获得结果:

from celery import group

# 定义任务组
@app.task
def sum_list(numbers):
    return sum(numbers)

@app.task
def process_groups():
    return group(sum_list.s([1, 2, 3]), sum_list.s([4, 5, 6]), sum_list.s([7, 8, 9]))().get()

# 调用任务组
result = process_groups()
print(result)  # 输出 [6, 15, 24]



定时任务
Celery 还支持定时任务,这类似于 Unix 系统的 cron 作业:

from celery import Celery
from celery.schedules import crontab

app = Celery('periodic_tasks', broker='redis://localhost:6379/0')

@app.task
def scheduled_task():
    print("This task runs every 10 seconds")

app.conf.beat_schedule = {
    'run-every-10-seconds': {
        'task': 'periodic_tasks.scheduled_task',
        'schedule': 10.0,
    },
}

# 启动 Celery beat 进程
celery -A periodic_tasks beat --loglevel=info



错误处理与重试机制
在实际应用中,任务失败是不可避免的。Celery 提供了优雅的错误处理和重试机制:

@app.task(bind=True, max_retries=3)
def unreliable_task(self):
    try:
        # 可能失败的操作
        risky_operation()
    except Exception as exc:
        # 捕获异常并重试
        raise self.retry(exc=exc, countdown=60)

在这个例子中,unreliable_task 如果失败,会在 60 秒后重试,总共重试 3 次。


使用 Celery 在 Web 应用
Celery 常用于 Web 应用中来处理后台任务。下面是一个简单的 Flask 应用,展示了如何集成 Celery:

from flask import Flask, request, jsonify
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)
    celery.Task = ContextTask
    return celery

celery = make_celery(app)

@celery.task
def long_task():
    import time
    time.sleep(10)
    return 'Task completed!'

@app.route('/start-task', methods=['POST'])
def start_task():
    task = long_task.apply_async()
    return jsonify({}), 202, {'Location': url_for('get_status', task_id=task.id)}

@app.route('/status/<task_id>')
def get_status(task_id):
    task = long_task.AsyncResult(task_id)
    response = {
        'state': task.state,
        'result': task.result,
    }
    return jsonify(response)

if __name__ == '__main__':
    app.run(debug=True)

在这个示例中,我们定义了一个 Flask 应用,并将 Celery 集成到其中。long_task 是一个模拟长时间运行的任务,客户端可以通过 start-task 路由启动任务,并通过 status/<task_id> 路由查询任务状态。

更多功能、详细用法可参考官方文档:

https://docs.celeryq.dev/en/stable

四、总结

Celery 是一个强大的工具,它为我们处理异步任务、计划任务和分布式任务提供了极大的便利。在这篇文章中,我们通过多个代码示例,展示了 Celery 的基本用法、任务链和组、定时任务、错误处理和重试机制,以及如何在 Web 应用中集成 Celery。

如果你在日常工作中需要处理复杂的任务调度和分布式任务,那么 Celery 绝对是一个值得深入学习和使用的工具。希望这篇文章能够帮助你更好地理解和使用 Celery,让你的工作更加高效、顺畅。

试一试吧,Celery 会让你的任务调度变得简单又高效!
在这里插入图片描述

五、作者Info

Author:小鸿的摸鱼日常

Goal:让编程更有趣! 专注于 Web 开发、爬虫,游戏开发,数据分析、自然语言处理,AI 等,期待你的关注,让我们一起成长、一起Coding!

版权说明:本文禁止抄袭、转载,侵权必究!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/787347.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Start LoongArch64 Alpine Linux VM on x86_64

一、Build from source(build on x86_64) Obtain the latest libvirt, virt manager, and QEMU source code, compile and install them 1.1 Build libvirt from source sudo apt-get update sudo apt-get install augeas-tools bash-completion debhelper-compat dh-apparmo…

Python学习笔记33:进阶篇(二十二)pygame的使用之image模块

前言 基础模块的知识通过这么长时间的学习已经有所了解&#xff0c;更加深入的话需要通过完成各种项目&#xff0c;在这个过程中逐渐学习&#xff0c;成长。 我们的下一步目标是完成python crash course中的外星人入侵项目&#xff0c;这是一个2D游戏项目。在这之前&#xff…

Codeforces Round 954 (Div. 3) F. Non-academic Problem

思路&#xff1a;考虑缩点&#xff0c;因为是无向图&#xff0c;所以双连通分量缩完点后是一棵树&#xff0c;我们去枚举删除每一条树边的答案&#xff0c;然后取最小值即可。 #include <bits/stdc.h>using namespace std; const int N 3e5 5; typedef long long ll; …

Profibus转ModbusTCP网关模块实现Profibus_DP向ModbusTCP转换

Profibus和ModbusTCP是工业控制自动化常用的二种通信协议。Profibus是一种串口通信协议&#xff0c;它提供了迅速靠谱的数据传输和各种拓扑结构&#xff0c;如总线和星型构造。Profibus可以和感应器、执行器、PLC等各类设备进行通信。 ModbusTCP是一种基于TCP/IP协议的通信协议…

Clickhouse的联合索引

Clickhouse 有了单独的键索引&#xff0c;为什么还需要有联合索引呢&#xff1f;了解过mysql的兄弟们应该都知道这个事。 对sql比较熟悉的兄弟们估计看见这个联合索引心里大概有点数了&#xff0c;不过clickhouse的联合索引相比mysql的又有些不一样了&#xff0c;mysql 很遵循最…

Springboot各个版本维护时间

Springboot各个版本维护时间

【 正己化人】 把自己做好,能解决所有问题

阳明先生说&#xff1a;与朋友一起辩论学问&#xff0c;纵然有人言辞观点浅近粗疏&#xff0c;或者是炫耀才华、显扬自己&#xff0c;也都不过是毛病发作。只要去对症下药就好&#xff0c;千万不能怀有轻视别人的心理&#xff0c;因为那不是君子与人为善的心。 人会爱发脾气、…

微信服务里底部的不常用功能如何优化的数据分析思路

图片.png 昨天下午茶时光&#xff0c;和闺蜜偶然聊起&#xff0c;其实在微信服务底部&#xff0c;有很多被我们忽略遗忘&#xff0c;很少点过用过的功能服务&#xff0c;往往进入服务只为了收付款或进入钱包&#xff0c;用完就走了&#xff0c;很少拉到底部&#xff0c;看到和用…

Python函数 之 函数基础

print() 在控制台输出 input() 获取控制台输⼊的内容 type() 获取变量的数据类型 len() 获取容器的⻓度 (元素的个数) range() ⽣成⼀个序列[0, n) 以上都是我们学过的函数&#xff0c;函数可以实现⼀个特定的功能。我们将学习⾃⼰如何定义函数, 实现特定的功能。 1.函数是什么…

LiveNVR监控流媒体Onvif/RTSP用户手册-录像计划:批量配置、单个配置、录像保存(天)、配置时间段录像

TOC 1、录像计划 支持单个通道 或是 通道范围内配置支持快速滑选支持录像时间段配置 1.1、录像存储位置如何配置&#xff1f; 2、RTSP/HLS/FLV/RTMP拉流Onvif流媒体服务 支持 Windows Linux 及其它CPU架构&#xff08;国产、嵌入式…&#xff09;操作系统安装包下载 、 安装…

亚马逊跟卖采集选品,2小时自动检索3000条商品数据与...

自动查商标局2个小时2928条数据。 ERP采集3000条数据需要多久&#xff1f;10&#xff1a;34开始的&#xff0c;12&#xff1a;52分&#xff0c;应该是两个小时多。采集3000条数据&#xff0c;2928条&#xff0c;平均每个就是3秒左右。 可以看一下采集出来的数据&#xff0c;打…

【C++知识点总结全系列 (08)】:面向对象编程OOP

这里写目录标题 1、OOP概述(1)面向对象四大特征A.抽象B.封装C.继承D.多态 (2)构造函数A.What&#xff08;什么是构造函数&#xff09;B.Why&#xff08;构造函数的作用&#xff09;C. Which&#xff08;有哪些构造函数&#xff09; (3)析构函数A.What&#xff08;什么是析构函数…

Python基础知识——(003)

文章目录 P12——11. 保留字和标识符 1. 保留字 2. Python标识符的命名规则&#xff08;必须遵守&#xff09; 3. Python标识符的命名规范&#xff08;建议遵守&#xff09; P13——12. 变量与常量 变量的语法结构 变量命名应遵循以下几条规则 常量 P14——13. 数值类型…

数据结构作业/2024/7/9

2>实现双向循环链表的创建、判空、尾插、遍历、尾删、销毁 fun.c #include "head.h" //1.双向循环链表的创建 doubleloop_ptr create_list() …

如何在 PostgreSQL 中确保数据的异地备份安全性?

文章目录 一、备份策略1. 全量备份与增量备份相结合2. 定义合理的备份周期3. 选择合适的备份时间 二、加密备份数据1. 使用 PostgreSQL 的内置加密功能2. 使用第三方加密工具 三、安全的传输方式1. SSH 隧道2. SFTP3. VPN 连接 四、异地存储的安全性1. 云存储服务2. 内部存储设…

Spring Cloud 引入

1.单体架构&#xff1a; 定义&#xff1a;所有的功能实现都打包成一个项目 带来的后果&#xff1a; ①后端服务器的压力越来越大&#xff0c;负载越来越高&#xff0c;甚至出现无法访问的情况 ②业务越来越复杂&#xff0c;为了满足用户的需求&#xff0c;单体应用也会越来越…

C#Modbus通信

目录 1&#xff0c;辅助工具。 2&#xff0c;初识Modbus。 3&#xff0c;基于ModbusRTU的通信。 3.1&#xff0c;RTU与ASCII模式区别 3.2&#xff0c;Modbus存储区 3.3&#xff0c;报文格式 3.4&#xff0c;异常代码 3.5&#xff0c;详细文档连接 。 3.6&#xff0c;代…

数据结构——顺序表【C】

顺序表 1. 顺序表的概念以及结构1.1概念1.2静态顺序表和动态顺序表 2. 顺序表接口模拟实现接口总览2.1 初始化数据和销毁容器 2.2 顺序表的尾插和尾删2.3 头插和头删2.4 任意位置插入和删除数据2.5 查找数据 3. 顺序表的问题 &#xff1a; 1. 顺序表的概念以及结构 1.1概念 顺…

生成多个ssh访问不同git

如果&#xff0c;你的git代码仓库&#xff0c;比如说腾讯云coding&#xff0c;通过ssh秘钥访问&#xff0c;一直用的好好的&#xff0c;有一天&#xff0c;你又增加一个aliyun云效的代码仓库&#xff0c;又配置了aliyun云效的秘钥并且&#xff0c;根据aliyun云效的官方文档上传…

DOM(文档对象模型)生命周期事件

前言 DOM 生命周期事件涉及到从创建、更新到销毁 DOM 元素的不同阶段。 ● 我们来看下当HTML文档加载完再执行JavaScript代码 document.addEventListener(DOMContentLoaded, function (e) {console.log(HTML parsed adn DOM tree built!, e); })● 除此之外&#xff0c;浏览…