定时任务——xxl-job源码解析

摘要

本文深入解析了xxl-job的源码,xxl-job是一个分布式任务调度平台,其核心设计思想是将调度行为抽象成“调度中心”,而任务逻辑则由“执行器”处理,实现调度与任务的解耦。文章详细介绍了调度器和执行器的初始化流程、任务执行机制,并探讨了xxl-job的关键组件和线程池的设计,以及任务触发和执行的具体实现。

1. xxl-job设计核心思想

将调度行为抽象形成“调度中心”公共平台而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。 将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。 因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;

1.1. 调度模块(调度中心):

负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;

支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。

1.2. 执行模块(执行器):

负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效; 接收“调度中心”的执行请求、终止请求和日志请求等。

2. xxl-job调度器的初始化源码解读

2.1. xxl-job 调度器的整体初始化流程
 

2.2. XxlJobAdminConfig

@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {

    private static XxlJobAdminConfig adminConfig = null;
    public static XxlJobAdminConfig getAdminConfig() {
        return adminConfig;
    }
    // ---------------------- XxlJobScheduler ----------------------
    private XxlJobScheduler xxlJobScheduler;
    /**
    * 实现了该接口的bean会在Spring容器注入完所有的依赖之后自动调用afterPropertiesSet方法
    * 
    */
    @Override
    public void afterPropertiesSet() throws Exception {
        adminConfig = this;

        xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
    }

    @Override
    public void destroy() throws Exception {
        xxlJobScheduler.destroy();
    }
    ......
}

2.3. xxlJobScheduler

public class XxlJobScheduler  {
    private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);


    public void init() throws Exception {
        // init i18n
        initI18n();

        // admin trigger pool start
        JobTriggerPoolHelper.toStart();

        // admin registry monitor run
        JobRegistryHelper.getInstance().start();

        // admin fail-monitor run
        JobFailMonitorHelper.getInstance().start();

        // admin lose-monitor run ( depend on JobTriggerPoolHelper )
        JobCompleteHelper.getInstance().start();

        // admin log report start
        JobLogReportHelper.getInstance().start();

        // start-schedule  ( depend on JobTriggerPoolHelper )
        JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init xxl-job admin success.");
    }
	......
}

2.4. JobTriggerPoolHelper

start 方法中定义了一个快线程池和慢线程池,快线程池的最大线程数量最小为200,慢线程池的最大线程数量最小为100。 快慢线程池的设计思想是为了解决任务执行器并发度控制的问题,让任务执行器在任务量较大时能够快速响应,同时在任务量较小时能够节省资源。具体来说,快线程池和慢线程池的作用如下:

  1. 快线程池:用于执行任务量较大的任务,线程数较多,执行速度较快,能够快速响应任务。
  2. 慢线程池:用于执行任务量较小的任务,线程数较少,执行速度较慢,能够节省资源。
public void start(){
    fastTriggerPool = new ThreadPoolExecutor(
        10,
        XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
        60L,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<Runnable>(1000),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
            }
        });

    slowTriggerPool = new ThreadPoolExecutor(
        10,
        XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
        60L,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<Runnable>(2000),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
            }
        });
}

2.5. JobRegistryHelper

创建一个线程池 registryOrRemoveThreadPool 用于注册或删除任务,创建一个后台守护线程 registryMonitorThread ,使用了一个死循环每隔30秒执行一次,删除超时的注册表信息,更新xxl_job_group执行器地址列表。

public void start(){

  // for registry or remove
  registryOrRemoveThreadPool = new ThreadPoolExecutor(
      2,
      10,
      30L,
      TimeUnit.SECONDS,
      new LinkedBlockingQueue<Runnable>(2000),
      new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
          return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
        }
      },
      new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
          r.run();
          logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
        }
      });

  // 创建一个守护线程 registryMonitorThread
  registryMonitorThread = new Thread(new Runnable() {
    @Override
    public void run() {
      while (!toStop) {
        try {
          // 查询执行器地址类型是自动注册的执行器信息表
          List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
          if (groupList!=null && !groupList.isEmpty()) {

            // 查找xxl_job_registry表中超时了90秒的注册信息
            List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
            // 如果超时 ids 集合不为空,则直接删除这些数据
            if (ids!=null && ids.size()>0) {
              XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
            }

            HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();

            // 查询所有未过期的注册信息,将注册类型为EXECUTOR的XxlJobRegistry集合改装成appname=>设置触发器的ip地址
            List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
            if (list != null) {
              for (XxlJobRegistry item: list) {
                if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                  String appname = item.getRegistryKey();
                  List<String> registryList = appAddressMap.get(appname);
                  if (registryList == null) {
                    registryList = new ArrayList<String>();
                  }

                  if (!registryList.contains(item.getRegistryValue())) {
                    registryList.add(item.getRegistryValue());
                  }
                  appAddressMap.put(appname, registryList);
                }
              }
            }

            // 遍历 XxlJobGroup 列表,将 appname 对应的注册器 IP 地址转化为 IP1,IP2,IP3 的形式,
            // 并更新到xxl_job_group表中
            for (XxlJobGroup group: groupList) {
              List<String> registryList = appAddressMap.get(group.getAppname());
              String addressListStr = null;
              if (registryList!=null && !registryList.isEmpty()) {
                Collections.sort(registryList);
                StringBuilder addressListSB = new StringBuilder();
                for (String item:registryList) {
                  addressListSB.append(item).append(",");
                }
                addressListStr = addressListSB.toString();
                addressListStr = addressListStr.substring(0, addressListStr.length()-1);
              }
              group.setAddressList(addressListStr);
              group.setUpdateTime(new Date());

              XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
            }
          }
        } catch (Exception e) {
          if (!toStop) {
            logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
          }
        }
        try {
          // 每 30 秒执行一次
          TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
        } catch (InterruptedException e) {
          if (!toStop) {
            logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
          }
        }
      }
      logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
    }
  });
  registryMonitorThread.setDaemon(true);
  registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
  registryMonitorThread.start();
}

2.6. JobFailMonitorHelper

创建一个守护线程monitorThread ,如果失败任务设置了重试机制,则触发重试流程,设置了告警策略,则会根据告警策略触发告警操作。

public void start(){
    monitorThread = new Thread(new Runnable() {

        @Override
        public void run() {

            // monitor
            while (!toStop) {
                try {
                    // 查询前1000条执行器执行失败且告警状态是默认的 xxl_job_log 表的 id 集合
                    List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
                    if (failLogIds!=null && !failLogIds.isEmpty()) {
                        for (long failLogId: failLogIds) {

                            // 以乐观锁的方式将日志的告警状态设置为 -1
                            int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
                            if (lockRet < 1) {
                                continue;
                            }
                            XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
                            XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

                            // 1、如果设置的失败重复次数大于0,则再次执行触发器,并更新日志信息
                            if (log.getExecutorFailRetryCount() > 0) {
                                JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
                                String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
                                log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
                                XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
                            }

                            int newAlarmStatus = 0;		// 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
                            // 2、如果报警器信息不为空,则触发报警操作,并更新告警状态
                            if (info != null) {
                                boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
                                newAlarmStatus = alarmResult?2:3;
                            } else {
                                newAlarmStatus = 1;
                            }

                            XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
                        }
                    }

                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
                    }
                }

                try {
                    // 每10ms需要执行一次
                    TimeUnit.SECONDS.sleep(10);
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }

            }

            logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");

        }
    });
    monitorThread.setDaemon(true);
    monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
    monitorThread.start();
}

