每天40分玩转Django:Django Celery

Django Celery

一、知识要点概览表

模块知识点掌握程度要求
Celery基础配置、任务定义、任务执行深入理解
异步任务任务状态、结果存储、错误处理熟练应用
周期任务定时任务、Crontab、任务调度熟练应用
监控管理Flower、任务监控、性能优化理解应用

二、基础配置实现

1. 安装和配置

# requirements.txt
celery==5.3.1
django-celery-results==2.5.1
django-celery-beat==2.5.0
redis==4.6.0

# settings.py
INSTALLED_APPS = [
    ...
    'django_celery_results',
    'django_celery_beat',
]

# Celery配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'

# celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

三、异步任务实现

1. 基本任务定义

# tasks.py
from celery import shared_task
from django.core.mail import send_mail
from .models import User

@shared_task
def send_welcome_email(user_id):
    """发送欢迎邮件"""
    try:
        user = User.objects.get(id=user_id)
        send_mail(
            '欢迎加入我们的平台',
            f'你好 {user.username},欢迎使用我们的服务!',
            'noreply@example.com',
            [user.email],
            fail_silently=False,
        )
        return True
    except Exception as e:
        return str(e)

@shared_task(bind=True, max_retries=3)
def process_payment(self, order_id):
    """处理支付任务"""
    from .models import Order
    try:
        order = Order.objects.get(id=order_id)
        result = process_payment_gateway(order)
        if result.success:
            order.status = 'paid'
            order.save()
            return 'Payment processed successfully'
        else:
            raise ValueError('Payment failed')
    except Exception as exc:
        self.retry(exc=exc, countdown=60*5)  # 5分钟后重试

2. 任务链和组

from celery import chain, group, chord
from .tasks import process_payment, send_notification, update_inventory

def process_order(order):
    # 任务链:按顺序执行任务
    task_chain = chain(
        process_payment.s(order.id),
        update_inventory.s(order.id),
        send_notification.s(order.user.id)
    )
    return task_chain()

def process_bulk_orders(orders):
    # 任务组:并行执行多个任务
    task_group = group(
        process_payment.s(order.id)
        for order in orders
    )
    return task_group()

def process_orders_with_summary(orders):
    # 和弦:并行执行任务后执行回调
    def on_complete(results):
        successful = sum(1 for r in results if r == 'success')
        failed = len(results) - successful
        return f"处理完成:{successful}成功,{failed}失败"
    
    task_chord = chord(
        (process_payment.s(order.id) for order in orders),
        on_complete.s()
    )
    return task_chord()

3. 自定义任务类

from celery import Task
from django.core.cache import cache

class BaseTaskWithRetry(Task):
    abstract = True
    max_retries = 3
    default_retry_delay = 60  # 60秒后重试

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """任务失败时的处理"""
        print(f'Task {task_id} failed: {str(exc)}')
        super().on_failure(exc, task_id, args, kwargs, einfo)

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        """任务重试时的处理"""
        print(f'Task {task_id} retrying: {str(exc)}')
        super().on_retry(exc, task_id, args, kwargs, einfo)

@shared_task(base=BaseTaskWithRetry)
def process_data(data_id):
    try:
        # 处理数据的逻辑
        result = process_complex_data(data_id)
        return result
    except Exception as exc:
        raise self.retry(exc=exc)

四、周期任务实现

1. 基本周期任务

# tasks.py
from celery.schedules import crontab
from celery.task import periodic_task

@periodic_task(run_every=timedelta(hours=24))
def daily_cleanup():
    """每日清理任务"""
    cleanup_expired_tokens()
    cleanup_old_logs()
    return "Daily cleanup completed"

@periodic_task(
    run_every=crontab(hour=0, minute=0),
    name="generate_daily_report"
)
def generate_daily_report():
    """生成每日报告"""
    report = Report.objects.create(
        date=timezone.now(),
        type='daily'
    )
    report.generate()
    return f"Report generated: {report.id}"

2. 动态周期任务

