django-celery-beat自动调度异步任务

        Celery是一个简单、灵活且可靠的分布式系统,专门用于处理大量消息的实时任务调度。它支持使用任务队列的方式在分布的机器、进程、线程上执行任务调度。Celery不仅支持异步任务(如发送邮件、文件上传、图像处理等耗时操作),还支持定时任务,即需要在特定时间执行的任务。Celery本身不提供消息服务,需要借助RabbitMQ、Redis等消息中间件,本案例使用的是Redis。

        Celery Beat则是Celery的一个组件,专门用于处理定时任务调度。它包含一个调度器,负责根据配置的时间表计划任务的执行。这些任务通常是Celery任务,即异步执行的函数或方法。Celery Beat将计划的任务发送到Celery任务队列,由Celery Worker处理并执行队列中的任务。此外,Celery Beat还支持任务的持久性,即使在系统重启后也能够保持已计划的周期性任务。

开发环境:Python3 + MySQL + Redis  + PyCharm专业版

一、创建Django项目

参考 Python框架Django入门教程-CSDN博客 前三步

二、安装celery、mysql、redis等依赖包

eventlet 是一个python协程模板,celery 4版本以上在windows环境进行测试需要安装此依赖

django_celery_results 是任务执行结果的依赖

mysqlclient
redis
celery
eventlet
django-celery-beat
django_celery_results

三、初始化Celery数据库

打开PyCharm的终端,执行以下命令

python manage.py makemigrations
python manage.py migrate

打开数据库查看,执行命令后自动创建了一些表

django_celery_beat_clockedschedule  # 以指定时间执行任务,例如:2024-05-22 09:22:10
django_celery_beat_crontabschedule  # 以crontab格式时间执行任务,某月某天星期几某时某分
django_celery_beat_intervalschedule  # 以间隔时间执行任务,例如:每5秒、每2小时
django_celery_beat_periodictask  # 存储要执行的任务。
django_celery_beat_periodictasks  # 索引和跟踪任务更改状态
django_celery_beat_solarschedule  # 以天文时间执行任务,例如:日出、日落
django_celery_results_chordcounter  # 存储Celery的chord任务的状态
django_celery_results_groupresult  # 存储Celery的group任务的结果
django_celery_results_taskresult  # 存储Celery任务的执行结果

四、配置Celery

修改(注意是修改,不是添加!!!)settings.py,找到 INSTALLED_APPS变量(约31行),将celery注册到django的应用管理中

在settings.py末尾添加celery配置:

CELERY_BROKER_URL = 'redis://localhost:6379/0'  # 使用Redis作为消息代理
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'  # 结果存储也使用Redis
# 配置 celery 定时任务使用的调度器,使用django_celery_beat插件用来动态配置任务
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# 配置celery自动存储任务执行结果
CELERY_RESULT_BACKEND = 'django_celery_results.backends:DatabaseBackend'
CELERY_TIMEZONE = 'Asia/Shanghai'  # 设置时区
# 是否启用UTC
CELERY_ENABLE_UTC = False
# 是否开启时间感知
DJANGO_CELERY_BEAT_TZ_AWARE = False

在settings.py同级目录下创建celery.py,然后添加以下内容:

import os

from celery import Celery
from django.conf import settings

# djangoDemo是项目名,大家根据自己的情况进行替换!!!
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoDemo.settings')
# djangoDemo是项目名,大家根据自己的情况进行替换!!!
app = Celery('djangoDemo')
# 从django的设置中读取配置信息
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现app下的任务
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print(f"Request: {self.request!r}")

五、创建应用模块,进行测试

打开PyCharm终端执行命令,创建应用模块,这里命名为celeryapp,将新的应用模块注册到django的应用管理中

python manage.py startapp celeryapp

 

在新的应用模块中创建tasks.py,添加异步函数,文件名必须是tasks.py,否则后面启动Celery的时候监听不到

@shared_task
def task_one():
    print("------------------------- 000 <<<")
    # 业务逻辑...
    print("------------------------- 111 <<<")
    return "222"    

在新的应用模块中修改views.py,添加一个测试接口。这里说一个坑:在settings.py中配置了TIME_ZONE = 'Asia/Shanghai' 和  USE_TZ = True 之后,通过datetime.now()获取的是亚洲上海时间,但是把这个时间存到数据库,就会自动减少8小时,变成了UTC时间,就很无语,于是我把USE_TZ的值改为False,发现存到数据库中的时间变成正常的亚洲上海时间。然而Celery定时执行任务的时区是UTC,经过多次测试,配置了CELERY_TIMEZONE等时区相关的配置,发现好像并没什么用,Celery定时执行任务的时区依然是UTC,无奈只能把任务执行的时间减少8小时