2.7. JobCompleteHelper

创建一个回调线程池 callbackThreadPool 和一个守护线程monitorThread,守护线程负责将调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;

public void start(){

    // for callback
    callbackThreadPool = new ThreadPoolExecutor(
            2,
            20,
            30L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(3000),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode());
                }
            },
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    r.run();
                    logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");
                }
            });


    // for monitor
    monitorThread = new Thread(new Runnable() {

        @Override
        public void run() {

            // wait for JobTriggerPoolHelper-init
            try {
                TimeUnit.MILLISECONDS.sleep(50);
            } catch (InterruptedException e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }

            // monitor
            while (!toStop) {
                try {
                    // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
                    Date losedTime = DateUtil.addMinutes(new Date(), -10);
                    List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);

                    if (losedJobIds!=null && losedJobIds.size()>0) {
                        for (Long logId: losedJobIds) {

                            XxlJobLog jobLog = new XxlJobLog();
                            jobLog.setId(logId);

                            jobLog.setHandleTime(new Date());
                            jobLog.setHandleCode(ReturnT.FAIL_CODE);
                            jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );

                            XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
                        }

                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
                    }
                }

                try {
                    TimeUnit.SECONDS.sleep(60);
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }

            }

            logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");

        }
    });
    monitorThread.setDaemon(true);
    monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");
    monitorThread.start();
}

2.8. JobLogReportHelper

每分钟刷新一次日志报告,包括当天的运行情况,包括运行次数、成功次数、失败次数等,并将这些信息保存到数据库中。 每天执行一次日志清理,删除指定天数前的日志数据。

public void start(){
    logrThread = new Thread(new Runnable() {

        @Override
        public void run() {

            // 最近一次清理日志的时间
            long lastCleanLogTime = 0;


            while (!toStop) {

                // 1、log-report refresh: refresh log report in 3 days
                try {
                    // 分别统计今天、昨天、前天 0到24点的 总调用次数、成功调用次数、失败调用次数以及正在运行的次数
                    for (int i = 0; i < 3; i++) {

                        // today
                        Calendar itemDay = Calendar.getInstance();
                        itemDay.add(Calendar.DAY_OF_MONTH, -i);
                        itemDay.set(Calendar.HOUR_OF_DAY, 0);
                        itemDay.set(Calendar.MINUTE, 0);
                        itemDay.set(Calendar.SECOND, 0);
                        itemDay.set(Calendar.MILLISECOND, 0);

                        Date todayFrom = itemDay.getTime();

                        itemDay.set(Calendar.HOUR_OF_DAY, 23);
                        itemDay.set(Calendar.MINUTE, 59);
                        itemDay.set(Calendar.SECOND, 59);
                        itemDay.set(Calendar.MILLISECOND, 999);

                        Date todayTo = itemDay.getTime();

                        // refresh log-report every minute
                        XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
                        xxlJobLogReport.setTriggerDay(todayFrom);
                        xxlJobLogReport.setRunningCount(0);
                        xxlJobLogReport.setSucCount(0);
                        xxlJobLogReport.setFailCount(0);

                        Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
                        if (triggerCountMap!=null && triggerCountMap.size()>0) {
                            int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
                            int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
                            int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
                            int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;

                            xxlJobLogReport.setRunningCount(triggerDayCountRunning);
                            xxlJobLogReport.setSucCount(triggerDayCountSuc);
                            xxlJobLogReport.setFailCount(triggerDayCountFail);
                        }

                        // 数据当天数据存在就更新,不存在就新建
                        int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
                        if (ret < 1) {
                            XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
                        }
                    }

                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);
                    }
                }

                // 如果设置了日志保留时间且最近24小时内没有清理过日志,则进入if语句内
                if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0
                        && System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {

                    // 根据日志保留时间计算日志过期的时间
                    Calendar expiredDay = Calendar.getInstance();
                    expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());
                    expiredDay.set(Calendar.HOUR_OF_DAY, 0);
                    expiredDay.set(Calendar.MINUTE, 0);
                    expiredDay.set(Calendar.SECOND, 0);
                    expiredDay.set(Calendar.MILLISECOND, 0);
                    Date clearBeforeTime = expiredDay.getTime();

                    // 清除过期的日志
                    List<Long> logIds = null;
                    do {
                        logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);
                        if (logIds!=null && logIds.size()>0) {
                            XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);
                        }
                    } while (logIds!=null && logIds.size()>0);

                    // 更新最近一次清除日志的时间,最近24小时不再执行清理操作
                    lastCleanLogTime = System.currentTimeMillis();
                }

                try {
                    // 每分钟执行一次
                    TimeUnit.MINUTES.sleep(1);
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }

            }

            logger.info(">>>>>>>>>>> xxl-job, job log report thread stop");

        }
    });
    logrThread.setDaemon(true);
    logrThread.setName("xxl-job, admin JobLogReportHelper");
    logrThread.start();
}

2.9. JobScheduleHelper

在这个类中,定义了两个线程,第一个线程是定时任务扫描处理线程,第二个线程则是一个时间轮线程。

scheduleThread线程中做的事情是将JobInfo中即将要执行的任务取出来(5秒的预读时间),然后根据三种不同的情况分别进行处理:

  1. 如果当前时间大于任务触发时间+5秒,说明这个任务漏触发,根据触发漏发策略决定是否执行任务。
  2. 如果当前时间大于任务的触发时间且小于触发时间+5秒,触发任务,计算下一次任务执行时间,如果下一次任务执行时间在五秒内,则放入时间轮。
  3. 其他情况,任务还没到时间触发,则放入时间轮中。

