基于SpringBoot实现轻量级的动态定时任务调度

在使用SpringBoot框架进行开发时,一般都是通过@Scheduled注解进行定时任务的开发:


@Component
public class TestTask
{
    @Scheduled(cron="0/5 * *  * * ? ")   //每5秒执行一次
    public void execute(){
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
        log.info("任务执行" + df.format(new Date()));
    }
}

但是这种方式存在一个问题,那就是任务的周期控制是死的,必须编写在代码中,如果遇到需要在系统运行过程中想中止、立即执行、修改执行周期等动态操作的需求时,使用注解的方式便不能满足了,当然为了满足此种需求可以额外再引入其他任务调度插件(例如XXL-Job等),但是引入其他组件是需要衡量成本的,额外的依赖成本、组件的维护成本、开发的复杂度等等,所以如果系统体量不是那么大,完全没必要通过增加组件来完成,可以基于SpringBoot框架实现一套内置轻量级的任务调度。

设计思路

整体设计

这里我们把定时任务以类作为基础单位,即一个类为一个任务,然后通过配置数据的方式,进行任务的读取,通过反射生成任务对象,使用SpringBoot本身的线程池任务调度,完成动态的定时任务驱动,同时通过接口支撑实现相应的REST API对外暴露接口

任务模型

首先基于模板模式,设计基础的任务执行流程抽象类,定义出一个定时任务需要执行的内容和步骤和一些通用的方法函数,后续具体的定时任务直接继承该父类,实现该父类的before、start、after三个抽象函数即可,所有公共操作均在抽象父类完成

特殊说明:

    基于此方法创建的类是不归Spring的容器管理的,所以自定义的任务子类中是无法使用SpringBoot中的任何注解,尤其在自定义任务类中如果需要依赖其他Bean时,需要借助抽象父类AbstractBaseCronTask中已经实现的<T> T getServer(Class<T> className)来完成,getServer的实现如下:

public <T> T getServer(Class<T> className){
       return applicationContext.getBean(className);
    }

是通过SpringBoot中的ApplicationContext接口来获取Spring的上下文,以此来满足可以获取Spring中其他Bean的诉求。

例如,有个定时任务TaskOne类,它需要使用UserService类中的 caculateMoney()的方法,势必这个定时任务需要依赖UserService类,而TaskOne并非是Spring创建的对象,而是我们人为干预生成的对象,所以它是不在Spring的Bean管理范围的,自然也就无法使用@Autowird等方式注入UserService类,此时就需要使用getServer方法来获取UserService对象

//自定义定时任务类
public class TaskOne extends AbstractBaseCronTask {
    
    private UserService userService;

  
    public TestTask(TaskEntity taskEntity) {
        super(taskEntity);
    }

    @Override
    public void beforeJob() {
        //任务运行第一步,先将userService进行变量注入
        userService = getServer(UserService.class);
        ……
    }

    @Override
    public void startJob() {
       if(XXXX){
          //直接调用getServer获取需要的bean
          User user = getServer(UserMapper.class).findUser("111223")
          userService.caluateMoney(user);
          //……其他代码
       }
    }

    @Override
    public void afterJob() {

    }
}

任务对象加载过程

 核心逻辑在于利用反射,在SpringBoot启动后动态创建相应的定时任务类,并将其放置到SpringBoot的定时线程池中进行维护,同时将该对象同步存放至内存中一份,便于可以实时调用,当进行修改任务相关配置时,需要重新加载一次内容。

public class TaskScheduleServerImpl implements TaskScheduleServer {

    //正在运行的任务
    private static ConcurrentHashMap<String, ScheduledFuture> runningTasks = new ConcurrentHashMap<>();

    //线程池任务调度
    private ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();

    public boolean addTaskToScheduling(TaskEntity task) {
        if(!runningTasks.containsKey(task.getTaskId())){
            try{
                Class<?> clazz = Class.forName(task.getTaskClass());
                Constructor c = clazz.getConstructor(TaskEntity.class);
                AbstractBaseCronTask runnable = (AbstractBaseCronTask) c.newInstance(task);
                //反射方式生成对象不属于Spring容器管控,对于Spring的bean使用需要手动注入
                runnable.setApplicationContext(context);
                CronTrigger cron = new CronTrigger(task.getTaskCron());
                //put到runTasks
                runningTasks.put(task.getTaskId(), Objects.requireNonNull(this.threadPoolTaskScheduler.schedule(runnable, cron)));
                //存入内存中,便于外部调用
                ramTasks.put(task.getTaskId(),runnable);
                task.setTaskRamStatus(1);
                taskInfoOpMapper.updateTaskInfo(task);
                return true;
            }catch (Exception e){
                log.error("定时任务加载失败..."+e);
            }
        }
        return false;
    }
}

