数据库表结构:
YyJobInfo:
public class YyJobInfo {
//定时任务id
private int id;
//该定时任务所属的执行器的id
private int jobGroup;
//定时任务描述
private String jobDesc;
//定时任务添加的时间
private Date addTime;
//定时任务的更新时间
private Date updateTime;
//负责人
private String author;
//报警邮件
private String alarmEmail;
//调度类型
private String scheduleType;
//一般为调度的cron表达式
private String scheduleConf;
//定时任务的失败策略
private String misfireStrategy;
//定时任务的路由策略
private String executorRouteStrategy;
//JobHandler的名称
private String executorHandler;
//执行器参数
private String executorParam;
//定时任务阻塞策略
private String executorBlockStrategy;
//执行超时时间
private int executorTimeout;
//失败重试次数
private int executorFailRetryCount;
//定时任务运行类型
private String glueType;
//glue的源码
private String glueSource;
//glue备注
private String glueRemark;
//glue更新时间
private Date glueUpdatetime;
//子任务id
private String childJobId;
//定时任务触发状态,0为停止,1为运行
private int triggerStatus;
//最近一次的触发时间
private long triggerLastTime;
//下一次的触发时间
private long triggerNextTime;
}
项目分层:
核心代码:
com.yy.yyjob.admin.core.thread.JobScheduleHelper
任务调度代码(有些逻辑采用模拟逻辑,后续会逐渐补全):
@Component
public class JobScheduleHelper {
@Resource
private YyJobInfoDao yyJobInfoDao;
// 调度定时任务的线程
private Thread scheduleThread;
// 创建当前类的对象
private static JobScheduleHelper instance = new JobScheduleHelper();
// 把当前类的对象暴露出去
public static JobScheduleHelper getInstance(){
return instance;
}
// 启动调度线程工作的方法
public void start(){
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
while (true){
// 从数据库中查询所有定时任务信息
List<YyJobInfo> yyJobInfoList = YyJobAdminConfig.getAdminConfig().getYyJobInfoDao().findAll();
// 得到当前时间
long time = System.currentTimeMillis();
// 遍历所有定时任务信息
for (YyJobInfo yyJobInfo : yyJobInfoList) {
if (time > yyJobInfo.getTriggerNextTime()){
// 如果大于就执行定时任务,在这里就选用集合的第一个地址
System.out.println("通知address服务器,去执行定时任务");
// 计算定时任务下一次的执行时间
Date nextTime = null;
try {
nextTime = new CronExpression(yyJobInfo.getScheduleConf()).getNextValidTimeAfter(new Date());
} catch (ParseException e) {
e.printStackTrace();
}
// 下面就是更新数据库中定时任务的操作
YyJobInfo job = new YyJobInfo();
job.setTriggerNextTime(nextTime.getTime());
System.out.println("保存job信息");
}
}
}
}
});
scheduleThread.start();
}
数据库配置代码:
/**
* @Description:
* 这个类可以说是服务端的启动入口,该类实现了Spring的InitializingBean接口,
* 所以该类中的 afterPropertiesSet方法会在容器中的bean初始化完毕后被回掉。回掉的过程中会创建xxl-job中最重要的块慢线程池
* 同时也会启动xxl-job中的时间轮
*/
@Component
public class YyJobAdminConfig implements InitializingBean, DisposableBean {
//当前类的引用,看到这里就应该想到单例模式了
//xxl-job中所有组件都是用了单例模式,通过public的静态方法把对象暴露出去
private static YyJobAdminConfig adminConfig = null;
//获得当前类对象的静态方法
public static YyJobAdminConfig getAdminConfig() {
return adminConfig;
}
@Override
public void afterPropertiesSet() throws Exception {
//为什么这里可以直接赋值呢?还是和spring的bean对象的初始化有关,XxlJobAdminConfig添加了@Component
//注解,所以会作为bean被反射创建,创建的时候会调用无参构造器
//而afterPropertiesSet方法是在容器所有的bean初始化完成式才会被回掉,所以这时候XxlJobAdminConfig对象已经
//创建完成了,直接赋值this就行
adminConfig = this;
}
@Override
public void destroy() throws Exception {
}
@Resource
private YyJobInfoDao yyJobInfoDao;
@Resource
private DataSource dataSource;
public YyJobInfoDao getYyJobInfoDao() {
return yyJobInfoDao;
}
public DataSource getDataSource() {
return dataSource;
}
总结:
目前调度中心单独部署在一台服务器中,两个定时任务程序各自部署在不同的服务器上。调度中心和定时任务程序启动的时候,会先启动调度中心,然后再启动定时任务程序。这样,定时任务程序才能把定时任务的信息和定时任务部署的服务器的ip地址发送给调度中心,然后调度中心把这些信息记录在数据库中。然后,调度中心的调度任务的线程就会开始在一个while循环中不断地扫描数据库,查找数据库能够执行的定时任务,如果有定时任务到了执行时间了,就远程通知执行定时任务的程序执行定时任务,接着在计算定时任务下一次的执行时间,记录时间到数据库中。