ringThread线程将时间轮中的任务按秒触发。

   public void start(){

        // schedule thread
        scheduleThread = new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    // 使线程的执行时间对齐到整秒
                    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                } catch (InterruptedException e) {
                    if (!scheduleThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
                // 计算预读的任务数,这个数量由参数控制
                // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
                int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

                while (!scheduleThreadToStop) {

                    // Scan Job
                    long start = System.currentTimeMillis();

                    Connection conn = null;
                    Boolean connAutoCommit = null;
                    PreparedStatement preparedStatement = null;

                    boolean preReadSuc = true;
                    try {
                        // 获取连接,设置自动提交为False
                        conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        connAutoCommit = conn.getAutoCommit();
                        conn.setAutoCommit(false);
                        // 通过for update语句锁住行,如果集群部署情况下同一时间只有一个线程可以执行后续的操作
                        preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                        preparedStatement.execute();

                        // tx start

                        // 1、pre read
                        long nowTime = System.currentTimeMillis();
                        // 将当前时间+5秒要触发的任务全部读取出来
                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                        if (scheduleList!=null && scheduleList.size()>0) {
                            // 2、push time-ring
                            for (XxlJobInfo jobInfo: scheduleList) {
                                // 如果当前时间大于任务触发时间+5秒,则进入第一个IF,说明任务到期但未触发,判断MisfireStrategy是立刻执行(FIRE_ONCE_NOW)
                                // 还是不做任何事情(DO_NOTHING),如果是FIRE_ONCE_NOW则立刻触发一次
                                // 然后刷新下一次的触发时间
                                // time-ring jump
                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                    // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                    logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                                    // 1、misfire match
                                    MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                    if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                        // FIRE_ONCE_NOW 》 trigger
                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                        logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                    }

                                    // 2、fresh next
                                    refreshNextValidTime(jobInfo, new Date());

                                } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                    // 如果当前时间大于任务的触发时间且小于触发时间+5秒,进入这个代码模块中
                                    // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

                                    // 直接触发一次任务
                                    // 1、trigger
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

                                    // 刷新下一次触发时间
                                    // 2、fresh next
                                    refreshNextValidTime(jobInfo, new Date());

                                    // 如果下一次触发时间在5秒内,则把任务放入时间轮中
                                    // next-trigger-time in 5s, pre-read again
                                    if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

                                        // 1、make ring second
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                        // 2、push time ring
                                        pushTimeRing(ringSecond, jobInfo.getId());

                                        // 3、fresh next
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                    }

                                } else {
                                    // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
                                    // 除了上面两种情况,其他情况都加入到时间轮中
                                    // 1、make ring second
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                    // 2、push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());

                                    // 3、fresh next
                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                }

                            }
                            // 更新任务触发情况
                            // 3、update trigger info
                            for (XxlJobInfo jobInfo: scheduleList) {
                                XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                            }

                        } else {
                            preReadSuc = false;
                        }

                        // tx stop


                    } catch (Exception e) {
                        if (!scheduleThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                        }
                    } finally {

                        // commit
                        if (conn != null) {
                            try {
                                // 将事务提交
                                conn.commit();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.setAutoCommit(connAutoCommit);
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.close();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }

                        // close PreparedStatement
                        if (null != preparedStatement) {
                            try {
                                preparedStatement.close();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }
                    }
                    long cost = System.currentTimeMillis()-start;

                    // 如果花费时间小于1s,则等待1s后
                    // Wait seconds, align second
                    if (cost < 1000) {  // scan-overtime, not wait
                        try {
                            // pre-read period: success > scan each second; fail > skip this period;
                            TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                        } catch (InterruptedException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }

                }

                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
            }
        });
        scheduleThread.setDaemon(true);
        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
        scheduleThread.start();

        // 时间轮线程
        // ring thread
        ringThread = new Thread(new Runnable() {
            @Override
            public void run() {

                while (!ringThreadToStop) {

                    // align second
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                    } catch (InterruptedException e) {
                        if (!ringThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                    try {
                        // 每次从时间轮中取出当前秒和前一秒的数据进行处理
                        // second data
                        List<Integer> ringItemData = new ArrayList<>();
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                        for (int i = 0; i < 2; i++) {
                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if (tmpData != null) {
                                ringItemData.addAll(tmpData);
                            }
                        }

                        // ring trigger
                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                        if (ringItemData.size() > 0) {
                            // do trigger
                            for (int jobId: ringItemData) {
                                // do trigger
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                            }
                            // clear
                            ringItemData.clear();
                        }
                    } catch (Exception e) {
                        if (!ringThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
            }
        });
        ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();
    }

3. xxl-job执行器的初始化源码解读

在XxlJob源码的samples代码中,配置文件里可以看到这样一段配置

在这段配置里配置了Xxl所需要的配置信息,包括地址、IP、端口等信息,这里的XxlJobSpringExecutor是一个任务执行器。

xxl-job任务执行器涉及到的类主要三个:XxlJobExecutor、XxlJobSimpleExecutor和XxlJobSpringExecutor。其中XxlJobSimpleExecutor和XxlJobSpringExecutor继承自XxlJobExecutor,两个类分别用于处理普通任务和Spring任务。

3.1. XxlJobExecutor

// 调度中心配置的地址
private String adminAddresses;
// 访问控制的token
private String accessToken;
// 本地应用的名字
private String appname;
// 本地的地址,也就是注册地址,如果设置了值就忽略ip+端口的地址
private String address;
// 本地的ip
private String ip;
// 本地的端口
private int port;
// 日志路径
private String logPath;
// 保留日志的天数
private int logRetentionDays;

接着定义了一个start方法和destroy方法,来看看start方法

public void start() throws Exception {

    // init logpath
    // 初始化日志路径
    XxlJobFileAppender.initLogPath(logPath);

    // init invoker, admin-client
    // 初始化客户端地址信息
    initAdminBizList(adminAddresses, accessToken);


    // init JobLogFileCleanThread
    // 初始化日志文件清理线程
    JobLogFileCleanThread.getInstance().start(logRetentionDays);

    // init TriggerCallbackThread
    // 初始化触发回调接口
    TriggerCallbackThread.getInstance().start();

    // init executor-server
	// 初始化执行器服务器端
    initEmbedServer(address, ip, port, appname, accessToken);
}

start()方法中定义了五个方法的初始化流程,我们依次来看一下

3.1.1. initLogPath()

这个方法是初始化日志相关的信息,初始化了日志文件的路径和glue文件的路径。

private static String logBasePath = "/data/applogs/xxl-job/jobhandler";
private static String glueSrcPath = logBasePath.concat("/gluesource");
public static void initLogPath(String logPath){
    // init
    if (logPath!=null && logPath.trim().length()>0) {
        logBasePath = logPath;
    }
    // mk base dir
    File logPathDir = new File(logBasePath);
    if (!logPathDir.exists()) {
        logPathDir.mkdirs();
    }
    logBasePath = logPathDir.getPath();

    // mk glue dir
    File glueBaseDir = new File(logPathDir, "gluesource");
    if (!glueBaseDir.exists()) {
        glueBaseDir.mkdirs();
    }
    glueSrcPath = glueBaseDir.getPath();
}

3.1.2. initAdminBizList()

    private static List<AdminBiz> adminBizList;
    private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
        if (adminAddresses!=null && adminAddresses.trim().length()>0) {
            for (String address: adminAddresses.trim().split(",")) {
                if (address!=null && address.trim().length()>0) {

                    AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);

                    if (adminBizList == null) {
                        adminBizList = new ArrayList<AdminBiz>();
                    }
                    adminBizList.add(adminBiz);
                }
            }
        }
    }

这个方法主要做了下面几件事情

  1. 通过解析传入的adminAddresses参数,将调度中心的地址拆分为多个地址字符串。
  2. 遍历每个地址字符串,创建一个AdminBizClient对象,该对象负责与对应的调度中心地址建立连接和进行通信。AdminBizClient是XxlJob提供的RPC调用客户端,用于与调度中心进行交互。
  3. 将创建的AdminBizClient对象添加到adminBizList列表中,以便后续使用。

总结起来,这段代码的作用是根据传入的调度中心地址和访问令牌,创建与调度中心的连接,并将连接对象存储在adminBizList列表中,以便后续使用。通过这个列表,可以实现与调度中心的交互,例如获取任务、上报任务执行结果等操作。

3.1.3. JobLogFileCleanThread

JobLogFileCleanThread 方法初始化了一个清理日志的线程,这个方法实现了一个定时清理过期日志文件的功能,通过启动一个后台线程,在每天固定时间执行清理操作,删除过期的日志文件。


/**
 * 1.获取日志文件所在的目录,并获取该目录下的所有子目录(即日期目录)。
 * 2.获取当前日期(不包含时间)。
 * 3.遍历每个子目录,进行以下判断和操作:
 *  - 如果子目录不是一个目录(即不是有效的日期目录),则跳过。
 *  - 如果子目录名称中不包含"-"字符,也跳过。
 *  - 解析子目录名称为日期对象,如果解析失败,则跳过。
 *  - 计算当前日期与子目录创建日期的时间差,如果超过logRetentionDays指定的天数,则删除该子目录及其下的所有文件。
 * 4.捕获异常并记录日志,如果不是停止状态,则记录异常信息。
 * 5.休眠1天的时间。
 * 6.重复执行上述步骤直到线程被停止。
 * @param logRetentionDays
 */
public void start(final long logRetentionDays){

    // limit min value
    if (logRetentionDays < 3 ) {
        return;
    }

    localThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (!toStop) {
                try {
                    // clean log dir, over logRetentionDays
                    File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
                    if (childDirs!=null && childDirs.length>0) {

                        // today
                        Calendar todayCal = Calendar.getInstance();
                        todayCal.set(Calendar.HOUR_OF_DAY,0);
                        todayCal.set(Calendar.MINUTE,0);
                        todayCal.set(Calendar.SECOND,0);
                        todayCal.set(Calendar.MILLISECOND,0);

                        Date todayDate = todayCal.getTime();

                        for (File childFile: childDirs) {

                            // valid
                            if (!childFile.isDirectory()) {
                                continue;
                            }
                            if (childFile.getName().indexOf("-") == -1) {
                                continue;
                            }

                            // file create date
                            Date logFileCreateDate = null;
                            try {
                                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                                logFileCreateDate = simpleDateFormat.parse(childFile.getName());
                            } catch (ParseException e) {
                                logger.error(e.getMessage(), e);
                            }
                            if (logFileCreateDate == null) {
                                continue;
                            }

                            if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {
                                FileUtil.deleteRecursively(childFile);
                            }

                        }
                    }

                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }

                }

                try {
                    TimeUnit.DAYS.sleep(1);
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destroy.");

        }
    });
    localThread.setDaemon(true);
    localThread.setName("xxl-job, executor JobLogFileCleanThread");
    localThread.start();
}