部分源码

这里将配置内容放入数据库中,直接以数据库中的表作为任务配置的基础

/**
* 任务对象
**/
@Data
public class TaskEntity implements Serializable {

    //任务唯一ID
    private String taskId;
    
    //任务名称
    private String taskName;
    
    //任务描述
    private String taskDesc;

    //执行周期配置
    private String taskCron;

    //任务类的全路径
    private String taskClass;
    
    //任务的额外配置
    private String taskOutConfig;
    
    //任务创建时间
    private String taskCreateTime;

    //任务是否启动,1启用,0不启用
    private Integer taskIsUse;
    
    //是否随系统启动立即执行
    private Integer taskBootUp;
    
    //任务上次执行状态
    private Integer taskLastRun;
    
    //任务是否加载至内存中 
    private Integer taskRamStatus;
}

核心逻辑,加载定时任务接口及其实现类



public interface TaskScheduleServer {



    ConcurrentHashMap<String, AbstractBaseCronTask> getTaskSchedulingRam();


    /**
     * 初始化任务调度
     */
    void initScheduling();

    /**
     * 添加任务至内存及容器
     * @param taskEntity 任务实体
     * @return boolean
     */
    boolean addTaskToScheduling(TaskEntity taskEntity);

    /**
     * 从任务调度器中移除任务
     * @param id 任务id
     * @return Boolean
     */
    boolean removeTaskFromScheduling(String id);


    /**
     * 执行指定任务
     * @param id 任务id
     * @return double 耗时
     */
    double runTaskById(String id);


    /**
     * 清空任务
     */
    void claearAllTask();



    /**
     * 加载所有任务
     */
    void loadAllTask();

    /**
     * 运行开机自启任务
     */
    void runBootUpTask();

}





@Slf4j
@Component
public class TaskScheduleServerImpl implements TaskScheduleServer {

    
    …………

    @Override
    public double runTaskById(String id) {
        TaskEntity task = taskInfoOpMapper.queryTaskInfoById(id);
        if(null!=task) {
            if (runningTasks.containsKey(task.getTaskId())){
                ramTasks.get(task.getTaskId()).run();
                return ramTasks.get(task.getTaskId()).getRunTime();
            }
        }
        return 0d;
    }

    @Override
    public void claearAllTask() {
        ramTasks.clear();
        log.info("【定时任务控制器】清除内存任务 完成");
        runningTasks.clear();
        log.info("【定时任务控制器】清除线程任务 完成");
        threadPoolTaskScheduler.shutdown();
    }

    @Override
    public void loadAllTask() {
        List<TaskEntity> allTask = taskInfoOpMapper.queryTaskInfo(null);
        for (TaskEntity task : allTask) {
            if(addTaskToScheduling(task)){
                log.info("【定时任务初始化】装填任务:{} [ 任务执行周期:{} ] [ bootup:{}]",task.getTaskName(),task.getTaskCron(),task.getTaskBootUp());
            }
        }
    }


    @Override
    public void runBootUpTask() {
        TaskEntity entity = new TaskEntity().taskBootUp(1);
        List<TaskEntity> list = taskInfoOpMapper.queryTaskInfo(entity);
        for(TaskEntity task:list){
            runTaskById(task.getTaskId());
        }
    }
}

在SpringBoot中的加载类

@Order(3)
@Component
@Slf4j
public class AfterAppStarted implements ApplicationRunner {


    TaskScheduleServer taskScheduleServer;

    @Autowired
    public void setTaskScheduleServer(TaskScheduleServer taskScheduleServer) {
        this.taskScheduleServer = taskScheduleServer;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        //运行随系统启动的定时任务
        taskScheduleServer.runBootUpTask();
    }

}

对外暴露控制接口及其Service

@RestController
@RequestMapping("/taskScheduling/manage")
@Api(tags = "数据源管理服务")
public class TaskSchedulingController {


    TaskScheduleManagerService taskScheduleManagerService;

    @Autowired
    public void setTaskScheduleManagerService(TaskScheduleManagerService taskScheduleManagerService) {
        this.taskScheduleManagerService = taskScheduleManagerService;
    }

    @PostMapping("/search")
    @Operation(summary = "分页查询任务")
    public Response searchData(@RequestBody SearchTaskDto param){
        return Response.success(taskScheduleManagerService.searchTaskForPage(param));
    }


