APScheduler、Django实现定时任务,以及任务动态操作

环境:Windows 11、python 3.12.3、Django 4.2.11、 APScheduler 3.10.4

背景:工作需要使用且用法较为复杂,各种功能基本都使用了

事件:20240920

说明:记录,方便后期自己查找

 1、搭建基础环境

文件结构图

蓝色代表文件,黑色代表目录,主要是django自动生成的文件以及apscheduler需要的文件

包括Django、APScheduler两个,代码如下:

新建scheduler文件

创建调度器,并配置启动函数

# scheduleJob\scheduler.py

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from django_apscheduler.jobstores import DjangoJobStore
from pytz import timezone, utc


# 第二种方式,内嵌
jobstores = {"default": DjangoJobStore()}
executors = {"default": ThreadPoolExecutor(20), "processpool": ProcessPoolExecutor(5)}
job_defaults = {                    # 该参数既可以在创建scheduler对象时使用,也可以用在add_job中,对象范围广、优先级低
    'coalesce': True,               # 是否合并积压的任务。如果设置为 True,当任务运行时间落后时,会只运行一次,而不是运行多次。默认值为 False。
    'max_instances': 2,             # 允许的最大作业实例数。确保同一任务在同一时间不会有多个实例运行。默认值为 1。
    'misfire_grace_time': 30,       # 设置任务错过其执行时间的容忍时间(以秒为单位)。如果任务在这个时间内错过了执行时间,将立即执行。如果设置为 None,则没有时间限制。
    'replace_existing': True        # 如果添加的任务ID已存在,是否替换现有任务。默认值为 False。
}
scheduler = BackgroundScheduler(
    jobstores=jobstores, 
    executors=executors, 
    job_defaults=job_defaults, 
    timezone=timezone('Asia/Shanghai')
    )

def start():
    scheduler.start()

 在settings文件中添加应用并配置数据库

# scheduleJob\settings.py

INSTALLED_APPS = [
    # ...
    'django_apscheduler',
    'testapscheduler',
]
ROOT_URLCONF = 'scheduleJob.urls'
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': "scheduler",
        'USER': "root",
        'PASSWORD': "123456",
        'HOST': "localhost",
        'PORT': 3306,
    }
}
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True
USE_TZ = False

 在apps中实现启动调度器

# testapscheduler\apps.py

from django.apps import AppConfig


class TestapschedulerConfig(AppConfig):
    default_auto_field = 'django.db.models.BigAutoField'
    name = 'testapscheduler'
    print("启动")

    def ready(self):
        from scheduleJob import scheduler
        scheduler.start()

 在url中配置路由

# scheduleJob\urls.py

from django.contrib import admin
from django.urls import path
from testapscheduler.views import operate_task


urlpatterns = [
    path('admin/', admin.site.urls),
    path('operate_task/', operate_task),
]

 在views文件中实现真正的逻辑处理

# testapscheduler\views.py

from scheduleJob.scheduler import scheduler
from django.http import HttpResponse
from datetime import datetime
from django.views.decorators.csrf import csrf_exempt

import time, json

# Create your views here.
@csrf_exempt

def operate_task(request):
    # 动态任务id, 利用时间戳获取整数,如一秒内添加两个,则会出现bug
    p = request.POST.get("id")
    print(p, "============")

    my_task_id = int(datetime.timestamp(datetime.now()))
    # job参数
    job_kwargs = {
        "func":excute_task,                     # job的函数
        "id":str(my_task_id),                   # 添加的jobid, 必须是字符串
        "name":f"task_{my_task_id}",            # 添加的job名称
        "kwargs":{"info":"test"},               # job的函数参数,本例中excute_task需要的参数
        "next_run_time": datetime.now(),        # 添加任务成功后立即执行
        "replace_existing": True,               
        "misfire_grace_time": 10,               
        "coalesce": True,                       
        "max_instances": 10,                    
        "trigger": "interval",                  # 任务执行类型,还有date、cron
        "seconds": 10,                          # 任务执行间隔时间,代表每10s执行一次
    }
    scheduler.add_job(**job_kwargs)
    # scheduler.remove_all_jobs()
    return HttpResponse("add task success")

def excute_task(info):
    time.sleep(3)

    print(info, "--------------------------------", datetime.now())

数据库迁移 

运行前,先执行数据库迁移

python manage.py makemigrations

python manage.py migrate