3.1.4. TriggerCallbackThread

这是一个触发回调的方法,在这个回调方法中创建了两个回调线程,第一个是回调线程


// callback
triggerCallbackThread = new Thread(new Runnable() {

    @Override
    public void run() {

        // normal callback
        while(!toStop){
            try {
                HandleCallbackParam callback = getInstance().callBackQueue.take();
                if (callback != null) {

                    // callback list param
                    // 将回调任务放到回调集合中
                    List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                    int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                    callbackParamList.add(callback);

                    // callback, will retry if error
                    // 如果集合不为空,就触发一次回调
                    if (callbackParamList!=null && callbackParamList.size()>0) {
                        doCallback(callbackParamList);
                    }
                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
        }

        // last callback
        // 在线程将要停止之后,将回调队列中剩下的回调任务全部执行
        try {
            List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
            int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
            if (callbackParamList!=null && callbackParamList.size()>0) {
                doCallback(callbackParamList);
            }
        } catch (Exception e) {
            if (!toStop) {
                logger.error(e.getMessage(), e);
            }
        }
        logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy.");

    }
});
triggerCallbackThread.setDaemon(true);
triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
triggerCallbackThread.start();

在上面线程中,在正常运行情况下,一直会通过循环取出回调任务,然后通过doCallback进行回调。当线程将要被关闭的时候,会有一段代码将回调队列中剩下的回调任务全部执行。

3.1.5. doCallback()

执行回调调用了doCallback方法

private void doCallback(List<HandleCallbackParam> callbackParamList){
    boolean callbackRet = false;
    // callback, will retry if error
    // 遍历所有的调度器服务,通过rpc执行回调方法
    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
        try {
            ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
            if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
                callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
                callbackRet = true;
                break;
            } else {
                callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
            }
        } catch (Exception e) {
            callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
        }
    }
    // 如果callback结果是失败的,则写入失败文件中
    if (!callbackRet) {
        appendFailCallbackFile(callbackParamList);
    }
}

在callback方法中,会遍历所有的调度器集合,然后每个调度器执行callback方法,这里的callback方法会通过rpc去调用调度器的方法。然后根据回调拿到的结果,执行不同的写日志的操作。

@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
    return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
}

具体的执行过程,我们在后续的文章里聊。 doCallback方法的最后会将错误写入到失败文件中,这个失败文件会用作失败重试。

private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){
    // valid
    if (callbackParamList==null || callbackParamList.size()==0) {
        return;
    }
    // append file
    // 将callback参数序列化成byte数组
    byte[] callbackParamList_bytes = JdkSerializeTool.serialize(callbackParamList);
    File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis())));
    if (callbackLogFile.exists()) {
        for (int i = 0; i < 100; i++) {
            callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) ));
            if (!callbackLogFile.exists()) {
                break;
            }
        }
    }
    // 将byte数组写入文件
    FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes);
}

第一个回调线程做的事情都讲清楚了,接下来介绍TriggerCallbackThread中的第二个线程,重试线程。

triggerRetryCallbackThread = new Thread(new Runnable() {
    @Override
    public void run() {
        while(!toStop){
            try {
                retryFailCallbackFile();
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }

            }
            try {
                TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
            } catch (InterruptedException e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
        logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy.");
    }
});
triggerRetryCallbackThread.setDaemon(true);
triggerRetryCallbackThread.start();

这个线程就做一件事情,每隔30秒执行一次retryFailCallbackFile()方法,在这个方法中,会扫描上面一步写入的失败文件,然后取出入参后再次调用doCallback方法,实现重试。

private void retryFailCallbackFile(){

    // valid
    // 对失败回调文件做一系列校验
    File callbackLogPath = new File(failCallbackFilePath);
    if (!callbackLogPath.exists()) {
        return;
    }
    if (callbackLogPath.isFile()) {
        callbackLogPath.delete();
    }
    if (!(callbackLogPath.isDirectory() && callbackLogPath.list()!=null && callbackLogPath.list().length>0)) {
        return;
    }

    // load and clear file, retry
    // 遍历回调文件,取出被序列化为byte数组的参数,执行doCallback
    for (File callbaclLogFile: callbackLogPath.listFiles()) {
        byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile);

        // avoid empty file
        if(callbackParamList_bytes == null || callbackParamList_bytes.length < 1){
            callbaclLogFile.delete();
            continue;
        }

        List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class);

        callbaclLogFile.delete();
        doCallback(callbackParamList);
    }

3.1.6. initEmbedServer()

这个方法负责初始化 xxl-job 的 EmbedServer,并配置服务器的地址、端口、应用名称和访问令牌等参数,然后启动该服务器。

