python 的APScheduler配置的定时任务会被Miss掉

背景

python 的APScheduler配置的定时任务会被Miss掉,经常在控制台收到一些Miss的告警信息,就觉得是任务太多导致的,为了定位到具体的原因,看了一些源码,了解到了定时任务的6大模块的协同工作模式。

异常信息及来源

异常信息
Run time of job “check_job_status (trigger: interval[0:00:05], next run at: 2023-04-11 12:05:01 CST)” was missed by 0:00:03.012525
异常信息解析
异常日志表示一个任务(名称为 “check_job_status”)的运行时间被错过了。任务使用了一个 interval 触发器,即每隔 5 秒触发一次。该任务的下一次运行时间应该是在 2023-04-11 12:05:01,CST 时区(中国标准时间)
异常日志来源
任务到了被触发执行的时候,执行器将该任务提交给子线程,执行方法run_job,run_job方法内部会再次校验当前时间和需要触发执行时间的时间差是否超过配置的超时时间(misfire_grace_time),如果超过了,则打印该告警信息。
在这里插入图片描述在这里插入图片描述

APScheduler 的六大模块

路径下的 executors、jobstores、schedulers、triggers、events、job 这几个模块是协同工作来实现调度任务的。下面是它们之间的关系和工作流程:
在这里插入图片描述

  1. jobstores 模块:任务存储器,用于存储任务信息和执行计划,包括 MemoryJobStore、SQLAlchemyJobStore、MongoDBJobStore 和 RedisJobStore 等多种实现。jobstores 模块将任务定义和计划信息存储在一个任务列表中,供 schedulers 模块调度任务。

  2. triggers 模块:触发器,用于确定任务的执行时间,包括 SimpleTrigger、IntervalTrigger、CronTrigger 等多种实现。triggers 模块提供了不同类型的触发器,用于根据不同的时间或事件条件触发任务的执行。

  3. job 模块:任务,用于封装要执行的任务内容,包括 Job 和 AsyncJob 两种实现。job 模块将任务的执行方法和参数封装在一个 Job 对象中,供 schedulers 模块调度。

  4. executors 模块:任务执行器,用于执行任务,包括 ThreadPoolExecutor、ProcessPoolExecutor、BlockingScheduler 等多种实现。executors 模块会根据 Job 对象中的执行方法和参数来执行任务,执行结果会被传递给 schedulers 模块。

  5. schedulers 模块:调度器,用于调度任务的执行,包括 BlockingScheduler、BackgroundScheduler、AsyncIOScheduler 和 GeventScheduler 等多种实现。schedulers 模块会从 jobstores 模块获取任务列表,并根据 triggers 模块提供的触发器信息,将需要执行的任务交给 executors 模块执行。

  6. events 模块:事件管理器,用于管理和触发各种事件,包括 JobExecutionEvent、JobSubmissionEvent 等多种实现。events 模块可以检测任务的执行结果,并触发相应的事件,供开发者进行监控和处理。

下面根据部分业务代码和源码来捋一下定时任务的全流程

一、初始化定时任务对象时,我这里配置ThreadPoolExecutoe, 最多10个线程的线程池(实际不配置的话,默认也是10)在这里插入图片描述

二、 在业务逻辑里添加定时任务:

#  最大实例数为1, interval触发器
cls.schedule.add_job(id=job_id, args=(func,), trigger="interval", max_instances=1, **params)

add_job方法的底层实现如下
/venv/lib/python3.8/site-packages/apscheduler/schedulers/base.py
在这里插入图片描述
这段源码的自我分析:将任务信息封装成Job对象,然后调用_real_add_job方法将任务添加到任务管理器中,添加时会携带任务信息(执行时间,执行次数,时间间隔等)以及触发器,调度器和下次期望被触发的时间。

