基于Celery+Supervisord的异步任务管理方案

一、架构设计背景

1.1 需求场景分析

在Web应用中,当遇到以下场景时需要异步任务处理方案:

  • 高延迟操作(大文件解析/邮件发送/复杂计算)
  • 请求响应解耦(客户端快速响应)
  • 任务队列管理(任务优先级/失败重试)
  • 分布式任务调度(多Worker节点)

1.2 技术选型说明

组件作用版本要求
FastAPI构建高性能API接口>=0.68
Redis消息中间件+结果存储>=5.0
Celery分布式任务队列>=5.2
Supervisord进程监控与管理>=4.2

二、核心实现逻辑

2.1 异步任务处理流程

  1. 客户端上传文件到FastAPI
  2. API生成唯一任务ID并持久化任务信息
  3. 任务进入Redis队列
  4. Celery Worker消费队列任务
  5. 状态更新与结果存储
  6. 客户端轮询获取任务状态

2.2 代码实现优化

2.2.1 增强型FastAPI服务
# 文件校验中间件
def validate_file(file: UploadFile):
    if not file.filename.lower().endswith(('.csv', '.xlsx')):
        raise HTTPException(400, "仅支持CSV/XLSX格式")
    if file.size > 1024*1024*100:  # 100MB限制
        raise HTTPException(413, "文件超过大小限制")
    return file

# 上传接口
@app.post("/upload")
async def upload(file: UploadFile = File(...)):
    validated_file = validate_file(file)
    task_id = f"{uuid.uuid4().hex}_{secure_filename(file.filename)}"
    
    # 异步存储文件
    await file.seek(0)
    content = await file.read()
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, save_upload_file, content, task_id)
    
    task_data = {
        "task_id": task_id,
        "file_path": file_path,
    }         
    r.lpush("task_queue", json.dumps(task_data))
    r.hset(name="task_status", 
           key=task_id, 
           value="pending")
    
    return JSONResponse({
        "code": 200,
        "data": {"task_id": task_id},
        "msg": "任务创建成功"
    })
2.2.2 健壮型Celery Worker
@app.task(
    bind=True,
    max_retries=3,
    soft_time_limit=300,
    autoretry_for=(Exception,),
    retry_backoff=True
)
def process_file_task(self, task_data):
    try:
        logger.info(f"Processing {task_data['task_id']}")
        # 实际业务逻辑
        time.sleep(10)
        r.hset("task_status", task_data["task_id"], "completed")
    except Exception as exc:
        self.retry(exc=exc, countdown=2 ** self.request.retries) 
2.2.3 轮询任务队列
r = redis.Redis(host="localhost", port=6379, db=0,password="123456")

def main():
    logger.info("任务轮询启动,正在轮询 Redis...")
    while True:
        task_data = r.lpop("task_queue")
        if task_data:
            data = json.loads(task_data)  
            logger.info("轮询到任务ID:"+data["task_id"])
            r.hset("task_status", data["task_id"], "processing")               
            save_file_to_disk.delay(data)
            logger.info("已在后台执行,继续轮询")
        time.sleep(3)  

if __name__ == "__main__":
    main()

三、生产级Supervisord配置

3.1 配置文件

[supervisord]
logfile=/var/log/supervisord.log
logfile_maxbytes=50MB
logfile_backups=10
loglevel=info
pidfile=/tmp/supervisord.pid
nodaemon=false

[program:fastapi]
command=uvicorn main:app --host 0.0.0.0 --port 8000
directory=/opt/app
autostart=true
autorestart=unexpected
startsecs=5
stopwaitsecs=30
user=www-data
environment=PYTHONPATH="/opt/app"

[program:celery_worker]
command=celery -A worker.celery_app worker --concurrency=4 -O fair
directory=/opt/app
autostart=true
autorestart=true
stdout_logfile=/var/log/celery_worker.log
redirect_stderr=true
killasgroup=true
stopasgroup=true

