在使用SpringBoot框架进行开发时,一般都是通过@Scheduled注解进行定时任务的开发:
@Component
public class TestTask
{
@Scheduled(cron="0/5 * * * * ? ") //每5秒执行一次
public void execute(){
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("任务执行" + df.format(new Date()));
}
}
但是这种方式存在一个问题,那就是任务的周期控制是死的,必须编写在代码中,如果遇到需要在系统运行过程中想中止、立即执行、修改执行周期等动态操作的需求时,使用注解的方式便不能满足了,当然为了满足此种需求可以额外再引入其他任务调度插件(例如XXL-Job等),但是引入其他组件是需要衡量成本的,额外的依赖成本、组件的维护成本、开发的复杂度等等,所以如果系统体量不是那么大,完全没必要通过增加组件来完成,可以基于SpringBoot框架实现一套内置轻量级的任务调度。
设计思路
整体设计
这里我们把定时任务以类作为基础单位,即一个类为一个任务,然后通过配置数据的方式,进行任务的读取,通过反射生成任务对象,使用SpringBoot本身的线程池任务调度,完成动态的定时任务驱动,同时通过接口支撑实现相应的REST API对外暴露接口
任务模型
首先基于模板模式,设计基础的任务执行流程抽象类,定义出一个定时任务需要执行的内容和步骤和一些通用的方法函数,后续具体的定时任务直接继承该父类,实现该父类的before、start、after三个抽象函数即可,所有公共操作均在抽象父类完成
特殊说明:
基于此方法创建的类是不归Spring的容器管理的,所以自定义的任务子类中是无法使用SpringBoot中的任何注解,尤其在自定义任务类中如果需要依赖其他Bean时,需要借助抽象父类AbstractBaseCronTask中已经实现的<T> T getServer(Class<T> className)来完成,getServer的实现如下:
public <T> T getServer(Class<T> className){
return applicationContext.getBean(className);
}
是通过SpringBoot中的ApplicationContext接口来获取Spring的上下文,以此来满足可以获取Spring中其他Bean的诉求。
例如,有个定时任务TaskOne类,它需要使用UserService类中的 caculateMoney()的方法,势必这个定时任务需要依赖UserService类,而TaskOne并非是Spring创建的对象,而是我们人为干预生成的对象,所以它是不在Spring的Bean管理范围的,自然也就无法使用@Autowird等方式注入UserService类,此时就需要使用getServer方法来获取UserService对象
//自定义定时任务类
public class TaskOne extends AbstractBaseCronTask {
private UserService userService;
public TestTask(TaskEntity taskEntity) {
super(taskEntity);
}
@Override
public void beforeJob() {
//任务运行第一步,先将userService进行变量注入
userService = getServer(UserService.class);
……
}
@Override
public void startJob() {
if(XXXX){
//直接调用getServer获取需要的bean
User user = getServer(UserMapper.class).findUser("111223")
userService.caluateMoney(user);
//……其他代码
}
}
@Override
public void afterJob() {
}
}
任务对象加载过程
核心逻辑在于利用反射,在SpringBoot启动后动态创建相应的定时任务类,并将其放置到SpringBoot的定时线程池中进行维护,同时将该对象同步存放至内存中一份,便于可以实时调用,当进行修改任务相关配置时,需要重新加载一次内容。
public class TaskScheduleServerImpl implements TaskScheduleServer {
//正在运行的任务
private static ConcurrentHashMap<String, ScheduledFuture> runningTasks = new ConcurrentHashMap<>();
//线程池任务调度
private ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
public boolean addTaskToScheduling(TaskEntity task) {
if(!runningTasks.containsKey(task.getTaskId())){
try{
Class<?> clazz = Class.forName(task.getTaskClass());
Constructor c = clazz.getConstructor(TaskEntity.class);
AbstractBaseCronTask runnable = (AbstractBaseCronTask) c.newInstance(task);
//反射方式生成对象不属于Spring容器管控,对于Spring的bean使用需要手动注入
runnable.setApplicationContext(context);
CronTrigger cron = new CronTrigger(task.getTaskCron());
//put到runTasks
runningTasks.put(task.getTaskId(), Objects.requireNonNull(this.threadPoolTaskScheduler.schedule(runnable, cron)));
//存入内存中,便于外部调用
ramTasks.put(task.getTaskId(),runnable);
task.setTaskRamStatus(1);
taskInfoOpMapper.updateTaskInfo(task);
return true;
}catch (Exception e){
log.error("定时任务加载失败..."+e);
}
}
return false;
}
}
部分源码
这里将配置内容放入数据库中,直接以数据库中的表作为任务配置的基础
/**
* 任务对象
**/
@Data
public class TaskEntity implements Serializable {
//任务唯一ID
private String taskId;
//任务名称
private String taskName;
//任务描述
private String taskDesc;
//执行周期配置
private String taskCron;
//任务类的全路径
private String taskClass;
//任务的额外配置
private String taskOutConfig;
//任务创建时间
private String taskCreateTime;
//任务是否启动,1启用,0不启用
private Integer taskIsUse;
//是否随系统启动立即执行
private Integer taskBootUp;
//任务上次执行状态
private Integer taskLastRun;
//任务是否加载至内存中
private Integer taskRamStatus;
}
核心逻辑,加载定时任务接口及其实现类
public interface TaskScheduleServer {
ConcurrentHashMap<String, AbstractBaseCronTask> getTaskSchedulingRam();
/**
* 初始化任务调度
*/
void initScheduling();
/**
* 添加任务至内存及容器
* @param taskEntity 任务实体
* @return boolean
*/
boolean addTaskToScheduling(TaskEntity taskEntity);
/**
* 从任务调度器中移除任务
* @param id 任务id
* @return Boolean
*/
boolean removeTaskFromScheduling(String id);
/**
* 执行指定任务
* @param id 任务id
* @return double 耗时
*/
double runTaskById(String id);
/**
* 清空任务
*/
void claearAllTask();
/**
* 加载所有任务
*/
void loadAllTask();
/**
* 运行开机自启任务
*/
void runBootUpTask();
}
@Slf4j
@Component
public class TaskScheduleServerImpl implements TaskScheduleServer {
…………
@Override
public double runTaskById(String id) {
TaskEntity task = taskInfoOpMapper.queryTaskInfoById(id);
if(null!=task) {
if (runningTasks.containsKey(task.getTaskId())){
ramTasks.get(task.getTaskId()).run();
return ramTasks.get(task.getTaskId()).getRunTime();
}
}
return 0d;
}
@Override
public void claearAllTask() {
ramTasks.clear();
log.info("【定时任务控制器】清除内存任务 完成");
runningTasks.clear();
log.info("【定时任务控制器】清除线程任务 完成");
threadPoolTaskScheduler.shutdown();
}
@Override
public void loadAllTask() {
List<TaskEntity> allTask = taskInfoOpMapper.queryTaskInfo(null);
for (TaskEntity task : allTask) {
if(addTaskToScheduling(task)){
log.info("【定时任务初始化】装填任务:{} [ 任务执行周期:{} ] [ bootup:{}]",task.getTaskName(),task.getTaskCron(),task.getTaskBootUp());
}
}
}
@Override
public void runBootUpTask() {
TaskEntity entity = new TaskEntity().taskBootUp(1);
List<TaskEntity> list = taskInfoOpMapper.queryTaskInfo(entity);
for(TaskEntity task:list){
runTaskById(task.getTaskId());
}
}
}
在SpringBoot中的加载类
@Order(3)
@Component
@Slf4j
public class AfterAppStarted implements ApplicationRunner {
TaskScheduleServer taskScheduleServer;
@Autowired
public void setTaskScheduleServer(TaskScheduleServer taskScheduleServer) {
this.taskScheduleServer = taskScheduleServer;
}
@Override
public void run(ApplicationArguments args) throws Exception {
//运行随系统启动的定时任务
taskScheduleServer.runBootUpTask();
}
}
对外暴露控制接口及其Service
@RestController
@RequestMapping("/taskScheduling/manage")
@Api(tags = "数据源管理服务")
public class TaskSchedulingController {
TaskScheduleManagerService taskScheduleManagerService;
@Autowired
public void setTaskScheduleManagerService(TaskScheduleManagerService taskScheduleManagerService) {
this.taskScheduleManagerService = taskScheduleManagerService;
}
@PostMapping("/search")
@Operation(summary = "分页查询任务")
public Response searchData(@RequestBody SearchTaskDto param){
return Response.success(taskScheduleManagerService.searchTaskForPage(param));
}
@GetMapping("/detail")
@Operation(summary = "具体任务对象")
public Response searchDetail(String taskId){
return Response.success(taskScheduleManagerService.searchTaskDetail(taskId));
}
@GetMapping("/shutdown")
@Operation(summary = "关闭指定任务")
public Response shutdownTask(String taskId){
return Response.success(taskScheduleManagerService.shutdownTask(taskId));
}
@GetMapping("/open")
@Operation(summary = "开启指定任务")
public Response openTask(String taskId){
return Response.success(taskScheduleManagerService.openTask(taskId));
}
@GetMapping("/run")
@Operation(summary = "运行指定任务")
public Response runTask(String taskId){
return Response.success(taskScheduleManagerService.runTask(taskId));
}
@PostMapping("/update")
@Operation(summary = "更新指定任务")
public Response updateTask(@RequestBody TaskEntity taskEntity){
return Response.success(taskScheduleManagerService.updateTaskBusinessInfo(taskEntity));
}
}
相关接口实现类
@Service
public class TaskScheduleManagerServiceImpl implements TaskScheduleManagerService {
private TaskInfoOpMapper taskInfoOpMapper;
private TaskScheduleServer taskScheduleServer;
@Autowired
public void setTaskInfoOpMapper(TaskInfoOpMapper taskInfoOpMapper) {
this.taskInfoOpMapper = taskInfoOpMapper;
}
@Autowired
public void setTaskScheduleServer(TaskScheduleServer taskScheduleServer) {
this.taskScheduleServer = taskScheduleServer;
}
@Override
public IPage<TaskEntity> searchTaskForPage(SearchTaskDto dto) {
Page<TaskEntity> pageParam = new Page<>(1,10);
pageParam.setAsc("task_id");
return taskInfoOpMapper.queryTaskInfoPage(pageParam,dto.getFilterKey(),dto.getBootUp(),dto.getLastRunStatus());
}
@Override
public TaskEntity searchTaskDetail(String taskId) {
if(!StringUtils.isEmpty(taskId)){
return taskInfoOpMapper.queryTaskInfoById(taskId);
}
return null;
}
@Override
public TaskRunRetDto runTask(String taskId) {
AbstractBaseCronTask task = taskScheduleServer.getTaskSchedulingRam().get(taskId);
TaskRunRetDto result = new TaskRunRetDto(TaskRunRetDto.TaskOperation.run, 0);
if(null != task) {
double time = taskScheduleServer.runTaskById(taskId);
result.setResult(1);
return result.extend(time).taskInfo(task.getThisTaskInfo());
} else {
return result.extend("任务未启用");
}
}
@Override
public TaskRunRetDto shutdownTask(String taskId) {
AbstractBaseCronTask task = taskScheduleServer.getTaskSchedulingRam().get(taskId);
TaskRunRetDto result = new TaskRunRetDto(TaskRunRetDto.TaskOperation.shutdown, 0);
if(null != task) {
boolean flag = taskScheduleServer.removeTaskFromScheduling(taskId);
if(flag) {
result.setResult(1);
}
return result.extend("任务成功关闭").taskInfo(task.getThisTaskInfo());
} else {
return result.extend("任务未启用");
}
}
@Override
public TaskRunRetDto openTask(String taskId) {
TaskEntity task = taskInfoOpMapper.queryTaskInfoById(taskId);
TaskRunRetDto result = new TaskRunRetDto(TaskRunRetDto.TaskOperation.open, 0);
if(null != task) {
if (!taskScheduleServer.getTaskSchedulingRam().containsKey(taskId)) {
boolean flag = taskScheduleServer.addTaskToScheduling(task);
if(flag) {
result.setResult(1);
}
return result.extend("任务开启成功").taskInfo(task);
} else {
return result.extend("任务处于启动状态").taskInfo(task);
}
}else {
return result.extend("任务不存在!");
}
}
@Override
public TaskRunRetDto updateTaskBusinessInfo(TaskEntity entity) {
TaskEntity task = searchTaskDetail(entity.getTaskId());
TaskRunRetDto result = new TaskRunRetDto(TaskRunRetDto.TaskOperation.update, 0).taskInfo(entity);
String config = entity.getTaskOutConfig();
if(null != config && !JSONUtil.isJson(config) && !JSONUtil.isJsonArray(config)){
result.setResult(0);
result.extend("更新任务失败,任务配置必须为JSON或空");
result.taskInfo(entity);
return result;
}
task.setTaskCron(entity.getTaskCron());
task.setTaskOutConfig(entity.getTaskOutConfig());
task.setTaskName(entity.getTaskName());
task.setTaskDesc(entity.getTaskDesc());
int num = taskInfoOpMapper.updateTaskInfo(task);
if (num == 1) {
result.setResult(1);
result.extend("成功更新任务");
result.taskInfo(entity);
//重新刷新任务
taskScheduleServer.removeTaskFromScheduling(entity.getTaskId());
taskScheduleServer.addTaskToScheduling(task);
}
return result;
}
效果
数据库中配置任务
任务代码
public class TestTask extends AbstractBaseCronTask {
public TestTask(TaskEntity taskEntity) {
super(taskEntity);
}
@Override
public void beforeJob() {
log.info("测试任务开始");
}
@Override
public void startJob() {
}
@Override
public void afterJob() {
}
}
任务查看
执行效果