三、 调度器会循环校验每个任务的执行时间,如果执行时间到,则获取到任务的触发器,触发器将任务转给执行器,执行器执行任务。具体逻辑在如下的源码中

    def _process_jobs(self):
        """
        Iterates through jobs in every jobstore, starts jobs that are due and figures out how long
        to wait for the next round.

        If the ``get_due_jobs()`` call raises an exception, a new wakeup is scheduled in at least
        ``jobstore_retry_interval`` seconds.

        """
        if self.state == STATE_PAUSED:
            self._logger.debug('Scheduler is paused -- not processing jobs')
            return None

        self._logger.debug('Looking for jobs to run')
        now = datetime.now(self.timezone)
        next_wakeup_time = None
        events = []

        with self._jobstores_lock:
            for jobstore_alias, jobstore in six.iteritems(self._jobstores):
                try:
                    due_jobs = jobstore.get_due_jobs(now)
                except Exception as e:
                    # Schedule a wakeup at least in jobstore_retry_interval seconds
                    self._logger.warning('Error getting due jobs from job store %r: %s',
                                         jobstore_alias, e)
                    retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval)
                    if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
                        next_wakeup_time = retry_wakeup_time
					# 如果任务还没到触发执行的时候,就pass
                    continue

                for job in due_jobs:
                    # Look up the job's executor
                    try:
                        executor = self._lookup_executor(job.executor)
                    except BaseException:
                        self._logger.error(
                            'Executor lookup ("%s") failed for job "%s" -- removing it from the '
                            'job store', job.executor, job)
                        self.remove_job(job.id, jobstore_alias)
                        continue

                    run_times = job._get_run_times(now)
                    run_times = run_times[-1:] if run_times and job.coalesce else run_times
                    if run_times:
                        try:
                            # 这里将任务和任务要触发的时间交给执行器,任务的实例值+1; 如果是线程池执行器,submit_job方法内部会将任务提交到线程池中执行,执行成功后,会将该任务的实例值-1。
                            executor.submit_job(job, run_times)
                        except MaxInstancesReachedError:
                            self._logger.warning(
                                'Execution of job "%s" skipped: maximum number of running '
                                'instances reached (%d)', job, job.max_instances)
                            # 将任务封装成事件,将事件添加到事件列表中
                            event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id,
                                                       jobstore_alias, run_times)
                            events.append(event)
                        except BaseException:
                            self._logger.exception('Error submitting job "%s" to executor "%s"',
                                                   job, job.executor)
                        else:
                            # 将任务封装成事件,将事件添加到事件列表中
                            event = JobSubmissionEvent(EVENT_JOB_SUBMITTED, job.id, jobstore_alias,
                                                       run_times)
                            events.append(event)

                        # Update the job if it has a next execution time.
                        # Otherwise remove it from the job store.
                        # 触发器获取到该任务下次需要被触发的时间,并更新任务管理器
                        job_next_run = job.trigger.get_next_fire_time(run_times[-1], now)
                        if job_next_run:
                            job._modify(next_run_time=job_next_run)
                            jobstore.update_job(job)
                        else:
                            self.remove_job(job.id, jobstore_alias)

                # Set a new next wakeup time if there isn't one yet or
                # the jobstore has an even earlier one
                jobstore_next_run_time = jobstore.get_next_run_time()
                if jobstore_next_run_time and (next_wakeup_time is None or
                                               jobstore_next_run_time < next_wakeup_time):
                    next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone)

        # Dispatch collected events
        # 遍历事件列表 `events`,并分别调用 `_dispatch_event` 方法来处理每个事件。
        # 具体地说,`_dispatch_event` 方法会将事件分发给所有已注册的监听器进行处理。遍历监听器列表 `self._listeners`,
        # 并分别调用# 每个监听器的 `handle_event` 方法来处理事件。
        # 如果监听器处理事件时出现异常,会将异常记录到日志中。处理完所有监听器后,`_dispatch_event` 方法会返回。
        # 这样,整个事件分发过程就完成了。通过这个过程,我们可以实现在任务执行成功、失败、被取消等情况下触发相应的事件,并对这些事件进行监听和处理。
        #(配置事件管理器的是add_listener方法,可以全局配置的事件管理器,也可以为每个任务配置不同的管理器。)
        for event in events:
            self._dispatch_event(event)

        # Determine the delay until this method should be called again
        if self.state == STATE_PAUSED:
            wait_seconds = None
            self._logger.debug('Scheduler is paused; waiting until resume() is called')
        elif next_wakeup_time is None:
            wait_seconds = None
            self._logger.debug('No jobs; waiting until a job is added')
        else:
            wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX)
            self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time,
                               wait_seconds)
		# 这里的 wait_seconds 就是再次触发_process_jobs方法被调用时,需要阻塞的时间,单位 秒。
        return wait_seconds

这段源码的自我分析:调度器会轮训任务管理器中的每一个任务,如果任务要被触发,会将任务交给执行器,同时将任务添加到事件管理列表中。然后获取任务的下一次触发时间,如果没有获取到,则将该任务移除。最后获取一个该方法被循环调用的阻塞时间。

四、 _process_jobs方法实际在调度器启动时就已经被注册调用了,具体的源码如下图:
在这里插入图片描述
wakeup 方法仅仅是调用了 _event.set() 方法,该方法会设置一个事件。当调度器处于休眠状态时,等待 _event 事件被设置,一旦事件被设置,调度器就会立即执行 _process_jobs 方法,检查所有的任务是否到了执行时间。