    @GetMapping("/detail")
    @Operation(summary = "具体任务对象")
    public Response searchDetail(String taskId){
        return Response.success(taskScheduleManagerService.searchTaskDetail(taskId));
    }

    @GetMapping("/shutdown")
    @Operation(summary = "关闭指定任务")
    public Response shutdownTask(String taskId){
        return Response.success(taskScheduleManagerService.shutdownTask(taskId));
    }


    @GetMapping("/open")
    @Operation(summary = "开启指定任务")
    public Response openTask(String taskId){
        return Response.success(taskScheduleManagerService.openTask(taskId));
    }


    @GetMapping("/run")
    @Operation(summary = "运行指定任务")
    public  Response runTask(String taskId){
        return Response.success(taskScheduleManagerService.runTask(taskId));
    }


    @PostMapping("/update")
    @Operation(summary = "更新指定任务")
    public Response updateTask(@RequestBody TaskEntity taskEntity){
        return Response.success(taskScheduleManagerService.updateTaskBusinessInfo(taskEntity));
    }


}

相关接口实现类

@Service
public class TaskScheduleManagerServiceImpl implements TaskScheduleManagerService {


    private TaskInfoOpMapper taskInfoOpMapper;


    private TaskScheduleServer taskScheduleServer;

    @Autowired
    public void setTaskInfoOpMapper(TaskInfoOpMapper taskInfoOpMapper) {
        this.taskInfoOpMapper = taskInfoOpMapper;
    }

    @Autowired
    public void setTaskScheduleServer(TaskScheduleServer taskScheduleServer) {
        this.taskScheduleServer = taskScheduleServer;
    }

    @Override
    public IPage<TaskEntity> searchTaskForPage(SearchTaskDto dto) {
        Page<TaskEntity> pageParam = new Page<>(1,10);
        pageParam.setAsc("task_id");
        return taskInfoOpMapper.queryTaskInfoPage(pageParam,dto.getFilterKey(),dto.getBootUp(),dto.getLastRunStatus());
    }


    @Override
    public TaskEntity searchTaskDetail(String taskId) {
        if(!StringUtils.isEmpty(taskId)){
            return taskInfoOpMapper.queryTaskInfoById(taskId);
        }
        return null;
    }


    @Override
    public TaskRunRetDto runTask(String taskId) {
        AbstractBaseCronTask task = taskScheduleServer.getTaskSchedulingRam().get(taskId);
        TaskRunRetDto result = new TaskRunRetDto(TaskRunRetDto.TaskOperation.run, 0);
        if(null != task) {
                double time = taskScheduleServer.runTaskById(taskId);
                result.setResult(1);
                return result.extend(time).taskInfo(task.getThisTaskInfo());
        } else {
            return result.extend("任务未启用");
        }
    }

    @Override
    public TaskRunRetDto shutdownTask(String taskId) {
        AbstractBaseCronTask task = taskScheduleServer.getTaskSchedulingRam().get(taskId);
        TaskRunRetDto result = new TaskRunRetDto(TaskRunRetDto.TaskOperation.shutdown, 0);
        if(null != task) {
            boolean flag = taskScheduleServer.removeTaskFromScheduling(taskId);
            if(flag) {
                result.setResult(1);
            }
            return result.extend("任务成功关闭").taskInfo(task.getThisTaskInfo());
        } else {
            return result.extend("任务未启用");
        }
    }

    @Override
    public TaskRunRetDto openTask(String taskId) {
        TaskEntity task = taskInfoOpMapper.queryTaskInfoById(taskId);
        TaskRunRetDto result = new TaskRunRetDto(TaskRunRetDto.TaskOperation.open, 0);
        if(null != task) {
            if (!taskScheduleServer.getTaskSchedulingRam().containsKey(taskId)) {
              boolean flag = taskScheduleServer.addTaskToScheduling(task);
                if(flag) {
                    result.setResult(1);
                }
                return result.extend("任务开启成功").taskInfo(task);
            } else {
                return result.extend("任务处于启动状态").taskInfo(task);
            }
        }else {
            return result.extend("任务不存在!");
        }
    }

