基于springboot分析Quartz(v2.3.2)的启动流程
最近公司的定时任务使用了Quartz框架,在开发中经常出现定任务不执行了的问题,但是我又找不到原因所在,可把我愁坏了。于是我决定看看Quartz框架是怎么调度任务的。(ps:适合用过Quart框架的同学阅读,如果从来没有用过Quartz框架的同学,可以看看我之前的文章【Quartz入门】)
如何定位到关键代码
1.通过控制台打印的关键日志入手
在程序启动时候,可以看到控制台会输出很多quartz相关的日志,从这些日志我们可以定位到quartz框架的初始化关键代码,下面是我本地启动时候打印的日志
2024-03-29T22:14:00.779+08:00 INFO 10044 --- [ main] org.quartz.core.QuartzScheduler : Scheduler meta-data: Quartz Scheduler (v2.3.2) 'quartzScheduler' with instanceId 'NON_CLUSTERED'
Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
NOT STARTED.
Currently in standby mode.
Number of jobs executed: 0
Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
Using job-store 'org.springframework.scheduling.quartz.LocalDataSourceJobStore' - which supports persistence. and is not clustered.
2024-03-29T22:14:00.779+08:00 INFO 10044 --- [ main] org.quartz.impl.StdSchedulerFactory : Quartz scheduler 'quartzScheduler' initialized from an externally provided properties instance.
2024-03-29T22:14:00.779+08:00 INFO 10044 --- [ main] org.quartz.impl.StdSchedulerFactory : Quartz scheduler version: 2.3.2
2024-03-29T22:14:00.779+08:00 INFO 10044 --- [ main] org.quartz.core.QuartzScheduler : JobFactory set to: org.springframework.scheduling.quartz.SpringBeanJobFactory@70a898b0
2024-03-29T22:14:01.496+08:00 INFO 10044 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port 8087 (http) with context path ''
2024-03-29T22:14:01.497+08:00 INFO 10044 --- [ main] o.s.s.quartz.SchedulerFactoryBean : Starting Quartz Scheduler now
我这儿就通过最后一行的打印(o.s.s.quartz.SchedulerFactoryBean : Starting Quartz Scheduler now
)定位到具体的代码中如下,并在此debug
- 可以看到
scheduler.start()
这行代码肯定是我们一个重要的突破口,从字面意思可以得知,Quartz框架在这个地方就启动了。 - 从左下角的堆栈信息可以看到quart启动流程是在context.refresh()阶段调用。
从日志定位到了关键方法,接下来我们就深入到start方法,深入到start方法,下面就看看start的核心逻辑到底在干嘛把
2.在job任务中debug分析上下文
可以看到第一个栈是SimpleThreadPool的WorkerThread内部类的一个线程,顺腾摸瓜最后定位到关键代码入口
QuartzSchedulerThread.run
分析代码
1.SchedulerFactoryBean.start
通过打印的日志定位到,代码入口SchedulerFactoryBean.start
public void start() throws SchedulerException {
//首先,检查调度器的状态,如果已经在关闭中(shuttingDown)或已经关闭(closed),则抛出 SchedulerException 异常,表示调度器无法在关闭后重新启动
if (shuttingDown|| closed) {
throw new SchedulerException(
"The Scheduler cannot be restarted after shutdown() has been called.");
}
// QTZ-212 : calling new schedulerStarting() method on the listeners
// right after entering start()
//调用 notifySchedulerListenersStarting() 方法通知调度器监听器,表示调度器即将启动
notifySchedulerListenersStarting();
//如果 initialStart 为 null,说明调度器是第一次启动:
//设置 initialStart 为当前日期和时间。
//调用作业存储器的 schedulerStarted() 方法,通知作业存储器调度器已经启动。
//调用 startPlugins() 方法,启动插件。
if (initialStart == null) {
initialStart = new Date();
this.resources.getJobStore().schedulerStarted();
startPlugins();
} else {
//如果 initialStart 不为 null,说明调度器已经启动过:
//调用作业存储器的 schedulerResumed() 方法,通知作业存储器调度器已经恢复运行。
resources.getJobStore().schedulerResumed();
}
//将调度器线程的暂停状态设置为 false,以确保调度器不处于暂停状态。
schedThread.togglePause(false);
getLog().info(
"Scheduler " + resources.getUniqueIdentifier() + " started.");
//通知调度器监听器调度器已经完全启动。
notifySchedulerListenersStarted();
}
看到这儿,嘿嘿关键代码又来咯,核心代码this.resources.getJobStore().schedulerStarted();
那我们接着分析吧
public void schedulerStarted() throws SchedulerException {
//首先,检查是否为集群模式(调用 isClustered() 方法)。
//如果是集群模式,创建并初始化集群管理线程(ClusterManager)。
//如果指定了 initializersLoader,将其设置为集群管理线程的上下文类加载器。
//调用集群管理线程的 initialize() 方法进行初始化。
if (isClustered()) {
clusterManagementThread = new ClusterManager();
if(initializersLoader != null)
clusterManagementThread.setContextClassLoader(initializersLoader);
clusterManagementThread.initialize();
} else {
try {
recoverJobs();
} catch (SchedulerException se) {
throw new SchedulerConfigException(
"Failure occured during job recovery.", se);
}
}
//初始化触发器
misfireHandler = new MisfireHandler();
if(initializersLoader != null)
misfireHandler.setContextClassLoader(initializersLoader);
misfireHandler.initialize();
schedulerRunning = true;
getLog().debug("JobStore background threads started (as scheduler was started).");
}
- clusterManagementThread.initialize 判断当前节点是否是集群中目前执行任务节点,是则发送任务调度通知
signalSchedulingChangeImmediately
public void run() {
while (!shutdown) {
if (!shutdown) {
long timeToSleep = getClusterCheckinInterval();
long transpiredTime = (System.currentTimeMillis() - lastCheckin);
timeToSleep = timeToSleep - transpiredTime;
if (timeToSleep <= 0) {
timeToSleep = 100L;
}
if(numFails > 0) {
timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
}
try {
Thread.sleep(timeToSleep);
} catch (Exception ignore) {
}
}
if (!shutdown && this.manage()) {
signalSchedulingChangeImmediately(0L);
}
}//while !shutdown
}
- misfireHandler.initialize主要就是启动一个线程,去查询错过执行的任务,立即发出调度变更的信号
signalSchedulingChangeImmediately
,并传递最早的新时间(earliestNewTime)。
@Override
public void run() {
while (!shutdown) {
long sTime = System.currentTimeMillis();
RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();
if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) {
signalSchedulingChangeImmediately(recoverMisfiredJobsResult.getEarliestNewTime());
}
if (!shutdown) {
long timeToSleep = 50l; // At least a short pause to help balance threads
if (!recoverMisfiredJobsResult.hasMoreMisfiredTriggers()) {
timeToSleep = getMisfireThreshold() - (System.currentTimeMillis() - sTime);
if (timeToSleep <= 0) {
timeToSleep = 50l;
}
if(numFails > 0) {
timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
}
}
try {
Thread.sleep(timeToSleep);
} catch (Exception ignore) {
}
}//while !shutdown
}
}
signalSchedulingChangeImmediately具体实现:QuartzSchedulerThread.signalSchedulingChange
到这儿,start方法执行已经到底了,维护了QuartzSchedulerThread类变量
public void signalSchedulingChange(long candidateNewNextFireTime) {
synchronized(sigLock) {
signaled = true;
signaledNextFireTime = candidateNewNextFireTime;
sigLock.notifyAll();
}
}
总结一下scheduler.start()
方法底层核心逻辑
- 器群模式实现启动集群线程,检查目前节点状态,如果目前节点可执行任务则标记立即执行任务调度(
JobStoreSupport.signalSchedulingChangeImmediately
) - 启动查询错过的任务线程
MisFireHandler
,去监听是否有错过的执行任务,有则发送任务调度通知(JobStoreSupport.signalSchedulingChangeImmediately
)
上面两个线程都没真正的去调度我们的任务,主要就是维护集群,发送是否要执行任务调度的信号,执行signalSchedulingChangeImmediately方法,此方法修改的就是QuartzSchedulerThread类变量,以及唤醒sigLock锁,说明有其他线程在获取sigLock,做一些事儿,估计就是真正的在做任务调度的事儿了。
接下来就可以分析QuartzSchedulerThread谁在使用sigLock,但是我没有继续分析哈哈,我是转头去job任务debug一下,看一下上下文方法栈找到调度任务的线程
2.QuartzSchedulerThread.run
通过在job任务中debug,定位到核心的run方法,接下来就是分析它在干嘛了
(SchedulerFactoryBean.afterPropertiesSet()中会进行QuartzScheduler的初始化,初始化过程有个重要的成员变量QuartzSchedulerThread
这个线程的run方法就是核心所在)
@Override
public void run() {
int acquiresFailed = 0;
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
// reset failure counter when paused, so that we don't
// wait again after unpausing
acquiresFailed = 0;
}
if (halted.get()) {
break;
}
}
// wait a bit, if reading from job store is consistently
// failing (e.g. DB is down or restarting)..
if (acquiresFailed > 1) {
try {
long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
Thread.sleep(delay);
} catch (Exception ignore) {
}
}
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
acquiresFailed = 0;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if (acquiresFailed == 0) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
if (acquiresFailed < Integer.MAX_VALUE)
acquiresFailed++;
continue;
} catch (RuntimeException e) {
if (acquiresFailed == 0) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
if (acquiresFailed < Integer.MAX_VALUE)
acquiresFailed++;
continue;
}
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
// this happens if releaseIfScheduleChangedSignificantly decided to release triggers
if(triggers.isEmpty())
continue;
// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers + "'", se);
//QTZ-179 : a problem occurred interacting with the triggers from the db
//we release them and loop again
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}
}
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
上面是 Quartz 中 QuartzSchedulerThread
类的 run()
方法的具体代码。该方法是线程运行的主要逻辑,负责获取触发器并执行作业。
以下是 run()
方法的大致流程:
- 定义一个变量
acquiresFailed
,用于记录连续获取触发器失败的次数。 - 进入一个循环,只要
halted
标志为false
,就会一直执行。 - 检查是否需要暂停调度器。
- 如果需要暂停,进入等待状态,直到调用
togglePause(false)
方法来恢复调度器。 - 如果
halted
标志为true
,跳出循环。
- 如果需要暂停,进入等待状态,直到调用
- 如果获取触发器的连续失败次数大于 1,等待一段时间。
- 等待时间由
computeDelayForRepeatedErrors()
方法计算。
- 等待时间由
- 获取可用的线程数。
- 如果有可用线程,则获取下一批触发器并执行作业。
- 获取触发器时,指定了最大批处理大小和时间窗口。
- 如果获取触发器过程中发生异常,根据失败次数进行错误处理。
- 如果获取到触发器且触发器列表不为空,等待触发器的执行时间到来。
- 如果期间发生调度器关闭、时间变化等情况,跳出循环。
- 如果触发器执行时间到达或发生了显著的调度变化,跳出循环。
- 如果触发器列表为空,跳过本次循环。
- 设置触发器为 “executing” 状态。
- 创建
JobRunShell
对象,并初始化。
- 如果发生异常,标记作业触发指令为 “SET_ALL_JOB_TRIGGERS_ERROR”。
- 在线程池中运行
JobRunShell
。
- 如果返回值为
false
,表示调度器已关闭或存在线程池的问题,进行相应的错误处理。
- 继续下一次循环,获取并执行下一批触发器。
- 如果没有可用线程,继续下一次循环。
- 计算随机的空闲等待时间,并等待一段时间。
- 如果调度计划发生变化,提前结束等待。
- 在循环中捕获并处理
RuntimeException
异常。 - 当
halted
标志为true
,跳出循环。 - 清除对调度器资源的引用,以便垃圾回收。
总结
通过启动日志、以及在任务中debug,反向推理出Quartz在springboot中的启动流程,以及Quartz框架调度任务的核心逻辑。授人以鱼不如授人以渔,希望本篇文章不仅仅能帮助大家理解Quartz,还能帮助大家学会去阅读框架源码。