# models.py
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from django.db import models

class ScheduledReport(models.Model):
    name = models.CharField(max_length=100)
    interval = models.IntegerField(help_text='间隔(分钟)')
    enabled = models.BooleanField(default=True)
    
    def save(self, *args, **kwargs):
        super().save(*args, **kwargs)
        self.update_periodic_task()
    
    def update_periodic_task(self):
        schedule, _ = IntervalSchedule.objects.get_or_create(
            every=self.interval,
            period=IntervalSchedule.MINUTES,
        )
        
        PeriodicTask.objects.update_or_create(
            name=f'generate_report_{self.id}',
            defaults={
                'task': 'myapp.tasks.generate_report',
                'interval': schedule,
                'args': [self.id],
                'enabled': self.enabled
            }
        )

3. 任务调度器

from django_celery_beat.models import CrontabSchedule, PeriodicTask
import json

def schedule_report_task(name, hour, minute, day_of_week):
    """创建定时报告任务"""
    schedule, _ = CrontabSchedule.objects.get_or_create(
        hour=hour,
        minute=minute,
        day_of_week=day_of_week,
    )
    
    task = PeriodicTask.objects.create(
        crontab=schedule,
        name=f'generate_report_{name}',
        task='myapp.tasks.generate_report',
        args=json.dumps([name]),
    )
    return task

# 使用示例
schedule_report_task('weekly_summary', hour=0, minute=0, day_of_week='1')  # 每周一

五、监控和管理

1. Flower配置

# requirements.txt
flower==2.0.1

# settings.py
CELERY_FLOWER_USER = 'admin'
CELERY_FLOWER_PASSWORD = 'password'

# 启动Flower
# celery -A myproject flower --port=5555

2. 任务监控中间件

# middleware.py
from celery.signals import task_prerun, task_postrun
import time
import logging

logger = logging.getLogger('celery.tasks')

@task_prerun.connect
def task_prerun_handler(task_id, task, args, kwargs, **kw):
    """任务执行前的处理"""
    task.start_time = time.time()

@task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **kw):
    """任务执行后的处理"""
    if hasattr(task, 'start_time'):
        duration = time.time() - task.start_time
        logger.info(
            f'Task {task.name}[{task_id}] '
            f'completed in {duration:.2f}s with state {state}'
        )

六、Celery工作流程图

在这里插入图片描述

七、实际应用示例

1. 图片处理任务

# tasks.py
from PIL import Image
import os
from celery import shared_task

@shared_task
def process_uploaded_image(image_path, sizes=[(800, 600), (400, 300)]):
    """处理上传的图片"""
    try:
        img = Image.open(image_path)
        filename = os.path.basename(image_path)
        name, ext = os.path.splitext(filename)
        
        results = []
        for size in sizes:
            resized = img.copy()
            resized.thumbnail(size)
            new_filename = f"{name}_{size[0]}x{size[1]}{ext}"
            new_path = os.path.join(os.path.dirname(image_path), new_filename)
            resized.save(new_path)
            results.append(new_path)
        
        return results
    except Exception as e:
        return str(e)

2. 站点监控任务

# tasks.py
import requests
from celery import shared_task
from django.core.mail import mail_admins
from .models import SiteMonitor

@shared_task(bind=True, max_retries=3)
def monitor_website(self, url):
    """监控网站可用性"""
    try:
        response = requests.get(url, timeout=10)
        status = response.status_code == 200
        response_time = response.elapsed.total_seconds()
        
        SiteMonitor.objects.create(
            url=url,
            status=status,
            response_time=response_time
        )
        
        if not status:
            mail_admins(
                f'网站{url}不可用',
                f'状态码:{response.status_code}'
            )
            
        return {
            'url': url,
            'status': status,
            'response_time': response_time
        }
    except Exception as exc:
        self.retry(exc=exc, countdown=60)