# 前者是将model层转为迁移文件migration
# 后者将新版本的迁移文件执行,更新数据库。

 会在数据库生成两个表,引用方法如下

from django_apscheduler.models import DjangoJob, DjangoJobExecution

django_apscheduler_djangojob:对应Django中的DjangoJob,共计三个字段,分别为id、next_run_time、job_state,默认排序字段为next_run_time

django_apscheduler_djangojobexecution:对应Django中的DjangoJobExecution,共计八个字段,分别是id、status、run_time、duration、finished、exception、traceback、job_id。

postman请求测试

在postman中请求路由,代码如下:

# postman生成的请求代码

import requests
import json

url = "localhost:8000/operate_task/"

payload = json.dumps({
  "action": "start",
  "id": 1
})
headers = {
  'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

实现效果

test -------------------------------- 2024-09-20 15:13:58.165250
test -------------------------------- 2024-09-20 15:13:58.165250
test -------------------------------- 2024-09-20 15:14:04.469140
test -------------------------------- 2024-09-20 15:14:04.469140

任务状态

#: constant indicating a scheduler's stopped state
STATE_STOPPED = 0
#: constant indicating a scheduler's running state (started and processing jobs)
STATE_RUNNING = 1
#: constant indicating a scheduler's paused state (started but not processing jobs)
STATE_PAUSED = 2

2、调度器动态操作

1、查询所有任务

# testapscheduler\views.py

# ......

@csrf_exempt
def query_all_task(request):
    # 查看所有任务
    job_list = scheduler.get_jobs()
    return JsonResponse([{x.name:x.id} for x in job_list], safe=False)

响应

[
    {
        "task_1726812545": "1726812545"
    },
    {
        "task_1726816011": "1726816011"
    }
]

2、查询某个任务

@csrf_exempt
def get_job(request):
    # 查询任务是否存在
    job_id = loads(request.body).get("id")
    msg = scheduler.get_job(job_id=job_id)
    print(msg)
    return JsonResponse({"msg":"success"})

 3、移除所有任务

@csrf_exempt
def remove_all_jobs(request):
    # 移除所有任务, 事件代码是256
    scheduler.remove_all_jobs()
    return JsonResponse({"msg":"success"})

4、移除某个任务

@csrf_exempt
def remove_job(request):
    # 移除某个任务, 事件代码是1024
    job_id = loads(request.body).get("id")
    scheduler.remove_job(job_id=job_id)
    return JsonResponse({"msg":"success"})

5、暂停某个任务

@csrf_exempt
def pause_job(request):
    # 暂停某个任务, 事件代码是2048
    job_id = loads(request.body).get("id")
    scheduler.pause_job(job_id=job_id)
    return JsonResponse({"msg":"success"})

6、恢复某个任务

@csrf_exempt
def resume_job(request):
    # 恢复某个任务,仅能恢复已暂停的任务, 事件代码是2048
    job_id = loads(request.body).get("id")
    scheduler.resume_job(job_id=job_id)
    return JsonResponse({"msg":"success"})

7、添加某个任务

@csrf_exempt
def add_job(request):
    # 添加任务, 事件代码是512
    kwargs = loads(request.body)
    scheduler.add_job(**kwargs)
    return JsonResponse({"msg":"success"})

8、修改某个任务

@csrf_exempt
def modify_job(request):
    # 恢复某个任务,仅能恢复已暂停的任务, 事件代码是2048
    job_id = loads(request.body).get("id")
    changes = loads(request.body).get("changes")
    scheduler.modify_job(job_id=job_id, changes=changes)
    return JsonResponse({"msg":"success"})

9、打印所有任务信息

@csrf_exempt
def print_jobs(request):
    # 打印所有任务信息
    scheduler.print_jobs()
    return JsonResponse({"msg":"success"})

10、启动调度器

@csrf_exempt
def start(request):
    # 调度程序启动, 事件代码是1
    scheduler.start()
    return JsonResponse({"msg":"success"})

11、关闭调度器

@csrf_exempt
def shutdown(request):
    # 调度程序关闭, 事件代码是2
    scheduler.shutdown()
    return JsonResponse({"msg":"success"})

12、暂停调度器

@csrf_exempt
def pause(request):
    # 调度程序暂停, 事件代码是4
    scheduler.pause()
    return JsonResponse({"msg":"success"})

13、恢复调度器

@csrf_exempt
def resume(request):
    # 调度程序恢复, 事件代码是8
    scheduler.resume()
    return JsonResponse({"msg":"success"})

14、添加执行器

@csrf_exempt
def add_executor(request):
    # 调度程序添加执行器, 事件代码是16
    executors = {"default": ThreadPoolExecutor(20), "processpool": ProcessPoolExecutor(5)}
    scheduler.add_executor(executor=executors)

15、删除执行器

@csrf_exempt
def remove_executor(request):
    # 调度程序删除执行器, 事件代码是32
    alias = loads(request.body).get("alias")
    scheduler.remove_executor(alias=alias)
    return JsonResponse({"msg":"success"})

16、添加作业存储器

@csrf_exempt
def add_jobstore(request):
    # 调度程序添加作业存储器, 事件代码是64
    jobstores = {"default": DjangoJobStore()}
    scheduler.add_jobstore(jobstores=jobstores)

17、删除作业存储器

@csrf_exempt
def remove_jobstore(request):
    # 调度程序删除作业存储器, 事件代码是128
    alias = loads(request.body).get("alias")
    scheduler.remove_jobstore(alias=alias)
    return JsonResponse({"msg":"success"})

18、修改触发器参数

@csrf_exempt
def reschedule_job(request):
    # 调度程序修改触发器参数
    job_id = loads(request.body).get("job_id")
    trigger_args = loads(request.body).get("trigger_args")
    scheduler.reschedule_job(trigger_args=trigger_args, job_id=job_id)
    return JsonResponse({"msg":"success"})

 未完待续 ······

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/883856.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

node.js从入门到快速开发一个简易的web服务器

浏览器中JavaScript学习路径: JavaScript基础语法浏览器内置API(DOMBOM)第三方库(jQuery,art-template等) Node.js的学习路径 JavaScript基础语法Node.js内置API模块(fs、path、http等)第三方API模块(express、mysql等) Node.js安装 通过Node.js 来运行Javascript 代码&am…

ElasticSearch的安装与使用

ElasticSearch的安装与使用 docker安装 docker进行安装Elasticsearch 1.拉取镜像 docker pull elasticsearch:7.6.22.创建实例 mkdir -p /docker/elasticsearch/config mkdir -p /docker/elasticsearch/data echo "http.host: 0.0.0.0" >> /docker/elastic…

C语言课程设计题目四:实验设备管理系统设计

序号系统设计题目进度1职工信息管理系统设计已完成,在本专栏2图书信息管理系统设计已完成,在本专栏3图书管理系统设计已完成,在本专栏4实验设备管理系统设计已完成,在本专栏5西文下拉菜单的设计链接6学生信息管理系统设计链接7学生…

c++9月20日

1.思维导图 2.顺序表 头文件 #ifndef RECTANGLE_H #define RECTANGLE_H#include <iostream>using namespace std;using datatype int ;//类型重定义class Seqlist { private://私有权限datatype *ptr; //指向堆区申请空间的起始地址int size;//堆区空间的长度int len …

汽车一键启动开关

‌ 一键启动点火开关是汽车上的一个重要功能&#xff0c;它替代了传统的机械钥匙&#xff0c;实现了简约的打火和熄火操作‌。移动管家一键启动点火开关的详细介绍&#xff1a; 汽车一键启动按钮12V24V通用超薄型汽车一键启动按键发动机启动按钮点火开关。超薄&#xff0c;…

软件测试学习笔记丨curl命令发送请求

本文转自测试人社区&#xff0c;原文链接&#xff1a;https://ceshiren.com/t/topic/32332 一、简介 cURL是一个通过URL传输数据的&#xff0c;功能强大的命令行工具。cURL可以与Chrome Devtool工具配合使用&#xff0c;把浏览器发送的真实请求还原出来&#xff0c;附带认证信…

嵌入式项目:STM32平衡车详解 (基础知识篇) (基于STM32F103C8T6)

前言&#xff1a; 本文是基于B站草履虫编写的平衡车相关内容&#xff0c;包括模块和基础知识&#xff0c;结合代码进行讲解&#xff0c;将知识进行汇总 &#xff08;由于本篇内容较长&#xff0c;请结合目录使用) 注&#xff1a;基于开源精神&#xff0c;本文仅供学习参考 目…

如何用ChatGPT制作一款手机游戏应用

有没有想过自己做一款手机游戏&#xff0c;并生成apk手机应用呢&#xff1f;有了人工智能&#xff0c;这一切就成为可能。今天&#xff0c;我们就使用ChatGPT来创建一个简单的井字棋游戏&#xff08;Tic-Tac-Toe&#xff09;&#xff0c;其实这个过程非常轻松且高效。 通过Cha…

windows 使用PortAudio 对电脑进行录音

PortAudio 采用回调方式&#xff0c;这样可以一帧一帧的处理 头文件&#xff1a; #ifndef __CAPTURE_AUDIO__ #define __CAPTURE_AUDIO__#include <functional> #include <windows.h> #include "portaudio.h"#define SAMPLE_RATE 44100class CaptureAu…

大数据毕业设计选题推荐-国潮男装微博评论数据分析系统-Hive-Hadoop-Spark

✨作者主页&#xff1a;IT毕设梦工厂✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、PHP、.NET、Node.js、GO、微信小程序、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇…

JavaSE——lombok、juint单元测试、断言

一、lombok的使用 默认jvm不解析第三方注解&#xff0c;需要手动开启 链式调用 二、juint单元测试 下载juint包 public class TestDemo {// 在每一个单元测试方法执行之前执行Beforepublic void before() {// 例如可以在before部分创建IO流System.out.println("befor…

89个H5小游戏源码

下载地址&#xff1a;https://download.csdn.net/download/w2sft/89791650 亲测可用&#xff0c;代码完整&#xff0c;都是htmljs&#xff0c;保存到本地即可。 游戏截图&#xff1a;

【AI创作组】工程方向的硕士研究生学习Matlab的路径

1. MATLAB软件概述 1.1 MATLAB发展历程 MATLAB自20世纪70年代诞生以来,已经经历了多次重要的版本更新和功能扩展。 初始版本:MATLAB的前身只是一个简单的交互式矩阵计算器,由Cleve B. Moler博士在1970年代初期开发,目的是为了方便学生和研究人员使用线性代数软件包LINPAC…

游戏如何对抗改包

游戏改包是指通过逆向分析手段及修改工具&#xff0c;来篡改游戏包内正常的设定和规则的行为&#xff0c;游戏包被篡改后&#xff0c;会被植入/剔除模块进行重打包。 本期图文我们将通过实际案例分析游戏改包的原理&#xff0c;并分享游戏如何应对改包问题。 安卓平台常见的改…

车载软件调试工具系列---Trace32简介UI界面简介

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 屏蔽力是信息过载时代一个人的特殊竞争力,任何消耗你的人和事,多看一眼都是你的不对。非必要不费力证明自己,无利益不试图说服别人,是精神上的节…

基于nodejs+vue的旅游管理系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码 精品专栏&#xff1a;Java精选实战项目…

想要高音质的开放式耳机?看看开放式蓝牙耳机排行榜前列的这些品牌!

​开放式蓝牙耳机现在超流行&#xff0c;不仅年轻人爱用&#xff0c;连不少上了年纪的人也喜欢在公园里散步时戴上。这些耳机无论是听歌、学习、健身还是办公&#xff0c;都能派上用场。到了2024年&#xff0c;想要挑到一款既好用又好听的开放式蓝牙耳机&#xff0c;得好好比较…

springboot+大数据基于数据挖掘的招聘信息可视化大屏系统【内含源码+文档+部署教程】

博主介绍&#xff1a;✌全网粉丝10W,前互联网大厂软件研发、集结硕博英豪成立工作室。专注于计算机相关专业毕业设计项目实战6年之久&#xff0c;选择我们就是选择放心、选择安心毕业✌ &#x1f345;由于篇幅限制&#xff0c;想要获取完整文章或者源码&#xff0c;或者代做&am…

基于gorm.io/sharding分表中间件使用案例

项目背景 项目中需要用到mysql的分表场景&#xff0c;调研了一些常用的分库分表中间件&#xff0c;比如&#xff0c;mycat&#xff0c;小米的Gaea&#xff0c;这两个中间件太重了&#xff0c;学习成本较大&#xff0c;另外mycat不是go写的。我们需要一个轻量级的go版本的分表中…

Android使用OpenCV 4.5.0实现扑克牌识别(源码分享)

一、显示效果展示 二、OpenCV 4.5.0 OpenCV 4.5.0是OpenCV&#xff08;Open Source Computer Vision Library&#xff0c;开源计算机视觉库&#xff09;的一个重要更新版本&#xff0c;该版本在多个方面进行了优化和新增了多项功能。 三、ONNX模型 ONNX&#xff08;Open Neu…