3.2 关键配置说明

  • 进程分组管理:killasgroup/stopasgroup确保子进程被正确回收
  • 日志轮转:logfile_maxbytes和logfile_backups防止日志膨胀
  • 资源限制:通过concurrency参数控制Worker并发数
  • 环境隔离:指定运行用户和Python路径

一、总结

在Web应用开发中,为了应对诸如处理大文件上传、发送邮件、执行复杂计算等耗时操作,以及实现请求响应解耦和分布式任务调度的需求,我们通常需要采用异步任务处理方案。本文介绍了一种基于FastAPI、Redis、Celery和Supervisord构建的高效异步任务处理架构。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/982430.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[MySQL初阶]MySQL(5)内置函数详解

标题:[MySQL初阶]MySQL(5)内置函数详解 水墨不写bug 文章目录 一、日期函数1. current_date()2. current_time()3. current_timestamp()4. date(datetime)5. date_add(date, interval expr unit)6. date_sub(date, interval expr unit)7. dat…

【MySQL】事务(隔离性、MVCC)

文章目录 1. 事务的概念2. 事务的提交方式3. 事务常见操作4. 隔离性4.1 隔离级别4.2 查看与设置隔离性4.3 隔离级别的测试 5. 隔离性的原理5.1 MVCC5.1.1 3个隐藏字段5.1.2 undo日志5.1.3 模拟MVCC 5.2 Read view5.3 RR与RC的本质区别 1. 事务的概念 在之前所有的SQL操作中&am…

单细胞分析(22)——高效使用 Cell Ranger:安装、参数解析及 Linux 后台运行指南

高效使用 Cell Ranger:安装、参数解析及 Linux 后台运行指南 背景介绍 Cell Ranger 是 10x Genomics 开发的一套用于单细胞转录组测序数据处理的软件。它可以对 10x Genomics 平台生成的 FASTQ 文件进行对齐、UMI 计数和基因表达量计算,是单细胞 RNA-se…

IEEE paper submission

author guideline IEEE 文章模板:https://template-selector.ieee.org/ 1)Manuscripts that exceed eight pages will incur mandatory over-length page charges. (超过 8 页强制收费 $175/page) 2)Authors are invited to submit manus…

NET431-C协议网关:跨网段·零编程PLC工业通信终极方案

系统框架图解析 三层架构,一图读懂: 设备层: 4个网口2个网段:直连AB、西门子、三菱等18台PLC,覆盖4个网段(如10.1.1.0/24、192.168.2.0/24),协议转换。5路RS485串口:通过…

nvm 让 Node.js 版本切换更灵活