八、最佳实践建议

  1. 任务设计原则:

    • 保持任务原子性
    • 实现幂等性
    • 合理设置超时时间
    • 添加适当的重试机制
  2. 性能优化:

    • 使用合适的序列化方式
    • 控制任务粒度
    • 合理设置并发数
    • 监控任务执行情况
  3. 错误处理:

    • 完善的异常捕获
    • 详细的日志记录
    • 合适的重试策略
    • 失败通知机制

这就是关于Django Celery的详细内容,包括异步任务队列和周期任务的实现。通过实践这些内容,你将能够在Django项目中熟练使用Celery处理异步任务。如果有任何问题,欢迎随时提出!


怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/947946.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Web安全扫盲

1、建立网络思维模型的必要 1 . 我们只有知道了通信原理, 才能够清楚的知道数据的交换过程。 2 . 我们只有知道了网络架构, 才能够清楚的、准确的寻找漏洞。 2、局域网的简单通信 局域网的简单通信(数据链路层) 一般局域网都通…

【MATLAB APP Designer】小波阈值去噪(第一期)

代码原理及流程 小波阈值去噪是一种信号处理方法,用于从信号中去除噪声。这种方法基于小波变换,它通过将信号分解到不同的尺度和频率上来实现。其基本原理可以分为以下几个步骤: (1)小波变换:首先对含噪信…

CDP集群安全指南-动态数据加密

[〇]关于本文 集群的动态数据加密主要指的是加密通过网络协议传输的数据,防止数据在传输的过程中被窃取。由于大数据涉及的主机及服务众多。你需要更具集群的实际环境来评估需要为哪些环节实施动态加密。 这里介绍一种通过Cloudera Manager 的Auto-TLS功能来为整个…

信息安全、网络安全和数据安全的区别和联系

1. 前言 有次有朋友问我 信息安全、网络安全和数据安全,这三个词平时写文档时怎么用? 我想很多人都说不清。这次我查阅了资料,尽量讲清楚这三者之间的区别和联系。 2. 信息安全 2.1 定义 信息安全是指为数据处理系统建立和采用的技术和管…

vim 的基础使用

目录 一:vim 介绍二:vim 特点三:vim 配置四:vim 使用1、vim 语法格式2、vim 普通模式(1)保存退出(2)光标跳转(3)文本删除(4)文本查找&…

Unity2022接入Google广告与支付SDK、导出工程到Android Studio使用JDK17进行打包完整流程与过程中的相关错误及处理经验总结

注:因为本人也是第一次接入广告与支付SDK相关的操作,网上也查了很多教程,很多也都是只言片语或者缺少一些关键步骤的说明,导致本人也是花了很多时间与精力踩了很多的坑才搞定,发出来也是希望能帮助到其他人在遇到相似问…

【嵌入式硬件】直流电机驱动相关

项目场景: 驱动履带车(双直流电机)前进、后退、转弯 问题描述 电机驱动MOS管烧毁 电机驱动采用IR2104STRH1R403NL的H桥方案(这是修改之后的图) 原因分析: 1.主要原因是4路PWM没有限幅,修改…

数据库知识汇总1

一. 数据库系统概述 信息需要媒体(文本、图像视频等)表现出来才能被人类所获取,媒体可以转换成比特或者符号,这些称为数据; 数据/信息的特点:爆炸式增长、无限复制、派生; 数据库是指长期长期…

Dubbo扩展点加载机制

加载机制中已经存在的一些关键注解,如SPI、©Adaptive> ©Activateo然后介绍整个加载机制中最核心的ExtensionLoader的工作流程及实现原理。最后介绍扩展中使用的类动态编译的实 现原理。 Java SPI Java 5 中的服务提供商https://docs.oracle.com/jav…

Elasticsearch向量检索需要的数据集以及768维向量生成

Elasticsearch8.17.0在mac上的安装 Kibana8.17.0在mac上的安装 Elasticsearch检索方案之一:使用fromsize实现分页 快速掌握Elasticsearch检索之二:滚动查询(scrool)获取全量数据(golang) Elasticsearch检索之三:官方推荐方案search_after…

网关的主要作用

