elastic -job和springboot集成实现分布式调度5

一   案例介绍说明

1.1 案例介绍

基于 Spring boot 集成方式的而产出的工程代码,完成对作业分片的实现,文件数据备份采取更接近真实项目的数 据库存取方式。
1.分片设置

2.每个线程获取给自的类型

1.2 作业配置

zk的配置 

 

二 操作说明

2.1 数据表的初始化

DROP TABLE IF EXISTS `t_file`;
CREATE TABLE `t_file` (
`id` varchar(11) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`type` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`content` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`backedUp` tinyint(1) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

如图所示:

2.2 初始化数据

2.3 pom文件的编写

   <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>2.1.5</version>
        </dependency>


        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

2.4 编写注册配置类

1.截图如下

2.代码

package com.itheima.scheduler.elasticjob.springboot.config;

import com.dangdang.ddframe.job.api.ElasticJob;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.itheima.scheduler.elasticjob.springboot.job.FileBackupJobDb;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;

import javax.sql.DataSource;

/**
 * @author Administrator
 * @version 1.0
 **/
@Configuration
public class ElasticJobConfig {

    @Autowired
    private DataSource dataSource; //数据源已经存在,直接引入

//    @Autowired
//    SimpleJob fileBackupJob;

    @Autowired
    FileBackupJobDb fileBackupJob;

//    @Autowired
//    FileBackupJobDataFlow fileBackupJob;

    @Autowired
    CoordinatorRegistryCenter registryCenter;

    /**
     * 配置任务详细信息
     * @param jobClass 任务执行类
     * @param cron  执行策略
     * @param shardingTotalCount 分片数量
     * @param shardingItemParameters 分片个性化参数
     * @return
     */
    private LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                        final String cron,
                                                        final int shardingTotalCount,
                                                        final String shardingItemParameters){
        //JobCoreConfigurationBuilder
        JobCoreConfiguration.Builder JobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
        //设置shardingItemParameters
        if(!StringUtils.isEmpty(shardingItemParameters)){
            JobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);
        }
        JobCoreConfiguration jobCoreConfiguration = JobCoreConfigurationBuilder.build();
        //创建SimpleJobConfiguration
        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());
        //创建LiteJobConfiguration
        LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true)
                .monitorPort(9888)//设置dump端口
                .build();
        return liteJobConfiguration;
    }

    //创建支持dataFlow类型的作业的配置信息
    private LiteJobConfiguration createFlowJobConfiguration(final Class<? extends ElasticJob> jobClass,
                                                        final String cron,
                                                        final int shardingTotalCount,
                                                        final String shardingItemParameters){
        //JobCoreConfigurationBuilder
        JobCoreConfiguration.Builder JobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
        //设置shardingItemParameters
        if(!StringUtils.isEmpty(shardingItemParameters)){
            JobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);
        }
        JobCoreConfiguration jobCoreConfiguration = JobCoreConfigurationBuilder.build();
        // 定义数据流类型任务配置
        DataflowJobConfiguration jobConfig = new DataflowJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName(),true);
        //创建LiteJobConfiguration
        LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(jobConfig).overwrite(true)
                .monitorPort(9888)//设置dump端口
                .build();
        return liteJobConfiguration;
    }
    @Bean(initMethod = "init")
    public SpringJobScheduler initSimpleElasticJob() {
        // 增加任务事件追踪配置
        JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource);
        //创建SpringJobScheduler

        SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileBackupJob, registryCenter,
                createJobConfiguration(fileBackupJob.getClass(), "0/10 * * * * ?", 4, "0=text,1=image,2=radio,3=vedio")
                ,jobEventConfig);
        return springJobScheduler;
    }
}

3.代码

package com.itheima.scheduler.elasticjob.springboot.config;

import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Administrator
 * @version 1.0
 **/
@Configuration
public class ElasticJobRegistryCenterConfig {

    //zookeeper链接字符串 localhost:2181
    private  String ZOOKEEPER_CONNECTION_STRING = "localhost:2181" ;
    //定时任务命名空间
    private  String JOB_NAMESPACE = "elastic-job-example-java";

    //zk的配置及创建注册中心
    @Bean(initMethod = "init")
    public  CoordinatorRegistryCenter setUpRegistryCenter(){
        //zk的配置
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE);

        //创建注册中心
        CoordinatorRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        return zookeeperRegistryCenter;

    }
}

2.5 启动类

2.6 配置连接

2.7 启动zk

2.8 测试

1.启动application服务,查看打印log:

三 多实例测试

3.1 启动4个

2.执行情况

3.2  案例场景分析

1.设置4个分片,10秒执行一次。

