Python运维之定时任务模块APScheduler

前言:本博客仅作记录学习使用,部分图片出自网络,如有侵犯您的权益,请联系删除

目录

定时任务模块APScheduler

一、安装及基本概念

1.1、APScheduler的安装

1.2、涉及概念

1.3、APScheduler的工作流程​编辑

二、配置调度器

三、启动调度器

四、调度事件监听


定时任务模块APScheduler

APScheduler提供了基于日期、固定时间间隔以及crontab类型的任务,我们可以在主程序的运行过程中快速增加新作业或删除旧作业。如果把作业存储在数据库中,那么作业的状态会被保存,当调度器重启时,不必重新添加作业,作业会恢复原状态继续执行。

一、安装及基本概念

1.1、APScheduler的安装

 pip install apscheduler

1.2、涉及概念

  • 触发器triggers:触发器包含调度逻辑描述一个任务何时被触发,有按日期按时间间隔按cronjob描述式三种触发方式。每个作业都有自己的触发器,除了初始配置之外,触发器是完全无状态的。
  • 作业存储器job stores:指定了作业被存放的位置,默认的作业存储器是内存,也可以将作业保存在各种数据库中。当作业被存放在数据库中时,它会被序列化;当重新被加载时,会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储
  • 执行器executors:执行器是将指定的作业调用函数提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件
  • 调度器schedulers:任务调度器,控制器角色,通过它配置作业存储器、执行器和触发器、添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员不需要直接处理作业存储器、执行器或触发器。配置作业存储器和执行器是通过调度器来完成的

1.3、APScheduler的工作流程

一个简单的间隔任务实例:

 import os
 from datetime import datetime
 from apscheduler.schedulers.blocking import BlockingScheduler
 ​
 # 打印当前的时间
 def tick():
     print('Tick! The time is: %s' % datetime.now())
 ​
 if __name__ == '__main__':
     scheduler = BlockingScheduler()
     # 添加一个作业rick,触发器为interval,每隔3秒执行一次
     scheduler.add_job(tick, 'interval', seconds=3)
     print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
     try:
         scheduler.start()
     except (KeyboardInterrupt, SystemExit):
         pass

另外的触发器为date,cron。date按特定时间点触发cron则按固定的时间间隔触发

上述代码稍作修改可变为cron类的定时任务:

 import os
 from datetime import datetime
 from apscheduler.schedulers.blocking import BlockingScheduler
 ​
 def tick():
     print('Tick! The time is: %s' % datetime.now())
 ​
 if __name__ == '__main__':
     scheduler = BlockingScheduler()
     scheduler.add_job(tick, 'cron', hour=19,minute=23)
     print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
     try:
         scheduler.start()
     except (KeyboardInterrupt, SystemExit):
         pass

定时cron任务也非常简单,直接给触发器trigger传入'cron'即可。hour=19,minute23,表示每天的19时23分执行任务

 hour=19,minute=23
 hour='19',minute='23'
 minute='*/3'  # 表示每3分钟执行一次
 hour='19-21',minute='23'  # 表示19:23、20:23、21:23各执行一次任务

二、配置调度器

调度器的主循环其实就是反复检查是否有到期需要执行的任务,具体分两步进行

  • 询问自己的每一个作业存储器,有没有到期需要执行的任务。如果有则计算这些作业中每个作业需要 运行的时间点;如果时间点有多个,就做coalesce检查
  • 提交给执行器按时间点运行

各调度器的适用场景

  • BlockingSchduler:适用于调度程序,是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回
  • BackgroundScheduler:适用于调度程序,在应用程序的后台运行调用start后主线程不会阻塞
  • AsyncIOScheduler:适用于使用了asyncio模块的应用程序
  • GeventScheduler:适用于使用了gevent模块的应用程序
  • TwistedScheduler:适用于构建Twisted的应用程序
  • QtSchuduler:适用于构建Qt的应用程序。

作业存储器的选择:一是内存( 默认),而是数据库