private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

    // fill ip port
    // 如果传入的端口大于0,则使用传入的端口;否则,使用 NetUtil.findAvailablePort(9999) 方法查找一个可用的端口。
    port = port>0?port: NetUtil.findAvailablePort(9999);
    // 如果传入的 IP 不为空,则使用传入的 IP;否则,使用 IpUtil.getIp() 方法获取本机 IP。
    ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

    // generate address
    // 如果传入的地址为空,则根据 IP 和端口生成地址,格式为 http://{ip_port}/,其中 {ip_port} 会被替换为 IP 和端口的组合。
    if (address==null || address.trim().length()==0) {
        String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
        address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
    }

    // accessToken
    // 如果传入的访问令牌为空,则输出警告日志,提醒用户设置访问令牌以确保系统安全。
    if (accessToken==null || accessToken.trim().length()==0) {
        logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
    }

    // start
    // 创建一个 EmbedServer 实例,并调用 start() 方法启动嵌入式服务器。
    embedServer = new EmbedServer();
    embedServer.start(address, port, appname, accessToken);
}

这个EmbedServer服务是基于Netty实现的NIO服务,主要为了实现包括任务下发、执行结果上报、心跳等功能。同时,通过嵌入式的 Netty HTTP 服务器,可以方便地处理任务调度中心的 HTTP 请求,并执行相应的任务操作。

3.2. XxlJobSimpleExecutor

XxlJobExecutor有两个子类,其中一个是XxlJobSimpleExecutor,用于处理简单的Java程序,不依赖Spring容器。

@Override
public void start() {

    // init JobHandler Repository (for method)
    initJobHandlerMethodRepository(xxlJobBeanList);

    // super start
    try {
        super.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

首先执行Job任务的注册,然后执行父类的start方法。

private void initJobHandlerMethodRepository(List<Object> xxlJobBeanList) {
    if (xxlJobBeanList==null || xxlJobBeanList.size()==0) {
        return;
    }

    // init job handler from method
    for (Object bean: xxlJobBeanList) {
        // method
        Method[] methods = bean.getClass().getDeclaredMethods();
        if (methods.length == 0) {
            continue;
        }
        for (Method executeMethod : methods) {
            XxlJob xxlJob = executeMethod.getAnnotation(XxlJob.class);
            // registry
            registJobHandler(xxlJob, bean, executeMethod);
        }

    }
}

注册方法也比较简单,如果用@XxlJob注解了,就注册到JobHandler中。

3.3. XxlJobSpringExecutor

XxlJobSpringExecutor是XxlJobExecutor的另一个子类,是 xxl-job 框架提供的基于 Spring 的执行器实现类,它是一个 Spring Bean,在 Spring 容器中进行管理。

public void afterSingletonsInstantiated() {

    // init JobHandler Repository (for method)
    initJobHandlerMethodRepository(applicationContext);

    // refresh GlueFactory
    GlueFactory.refreshInstance(1);

    // super start
    try {
        super.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

XxlJobSpringExecutor的初始化也是先执行Job任务的注册,然后执行父类的start方法。中间有一个刷新GlueFactory实例的方法,Glue是xxljob支持的一种脚本执行模式。注册方法的注释已经放在代码内了。

private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
// 检查传入的 ApplicationContext 是否为空,如果为空,则直接返回。
if (applicationContext == null) {
    return;
}
// init job handler from method
// 从 ApplicationContext 中获取所有的 bean 定义名称,并遍历每个 bean。
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
// 对于每个 bean,首先检查是否有 @Lazy 注解,如果有,则跳过该 bean 的处理。否则,继续下一步。
for (String beanDefinitionName : beanDefinitionNames) {

    // get bean
    Object bean = null;
    Lazy onBean = applicationContext.findAnnotationOnBean(beanDefinitionName, Lazy.class);
    if (onBean!=null){
        logger.debug("xxl-job annotation scan, skip @Lazy Bean:{}", beanDefinitionName);
        continue;
    }else {
        bean = applicationContext.getBean(beanDefinitionName);
    }

    // filter method
    Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
    try {
        // 使用 Spring 的 MethodIntrospector 工具类,检查 bean 类中的方法,并找到被 @XxlJob 注解修饰的方法。
        annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                new MethodIntrospector.MetadataLookup<XxlJob>() {
                    @Override
                    public XxlJob inspect(Method method) {
                        return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                    }
                });
    } catch (Throwable ex) {
        logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
    }
    if (annotatedMethods==null || annotatedMethods.isEmpty()) {
        continue;
    }

    // generate and regist method job handler
    for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
        // 获取方法对象和 @XxlJob 注解的信息。
        Method executeMethod = methodXxlJobEntry.getKey();
        XxlJob xxlJob = methodXxlJobEntry.getValue();
        // regist
        // 调用 registJobHandler() 方法,注册任务处理器,将 @XxlJob 注解和方法信息传递给注册方法。
        registJobHandler(xxlJob, bean, executeMethod);
    }
}
}

4. xxl-job任务的执行源码解读

不管是主动触发执行还是被动触发执行,都会进入到JobInfoController 中的triggerJob 方法

4.1. JobInfoController

在这个接口中,首先会判断 executorParam 是否为 null,如果是 null 的话就设置executorParam参数为空,接着触发 JobTriggerPoolHelper 的 trigger 方法。

@RequestMapping("/trigger")
@ResponseBody
//@PermissionLimit(limit = false)
public ReturnT<String> triggerJob(int id, String executorParam, String addressList) {
    // force cover job param
    if (executorParam == null) {
        executorParam = "";
    }

    JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);
    return ReturnT.SUCCESS;
}

4.2. JobTriggerPoolHelper

进入trigger方法后会进入到addTrigger方法内,这个方法做了两个事情,第一按执行耗时,将任务分到快线程池和或者慢线程池。第二件事情是触发 XxlJobTrigger.trigger 来执行任务。

public void addTrigger(final int jobId,
                   final TriggerTypeEnum triggerType,
                   final int failRetryCount,
                   final String executorShardingParam,
                   final String executorParam,
                   final String addressList) {

// choose thread pool
// 刚开始所有任务都放到Fast线程池内,如果任务执行时间大于500ms的次数大于10次,就放入Slow线程池中
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
    triggerPool_ = slowTriggerPool;
}