分片弹性扩容缩容机制测试:

测试1:测试窗口1不关闭,再次运行main方法查看控制台日志,注意修改application.properties中的 server.port,保证端口不冲突

测试2:测试窗口1 和测试窗口2 不关闭,再次运行2次main方法,达到4个任务实例,查看控制台日志

 测试3:测试窗口1 和测试窗口2 不关闭,将测试窗口3和测试窗口4任务停止

测试4:测试窗口1不关闭 将测试窗口2 任务停止

结论:

1、任务运行期间,如果有新机器加入,则会立刻触发分片机制,将任务相对 平均的分配到每台机器上并行执行调度。

2、如果有机器退出集群,则经过短暂的一段时间(大约40秒)后又会重 新触发分片机制

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/190753.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【单片机学习笔记】STC8H1K08参考手册学习笔记

STC8H1K08参考手册学习笔记 STC8H系列芯片STC8H1K08开发环境串口烧录 STC8H系列芯片 STC8H 系列单片机是不需要外部晶振和外部复位的单片机&#xff0c;是以超强抗干扰/超低价/高速/低功耗为目标的 8051 单片机,在相同的工作频率下,STC8H 系列单片机比传统的 8051约快12 倍速度…

讯飞星火知识库文档问答Web API的使用(二)

上一篇提到过星火spark大模型&#xff0c;现在有更新到3.0&#xff1a; 给ChuanhuChatGPT 配上讯飞星火spark大模型V2.0&#xff08;一&#xff09; 同时又看到有知识库问答的web api&#xff0c;于是就测试了一下。 下一篇是在ChuanhuChatGPT 中单独写一个基于星火知识库的内容…

机器学习库:pandas

☁️主页 Nowl &#x1f525;专栏《机器学习实战》 《机器学习》 &#x1f4d1;君子坐而论道&#xff0c;少年起而行之 文章目录 写在开头 基本数据格式 DataFrame 数据选取 iloc 数据描述 head describe 数据合并 merge 数据删除 drop drop删除多列 处理缺失…

LeetCode Hot100 236.二叉树的最近公共祖先

题目&#xff1a; 给定一个二叉树, 找到该树中两个指定节点的最近公共祖先。 百度百科中最近公共祖先的定义为&#xff1a;“对于有根树 T 的两个节点 p、q&#xff0c;最近公共祖先表示为一个节点 x&#xff0c;满足 x 是 p、q 的祖先且 x 的深度尽可能大&#xff08;一个节…

基于U-Net的视网膜血管分割(Pytorch完整版)

基于 U-Net 的视网膜血管分割是一种应用深度学习的方法&#xff0c;特别是 U-Net 结构&#xff0c;用于从眼底图像中分割出视网膜血管。U-Net 是一种全卷积神经网络&#xff08;FCN&#xff09;&#xff0c;通常用于图像分割任务。以下是基于 U-Net 的视网膜血管分割的内容&…

什么是分布式锁?Redis实现分布式锁详解

目录 前言&#xff1a; 分布式系统买票示例 引入redis做分布式锁 引入过期时间 引入校验id 引入lua脚本 过期时间续约问题 redlock算法 小结&#xff1a; 前言&#xff1a; 在分布式系统中&#xff0c;涉及多个主机访问同一块资源&#xff0c;此时就需要锁来做互斥控制…

[论文阅读]CBAM——代码实现和讲解

CBAM 论文网址&#xff1a;CBAM 论文代码&#xff1a;CBAM 本文提出了一种卷积块注意力模块&#xff08;CBAM&#xff09;&#xff0c;它是卷积神经网络&#xff08;CNN&#xff09;的一种轻量级、高效的注意力模块。该模块沿着通道和空间两个独立维度依次推导注意力图&#x…

ehr人力资源管理系统(实际项目源码)

eHR人力资源管理系统&#xff1a;功能强大的人力资源管理工具 随着企业规模的不断扩大和业务需求的多样化&#xff0c;传统的人力资源管理模式已无法满足现代企业的需求。eHR人力资源管理系统作为一种先进的管理工具&#xff0c;能够为企业提供高效、准确、实时的人力资源管理…

GPT实战系列-GPT训练的Pretraining,SFT,Reward Modeling,RLHF

GPT实战系列-GPT训练的Pretraining&#xff0c;SFT&#xff0c;Reward Modeling&#xff0c;RLHF 文章目录 GPT实战系列-GPT训练的Pretraining&#xff0c;SFT&#xff0c;Reward Modeling&#xff0c;RLHFPretraining 预训练阶段Supervised FineTuning &#xff08;SFT&#x…