    @Override
    public TaskRunRetDto updateTaskBusinessInfo(TaskEntity entity) {
        TaskEntity task = searchTaskDetail(entity.getTaskId());
        TaskRunRetDto result = new TaskRunRetDto(TaskRunRetDto.TaskOperation.update, 0).taskInfo(entity);
        String config = entity.getTaskOutConfig();
        if(null != config && !JSONUtil.isJson(config) && !JSONUtil.isJsonArray(config)){
            result.setResult(0);
            result.extend("更新任务失败,任务配置必须为JSON或空");
            result.taskInfo(entity);
            return result;
        }
        task.setTaskCron(entity.getTaskCron());
        task.setTaskOutConfig(entity.getTaskOutConfig());
        task.setTaskName(entity.getTaskName());
        task.setTaskDesc(entity.getTaskDesc());
        int num = taskInfoOpMapper.updateTaskInfo(task);
        if (num == 1) {
            result.setResult(1);
            result.extend("成功更新任务");
            result.taskInfo(entity);
            //重新刷新任务
            taskScheduleServer.removeTaskFromScheduling(entity.getTaskId());
            taskScheduleServer.addTaskToScheduling(task);
        }

        return result;
    }

效果

数据库中配置任务

任务代码

public class TestTask extends AbstractBaseCronTask {

    public TestTask(TaskEntity taskEntity) {
        super(taskEntity);
    }

    @Override
    public void beforeJob() {
        log.info("测试任务开始");
    }

    @Override
    public void startJob() {
    }