// trigger
triggerPool_.execute(new Runnable() {
    @Override
    public void run() {

        long start = System.currentTimeMillis();

        try {
            // do trigger
            // 调用 XxlJobTrigger.trigger 方法来触发作业执行,传递相应的参数。
            XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {

            // check timeout-count-map
            // 获取当前分钟对应的时间戳,并与之前记录的分钟时间戳 minTim 进行比较。如果不一致,表示进入了新的一分钟,清空超时计数映射表。
            long minTim_now = System.currentTimeMillis()/60000;
            if (minTim != minTim_now) {
                minTim = minTim_now;
                jobTimeoutCountMap.clear();
            }

            // incr timeout-count-map
            // 对于单次执行时间超过500ms的任务,jobTimeoutCountMap 增加一次计数
            long cost = System.currentTimeMillis()-start;
            if (cost > 500) {       // ob-timeout threshold 500ms
                AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                if (timeoutCount != null) {
                    timeoutCount.incrementAndGet();
                }
            }

        }

    }
});

4.3. XxlJobTrigger

在trigger方法中,会根据传入的参数做一系列的初始化,然后执行processTrigger方法进行任务的触发。

public static void trigger(int jobId,
                           TriggerTypeEnum triggerType,
                           int failRetryCount,
                           String executorShardingParam,
                           String executorParam,
                           String addressList) {

    // load data
    // 根据 jobId 获取 XxlJobInfo 信息
    XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
    if (jobInfo == null) {
        logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
        return;
    }
    if (executorParam != null) {
        jobInfo.setExecutorParam(executorParam);
    }
    // 如果传入的失败重试次数 failRetryCount 大于等于0,则将其作为最终的失败重试次数。否则使用作业信息中配置的执行失败重试次数。
    int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
    XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

    // cover addressList
    // 如果传入的地址列表 addressList 不为null且不为空字符串,则将作业组的地址类型设置为1(手动录入),地址列表设置为传入的地址列表。
    if (addressList!=null && addressList.trim().length()>0) {
        group.setAddressType(1);
        group.setAddressList(addressList.trim());
    }

    // sharding param
    int[] shardingParam = null;
    if (executorShardingParam!=null){
        String[] shardingArr = executorShardingParam.split("/");
        if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
            shardingParam = new int[2];
            shardingParam[0] = Integer.valueOf(shardingArr[0]);
            shardingParam[1] = Integer.valueOf(shardingArr[1]);
        }
    }
    // 如果分片策略为分片广播(SHARDING_BROADCAST),并且任务组的注册地址列表不为空且不为空集合,并且 shardingParam 为null,则遍历注册地址列表触发任务
    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
            && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
            && shardingParam==null) {
        for (int i = 0; i < group.getRegistryList().size(); i++) {
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
        }
    } else {
        if (shardingParam == null) {
            shardingParam = new int[]{0, 1};
        }
        processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
    }

}

processTrigger方法主要作用是处理触发作业执行的逻辑。它初始化触发参数,确定执行器地址,触发执行器执行作业,并记录触发信息和日志。

private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
    // param
    // 阻塞策略
    ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
    // 路由策略
    ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
    // 分片参数
    String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

    // 1、save log-id
    // 保存 XxlJobLog 日志信息
    XxlJobLog jobLog = new XxlJobLog();
    jobLog.setJobGroup(jobInfo.getJobGroup());
    jobLog.setJobId(jobInfo.getId());
    jobLog.setTriggerTime(new Date());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
    logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

    // 2、init trigger-param
    // 设置触发参数
    TriggerParam triggerParam = new TriggerParam();
    triggerParam.setJobId(jobInfo.getId());
    triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
    triggerParam.setExecutorParams(jobInfo.getExecutorParam());
    triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
    triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
    triggerParam.setLogId(jobLog.getId());
    triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
    triggerParam.setGlueType(jobInfo.getGlueType());
    triggerParam.setGlueSource(jobInfo.getGlueSource());
    triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
    triggerParam.setBroadcastIndex(index);
    triggerParam.setBroadcastTotal(total);

    // 3、init address
    // 根据不同的策略,确定执行器的地址
    String address = null;
    ReturnT<String> routeAddressResult = null;
    if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
            if (index < group.getRegistryList().size()) {
                address = group.getRegistryList().get(index);
            } else {
                address = group.getRegistryList().get(0);
            }
        } else {
            routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
            if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                address = routeAddressResult.getContent();
            }
        }
    } else {
        routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
    }

    // 4、trigger remote executor
    // 根据上一步确定的地址以及执行器参数,调用 runExecutor 方法,向远程执行器触发执行作业
    ReturnT<String> triggerResult = null;
    if (address != null) {
        triggerResult = runExecutor(triggerParam, address);
    } else {
        triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
    }

    // 5、collection trigger info
    StringBuffer triggerMsgSb = new StringBuffer();
    triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
            .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
    if (shardingParam != null) {
        triggerMsgSb.append("("+shardingParam+")");
    }
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

    triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
            .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

    // 6、save log trigger-info
    // 更新作业日志的触发信息,包括执行器地址、执行器处理器、执行器参数、执行器分片参数、失败重试次数
    jobLog.setExecutorAddress(address);
    jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
    jobLog.setExecutorParam(jobInfo.getExecutorParam());
    jobLog.setExecutorShardingParam(shardingParam);
    jobLog.setExecutorFailRetryCount(finalFailRetryCount);
    //jobLog.setTriggerTime();
    jobLog.setTriggerCode(triggerResult.getCode());
    jobLog.setTriggerMsg(triggerMsgSb.toString());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

    logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}

在上面这个方法中,通过runExecutor(triggerParam, address)触发任务的执行。在这个runExecutor方法中,通过RPC的方式找到对应地址的执行器,调用执行器的run方法。并将调用的结果返回。

public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {
        // 通过RPC的方式找到对应地址的执行器,调用执行器的run方法
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }
    // 拼接调度返回结果
    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());

    runResult.setMsg(runResultSB.toString());
    return runResult;
}

4.4. ExecutorBizImpl

executorBiz.run(triggerParam),会根据传入的条件构建出IJobHandler和JobThread,最终将JobThread传入TriggerQueue等待触发。具体的细节流程我都在代码里加上中文注释了。

@Override
public ReturnT<String> run(TriggerParam triggerParam) {
    // load old:jobHandler + jobThread
    // 根据传入的JobId,加载旧的jobHandler和jobThread(如果一个任务已经执行过一次,会存入jobThreadRepository这个本地的Map里)
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

    // valid:jobHandler + jobThread
    // 根据不同的GlueType,构建不同的IJobHandler
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    // 如果触发参数中的 Glue 类型为 BEAN
    if (GlueTypeEnum.BEAN == glueTypeEnum) {

        // new jobhandler
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
        // 校验jobThread是否存在且jobHandler是否与新的jobHandler相同。如果不相同,将jobThread和jobHandler设置为 null
        // valid old jobThread
        if (jobThread!=null && jobHandler != newJobHandler) {
            // change handler, need kill old thread
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        // 如果jobHandler是null,就给jobHandler赋值为新的jobHandler
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }

    } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

        // valid old jobThread
        // 校验jobThread是否存在且JobHandler是否是 GlueJobHandler 类型且 Glue 更新时间与触发参数中的 Glue 更新时间相同
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof GlueJobHandler
                    && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change handler or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // 如果jobHandler是null,创建一个新的 GlueJobHandler
        if (jobHandler == null) {
            try {
                IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
            }
        }
    } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {

        // valid old jobThread
        // 校验jobThread是否存在且JobHandler是否是 ScriptJobHandler 类型且 Glue 更新时间与触发参数中的 Glue 更新时间相同
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof ScriptJobHandler
                        && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change script or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // 如果jobHandler是null,创建一个新的 ScriptJobHandler
        if (jobHandler == null) {
            jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
        }
    } else {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
    }

    // executor block strategy
    // 判断阻塞策略
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        // 如果阻塞策略为 DISCARD_LATER 并且jobThread正在运行或在触发队列中,则返回错误信息
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
            // 如果阻塞策略为 COVER_EARLY 并且任务线程正在运行或有队列,则终止旧的任务线程,并将任务线程设置为 null
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // kill running jobThread
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                jobThread = null;
            }
        } else {
            // just queue trigger
        }
    }

    // replace thread (new or exists invalid)
    // 如果任务线程为null,注册一个新的任务线程
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }

    // push data to queue
    // 将触发参数推送到任务线程的触发队列中,等待执行
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