在网络安全领域,网关扮演着举足轻重的角色,它不仅是网络间的桥梁,更是安全防线的守护者。以下是网关在网络安全中的几个关键作用: 1. 防火墙功能:网关常常集成了防火墙技术,能够对进出网络的数据包进行严格…

【Cocos TypeScript 零基础 4.1】

目录 背景滚动 背景滚动 创建一个 空节点 背景丟进去 ( 复制一个,再丢一次都行) 新建TS脚本 并绑定到 空节点 上 再对TS脚本进行编辑 export class TS2bg extends Component {property (Node) // 通过属性面板去赋值bg1:Node nullproperty (Node) bg2:Node nullprope…

如何利用群晖NAS实现远程访问你的网页版Linux虚拟桌面环境

文章目录 前言1. 下载Docker-Webtop镜像2. 运行Docker-Webtop镜像3. 本地访问网页版Linux系统4. 群晖NAS安装Cpolar工具5. 配置异地访问Linux系统6. 异地远程访问Linux系统7. 固定异地访问的公网地址 前言 今天我要给大家介绍一下如何在群晖NAS设备上部署Docker-Webtop&#x…

MySQL 04 章——运算符

一、算数运算符 算数运算符主要用于数学运算,其可以连接运算符前后的两个数值或表达式 运算符名称作用示例加法运算符计算两个值或表达式的和SELECT AB-减法运算符计算两个值或表达式的差SELECT A-B*乘法运算符计算两个值或表达式的乘积SELECT A*B/或DIV除法运算符…

ES IK分词器插件

前言 ES中默认了许多分词器,但是对中文的支持并不友好,IK分词器是一个专门为中文文本设计的分词工具,它不是ES的内置组件,而是一个需要单独安装和配置的插件。 Ik分词器的下载安装(Winows 版本) 下载地址:…

Oracle DG备库数据文件损坏修复方法(ORA-01578/ORA-01110)

今天负责报表的同事反馈在DG库查询时出现如下报错 ORA-01578:ORACLE数据块损坏(文件号6,块号 2494856)ORA-01110:数据文件6: /oradata/PMSDG/o1 mf users_molczgmn_.dbfORA-26040:数据块是使用 NOLOGGING 选项加载的 可以看到报错是数据文件损坏,提示了file id和b…

idea无法安装插件

目录 修改工具配置 本地安装 无法下载很多时候就是延迟太高导致的,我们先打开插件官网看一下 Python - IntelliJ IDEs Plugin | Marketplace 修改工具配置 1、配置代理(点击 setting-点击 plugins-在点击 http proxy Settings) 输入&…

Linux部署web项目【保姆级别详解,Ubuntu,mysql8.0,tomcat9,jdk8 附有图文】

文章目录 部署项目一.安装jdk1.1 官网下载jdk81.2 上传到Linux1.3 解压1.4 配置环境变量1.5 查看是jdk是否安装成功 二.安装TomCat2.1 官网下载2.2 上传到Linux2.3 解压2.4配置2.5 启动Tomcat2.6 验证是否成功 三.安装mysql四.部署javaweb项目4.1 打包4.2 启动tomcat 部署项目 …

服务器等保测评日志策略配置

操作系统日志 /var/log/message 系统启动后的信息和错误日志,是Red Hat Linux中最常用的日志之一 /var/log/secure 与安全相关的日志信息 /var/log/maillog 与邮件相关的日志信息 /var/log/cron 与定时任务相关的日志信息 /var/log/spooler 与UUCP和news设备相关的…

初学stm32 --- FSMC驱动LCD屏

目录 FSMC简介 FSMC框图介绍 FSMC通信引脚介绍 FSMC_NWE 的作用 FSMC_NWE 的时序关系 FSMC_NOE 的含义 FSMC_NOE 的典型用途 FSMC_NOE 的时序关系 使用FSMC驱动LCD FSMC时序介绍 时序特性中的 OE ILI9341重点时序: FSMC地址映射 HADDR与FSMC_A关系 LCD的…