环境:Windows 11、python 3.12.3、Django 4.2.11、 APScheduler 3.10.4
背景:工作需要使用且用法较为复杂,各种功能基本都使用了
事件:20240920
说明:记录,方便后期自己查找
1、搭建基础环境
文件结构图
蓝色代表文件,黑色代表目录,主要是django自动生成的文件以及apscheduler需要的文件
包括Django、APScheduler两个,代码如下:
新建scheduler文件
创建调度器,并配置启动函数
# scheduleJob\scheduler.py
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from django_apscheduler.jobstores import DjangoJobStore
from pytz import timezone, utc
# 第二种方式,内嵌
jobstores = {"default": DjangoJobStore()}
executors = {"default": ThreadPoolExecutor(20), "processpool": ProcessPoolExecutor(5)}
job_defaults = { # 该参数既可以在创建scheduler对象时使用,也可以用在add_job中,对象范围广、优先级低
'coalesce': True, # 是否合并积压的任务。如果设置为 True,当任务运行时间落后时,会只运行一次,而不是运行多次。默认值为 False。
'max_instances': 2, # 允许的最大作业实例数。确保同一任务在同一时间不会有多个实例运行。默认值为 1。
'misfire_grace_time': 30, # 设置任务错过其执行时间的容忍时间(以秒为单位)。如果任务在这个时间内错过了执行时间,将立即执行。如果设置为 None,则没有时间限制。
'replace_existing': True # 如果添加的任务ID已存在,是否替换现有任务。默认值为 False。
}
scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone=timezone('Asia/Shanghai')
)
def start():
scheduler.start()
在settings文件中添加应用并配置数据库
# scheduleJob\settings.py
INSTALLED_APPS = [
# ...
'django_apscheduler',
'testapscheduler',
]
ROOT_URLCONF = 'scheduleJob.urls'
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': "scheduler",
'USER': "root",
'PASSWORD': "123456",
'HOST': "localhost",
'PORT': 3306,
}
}
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True
USE_TZ = False
在apps中实现启动调度器
# testapscheduler\apps.py
from django.apps import AppConfig
class TestapschedulerConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'testapscheduler'
print("启动")
def ready(self):
from scheduleJob import scheduler
scheduler.start()
在url中配置路由
# scheduleJob\urls.py
from django.contrib import admin
from django.urls import path
from testapscheduler.views import operate_task
urlpatterns = [
path('admin/', admin.site.urls),
path('operate_task/', operate_task),
]
在views文件中实现真正的逻辑处理
# testapscheduler\views.py
from scheduleJob.scheduler import scheduler
from django.http import HttpResponse
from datetime import datetime
from django.views.decorators.csrf import csrf_exempt
import time, json
# Create your views here.
@csrf_exempt
def operate_task(request):
# 动态任务id, 利用时间戳获取整数,如一秒内添加两个,则会出现bug
p = request.POST.get("id")
print(p, "============")
my_task_id = int(datetime.timestamp(datetime.now()))
# job参数
job_kwargs = {
"func":excute_task, # job的函数
"id":str(my_task_id), # 添加的jobid, 必须是字符串
"name":f"task_{my_task_id}", # 添加的job名称
"kwargs":{"info":"test"}, # job的函数参数,本例中excute_task需要的参数
"next_run_time": datetime.now(), # 添加任务成功后立即执行
"replace_existing": True,
"misfire_grace_time": 10,
"coalesce": True,
"max_instances": 10,
"trigger": "interval", # 任务执行类型,还有date、cron
"seconds": 10, # 任务执行间隔时间,代表每10s执行一次
}
scheduler.add_job(**job_kwargs)
# scheduler.remove_all_jobs()
return HttpResponse("add task success")
def excute_task(info):
time.sleep(3)
print(info, "--------------------------------", datetime.now())
数据库迁移
运行前,先执行数据库迁移
python manage.py makemigrations
python manage.py migrate
# 前者是将model层转为迁移文件migration
# 后者将新版本的迁移文件执行,更新数据库。
会在数据库生成两个表,引用方法如下
from django_apscheduler.models import DjangoJob, DjangoJobExecution
django_apscheduler_djangojob:对应Django中的DjangoJob,共计三个字段,分别为id、next_run_time、job_state,默认排序字段为next_run_time
django_apscheduler_djangojobexecution:对应Django中的DjangoJobExecution,共计八个字段,分别是id、status、run_time、duration、finished、exception、traceback、job_id。
postman请求测试
在postman中请求路由,代码如下:
# postman生成的请求代码
import requests
import json
url = "localhost:8000/operate_task/"
payload = json.dumps({
"action": "start",
"id": 1
})
headers = {
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
实现效果
test -------------------------------- 2024-09-20 15:13:58.165250
test -------------------------------- 2024-09-20 15:13:58.165250
test -------------------------------- 2024-09-20 15:14:04.469140
test -------------------------------- 2024-09-20 15:14:04.469140
任务状态
#: constant indicating a scheduler's stopped state
STATE_STOPPED = 0
#: constant indicating a scheduler's running state (started and processing jobs)
STATE_RUNNING = 1
#: constant indicating a scheduler's paused state (started but not processing jobs)
STATE_PAUSED = 2
2、调度器动态操作
1、查询所有任务
# testapscheduler\views.py
# ......
@csrf_exempt
def query_all_task(request):
# 查看所有任务
job_list = scheduler.get_jobs()
return JsonResponse([{x.name:x.id} for x in job_list], safe=False)
响应
[
{
"task_1726812545": "1726812545"
},
{
"task_1726816011": "1726816011"
}
]
2、查询某个任务
@csrf_exempt
def get_job(request):
# 查询任务是否存在
job_id = loads(request.body).get("id")
msg = scheduler.get_job(job_id=job_id)
print(msg)
return JsonResponse({"msg":"success"})
3、移除所有任务
@csrf_exempt
def remove_all_jobs(request):
# 移除所有任务, 事件代码是256
scheduler.remove_all_jobs()
return JsonResponse({"msg":"success"})
4、移除某个任务
@csrf_exempt
def remove_job(request):
# 移除某个任务, 事件代码是1024
job_id = loads(request.body).get("id")
scheduler.remove_job(job_id=job_id)
return JsonResponse({"msg":"success"})
5、暂停某个任务
@csrf_exempt
def pause_job(request):
# 暂停某个任务, 事件代码是2048
job_id = loads(request.body).get("id")
scheduler.pause_job(job_id=job_id)
return JsonResponse({"msg":"success"})
6、恢复某个任务
@csrf_exempt
def resume_job(request):
# 恢复某个任务,仅能恢复已暂停的任务, 事件代码是2048
job_id = loads(request.body).get("id")
scheduler.resume_job(job_id=job_id)
return JsonResponse({"msg":"success"})
7、添加某个任务
@csrf_exempt
def add_job(request):
# 添加任务, 事件代码是512
kwargs = loads(request.body)
scheduler.add_job(**kwargs)
return JsonResponse({"msg":"success"})
8、修改某个任务
@csrf_exempt
def modify_job(request):
# 恢复某个任务,仅能恢复已暂停的任务, 事件代码是2048
job_id = loads(request.body).get("id")
changes = loads(request.body).get("changes")
scheduler.modify_job(job_id=job_id, changes=changes)
return JsonResponse({"msg":"success"})
9、打印所有任务信息
@csrf_exempt
def print_jobs(request):
# 打印所有任务信息
scheduler.print_jobs()
return JsonResponse({"msg":"success"})
10、启动调度器
@csrf_exempt
def start(request):
# 调度程序启动, 事件代码是1
scheduler.start()
return JsonResponse({"msg":"success"})
11、关闭调度器
@csrf_exempt
def shutdown(request):
# 调度程序关闭, 事件代码是2
scheduler.shutdown()
return JsonResponse({"msg":"success"})
12、暂停调度器
@csrf_exempt
def pause(request):
# 调度程序暂停, 事件代码是4
scheduler.pause()
return JsonResponse({"msg":"success"})
13、恢复调度器
@csrf_exempt
def resume(request):
# 调度程序恢复, 事件代码是8
scheduler.resume()
return JsonResponse({"msg":"success"})
14、添加执行器
@csrf_exempt
def add_executor(request):
# 调度程序添加执行器, 事件代码是16
executors = {"default": ThreadPoolExecutor(20), "processpool": ProcessPoolExecutor(5)}
scheduler.add_executor(executor=executors)
15、删除执行器
@csrf_exempt
def remove_executor(request):
# 调度程序删除执行器, 事件代码是32
alias = loads(request.body).get("alias")
scheduler.remove_executor(alias=alias)
return JsonResponse({"msg":"success"})
16、添加作业存储器
@csrf_exempt
def add_jobstore(request):
# 调度程序添加作业存储器, 事件代码是64
jobstores = {"default": DjangoJobStore()}
scheduler.add_jobstore(jobstores=jobstores)
17、删除作业存储器
@csrf_exempt
def remove_jobstore(request):
# 调度程序删除作业存储器, 事件代码是128
alias = loads(request.body).get("alias")
scheduler.remove_jobstore(alias=alias)
return JsonResponse({"msg":"success"})
18、修改触发器参数
@csrf_exempt
def reschedule_job(request):
# 调度程序修改触发器参数
job_id = loads(request.body).get("job_id")
trigger_args = loads(request.body).get("trigger_args")
scheduler.reschedule_job(trigger_args=trigger_args, job_id=job_id)
return JsonResponse({"msg":"success"})
未完待续 ······