4.5. JobThread

在上面这个方法中,最终会把triggerParam放入TriggerQueue中,那么真正的任务执行是在哪里呢?还是看上面这段代码,XxlJobExecutor.registJobThread这个方法中,注册了一个新的jobThread,并且通过start方法启动了这个线程。

public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
    // 新建了一个 JobThread ,并且通过start方法启动
    JobThread newJobThread = new JobThread(jobId, handler);
    newJobThread.start();
    logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});

    JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!
    if (oldJobThread != null) {
        oldJobThread.toStop(removeOldReason);
        oldJobThread.interrupt();
    }

    return newJobThread;
}

调用了线程的start方法之后,JobThread的run方法就开始执行。

@Override
public void run() {

    // init
    // 如果IJobHandler有初始化方法的话,就执行初始化方法
    try {
        handler.init();
    } catch (Throwable e) {
        logger.error(e.getMessage(), e);
    }

    // execute
    while(!toStop){
        running = false;
        idleTimes++;

        TriggerParam triggerParam = null;
        try {
            // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
            // 从 triggerQueue 中获取 TriggerParam 对象,使用 poll 方法,最多等待 3 秒。如果获取到了 TriggerParam 对象,说明有任务需要执行
            triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
            if (triggerParam!=null) {
                running = true;
                idleTimes = 0;
                triggerLogIdSet.remove(triggerParam.getLogId());

                // log filename, like "logPath/yyyy-MM-dd/9999.log"
                String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
                // 构建XxlJobContext上下文对象
                XxlJobContext xxlJobContext = new XxlJobContext(
                        triggerParam.getJobId(),
                        triggerParam.getExecutorParams(),
                        logFileName,
                        triggerParam.getBroadcastIndex(),
                        triggerParam.getBroadcastTotal());

                // init job context
                XxlJobContext.setXxlJobContext(xxlJobContext);

                // execute
                XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
                // 如果任务设置了超时时间,则通过创建一个 FutureTask,在另一个线程中执行任务,并设置超时时间进行限制。
                if (triggerParam.getExecutorTimeout() > 0) {
                    // limit timeout
                    Thread futureThread = null;
                    try {
                        FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
                            @Override
                            public Boolean call() throws Exception {

                                // init job context
                                XxlJobContext.setXxlJobContext(xxlJobContext);
                                // 通过该方法真正执行任务
                                handler.execute();
                                return true;
                            }
                        });
                        futureThread = new Thread(futureTask);
                        futureThread.start();

                        Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                    } catch (TimeoutException e) {
                        // 如果超时了,就会进入到catch方法内
                        XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
                        XxlJobHelper.log(e);

                        // handle result
                        XxlJobHelper.handleTimeout("job execute timeout ");
                    } finally {
                        futureThread.interrupt();
                    }
                } else {
                    // just execute
                    // 如果没有设置超时时间,直接执行任务
                    handler.execute();
                }

                // valid execute handle data
                // 如果任务执行失败或结果丢失,调用 XxlJobHelper.handleFail() 方法进行处理。
                // 如果任务执行成功,记录执行结果信息。
                if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
                    XxlJobHelper.handleFail("job handle result lost.");
                } else {
                    String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
                    tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
                            ?tempHandleMsg.substring(0, 50000).concat("...")
                            :tempHandleMsg;
                    XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
                }
                XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
                        + XxlJobContext.getXxlJobContext().getHandleCode()
                        + ", handleMsg = "
                        + XxlJobContext.getXxlJobContext().getHandleMsg()
                );

            } else {
                // 空闲次数计数器 idleTimes。如果 idleTimes 达到阈值(30 次),相当于有30次都没有执行对应的任务,就从执行器缓存中删除这个任务
                if (idleTimes > 30) {
                    if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lost
                        XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                    }
                }
            }
        } catch (Throwable e) {
            // 如果进入catch代码块内,记录异常信息

            if (toStop) {
                XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
            }

            // handle result
            StringWriter stringWriter = new StringWriter();
            e.printStackTrace(new PrintWriter(stringWriter));
            String errorMsg = stringWriter.toString();

            XxlJobHelper.handleFail(errorMsg);

            XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
        } finally {
            if(triggerParam != null) {
                // callback handler info
                // 如果任务线程未被停止,将推送回调信息 push 到 TriggerCallbackThread
                if (!toStop) {
                    // commonm
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                            triggerParam.getLogId(),
                            triggerParam.getLogDateTime(),
                            XxlJobContext.getXxlJobContext().getHandleCode(),
                            XxlJobContext.getXxlJobContext().getHandleMsg() )
                    );
                } else {
                    // is killed
                    // 如果任务线程被停止,则推送线程停止信息到回调线程
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                            triggerParam.getLogId(),
                            triggerParam.getLogDateTime(),
                            XxlJobContext.HANDLE_CODE_FAIL,
                            stopReason + " [job running, killed]" )
                    );
                }
            }
        }
    }


    // callback trigger request in queue
    // 如果线程被停止后触发队列不为空,则推送线程停止信息到回调线程
    while(triggerQueue !=null && triggerQueue.size()>0){
        TriggerParam triggerParam = triggerQueue.poll();
        if (triggerParam!=null) {
            // is killed
            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                    triggerParam.getLogId(),
                    triggerParam.getLogDateTime(),
                    XxlJobContext.HANDLE_CODE_FAIL,
                    stopReason + " [job not executed, in the job queue, killed.]")
            );
        }
    }

    // destroy
    // 如果执行器配置了destroyMethod方法,就执行对应的destroyMethod方法
    try {
        handler.destroy();
    } catch (Throwable e) {
        logger.error(e.getMessage(), e);
    }

    logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}

真正执行是在handler.execute(); execute方法通过反射,执行目标方法。

@Override
public void execute() throws Exception {
    Class<?>[] paramTypes = method.getParameterTypes();
    if (paramTypes.length > 0) {
        method.invoke(target, new Object[paramTypes.length]);       // method-param can not be primitive-types
    } else {
        method.invoke(target);
    }
}

博文参考

25图搞懂Xxl-Job

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/930855.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vxe-table 键盘操作,设置按键编辑方式,支持覆盖方式与追加方式

vxe-table 全键盘操作&#xff0c;按键编辑方式设置&#xff0c;覆盖方式与追加方式&#xff1b; 通过 keyboard-config.editMode 设置按键编辑方式&#xff1b;支持覆盖方式编辑和追加方式编辑 安装 npm install vxe-pc-ui4.3.15 vxe-table4.9.15// ... import VxeUI from v…

乾元通渠道商中标福州市人防信息化建设项目

乾元通渠道商中标福州市人防信息化建设项目&#xff0c;乾元通作为应急通讯设备厂家&#xff0c;为项目提供车载版多链路聚合通信保障设备 QYT-X1s。 青岛乾元通数码科技有限公司作为国家应急产业企业&#xff0c;深耕于数据调度算法研究&#xff0c;参与了多项国家及省部级信息…

