一、环境配置
python==3.8.10
包:
APScheduler==3.10.4
Django==3.2.7
djangorestframework==3.15.1
SQLAlchemy==2.0.29
PyMySQL==1.1.0
项目目录情况
gs_scheduler 应用
commands : 主要用来自定义命令,python manage.py crontab
schedulers:所有apscheduler定时器的东西都在里面
logs:存放定时器任务的日志信息
views.py和urls.py:对外开放的接口,获取定时任务的基本信息和运行情况
二、django基本配置
根settings.py
#pymysql使用数据库
import pymysql
# pymysql.version_info = (1, 4, 0, "final", 0) # 确保版本信息被正确设置
pymysql.install_as_MySQLdb()
INSTALLED_APPS = [
'rest_framework',#restful
'gs_scheduler', #注册创建的应用
]
#设置数据库
MYSQL_HOST = '127.0.0.1'
MYSQL_PORT = 3306
MYSQL_USER = 'root'
MYSQL_PASSWORD = 'ldc-root'
MYSQL_NAME = 'study_scheduler'
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'HOST': MYSQL_HOST,
'PORT': MYSQL_PORT,
'USER': MYSQL_USER,
'PASSWORD': MYSQL_PASSWORD,
'NAME': MYSQL_NAME,
}
}
根urls.py
注册路由
from django.contrib import admin
from django.urls import path,include
urlpatterns = [
path('admin/', admin.site.urls),
path('api/scheduler/',include('gs_scheduler.urls')),
]
三、配置apscheduler
3.1、schedulers/base.py
主要重写了SQLAlchemyJobStore类,添加上一些其他的字段和数据库表。
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore as _SQLAlchemyJobStore
from apscheduler.jobstores.base import JobLookupError, ConflictingIdError
from apscheduler.util import maybe_ref, datetime_to_utc_timestamp
from datetime import datetime
try:
import cPickle as pickle
except ImportError: # pragma: nocover
import pickle
try:
from sqlalchemy import (
create_engine, Table, Column, MetaData,delete, Unicode, Float, LargeBinary,String, BigInteger,DATETIME,select,Boolean,text, and_)
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql.expression import null
except ImportError: # pragma: nocover
raise ImportError('SQLAlchemyJobStore requires SQLAlchemy installed')
Datetime_Format = '%Y-%m-%d %H:%M:%S'
#重写SQLAlchemyJobStore,用于自定义数据库表
class SQLAlchemyJobStore(_SQLAlchemyJobStore):
Jobs_Tablename = 'apscheduler_jobs'#记录定时任务基本信息
Jobs_History_Tablename = 'apscheduler_history'#记录定时任务运行历史
def __init__(self, url=None, engine=None, tablename='apscheduler_jobs', metadata=None,
pickle_protocol=pickle.HIGHEST_PROTOCOL, tableschema=None, engine_options=None):
#执行当前父类的父类的初始化方法
super(_SQLAlchemyJobStore, self).__init__()
self.pickle_protocol = pickle_protocol
metadata = maybe_ref(metadata) or MetaData()
if engine:
self.engine = maybe_ref(engine)
elif url:
self.engine = create_engine(url, **(engine_options or {}))
else:
raise ValueError('Need either "engine" or "url" defined')
# 191 = max key length in MySQL for InnoDB/utf8mb4 tables,
# 25 = precision that translates to an 8-byte float
self.jobs_t = Table(
self.Jobs_Tablename, metadata,
Column('id', Unicode(191), primary_key=True),
Column('next_run_time', Float(25), index=True),
Column('job_state', LargeBinary, nullable=False),
Column('trigger',String(256),nullable=True),#新增,记录定时器的定时规则
Column('desc',String(256),nullable=True),#新增,记录定时任务的描述信息
schema=tableschema
)
#新增的一张表,记录定时任务运行历史
self.jobs_t_history = Table(
self.Jobs_History_Tablename,metadata,
Column('id',BigInteger(),primary_key=True),
Column('job_id',Unicode(191)),
Column('run_time',DATETIME(),index=True,nullable=False),
Column('is_error',Boolean(),default=0),
Column('error_msg',String(256),nullable=True),
schema=tableschema
)
#重写:对新表的创建
def start(self, scheduler, alias):
super(SQLAlchemyJobStore, self).start(scheduler, alias)
#创建表
self.jobs_t.create(self.engine, True)
self.jobs_t_history.create(self.engine,True)
#重写:对新字段的操作
def add_job(self, job):
#获取当前任务的定时器规则,描述信息
trigger,desc = self.get_job_rule_and_desc(job)
insert = self.jobs_t.insert().values(**{
'id': job.id,
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': pickle.dumps(job.__getstate__(), self.pickle_protocol),
'trigger':trigger,
'desc':desc
})
with self.engine.begin() as connection:
try:
connection.execute(insert)
except IntegrityError:
raise ConflictingIdError(job.id)
#重写:对新字段的操作
def update_job(self, job):
trigger,desc = self.get_job_rule_and_desc(job)
update = self.jobs_t.update().values(**{
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': pickle.dumps(job.__getstate__(), self.pickle_protocol),
'trigger':trigger,
'desc':desc
}).where(self.jobs_t.c.id == job.id)
with self.engine.begin() as connection:
result = connection.execute(update)
if result.rowcount == 0:
raise JobLookupError(job.id)
# 新增方法:提取job的定时器信息和job的描述信息
def get_job_rule_and_desc(self, job):
the_type, rules = str(job.trigger).split('[')
rule = rules.split(']')[0]
trigger = '每隔' # 定时任务的定时规则
if the_type == 'date':
# rule = '2024-10-10 20:20:12 csl'
trigger = '在{} 时间点执行一次'.format(rule.rsplit(' ', 1)[0])
elif the_type == 'interval':
dic = {0: '小时', 1: '分钟', 2: '秒'}
if 'day' in rule:
# rule = '1 ady,00:00:00'
day, hms = rule.split(',')
day = int(day.split('day')[0])
trigger += '{}天'.format(day)
hms = hms.split(':')
else:
# rule = '01:01:01'
hms = rule.split(':')
for i, value in enumerate(hms):
value = int(value)
if value > 0:
trigger += str(value)
trigger += dic.get(i)
else:
trigger += '执行一次'
else:
# cron,比较复杂,不好判断
# rule ="hour='0', minute='0', second='1'"
trigger = '{},通过linux系统cron表达式'.format(job.trigger)
desc = job.name or '' # 定时任务描述
return (trigger, desc)
#新增方法:记录定时任务的运行历史,给scheduler监听器使用
def insert_job_history(self,data:dict):
'''
:param data: {'job_id':'x','run_time','is_error':1,'error_msg':'xxxx'}
:return:
'''
insert = self.jobs_t_history.insert().values(**{
'job_id': data.get('job_id'),
'run_time': data.get('run_time'),
'is_error': data.get('is_error'),
'error_msg': data.get('error_msg')
})
with self.engine.begin() as connection:
connection.execute(insert)
#新增方法:api获取任务下次运行时间
def api_get_run_next(self):
'''
获取每个任务的下次运行时间
row:
job_id = row[0],next_run=row[1],trigger=row[3],desc=row[4]
:return:
'''
search = self.jobs_t.select().filter_by()
with self.engine.begin() as connection:
results = connection.execute(search)
ret_data = []
for row in results:
dic = {
'job_id':row[0],
'next_run':datetime.fromtimestamp(row[1]).strftime(Datetime_Format),
'trigger':row[3],
'desc':row[4]
}
ret_data.append(dic)
return ret_data
#新增方法:api获取任务历史运行记录
def api_get_run_history(self):
'''
获取每个任务运行成功的最近10个记录
row :
id=row[0],job_id=row[1],run_time=row[2],is_error=row[3],error_msg=row[4]
:return:
'''
job_data_list = self.api_get_run_next()
ret_data = []
for dic in job_data_list:
job_id = dic.get('job_id')
history_data = {
'job_id': job_id,
'run_time': [],
}
search = self.jobs_t_history.select().filter_by(is_error=0,job_id=job_id).order_by(text('-id')).limit(10)
with self.engine.begin() as connection:
results = connection.execute(search)
for row in results:
run_time = datetime.strftime(row[2],Datetime_Format)
history_data['run_time'].append(run_time)
ret_data.append(history_data)
return ret_data
#新增方法:api获取任务错误记录
def api_get_run_error(self):
'''
获取每个任务运行失败的最近10个记录
row :
id=row[0],job_id=row[1],run_time=row[2],is_error=row[3],error_msg=row[4]
:return:
'''
job_data_list = self.api_get_run_next()
ret_data = []
for dic in job_data_list:
job_id = dic.get('job_id')
history_data = {
'job_id': job_id,
'error_run': [],
}
search = self.jobs_t_history.select().filter_by(is_error=1, job_id=job_id).order_by(text('-id')).limit(5)
with self.engine.begin() as connection:
results = connection.execute(search)
for row in results:
id=row[0]
job_id=row[1]
run_time=datetime.strftime(row[2], Datetime_Format)
is_error=row[3]
error_msg=row[4]
history_data['error_run'].append({
'run_time':run_time,
'error_msg':error_msg
})
ret_data.append(history_data)
return ret_data
#新增方法:清除历史运行记录
def delete_before_run_history(self):
'''
将历史运行记录中,
每个任务只保留最近20个运行成功的记录
每个任务只保留最近20个运行失败的记录
:return:
'''
# 获取底层数据库连接
session = self.engine.connect()
# 获取任务表的名称
table_name = self.Jobs_History_Tablename
# 获取所有任务的job_id
job_data_list = self.api_get_run_next()
for dic in job_data_list:
job_id = dic.get('job_id')
search_success = self.jobs_t_history.select().filter_by(is_error=0, job_id=job_id).order_by(text('-id'))
search_error = self.jobs_t_history.select().filter_by(is_error=1, job_id=job_id).order_by(text('-id'))
#正常运行的待删除的id
delete_ids_su = []
#运行失败的待删除的id
delete_ids_err = []
#提交,拿到查询结果
with self.engine.begin() as connection:
results_su = connection.execute(search_success)
results_err = connection.execute(search_error)
#把成功的所有的id取出
for row in results_su:
delete_ids_su.append(row[0])
#把运行失败的所有的id取出
for row in results_err:
delete_ids_err.append(row[0])
#删除成功运行的记录
if delete_ids_su:
delete_ids_su = delete_ids_su[20:]
# 构建 SQL 语句
delete_query = text(f"DELETE FROM {table_name} WHERE id IN :job_ids")
# 执行删除操作
session.execute(delete_query, {"job_ids": delete_ids_su})
# 提交事务
session.commit()
#删除失败运行的记录
if delete_ids_err:
delete_ids_err = delete_ids_err[20:]
# 构建 SQL 语句
delete_query = text(f"DELETE FROM {table_name} WHERE id IN :job_ids")
# 执行删除操作
session.execute(delete_query, {"job_ids": delete_ids_err})
# 提交事务
session.commit()
# 关闭数据库连接
session.close()
3.2、schedulers/logger.py
用于记录apscheduler定时器的一些日志信息。
import os
import logging
from datetime import datetime, date, timedelta
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
# management目录路径
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
# 日志文件存放的目录
LOGS_DIR = os.path.join(BASE_DIR, 'logs')
# 创建logs目录
if not os.path.exists(LOGS_DIR):
os.makedirs(LOGS_DIR)
def getLogHandlerConsole():
'1、日志格式'
formatter = logging.Formatter('[%(asctime)s][%(levelname)s][ %(funcName)s function: %(lineno)s line]:%(message)s')
'2、输出到控制台处理器'
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(formatter)
return console_handler
def getLogHandlerFile():
# 文件名,以日期作为文件名
log_file_name = date.today().strftime('%Y-%m-%d.log')
# 构建日志文件的路径
log_file_str = os.path.join(LOGS_DIR, log_file_name)
'1、日志记录格式'
# 默认日志等级的设置
# logging.basicConfig(level=logging.INFO)
# 设置日志的格式:发生时间,日志等级,日志信息文件名, 函数名,行数,日志信息
formatter = logging.Formatter(
'[%(asctime)s][%(levelname)s][%(pathname)s: %(funcName)s function: %(lineno)s line]: %(message)s')
'2、基于文件的日志处理器配置'
# 创建日志记录器,指明日志保存路径,每个日志的大小,保存日志的上限
file_log_handler = RotatingFileHandler(
filename=log_file_str, # 日志文件名
maxBytes=1024 * 1024 * 10, # 文件大小超过10MB后,就会生成一个新的日志文件,日志就写到新的文件中
backupCount=10, # 最大支持总的日志文件数
encoding='UTF-8')
file_log_handler.setFormatter(formatter) # 设置日志的格式
file_log_handler.setLevel(logging.INFO) # 设置日志等级
return file_log_handler # 基于文件大小分割日志的方案
# 日志记录器1
scheduler_logger = logging.getLogger('apscheduler.scheduler')
scheduler_logger.setLevel(logging.INFO)
scheduler_logger.addHandler(getLogHandlerFile()) # 添加文件日志处理器
scheduler_logger.addHandler(getLogHandlerConsole()) # 添加控制台日志处理器
if __name__ == '__main__':
scheduler_logger.info('hhhhh')
print(os.path.dirname(os.path.dirname(__file__)))
3.3、schedulers/config.py
存放的是apscheduler调度器需要的配置信息。
import os
#from apscheduler.jobstores.memory import MemoryJobStore #内存做后端存储
#from apscheduler.jobstores.redis import RedisJobStore #redis做后端存储
from .base import SQLAlchemyJobStore #mysql等做后端存储
from study_apscheduler import settings
from .logger import scheduler_logger
#mysql://root:ldc-root@127.0.0.1:3306/jobs?charset=utf8
MYSQL_CONFIG = settings.DATABASES.get('default')
MYSQL_USER = MYSQL_CONFIG.get('USER')
MYSQL_PASSWORD = MYSQL_CONFIG.get('PASSWORD')
MYSQL_HOST = MYSQL_CONFIG.get('HOST')
MYSQL_PORT = MYSQL_CONFIG.get('PORT')
MYSQL_NAME = MYSQL_CONFIG.get('NAME')
MYSQL_CHARSET = 'utf8mb4'
URL = 'mysql://{}:{}@{}:{}/{}?charset={}'.format(MYSQL_USER,MYSQL_PASSWORD,MYSQL_HOST,MYSQL_PORT,MYSQL_NAME,MYSQL_CHARSET)
#时区
TIME_ZONE = 'Asia/Shanghai'
#job的默认配置
JOB_DEFAULTS = {
'coalesce': True, #系统挂掉,任务积攒多次为执行,True是合并成一次执行,False是执行所有的次数。 持久化存储才有效
'max_instances': 3 # 同一个任务同一时间最多只能有3个实例在运行。
}
#job的存储后端
JOB_STORE = {
'default': SQLAlchemyJobStore(url=URL)
}
#日志处理器
Scheduler_Logger = {
'logger':scheduler_logger
}
#监听事件对应的情况
LISTENER={
1:'调度程序启动',
2:'调度程序关闭',
4:'调度程序中任务处理暂停',
8:'调度程序中任务处理恢复',
16:'将执行器添加到调度程序中',
32:'执行器从调度程序中删除',
64:'将任务存储添加到调度程序中',
128:'任务存储从调度程序中删除',
256:'所有任务从所有任务存储中删除或从一个特定的任务存储中删除 ',
512:'添加新的定时任务',
1024:'从任务存储中删除了任务',
2048:'从调度程序外部修改了任务',
4096:'任务执行成功',
8192:'任务在执行期间引发异常',
16384:'错误了任务执行',
32768:'任务已经提交到执行器中执行',
65536:'任务因为达到最大并发执行时,触发的事件'
}
3.4、schedulers/main.py
实例化好调度器,配置日志,监听器、添加定时任务、存储后端等。
# 导入所需的调度器类和触发器类
from apscheduler.schedulers.background import BackgroundScheduler #后台运行
from apscheduler.schedulers.blocking import BlockingScheduler #主进程运行,需要单独运行
from apscheduler.triggers.interval import IntervalTrigger #时间间隔
from apscheduler.triggers.cron import CronTrigger #复杂的定时任务
from apscheduler.triggers.date import DateTrigger #一次性定时任务
from apscheduler import events
from datetime import datetime
#定时任务
from .task import delete_apscheduler_history
from .task import send_to_big_data
from .task import crontab_task,date_task
#日志
from .config import LISTENER
from .config import TIME_ZONE,JOB_DEFAULTS,JOB_STORE,Scheduler_Logger #调度器配置
class TheBlockScheduler(object):
TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
def __init__(self):
self.scheduler = self._scheduler_obj()
self.logger = Scheduler_Logger.get('logger')
# 1、初始化调度器
def _scheduler_obj(self):
obj = BlockingScheduler()
obj.configure(
timezone=TIME_ZONE, # 时区
job_defaults=JOB_DEFAULTS, # job的默认配置
jobstores=JOB_STORE, # job的存储后端
gconfig=Scheduler_Logger, #日志记录相关的的配置
)
return obj
# 2、添加任务
def _add_job(self):
# 每5分钟执行一次推送告警到大数据仓
self.scheduler.add_job(
send_to_big_data,
trigger=IntervalTrigger(minutes=1),
id='send_to_big_data',
replace_existing=True,
coalesce=True,
name='该定时,用于将数据推送到远端大数据系统'
)
# 每天凌晨,清除历史的记录
self.scheduler.add_job(
delete_apscheduler_history,
trigger=CronTrigger(hour=0,minute=0,second=1),
id='delete_apscheduler_history',
replace_existing=True,
coalesce=True,
name='该定时器,是将历史定时任务的执行记录进行清除'
)
# 每隔20分钟清除垃圾记录
self.scheduler.add_job(
crontab_task,
trigger=IntervalTrigger(minutes=20), # 每天晚上零点1秒执行
id='crontab_task',
replace_existing=True,
coalesce=True,
name='该定时任务,用于清除xxx表中的垃圾记录'
)
#在指定某个时刻执行一次
self.scheduler.add_job(
date_task,
trigger=DateTrigger(run_date='2024-12-03 10:11:30'),
id='date_task',
replace_existing=True,
coalesce=True,
name='该定时任务,在指定时间,进行系统初始化任务',
)
# 3、添加监听器
def _listener(self, event: events):
code = event.code
run_time = datetime.now().strftime(self.TIME_FORMAT)
msg = LISTENER.get(code)
#存储器
jobstore = self.scheduler._jobstores['default']
job_history_data = {
'job_id':None,
'run_time':None,
'is_error':0,
'error_msg':None
}
if code == 4096:
# 成功运行
job_id = event.job_id
job_history_data['job_id'] = job_id
job_history_data['run_time'] = run_time
# 记录到数据库中
jobstore.insert_job_history(job_history_data)
elif code == 8192 or code == 16384:
# 运行异常了
job_id = event.job_id
job_history_data['job_id'] = job_id
job_history_data['run_time'] = run_time
job_history_data['is_error'] = 1
job_history_data['error_msg'] = msg
# 记录到数据库中
jobstore.insert_job_history(job_history_data)
elif code in (1,2,4,8,32,128,1024,2048):
###调度器启动时
self.logger.info(msg)
try:
job_id = event.job_id
if msg:
msg = '任务id={},{}'.format(job_id, msg)
self.logger.info(msg)
except Exception:
if msg:
self.logger.info(msg)
# 4、启动定时器
def start(self):
# 1、设置定时任务(监听器会先监听到任务添加,再监听到调度器启动)
self._add_job()
# 2、设置监听器
self.scheduler.add_listener(self._listener)
# 3、启动调度器
try:
# print('{},定时器启动成功,等待定时任务执行...'.format(datetime.now().strftime(self.TIME_FORMAT)))
self.scheduler.start()
except KeyboardInterrupt:
self.scheduler.shutdown()
#后台线程运行:随django项目一起运行
class TheBackRunScheduler(TheBlockScheduler):
def _scheduler_obj(self):
obj = BackgroundScheduler()
obj.configure(
timezone=TIME_ZONE, # 时区
job_defaults=JOB_DEFAULTS, # job的默认配置
jobstores=JOB_STORE, # job的存储后端
gconfig=Scheduler_Logger, # 日志记录相关的的配置
)
return obj
if __name__ == '__main__':
#后台启动
backScheduler = TheBackRunScheduler()
backScheduler.start()
3.5、schedulers/models.py
数据库查询方法,获取定时任务的状态和运行情况。
from .config import JOB_STORE
jobstore = JOB_STORE['default']
#获取定时任务下次运行时间
get_job_run_next = jobstore.api_get_run_next
#获取定时任务最近运行情况
get_job_run_history = jobstore.api_get_run_history
#获取定时任务最近运行错误
get_job_run_error = jobstore.api_get_run_error
#清除历史运行记录
delete_run_history = jobstore.delete_before_run_history
if __name__ == '__main__':
pass
3.6、schedulers/task.py
存放定时任务。
import os
from .logger import scheduler_logger as log
from .models import delete_run_history
#推送到数据仓的告警信息:5分钟执行一次
def send_to_big_data():
log.info('推送到大数据仓')
#清除定时任务历史运行记录
def delete_apscheduler_history():
#清除历史运行记录
delete_run_history()
def crontab_task():
log.info('crontab定时器')
def date_task():
log.info('date定时器')
if __name__ == '__main__':
print(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
四、自定义命令,启动定时器
4.1、commands/crontab.py
from django.core.management.base import BaseCommand
#导入block运行的代码
from gs_scheduler.management.schedulers.main import TheBlockScheduler
#自定义脚本命令:使用,python manage.py crontab 启动
class Command(BaseCommand):
# python manage.py crontab运行 就是调用该方法
def handle(self, *args, **options):
scheduler = TheBlockScheduler()
scheduler.start()
五、启动定时器
5.1、以主进程方式启动
python manage.py crontab
5.2、后台进程方式启动
1、修改settings.py
from gs_scheduler.management.schedulers.main import TheBackRunScheduler
scheduler = TheBackRunScheduler()
scheduler.start()
2、启动django项目时,后台运行定时器
六、启动django项目
python manage.py runserver 8800
七、测试
1、获取任务下次运行情况
获取每个任务,下次执行的时间和基本信息。
http://127.0.0.1:8800/api/scheduler/run_next/
2、获取任务运行历史记录
获取到每个任务的最近10次运行成功的记录。
http://127.0.0.1:8800/api/scheduler/run_history/
3、获取任务运行失败的记录
获取每个任务最近5次运行失败的记录。
http://127.0.0.1:8800/api/scheduler/run_error/
八、码云下载源码
码云地址:
django应用定时器: django下使用定时器的方法https://gitee.com/liuhaizhang/django-application-timer