执行器的选择:默认的ThreadPoolExecutor足够OK,如果作业负载涉及CPU密集型操作,那么考虑使用ProcessPoolExecutor,甚至同时使用,将其作为二级执行器。

APScheduler可以使用字典,关键字参数传递配置调度器。首先实例化调度程序添加作业,然后配置调度器,获得最大的灵活性。

如果调度程序在应用程序的后台运行,则选择BackgroundScheduler,并使用默认的jobstore和executor

 from apscheduler.schedulers.blocking import BlockingScheduler
 scheduler = BlockingScheduler()

如果想配置更多的信息,就可设置两个执行器、两个作业存储器、调整新作业的默认值,并设置不同的时区。配置详情:

  • 配置名为mongo的MongoDBjobStore作业存储器
  • 配置名为default的SQLAlchemyJobStore(使用SQLite)
  • 配置名为default的ThreadPoolExecutor,最大进程数为5
  • UTC作为调度器的时区
  • coalesce默认情况下关闭
  • 作业的默认最大运行实例限制为3

方法一:

 from pytz import utc
 ​
 from apscheduler.schedulers.background import BlockingScheduler
 from apscheduler.jobstores.mongodb import MongoDBJobStore
 from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
 from apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor
 ​
 jobstores = {
     'mongo':MongoDBJobStore(),
     'default':SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
 }
 executors = {
     'default':ThreadPoolExecutor(20),
     'processpool':ProcessPoolExecutor(5)
 }
 job_defaults = {
     'coalesce':False,
     'max_instances':3
 }
 scheduler = BlockingScheduler(jobstores=jobstores,executors=executors,job_defaults=job_defaults,timezone=utc)

方法二:

 from apscheduler.schedulers.background import BlockingScheduler
 scheduler = BlockingScheduler({
     'apscheduler.jobstores.mongo':{
         'type':'mongodb'
     },
     'apscheduler.jobstores.default':{
         'type':'sqlalchemy',
         'url':'sqlite:///jobs.sqlite'
     },
     'apscheduler.executors.default':{
         'class':'apscheduler.executors.pool:ThreadPoolExecutor',
         'max_workers':'5'
     },
     'apscheduler.job_defaults.coalesce':'fasle',
     'apscheduler.job_defaults.max_instances':'3',
     'apscheduler.timezone':'UTC',
 })

方法三:

 from pytz import utc
 from apscheduler.schedulers.background import BlockingScheduler
 from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
 from apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor
 ​
 jobstores = {
     'mongo':{'type':'mongodb'},
     'default':SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
 }
 executors = {
     'default':{'type':'threadpool','max_workers':20},
     'processpool':ProcessPoolExecutor(max_workers=5)
 }
 job_defaults = {
     'coalesce':False,
     'max_instances':3
 }
 scheduler = BlockingScheduler()
 scheduler.configure(jobstores=jobstores,executors=executors,job_defaults=job_defaults,timezone=utc)

三、启动调度器

启动调度器前需要先添加作业,有两种方法可以向调度器添加作业:一是通过接口add_job();二是通过使用函数装饰器,其中add_job()返回一个apscheduler.job.Job类的实例,用于后续修改或删除作业。

可以随时在调度器上调度作业。如果在添加作业时,调度器还没有启动,那么任务不会运行,并且它的第一次运行时间在调度器启动时计算。

调用调度器的start()方法启动调度器,下面用不同的作业存储器来举例:

 from apscheduler.schedulers.blocking import BlockingScheduler
 import datetime
 from apscheduler.jobstores.memory import MemoryJobStore
 from apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor
 f
 ​
 def my_job(id='my_job'):
     print(id,'-->',datetime.datetime.now())
 ​
 jobstores = {
     'default':MemoryJobStore()
 }
 executors = {
     'default':ThreadPoolExecutor(20),
     'processpool':ProcessPoolExecutor(10)
 }
 job_defaults = {
     'coalesce':False,
     'max_instance':3
 }
 scheduler =BlockingScheduler(jobstores=jobstores,executors=executors,job_defaults=job_defaults)
 scheduler.add_job(my_job,args=['job_interval',],id='job_interval',trigger='interval',seconds=5,replace_existing=True)
 scheduler.add_job(my_job,args=['job_cron',],id='job_cron',trigger='cron',month='4-8,5-6',hour='7-11',second='*/10',end_date='2024-06-06')
 scheduler.add_job(my_job,args=['job_once_now',],id='job_once_now')
 scheduler.add_job(my_job,args=['job_date_once',],id='job_date_once',trigger='date',run_date='2024-01-01 00:00:00')
 try:
     scheduler.start()
 except SystemExit:
     print('exit')
     exit()