【深度学习|地学应用-地震气溶胶异常解析3】气溶胶异常是地震的前兆现象之一!地震是如何影响气溶胶浓度和分布的异常变化的呢,我们该如何分析?

【深度学习|地学应用-地震气溶胶异常解析3】气溶胶异常是地震的前兆现象之一&#xff01;地震是如何影响气溶胶浓度和分布的异常变化的呢&#xff0c;我们该如何分析&#xff1f; 【深度学习|地学应用-地震气溶胶异常解析3】气溶胶异常是地震的前兆现象之一&#xff01;地震是…

C++析构函数和构造函数

一、构造函数 1.构造函数的基本概念 1.对构造函数的理解&#xff1a; 构造函数是类的一种特殊成员函数&#xff0c;其主要功能是在创建对象时进行初始化操作。它的名字与类名相同&#xff0c;并且没有返回值类型&#xff08;不能是void&#xff09;。例如&#xff0c;对于一个…

【python自动化四】日志打印

我们在进行自动化测试时&#xff0c;需要打印过程日志和结果日志等&#xff0c;这里记录下日志的相关配置。这里我们直接自己新建一个logger。 先贴上日志代码如下&#xff0c;可根据需要修改&#xff1a; import logging import os import timefrom logging.handlers import …

鸿蒙分享(一):添加模块,修改app名称图标

码仓库&#xff1a;https://gitee.com/linguanzhong/share_harmonyos 鸿蒙api:12 新建公共模块common 在entry的oh-package.json5添加dependencies&#xff0c;引入common模块 "dependencies": {"common": "file:../common" } 修改app名称&…

IDE如何安装插件实现Go to Definition

项目背景 框架&#xff1a;Cucumber Cypress 语言&#xff1a;Javascript IDE&#xff1a;vscode 需求 项目根目录cypress-automation的cypress/integration是测试用例的存放路径&#xff0c;按照不同模块不同功能创建了很多子目录&#xff0c;cucumber测试用例.feature文…

【机器学习】CatBoost 模型实践:回归与分类的全流程解析

一. 引言 本篇博客首发于掘金 https://juejin.cn/post/7441027173430018067。 PS&#xff1a;转载自己的文章也算原创吧。 在机器学习领域&#xff0c;CatBoost 是一款强大的梯度提升框架&#xff0c;特别适合处理带有类别特征的数据。本篇博客以脱敏后的保险数据集为例&#x…

HTML Input 文件上传功能全解析:从基础到优化

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

Unity在运行状态下,当物体Mesh网格发生变化时,如何让MeshCollider碰撞体也随之实时同步变化?

旧版源代码地址&#xff1a;https://download.csdn.net/download/qq_41603955/90087225?spm1001.2014.3001.5501 旧版效果展示&#xff1a; 新版加上MeshCollider后的效果&#xff1a; 注意&#xff1a;在Unity中&#xff0c;当你动态地更改物体的Mesh时&#xff0c;通常期望…

conda create -n name python=x.x 执行失败问题解决方法

今天想在anaconda环境下创建一个指定python版本为3.9的虚拟环境&#xff0c;执行命令 conda create -n DeepLearning python3.9 但是系统竟然报错 看报错信息是在镜像源里找不到下载包&#xff0c;于是对镜像源文件处理 首先删除之前的镜像通道 conda config --remove-key …

第一个 JSP 程序

一个简单的 JSP 程序&#xff1a; 使用 IDEA 开发工具新建一个 maven 项目&#xff0c;具体操作如图所示&#xff1a; 配置 Tomcat 服务器 项目结构如下图所示&#xff1a; 3. 修改 index.jsp 页面的代码&#xff1a; <% page language"java" contentType&q…

Altium Designer学习笔记 32 DRC检查_丝印调整

基于Altium Designer 23学习版&#xff0c;四层板智能小车PCB 更多AD学习笔记&#xff1a;Altium Designer学习笔记 1-5 工程创建_元件库创建Altium Designer学习笔记 6-10 异性元件库创建_原理图绘制Altium Designer学习笔记 11-15 原理图的封装 编译 检查 _PCB封装库的创建Al…

docker学习笔记(五)--docker-compose

文章目录 常用命令docker-compose是什么yml配置指令详解versionservicesimagebuildcommandportsvolumesdepends_on docker-compose.yml文件编写 常用命令 命令说明docker-compose up启动所有docker-compose服务&#xff0c;通常加上-d选项&#xff0c;让其运行在后台docker-co…

不同类型的集成技术——Bagging、Boosting、Stacking、Voting、Blending简述

目录 一、说明 二、堆叠 2.1 堆叠的工作原理&#xff1a; 2.2 例子&#xff1a; 2.3 堆叠的优点&#xff1a; 三、投票&#xff08;简单投票&#xff09; 3.1 例子&#xff1a; 3.2 投票的优点&#xff1a; 四、装袋和投票之间的区别 五、混合 6.1 混合的主要特征&#xff1a; …

ONES 功能上新|ONES Project 甘特图再度升级

ONES Project 甘特图支持展示工作项标题、进度百分比、依赖关系延迟时间等信息。 应用场景&#xff1a; 在使用甘特图规划项目任务、编排项目计划时&#xff0c;可以对甘特图区域进行配置&#xff0c;展示工作项的工作项标题、进度百分比以及依赖关系延迟时间等维度&#xff0c…

【目标检测】【反无人机目标检测】使用SEB-YOLOv8s实时检测未经授权的无人机

Real-Time Detection of Unauthorized Unmanned Aerial Vehicles Using SEB-YOLOv8s 使用SEB-YOLOv8s实时检测未经授权的无人机 论文链接 0.论文摘要 摘要&#xff1a;针对无人机的实时检测&#xff0c;复杂背景下无人机小目标容易漏检、难以检测的问题。为了在降低内存和计算…

Elasticsearch:使用 Elastic APM 监控 Android 应用程序

一、前言 人们通过私人和专业的移动应用程序在智能手机上处理越来越多的事情。 拥有成千上万甚至数百万的用户&#xff0c;确保出色的性能和可靠性是移动应用程序和相关后端服务的提供商和运营商面临的主要挑战。 了解移动应用程序的行为、崩溃的发生和类型、响应时间慢的根本…

DataSophon集成CMAK KafkaManager

本次集成基于DDP1.2.1 集成CMAK-3.0.0.6 设计的json和tar包我放网盘了. 通过网盘分享的文件&#xff1a;DDP集成CMAK 链接: https://pan.baidu.com/s/1BR70Ajj9FxvjBlsOX4Ivhw?pwdcpmc 提取码: cpmc CMAK github上提供了zip压缩包.将压缩包解压之后 在根目录下加入启动脚本…

Java——异常机制(上)

1 异常机制本质 (异常在Java里面是对象) (抛出异常&#xff1a;执行一个方法时&#xff0c;如果发生异常&#xff0c;则这个方法生成代表该异常的一个对象&#xff0c;停止当前执行路径&#xff0c;并把异常对象提交给JRE) 工作中&#xff0c;程序遇到的情况不可能完美。比如…