基于51单片机交通灯夜间模式+紧急模式_易懂版_(仿真+代码_报告_讲解)

J029 51单片机交通灯_易懂版__夜间紧急(仿真代码_报告_讲解&#xff09; 51单片机交通灯_易懂版_ 1 **讲解视频&#xff1a;**2 **功能要求**3 **仿真图&#xff1a;**4 **程序设计&#xff1a;**5 **设计报告**6 **资料清单&&下载链接&#xff1a;****资料下载链接&am…

超好玩C++控制台打飞机小游戏,附源码

我终于决定还是把这个放出来。 视频在这&#xff1a;https://v.youku.com/v_show/id_XNDQxMTQwNDA3Mg.html 具体信息主界面上都有写。 按空格暂停&#xff0c;建议暂停后再升级属性。 记录最高分的文件进行了加密。 有boss&#xff08;上面视频2分47秒&#xff09;。 挺好…

2023年【N1叉车司机】新版试题及N1叉车司机作业考试题库

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 N1叉车司机新版试题参考答案及N1叉车司机考试试题解析是安全生产模拟考试一点通题库老师及N1叉车司机操作证已考过的学员汇总&#xff0c;相对有效帮助N1叉车司机作业考试题库学员顺利通过考试。 1、【多选题】《中华…

Linux系统编程 day05 进程控制

Linux系统编程 day05 进程控制 1. 进程相关概念2. 创建进程3. exec函数族4. 进程回收 1. 进程相关概念 程序就是编译好的二进制文件&#xff0c;在磁盘上&#xff0c;占用磁盘空间。程序是一个静态的概念。进程即使启动了的程序&#xff0c;进程会占用系统资源&#xff0c;如内…

【服务器能干什么】二十分钟搭建一个属于自己的 RSS 服务

如果大家不想自己捣鼓,只是想尝尝鲜,可以在下面留言,我后台帮大家开几个账号玩一玩。 哔哩哔哩【高清版本可以点击去吐槽到 B 站观看】:【VPS服务器到底能干啥】信息爆炸的年代,如何甄别出优质的内容?你可能需要自建一个RSS服务!_哔哩哔哩_bilibili 前言 RSS 服务 市…

MySQL表连接

文章目录 MySQL内外连接1.内连接2.外连接&#xff08;1&#xff09;左外连接&#xff08;2)右外连接 3.简单案例 MySQL内外连接 1.内连接 内连接的SQL如下&#xff1a; SELECT ... FROM t1 INNER JOIN t2 ON 连接条件 [INNER JOIN t3 ON 连接条件] ... AND 其他条件;说明一下…

2.5 逆矩阵

一、逆矩阵的注释 假设 A A A 是一个方阵&#xff0c;其逆矩阵 A − 1 A^{-1} A−1 与它的大小相同&#xff0c; A − 1 A I A^{-1}AI A−1AI。 A A A 与 A − 1 A^{-1} A−1 会做相反的事情。它们的乘积是单位矩阵 —— 对向量无影响&#xff0c;所以 A − 1 A x x A^{…

QT基础开发笔记

用VS 写QT &#xff0c;设置exe图标的方法&#xff1a; 选定工程--》右键--》添加---》资源--》 QString 字符串用法总结说明 Qt QString 增、删、改、查、格式化等常用方法总结_qstring 格式化-CSDN博客 总结来说&#xff1a; QString 的 remove有两种用法&#xff0c;&am…

【C++】类和对象(下篇)

这里是目录 构造函数&#xff08;续&#xff09;构造函数体赋值初始化列表 explicit关键字隐式类型转换 static成员友元友元函数友元类 内部类匿名对象匿名对象的作用const引用匿名对象 构造函数&#xff08;续&#xff09; 构造函数体赋值 在创建对象时&#xff0c;编译器通…

02、Tensorflow实现手写数字识别(数字0-9)

02、Tensorflow实现手写数字识别&#xff08;数字0-9&#xff09; 开始学习机器学习啦&#xff0c;已经把吴恩达的课全部刷完了&#xff0c;现在开始熟悉一下复现代码。对这个手写数字实部比较感兴趣&#xff0c;作为入门的素材非常合适。 基于Tensorflow 2.10.0与pycharm 1…

SASS的导入文件详细教程

文章目录 前言导入SASS文件使用SASS部分文件默认变量值嵌套导入原生的CSS导入后言 前言 hello world欢迎来到前端的新世界 &#x1f61c;当前文章系列专栏&#xff1a;Sass和Less &#x1f431;‍&#x1f453;博主在前端领域还有很多知识和技术需要掌握&#xff0c;正在不断努…