    @Override
    public void afterJob() {
    }
}

任务查看

执行效果

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/786557.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

视频监控管理平台智能边缘分析一体机视频监控系统客流统计检测算法

在当今数据驱动的时代&#xff0c;客流统计作为商业分析的重要手段&#xff0c;其准确性和实时性对于商家决策具有至关重要的影响。随着技术的发展&#xff0c;智能边缘分析一体机结合了边缘计算与深度学习技术&#xff0c;为客流统计提供了更为高效、精准的解决方案。 首先&am…

湖南源点咨询 正确定义问题是企业市场调研至关重要的第一步

湖南&#xff08;市场调研公司&#xff09;源点咨询认为:正确地定义问题是市场调研过程中至关重要的第一步。 如果没有正确的定义所调研的问题&#xff0c;那么调研目标也会是错误的&#xff0c;并且整个市场调研过程都将会浪费时间和金钱。一家大型的消费品包装企业想要在品牌…

代码随想录算法训练营第二天|【数组】59.螺旋矩阵II

这两天工作的事情有点多&#xff0c;周末又比较懒&#xff0c;所以没有跟上进度。这两天开始补进度。 题目 给你一个正整数 n &#xff0c;生成一个包含 1 到 n2 所有元素&#xff0c;且元素按顺时针顺序螺旋排列的 n x n 正方形矩阵 matrix 。 示例 1&#xff1a; 输入&a…

Java代码实现elasticSearch的DSL复合查询

elasticsearch提供DSL&#xff08;domain specific language&#xff09;查询&#xff0c;就是以json格式定义查询条件实现复杂条件查询。 DSL查询分为俩大类&#xff1a; 叶子查询&#xff1a;一般是在特定的字段里查询特定值&#xff0c;属于简单查询&#xff0c;很少单独使…

峟思雨水情智能监测与预警系统核心运作机制解析

雨水情智能监测与预警系统&#xff0c;作为现代水文观测领域的尖端技术集成体&#xff0c;其运作机制深度融合了信息采集的精准性、数据传输的高效性、数据分析的智能化以及预警响应的及时性&#xff0c;构建了全方位、多层次的水文安全防线。以下是对该系统核心运作机制的深入…

C++的入门基础

目录 C的简单介绍命名空间命名空间的使用C的输入与输出缺省参数函数重载 C的简单介绍 本贾尼斯特劳斯特卢普博士在C的基础上增加了面向对象的特性&#xff0c;这时又增加了继承和、类、封装的概念&#xff0c;为后来的面向对象的编程奠定了基础&#xff0c;这被命名为C 命名空…

进度条提示-在python程序中使用避免我误以为挂掉了

使用库tqdm 你还可以手写一点&#xff0c;反正只要是输出点什么东西都可以&#xff1b; Demo from chatgpt import time from tqdm import tqdm# 示例函数&#xff0c;模拟长时间运行的任务 def long_running_task():total_steps 100for step in tqdm(range(total_steps), …

用python生成词频云图(python实例二十一)

目录 1.认识Python 2.环境与工具 2.1 python环境 2.2 Visual Studio Code编译 3.词频云图 3.1 代码构思 3.2 代码实例 3.3 运行结果 4.总结 1.认识Python Python 是一个高层次的结合了解释性、编译性、互动性和面向对象的脚本语言。 Python 的设计具有很强的可读性&a…

unity-记录位置的坐标系

目录 确定世界坐标系原点的方法 1.创建一个物体 2.在检查器中&#xff0c;将该物体的位置设置为0&#xff0c;0&#xff0c;0 3.观察 父子物体的位置关系 调整坐标轴位置 坐标轴的局部与全局旋转 全局 ​局部 unity使用的是左手坐标系 世界坐标系&#xff1a;是整个游…

从数字化营销与运营视角:看流量效果的数据分析

基于数据打通的“全链路”营销是当下的“时髦”&#xff0c;应用它的前提是什么&#xff1f;深度营销和运营的关键数据如何获得&#xff1f;如何利用数据进行更精准的营销投放&#xff1f;如何利用数据优化投放的效果&#xff1f;如何促进消费者的转化&#xff0c;以及激活留存…

Java语言程序设计——篇二(2)

Java语言基础 运算符与表达式运算符1、算术运算符2、关系运算符3、逻辑运算符&#xff08; &&、||、 !、&、| 、^&#xff09;4、位运算符&#xff08; >>、<<、>>>、&、|、^、~&#xff09;5、赋值运算符6、条件运算符7、字符串运算符8、…

我们公司落地大模型的路径、方法和坑

最近一年&#xff0c;LLM&#xff08;大型语言模型&#xff09;已经成熟到可以投入实际应用中了。预计到 2025 年&#xff0c;AI 领域的投资会飙升到 2000 亿美元。现在&#xff0c;不只是机器学习专家&#xff0c;任何人都能轻松地把 AI 技术融入自己的产品里。 我们整理了一…

AI与智能的差异

在讨论AI&#xff08;人工智能&#xff09;与智能的差异时&#xff0c;可以从以下几个角度来理解&#xff1a; 人工智能&#xff08;AI&#xff09;是指计算机系统执行人类通常需要使用智力才能完成的任务的能力。这包括感知、推理、学习、解决问题等。AI可以通过算法和大数据进…

【C++】开源:格式化库fmt配置与使用

&#x1f60f;★,:.☆(&#xffe3;▽&#xffe3;)/$:.★ &#x1f60f; 这篇文章主要介绍格式化库fmt配置与使用。 无专精则不能成&#xff0c;无涉猎则不能通。——梁启超 欢迎来到我的博客&#xff0c;一起学习&#xff0c;共同进步。 喜欢的朋友可以关注一下&#xff0c;下…

Aop切面编程(1)

1、aop的使用思想&#xff1a;面向切面的编程&#xff0c;不改变原有代码的基础上&#xff0c;进行拓展&#xff0c;减少代码的冗余&#xff0c;降低耦合性&#xff1b; 2、使用注解进行aop编程&#xff0c;使用自定义注解 2.1导入aop的依赖 <dependency><groupId&…

人话学Python-基础篇-数字计算

一&#xff1a;数字类型 对于最常见的数据类型,数字在Python中分为三类&#xff1a; 整型(int) 表示的是整数类型的所有数字&#xff0c;包括正整数&#xff0c;负整数和0。和C语言不同的是&#xff0c;Python中的int型没有范围的限制&#xff0c;理论上可以从无限小的整数取到…

文献阅读:基于测序的空间转录组方法的系统比较

文献介绍 文献题目&#xff1a; Systematic comparison of sequencing-based spatial transcriptomic methods 研究团队&#xff1a; 田鲁亦&#xff08;广州实验室&#xff09;、刘晓东&#xff08;西湖大学&#xff09; 发表时间&#xff1a; 2024-07-04 发表期刊&#xff…

【YOLO格式的数据标签,目标检测】

标签为 YOLO 格式&#xff0c;每幅图像一个 *.txt 文件&#xff08;如果图像中没有对象&#xff0c;则不需要 *.txt 文件&#xff09;。*.txt 文件规格如下: 每个对象一行 每一行都是 class x_center y_center width height 格式。 边框坐标必须是 归一化的 xywh 格式&#x…

上传图片,base64改为文件流,并转给后端

需求&#xff1a; html代码&#xff1a; <el-dialog v-model"dialogPicVisible" title"新增图片" width"500"><el-form :model"picForm"><el-form-item label"图片名称&#xff1a;" :label-width"10…

开放式耳机哪个品牌比较好?2024最值得推荐的火爆机型!!

在这个快节奏的时代&#xff0c;我们都在寻找那些既能让我们享受音乐&#xff0c;又能保持对外界感知的音频设备。开放式耳机以其独特的设计&#xff0c;满足了这一需求&#xff0c;它们让你在享受音乐的同时&#xff0c;还能听到周围环境的声音&#xff0c;无论是安全出行还是…