总结整个流程

  1. 调用start方法启动定时任务
  2. 调用add_job方法添加定时任务,定时任务被存入任务管理器,此时会调用wakeup方法触发_process_jobs方法执行
  3. _process_jobs方法做的事情比较多
    a. 会将任务和任务要触发的时间交给执行器,任务的实例值+1;
    b. 如果是线程池执行器,submit_job方法内部会将任务提交到线程池中执行,执行成功后,会将该任务的实例值-1.
    c. 最后将任务封装成事件,将事件添加到事件列表中
    d. 如果任务配置了事件管理器,则执行注册的事件
    e. 获取阻塞_process_jobs方法的时间
  4. 下次调用_process_jobs方法继续监控任务列表

架构图

引用这位博主的一张架构图, 感谢~
在这里插入图片描述

任务被Miss的原因

通过整个流程的梳理,项目中任务被Miss掉的真正原因如下:

  1. 任务量太大,大概500个定时任务
  2. 任务触发间隔时间太短, 5秒钟触发一次
  3. 线程池配置比较小。感觉这里也不应该配置太多,太多的话,资源消耗肯定会增加
  4. 任务里有对第三方服务发起http请求,比较耗时。这里也许可以用

解决方案

批处理:下游服务提供批量查询的功能,我的服务就不用一个个查询了,一下子解决了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/23121.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

2022年深圳杯数学建模B题基于用电可靠性的配电网规划解题全过程文档及程序

2022年深圳杯数学建模 B题 基于用电可靠性的配电网规划 原题再现&#xff1a; 如果一批用户变压器&#xff08;下面简称用户&#xff09;仅由一个电源变电站&#xff08;下面简称电源&#xff09;供电&#xff0c;称为单供。这时配电网由电线和开关联接成以电源为根节点的树状…

Transformer应用之构建聊天机器人(二)

四、模型训练解析 在PyTorch提供的“Chatbot Tutorial”中&#xff0c;关于训练提到了2个小技巧&#xff1a; 使用”teacher forcing”模式&#xff0c;通过设置参数“teacher_forcing_ratio”来决定是否需要使用当前标签词汇来作为decoder的下一个输入&#xff0c;而不是把d…

< ElementUi组件库: el-progress 进度条Bug及样式调整 >

ElementUi组件库&#xff1a; el-progress 进度条Bug及样式调整 &#x1f449; 前言&#x1f449; 一、实现原理> 修改 el-progress 进度条样式 及 渐变进度条样式 &#x1f449; 二、案例代码&#xff08;前言效果图案例&#xff09;> HTML代码> CSS代码 &#x1f44…

C++学习day--12 循环的应用,暴力破解密码和输出动图

第 1 节 职场修炼&#xff1a;程序员到底能干多久 现状&#xff1a; 很多程序员&#xff0c;过了 30 岁&#xff0c;纷纷转行。 原因&#xff1a; 1 &#xff09;薪资过万后&#xff0c;很难进一步提升 2 &#xff09;可替代性高&#xff0c;在新人面前&#xff0c;没有…

SolVES模型在生态系统服务社会价值评估中的运用

SolVES模型&#xff08;Social Values for Ecosystem Services&#xff09;全称为生态系统服务社会价值模型&#xff0c;是由美国地质勘探局和美国科罗拉多州立大学联合开发的一款地理信息系统应用程序&#xff0c;开发该模型的目的主要是对生态系统服务功能中的社会价值进行空…

全面了解Java连接MySQL的基础知识,快速实现数据交互

全面了解Java连接MySQL的基础知识&#xff0c;快速实现数据交互 1. 数据库的重要性2. MySQL数据库简介2.1 MySQL数据库的基本概念2.2 MySQL的基本组成部分包括服务器、客户端和存储引擎。2.3 安装MySQL数据库2.3.1安装MySQL数据库2.3.2 下载MySQL安装程序2.3.3 运行MySQL安装程…

帽子设计作品——蒸汽朋克的乌托邦,机械配件的幻想世界!

蒸汽朋克是由蒸汽steam和朋克punk两个词组成&#xff0c; 蒸汽代表着以蒸汽机作为动力的大型机械&#xff0c;而朋克则代表一种反抗、叛逆的精神。 蒸汽朋克的作品通常以蒸汽时代为背景&#xff0c;通过如新能源、新机械、新材料、新交通工具等新技术&#xff0c;使画面充满想…

理解深度可分离卷积

1、常规卷积 常规卷积中&#xff0c;连接的上一层一般具有多个通道&#xff08;这里假设为n个通道&#xff09;&#xff0c;因此在做卷积时&#xff0c;一个滤波器&#xff08;filter&#xff09;必须具有n个卷积核&#xff08;kernel&#xff09;来与之对应。一个滤波器完成一…

PMP课堂模拟题目及解析(第13期)

121. 项目经理、团队成员以及若干干系人共同参与一次风险研讨会。已经根据风险管理计划生成并提供一份风险报告。若要为各个项目风险进行优先级排序&#xff0c;现在必须执行哪一项分析&#xff1f; A. 定量风险分析 B. 根本原因分析 C. 偏差分析 D. 定性风险分析 122. …