import json
from datetime import datetime, timedelta

from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django_celery_beat.models import ClockedSchedule, PeriodicTask

def celery_test(request):
    # 任务名
    task_name = 'celery_test'
    # 任务执行的时间,设置为下一分钟的10秒,celery执行时间的时区是UTC,要保证数据库中的时间是UTC时间,这里获取的是亚洲上海的时间,所以减了8小时
    time = (datetime.now() - timedelta(hours=8) + timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M:10")
    # 创建任务的执行时间
    clock = ClockedSchedule.objects.get_or_create(clocked_time=time)
    # 创建指定时间执行的celery任务
    PeriodicTask.objects.update_or_create(
        name=task_name,  # 任务名,尽量保证唯一性,若该任务已存在,则更新该任务
        task='celeryapp.tasks.task_one',  # 要执行的异步函数的全路径
        defaults={
            'clocked': clock[0],  # 使用clocked在指定时间执行该任务
            'one_off': True,  # 在任务执行完一次后关闭该任务
            'enabled': True,  # 开启任务
            'args': json.dumps([]),  # 参数列表,必须是json格式的数组
        }
    )
    return JsonResponse({'message': '200'})

修改urls.py,在urlpatterns中添加路由

from celeryapp import views

urlpatterns = [
    # ......
    path('celery/test/', views.celery_test),
]

六、启动项目进行测试

先启动Django项目,然后在分别两个终端中执行命令启动Celery和Celery Beat,命令中的djangoDemo是项目名,大家根据自己的情况进行替换

celery -A djangoDemo worker -P eventlet -l info  # 启动worker监听异步任务
celery -A djangoDemo beat -l info  # 启动beat任务调度器

这里的woker已经监听到我们创建的celeryapp.tasks.task_one异步函数了

这里说明beat启动成功了:

浏览器输入请求地址:http://127.0.0.1:8000/celery/test/ ,创建Celery任务

接口请求成功后查看数据库,django_celery_beat_periodictask表新增了一条异步任务,其中的clocked_id字段指向django_celery_beat_clockedschedule表,该表新增了一条任务的执行时间。celery.backend_cleanup是Celery自动创建的,不用管

在到达任务执行时间后观察woker和beat的终端日志

查看beat终端日志,红框第一行是我们发送请求成功创建异步任务之后,CeleryBeat已经检测到数据库中有任务发生变化(CeleryBeat每5秒检测一次,使用debug级日志可查看到);第二行CeleryBeat将一个名为celery_test的任务发送给worker,让woker执行celeryapp.tasks.task_one异步函数,消费该任务

在woker终端的日志中可以看到任务执行的结果:

七、 Celery Beat常用的三种时间控制器

clockedSchedule:指定某个时间执行任务,例如:2024-05-22 09:22:10,对应的表是django_celery_beat_clockedschedule,该表仅有id和clocked_time两个字段

crontabSchedule:指定crontab格式的时间执行任务,某月某天星期几某时某分,与linux的定时任务规则一致,可参考Linux定时任务-CSDN博客,对应的表是django_celery_beat_crontabschedule,该表有7个字段

  `id` 
  `minute` 分钟
  `hour` 小时
  `day_of_week`  星期几
  `day_of_month`  每月的哪些天
  `month_of_year`  每年的哪些月份
  `timezone`  时区

intervalSchedule:间隔指定时间执行任务,对应的表是django_celery_beat_intervalschedule,该表有3个字段id、every(间隔时长)、period(时间单位,可选时、分、秒、微秒、天)

代码示例,clockedSchedule上面已经演示过了,这里只演示另外两种:

# crontabSchedule
def celery_test2(request):
    task_name = 'celery_test2'
    crontab = CrontabSchedule.objects.get_or_create(
        minute='*/1',  # 每1分钟
        hour='*',  # 每小时
        day_of_week='*',  # 一周中的哪几天,*表示每天
        day_of_month='*',  # 月份中的哪一天,*表示每一天
        month_of_year='*',  # 年中的哪一月,*表示每个月
        timezone='Asia/Shanghai'
    )

    PeriodicTask.objects.update_or_create(
        name=task_name,
        task='celeryapp.tasks.task_one',  # Celery任务的全路径
        defaults={
            'crontab': crontab[0],  # 使用crontab格式时间执行该任务
            'one_off': False,  # 在任务执行完一次后关闭该任务
            'enabled': True,  # 开启任务
            'args': json.dumps([]),
        }
    )

    return JsonResponse({'message': '200'})


# intervalSchedule
def celery_test3(request):
    task_name = 'celery_test3'

    interval = IntervalSchedule.objects.get_or_create(
        every=1,  # 间隔时间
        period=IntervalSchedule.MINUTES,  # 周期单位,这里是分钟
    )

    PeriodicTask.objects.update_or_create(
        name=task_name,
        task='celeryapp.tasks.task_one',  # Celery任务的全路径
        defaults={
            'interval': interval[0],  # 使用interval间隔指定时间执行该任务
            'one_off': False,  # 在任务执行完一次后关闭该任务
            'enabled': True,  # 开启任务
            'args': json.dumps([]),
        }
    )

    return JsonResponse({'message': '200'})

其实只要创建不同的时间控制器,然后在创建任务的时候作为参数放进去即可,注意一个任务只能使用一种时间控制器

参考文献:

https://blog.csdn.net/wuwei_201/article/details/129650089

https://blog.51cto.com/u_15703497/6252757

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/651311.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

拥有这几个3dMax插件,科研绘图让我省时又省力!

DNAChain&#xff08;一键生成DNA链&#xff09; 3DMAX一键生成DNA链插件DNAChain&#xff0c;沿着线条路径一键生成DNA链条&#xff0c;你可以用它创建非常有趣的图案和效果。 3dMax不仅在影视动画、建筑室内、环境艺术等领域应用广泛&#xff0c;同样&#xff0c;它在科研绘图…

C++ AI 编程助手

这两年 AI 发展迅猛&#xff0c;作为开发人员&#xff0c;我们总是追求更快、更高效的工作方式&#xff0c;AI 的出现可以说改变了很多人的编程方式。 AI 对我们来说就是一个可靠的编程助手&#xff0c;给我们提供了实时的建议和解决方案&#xff0c;无论是快速修复错误、提升…

解决:java.util.concurrent.RejectedExecutionException

一 发现RejectedExecutionException错误 今天查看服务器的时候发现了一些java.util.concurrent.RejectedExecutionException错误&#xff0c;这个是由于线程池里的线程忙不过来报出的。如下图&#xff1a; 像这种 RejectedExecutionException 错误&#xff0c;表明在Java应…

vue中封装组件实例

本篇是一篇组件封装。因为要经常使用&#xff0c;特此封装并且记录下来&#xff0c;以供参考。 封装组件&#xff1a;封装组件是指将一段具有特定功能的Vue代码&#xff08;包括模板、脚本和样式&#xff09;封装成一个可复用的组件。这个组件可以作为一个独立的单元&#xff…

Java毕业设计 基于springboot vue考勤管理系统

Java毕业设计 基于springboot vue考勤管理系统 SpringBoot 考勤管理系统 功能介绍 员工 登录 个人中心 修改密码 个人信息 员工请假管理 员工出差管理 薪资管理 员工签到管理 公告管理 管理员 登录 个人中心 修改密码 个人信息 员工管理 员工请假管理 员工出差管理 薪资管理…

AIGC002-LoRA让大模型微调更加轻盈方便!

AIGC002-LoRA让大模型微调更加轻盈方便&#xff01; 文章目录 0 论文工作1 论文方法2 效果 0 论文工作 这篇论文名为 LORA: LOW-RANK ADAPTATION OF LARGE LANGUAGE MODELS&#xff0c;作者是 Edward Hu 等人。它提出了一种名为 低秩自适应 (Low-Rank Adaptation, LoRA) 的新方…

离线安装kubernetes

我们很多时候在开发或测试环境中使用的Kubernetes集群基本都是云厂商提供或者说基于有网环境快速搭建的&#xff0c;但是到了客户的生产环境&#xff0c;往往基于安全考虑他们是不允许服务器连接外部网络的&#xff0c;这时我们就不得不在离线环境下完成部署工作。 1、前言 1…

DDR、LPDDR和GDDR的区别

1、概况 以DDR开头的内存适用于服务器、云计算、网络、笔记本电脑、台式机和消费类应用&#xff0c;支持更宽的通道宽度、更高的密度和不同的形状尺寸。 以LPDDR开头的内存适合面向移动和汽车这些对规格和功耗非常敏感的领域&#xff0c;提供更窄的通道宽度和多种低功耗运行状态…

Revit的特性 - 族类型和族实例、联动更新

Revit 模型的表示方式 Revit 是 Autodesk 推出的一款建筑建模软件&#xff0c;主要应用于建筑信息模型&#xff08;Building Information Modeling&#xff0c;简称BIM&#xff09;领域。Revit发布至今已经超过20年&#xff0c;他的核心理念是以族的概念来表达建筑模型。 在Re…

su模型转3d模型不够平滑怎么办?---模大狮

当将SU模型转换为3D模型时&#xff0c;可能会遇到模型不够平滑的情况&#xff0c;这会影响到最终的渲染效果和视觉体验。本文将探讨在此情况下应该如何解决&#xff0c;帮助读者更好地处理这一常见的问题。 一、检查SU模型细分程度 首先要检查的是原始的SU模型的细分程度。在S…

【vue-2】v-on、v-show、v-if及按键修饰符

目录 1、v-on事件 2、按键修饰符 3、显示和隐藏v-show 4、条件渲染v-if 1、v-on事件 创建button按钮有以下两种方式&#xff1a; <button v-on:click"edit">修改</button> <button click"edit">修改</button> 完整示例代码…

阿里云物联网平台python ADK 发布/订阅

基础知识学习参考&#xff1a; 1、使用消息通讯Topic 2、python link SDK 一、环境变量配置 1、python3.6&#xff1a;下载安装 2、安装paho-mqtt 1.4.0版本 pip install paho-mqtt1.4.03、安装安装Link SDK最新版本 pip install aliyun-iot-linkkit 4、下载python ADK…

自定义函数python:深入解析与实操

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、引言&#xff1a;函数的命名与规范 二、函数命名&#xff1a;遵循规范&#xff0c;易于…

Linux 批量网络远程PXE

一、搭建PXE远程安装服务器 1、yum -y install tftp-server xinetd #安装tftp服务 2、修改vim /etc/xinetd.d/tftpTFTP服务的配置文件 systemctl start tftp systemctl start xinetd 3、yum -y install dhcp #---安装服务 cp /usr/share/doc/dhc…

springboot 集成 es--未完结

基于es7.10.x版本 一、前提知识 常见的两种方式&#xff1a;spring boot提供的API 和 ES 官方提供的API ES官方&#xff1a; RestHighLevelClient&#xff1a; 适用于复杂、更细粒度控制的Elasticsearch 操作 spring boot&#xff1a; ElasticsearchRestTemplate&#xff1a…

arXiv AI 综述列表(2024.05.20~2024.05.24)

公众号&#xff1a;EDPJ&#xff08;进 Q 交流群&#xff1a;922230617 或加 VX&#xff1a;CV_EDPJ 进 V 交流群&#xff09; 每周末更新&#xff0c;完整版进群获取。 Q 群在群文件&#xff0c;VX 群每周末更新。 目录 1. Beyond Traditional Single Object Tracking: A …

OSPF减少LSA更新量1

OSPF的LSA优化 一、汇总——优化骨干区域 (1)域间汇总ABR设备基于某个区域的1/2类LSA计算所得的最佳路由&#xff0c;共享给其他区域时&#xff0c;进行汇总传递。 [r2]ospf 1 [r2-ospf-1]area 1——明细路由所在区域&#xff0c;该ABR设备必须和明细路由在同一区域 [r2-ospf…

flink程序本地运行报: A JNI error has occurred和java.lang.NoClassDefFoundError

1.问题描述 在idea中运行flink job程序出现如下错误&#xff1a; Error: A JNI error has occurred, please check your installation and try again Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/common/io/FileInputFormat …

再见PS,Canva Create正式上线

再见&#xff0c;Photoshop&#xff01; Canva Create 正式上线&#xff0c;太疯狂了&#xff01;&#xff01; Canva是一款著名的免费在线AI图像生成器 构想你的创意&#xff0c;然后将其添加到你的设计中。使用最佳的AI图像生成器&#xff0c;观察你的文字和短语变换成美丽…

关于搜索引擎链路

一、搜索引擎的的链路 简单流程如下&#xff0c;一般都包括query理解&#xff0c;召回&#xff0c;粗排&#xff0c;精排&#xff0c;重排。 二、query理解&#xff0c;查询词处理 对于进来的query需要有很多道工序做处理。才能让搜索引擎的效果更好、更智能。 2.1 分词 分词…