有很多小伙伴前端开发进程中,我们常常会遇到不同项目依赖不同版本 Node.js 的情况。我们不可能去卸载重新安装适应的版本去安装依赖或者启动项目。为了避免版本冲突带来的一系列麻烦,在这里给大家推荐一款Node.js 版本管理工具——nvm(Node V…

豆包大模型 MarsCode AI 刷题专栏 001

001.找单独的数 难度:易 问题描述 在一个班级中,每位同学都拿到了一张卡片,上面有一个整数。有趣的是,除了一个数字之外,所有的数字都恰好出现了两次。现在需要你帮助班长小C快速找到那个拿了独特数字卡片的同学手上…

迭代器模式:遍历集合的艺术

文章目录 什么是迭代器模式?现实中的例子迭代器模式的结构代码示例1. 定义Iterator接口2. 实现ConcreteIterator3. 定义Aggregate接口4. 实现ConcreteAggregate5. 客户端代码输出结果 迭代器模式的优缺点优点缺点 总结 在软件开发中,我们经常需要遍历集合…

通用文件模型

一、通用文件模型 通常一个完整的Linux系统有数千到数百万个文件组成,文件中存储了程序、数据和各种信息。层次化的目录结构用于对文件进行编排和分组。 1.ReiserFS(新型的文件系统) -->Reiser4 它通过一种与众不同的方式----完全平衡树来容纳数据,包…

DeepSeek + 飞书多维表格搭建你的高效工作流

众所周知,大模型DeepSeek擅长于处理大规模语言模型推理任务,特别是在成本降低和思维链推理方面表现出色‌,我们一般把大模型必做我们的大脑,但是一个人不能只有大脑,还需要其他输入输出以及操作支配的眼耳鼻嘴手足等。…

微服务架构下的 Node.js

Node.js 在微服务架构中的特点 轻量级和高效性 Node.js 以其轻量级和高效的特点,非常适合构建微服务架构。它具有事件驱动和非阻塞 I/O 模型,能够在处理高并发请求时表现出色。这意味着 Node.js 可以同时处理大量的并发连接,而不会因为阻塞…

用DeepSeek-R1-Distill-data-110k蒸馏中文数据集 微调Qwen2.5-7B-Instruct!

下载模型与数据 模型下载: huggingface: Qwen/Qwen2.5-7B-Instruct HF MirrorWe’re on a journey to advance and democratize artificial intelligence through open source and open science.https://hf-mirror.com/Qwen/Qwen2.5-7B-Instruct 魔搭&a…

flask-定时任务

文章目录 前言一、APScheduler是什么二、APScheduler 主要功能:三、主要组成部分:四、典型使用场景:五、具体使用1.安装 APScheduler2.假设我们有一个需要五分钟请求一次http接口的任务1.定义一个scheduler.py去专门处理定时2.启动文件处理3.…

Python的Pandas和matplotlib库:让数据可视化贼简单

在数据爆炸的时代,数据可视化已成为数据分析的关键环节。Python 作为强大的编程语言,拥有众多用于数据可视化的库,而 pandas 库在其中扮演着重要角色。它不仅能高效处理和分析数据,还具备强大的数据可视化功能,让我们轻…

rabbitmq版本升级并部署高可用

RabbitMQ版本升级 先检查是否已经安装rabbitmq rpm -qa|grep rabbitmq|wc -l //如果结果是0,表示没有安装 rpm -e --nodeps $(rpm -qa|grep rabbitmq) //如安装了,则进行卸载 先检查是否已经安装erlang rpm -qa|grep erlang|wc -l //如果结果…

Electron-Forge + Vue3 项目初始化

本人对Electron的浅薄理解如下图所示 由上图可以,如果你需要开发一个electron应用,你得具备基本的前端开发经验。对于electron相关的知识,建议先了解下基本的窗口操作,比如新建窗口、关闭窗口等简单的操作,这些内容在…

pinginfoview网络诊断工具中文版

介绍 pinginfoview中文版本是一款实用的网络诊断工具,它专为中文用户设计,提供了方便易用的界面,使得在Windows环境下进行ping测试变得更加简单。该工具是由NirSoft开发的一款免费的桌面应用程序,尽管官方可能并未正式发布中文版…

DeepSeek R1 + 飞书机器人实现AI智能助手

效果 TFChat项目地址 https://github.com/fish2018/TFChat 腾讯大模型知识引擎用的是DeepSeek R1,项目为sanic和redis实现,利用httpx异步处理流式响应,同时使用buffer来避免频繁调用飞书接口更新卡片的网络耗时。为了进一步减少网络IO消耗&…

Go学习笔记:基础语法3

1. 常量 Go语言中的常量使用关键字const定义,用于存储不会改变的数据,常量是在编译时被创建的,即使定义在函数内部也是如此,并且只能是布尔型、数字型(整数型、浮点型和复数)和字符串型。 由于编译时的限…

010---基于Verilog HDL的分频器设计

文章目录 摘要一、时序图二、程序设计2.1 rtl2.2 tb 三、仿真分析四、实用性 摘要 文章为学习记录。绘制时序图,编码。通过修改分频值参数,实现一定范围分频值内的任意分频器设计。 一、时序图 二、程序设计 2.1 rtl module divider #(parameter D…