带你手撕链式二叉树—【C语言】

前言&#xff1a; 普通二叉树的增删查改没有意义&#xff1f;那我们为什么要先学习普通二叉树呢&#xff1f; 给出以下两点理由&#xff1a; 1.为后面学习更加复杂的二叉树打基础。&#xff08;搜索二叉树、ALV树、红黑树、B树系列—多叉平衡搜索树&#xff09; 2.有很多二叉树…

Linux安装MongoDB数据库并内网穿透在外远程访问

文章目录 前言1.配置Mongodb源2.安装MongoDB数据库3.局域网连接测试4.安装cpolar内网穿透5.配置公网访问地址6.公网远程连接7.固定连接公网地址8.使用固定公网地址连接 转发自CSDN cpolarlisa的文章&#xff1a;Linux服务器安装部署MongoDB数据库 - 无公网IP远程连接「内网穿透…

亚马逊开放个人卖家验证入口?亚马逊卖家验证到底怎么搞?

亚马逊卖家账户的安全对于所有卖家来说都非常重要。如果卖家想要在亚马逊上长期稳定地发展&#xff0c;赚取更多的钱并推出更多热卖产品&#xff0c;就必须确保他们的亚马逊卖家账户安全&#xff0c;特别是一直存在的亚马逊账户验证问题。 近期&#xff0c;根据亚马逊官方披露的…

开发敏捷高效 | 云原生应用开发与运维新范式

5 月 18 日&#xff0c;腾讯云举办了 Techo Day 腾讯技术开放日&#xff0c;以「开箱吧&#xff01;腾讯云」为栏目&#xff0c;对外发布和升级了腾讯自研的一系列云原生产品和工具。其中&#xff0c;腾讯云开发者产品中心总经理刘毅围绕“开发敏捷高效”这一话题&#xff0c;分…

单体项目偶遇并发漏洞!短短一夜时间竟让老板蒸发197.83元

事先声明&#xff1a;以下故事基于真实事件而改编&#xff0c;如有雷同&#xff0c;纯属巧合~ 眼下这位正襟危坐的男子&#xff0c;名为小竹&#xff0c;他正是本次事件的主人公&#xff0c;也即将成为熊猫集团的被告&#xff0c;嗯&#xff1f;这究竟怎么一回事&#xff1f;欲…

手写简单的RPC框架(一)

一、RPC简介 1、什么是RPC RPC&#xff08;Remote Procedure Call&#xff09;远程过程调用协议&#xff0c;一种通过网络从远程计算机上请求服务&#xff0c;而不需要了解底层网络技术的协议。RPC它假定某些协议的存在&#xff0c;例如TPC/UDP等&#xff0c;为通信程序之间携…

PMP考试应该要如何备考?如何短期通过PMP?

我从新考纲考完下来&#xff0c;3A通过了考试&#xff0c;最开始也被折磨过一段时间&#xff0c;但是后面还是找到了方法&#xff0c;也算有点经验&#xff0c;给大家分享一下吧。 程序猿应该是考PMP里面人最多的&#xff0c;毕竟有一个30大坎&#xff0c;大部分人还是考虑转型…

什么是网络编程

目录 一、什么是网络编程&#xff1f; 二、协议 1.用户数据报协议(User Datagram Protocol) 2.TCP协议 TCP三次握手过程 三、实例 1.UDP通信程序 实现步骤 TCP接收数据 四、TCP协议和UDP协议的区别和联系 一、什么是网络编程&#xff1f; 1.在网络通信协议下&#xf…

一图看懂!RK3568与RK3399怎么选?

▎简介 RK3568和RK3399都是Rockchip公司的处理器&#xff0c;具有不同的特点和适用场景。以下是它们的主要区别和应用场景。 ▎RK3568 RK3568是新一代的高性能处理器&#xff0c;采用了22nm工艺&#xff0c;具有更高的性能和更低的功耗。它支持4K视频解码和编码&#xff0c;支持…

电脑如何查找重复文件?轻松揪出它!

电脑如何查找重复文件&#xff1f;小编每天要接触各种文档、图片等资料&#xff0c;很多时候下载了一些图片后&#xff0c;我根本记不住&#xff0c;下次看到不错的图片&#xff0c;我又会下载下来&#xff0c;结果就是和之前下载的图片是一样的内容。下载的重复文件多了&#…

人员定位及轨迹管理技术原理及应用领域

人员定位及轨迹管理的实现涉及多种技术和设备。例如&#xff0c;在GPS定位方面&#xff0c;使用卫星系统可以提供全球范围内的准确定位信息。然而&#xff0c;GPS在室内环境下的信号覆盖可能存在限制&#xff0c;因此在室内定位应用中&#xff0c;常常采用无线传感器网络&#…