方法二:使用数据库作为作业存储器(修改第5行和11行)

 from apscheduler.schedulers.blocking import BlockingScheduler
 import datetime
 from apscheduler.jobstores.memory import MemoryJobStore
 from apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor
 from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
 ​
 def my_job(id='my_job'):
     print(id,'-->',datetime.datetime.now())
 ​
 jobstores = {
     'default':SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
 }
 executors = {
     'default':ThreadPoolExecutor(20),
     'processpool':ProcessPoolExecutor(10)
 }
 job_defaults = {
     'coalesce':False,
     'max_instance':3
 }
 scheduler =BlockingScheduler(jobstores=jobstores,executors=executors,job_defaults=job_defaults)
 scheduler.add_job(my_job,args=['job_interval',],id='job_interval',trigger='interval',seconds=5,replace_existing=True)
 scheduler.add_job(my_job,args=['job_cron',],id='job_cron',trigger='cron',month='4-8,5-6',hour='7-11',second='*/10',end_date='2024-06-06')
 scheduler.add_job(my_job,args=['job_once_now',],id='job_once_now')
 scheduler.add_job(my_job,args=['job_date_once',],id='job_date_once',trigger='date',run_date='2024-01-01 00:00:00')
 try:
     scheduler.start()
 except SystemExit:
     print('exit')
     exit()

运行过之后,如果不注释添加作业的代码,则作业会重新添加到数据库中,这样就有了两个作业,为了避免这样的情况:设置(replace_existing=True)

 scheduler.add_job(my_job,args=['job_interval',],id='job_interval',trigger='interval',seconds=5,replace_existing=True)

如果想运行错过运行的作业,则使用misfire_grace_time

 scheduler.add_job(my_job,args=['job_cron',],id='job_cron',trigger='cron',month='4-8,5-6',hour='7-11',second='*/10',coalesce=True,misfire_grace_time=30,replace_existing=True,end_date='2024-06-06')

其他操作如下:

 scheduler.remove_job(job_id,jobstore=None)      # 删除作业
 scheduler.remove_all_jobs(jobstore=None)        # 删除所有作业
 scheduler.pause_job(job_id,jobstore=None)       # 暂停作业
 scheduler.resume_job(job_id,jobstore=None)      # 恢复作业
 scheduler.modify_job(job_id,jobstore=None,**changes)    # 修改单个作业属性配置
 scheduler.reschedule_job(job_id,jobstore=None,trigger=None,**trigger_args) # 修改单个作业的触发器并更新下次运行时间
 scheduler.print_jobs(jobstore=None,out=sys.stdout)      # 输出作业信息

四、调度事件监听

日志记录和事件监听:

 from apscheduler.schedulers.blocking import BlockingScheduler
 from apscheduler.events import EVENT_JOB_EXECUTED,EVENT_JOB_ERROR
 import datetime
 import logging
 ​
 # 配置日志记录信息 
 logging.basicConfig(level=logging.INFO,
                     format='%(asctime)s %(filename)s[line:%(lineno)d %(levelname)s %(message)s',
                     datefmt='%Y-%m-%d %H:%M:%S',
                     filename='log1.txt',
                     filemode='a'
                     )
 ​
 def aps_test(x):
     print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),x)
 ​
 def date_test(x):
     print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),x)
     print(1/0)
 ​
 def my_listener(event):
     if event.exception:
         print('任务出错了!!!!')
     else:
         print('任务照常运行...')
 ​
 scheduler = BlockingScheduler()
 scheduler.add_job(func=date_test,args=('一次性任务,会出错',),next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15),id='date_task')
 scheduler.add_job(func=aps_test,args=('循环任务',),trigger='interval',seconds=3,id='interval_task')
 scheduler.add_listener(my_listener,EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
 scheduler._logger = logging
 ​
 scheduler.start()

致谢

在此,我要对所有为知识共享做出贡献的个人和机构表示最深切的感谢。同时也感谢每一位花时间阅读这篇文章的读者,如果文章中有任何错误,欢迎留言批评指正。

学习永无止境,让我们共同进步

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/611772.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

26、Qt使用QFontDatabase类加载ttf文件更改图标颜色

一、图标下载 iconfont-阿里巴巴矢量图标库 点击上面的链接,在打开的网页中搜索自己要使用的图标,如:最大化 找到一个自己想用图标,选择“添加入库” 点击“购物车”图标 能看到刚才添加的图标,点击“下载代码”(需要…

Android 屏幕适配全攻略(中)-从九宫格到矢量图,揭秘Android多屏幕适配的正确打开方式

在移动互联网时代,无论是小小的手机屏幕,还是大大的平板显示器,Android 应用都必须做到完美适配,给用户以极佳的体验。本文将剖析 Android 多屏幕适配背后的种种技术细节,为您揭开最佳实践的正确打开方式,让…

教你解决PUBG绝地求生打完一把游戏无法返回大厅的问题

《绝地求生》(PUBG)作为风靡全球的战术竞技大作,凭借其高度还原的战场氛围和扣人心弦的生存挑战吸引了大量游戏玩家。不过,部分玩家在经历了一场紧张激烈的比赛后,遭遇了一个小困扰:游戏未能顺畅过渡到结算…

人大金仓报The connection attempt failed.Reason:Connection reset解决办法

在连接人大京仓数据库 的时候报下面的错误 解决办法: 更换这里的IP地址就行,不要用127.0.0.1,然后就可以了

keil5软件安装教程(MDKv5.39)

keil5软件安装分为三部分: 目录 1.安装mdk 2.激活mdk 3.安装STM32芯片包 1.安装mdk 安装包链接:https://pan.baidu.com/s/1StkkTQ5lmOz_99Qop4l8Gw?pwdrlmc 提取码:rlmc 1、下载keil5的压缩包并解压,鼠标右击【Setup】选…

利用函数视图实现精细化管控:DolphinDB 非标权限管理指南

1. 前言 DolphinDB 提供的用户权限管理功能管控的最小粒度是表级别,无法设置小于表粒度的数据访问权限管控,如限制用户仅能访问表中某些行或某些列的数据。为了满足客户更精细的权限管控需求,我们编写了本教程。 2. 概述 函数视图是封装了…

安卓通信方式简介

目录 一、Binder二、Socket三、Binder与Socket四、Handler 一、Binder Binder作为Android系统提供的一种IPC机制,无论从系统开发还是应用开发,都是Android系统中最重要的组成。 二、Socket Socket通信方式也是C/S架构,比Binder简单很多。在…

数据库开启远程连接

服务器端添加一个允许远程连接的root用户: mysql -u root -p create user root192.168.10.20 identified by admin; //创建一个192.168.10.20地址远程连接的root用户 grant all privileges on *.* to root192.168.10.20; //赋予远程root用户所有的权…

Linux文本处理工具【tr、cut、sort、uniq】

1. tr 命令——替换、压缩、删除 tr (Text Replacer) 命令常用来对来自标准输入的字符进行替换、压缩和删除。 命令格式 :tr [选项]... SET1 [SET2] (SET 是一组字符串,一般都可按照字面含义理解) 选项: -d 删除 -s 压…

01面向类的讲解

指针指向类成员使用 代码&#xff1a; #include<iostream> using namespace std;class Test { public:void func() { cout << "call Test::func" << endl; }static void static_func();int ma;static int mb; //不依赖对象 }; void Test::static…

DDS Blockset Shapes Demo

此示例演示DDS模块集Blockset形状演示应用程序。Shapes Demo是一个常见的数据分发服务&#xff08;DDS&#xff09;应用程序&#xff0c;用于介绍DDS概念&#xff0c;你可以使用它发布和订阅以简单形状&#xff08;圆形、方形和三角形&#xff09;表示的主题&#xff0c;并观察…

如何设计测试用例

一、介绍 测试用例就是一个文档&#xff0c;描述输入、动作、或者时间和一个期望的结果&#xff0c;其目的是确定应用程序的某个特性是否正常的工作。 二、基本格式 用例的基本要素包括测试用例编号、测试标题、重要级别、测试输入、操作步骤、预期结果等。 用例编号&#…

适合年轻人的恋爱交友脱单软件有哪些?中国十大社交软件排行榜分享

交友始祖&#xff1a;Tinder 一直很受欢迎&#xff0c;可以向上扫给 super like (每日有一次免费机会)。如果双方互相 like&#xff0c;代表配对成功&#xff0c;就可以开始聊天。另外&#xff0c;每日有 10 个 top picks 供选择&#xff0c;你可以免费选一位 主力编外&#xf…

添加一个索引要投产,需要哪些步骤?

编程一生 致力于写大家都能看懂的、有深度的 技术文章 05/2024 01 开场白 亚马逊有个bar raiser文化。就是说新招来的人一定要超过之前入职人员的平均水平&#xff0c;宁缺毋滥。越来越多的公司在推行这种文化。在这种氛围下&#xff1a;“虽然我不懂&#xff0c;但是活儿是能出…

一文了解webpack和vite中Tree-Shaking

1、什么是Tree-Shaking 1.1 摇树优化&#xff08;Tree Shaking&#xff09;是Webpack中一种用于优化JavaScript代码的技术。它的目标是通过静态分析&#xff0c;从代码中剔除未被使用的模块&#xff0c;从而减少最终打包文件的大小。 1.2 Tree-shaking 它的名字来源于通过摇晃…

纯血鸿蒙APP实战开发——数字滚动动效实现

介绍 本示例主要介绍了数字滚动动效的实现方案。 该方案多用于数字刷新&#xff0c;例如页面刷新抢票数量等场景。 效果图预览 使用说明&#xff1a; 下拉页面刷新&#xff0c;数字进行刷新。 实现思路 通过双重ForEach循环分别横向、纵向渲染数字。 Row() {ForEach(this…

基于SSM的文化遗产的保护与旅游开发系统(有报告)。Javaee项目。ssm项目。

演示视频&#xff1a; 基于SSM的文化遗产的保护与旅游开发系统&#xff08;有报告&#xff09;。Javaee项目。ssm项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;…

Java中包的概念package

Package Package:包 指明方法、类所处的包&#xff1b; 将类分配到不同的包中&#xff0c;方便管理&#xff1b; 用于指明文件中定义的类、接口等结构所在的包&#xff1b; 一个源文件只要一个包的声明语句&#xff0c;必须放到开头&#xff1b; 属于标识符&#xff0c;满足命…

Java类加载器介绍

在Java中&#xff0c;类加载器是一种动态加载类的机制&#xff0c;它负责在运行时查找、加载和链接类文件。当Java应用程序需要创建某个类的对象时&#xff0c;类加载器会在运行时查找该类对应的.class文件&#xff0c;并将其加载到Java虚拟机中。Java类加载器通常分为三层&…

《ESP8266通信指南》15-MQTT连接、订阅MQTT主题并打印消息(基于Lua|适合新手|非常简单)

往期 《ESP8266通信指南》14-连接WIFI&#xff08;基于Lua&#xff09;-CSDN博客 《ESP8266通信指南》13-Lua 简单入门&#xff08;打印数据&#xff09;-CSDN博客 《ESP8266通信指南》12-Lua 固件烧录-CSDN博客 《ESP8266通信指南》11-Lua开发环境配置-CSDN博客 《ESP826…
最新文章