工具篇--分布式定时任务springBoot 整合 elasticjob使用(3)

文章目录

  • 前言
  • 一、Springboot 整合:
    • 1.1 引入jar:
    • 1.2 配置zookeeper 注册中心:
    • 1.3 定义job 业务类:
    • 1.4 job 注册到zookeeper:
    • 1.5 项目启动:
      • 1.5.1 zookeeper 注册中心实例:
      • 1.5.2 任务执行日志输出:
  • 二、扩展:
    • 2.1 任务监听器:
    • 2. 2 DataflowJob 流工作:
    • 2.2.1 新建 DataflowJob:
    • 2.2.2 streaming.process 属性配置:
    • 2.2.3 执行效果:
  • 总结


前言

本文对springBoot 整合 elasticjob 进行介绍。 本文环境 jdk:1.8 ; zookeeper : 3.7


一、Springboot 整合:

1.1 引入jar:

<!-- https://mvnrepository.com/artifact/org.apache.shardingsphere.elasticjob/elasticjob-lite-core -->
 <dependency>
     <groupId>org.apache.shardingsphere.elasticjob</groupId>
     <artifactId>elasticjob-lite-core</artifactId>
    <!-- <version>3.0.4</version>-->
     <version>3.0.1</version>
 </dependency>
 <dependency>
     <groupId>org.yaml</groupId>
     <artifactId>snakeyaml</artifactId>
     <version>1.27</version>
 </dependency>
 <dependency>
     <groupId>ch.qos.logback</groupId>
     <artifactId>logback-classic</artifactId>
     <version>1.2.12</version>
 </dependency>

1.2 配置zookeeper 注册中心:

ElasticJobZookeeper:

import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ElasticJobZookeeper {


    @Bean(initMethod = "init")
    public  CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181", "elastic-job");
        zookeeperConfiguration.setConnectionTimeoutMilliseconds(10000);
        zookeeperConfiguration.setSessionTimeoutMilliseconds(10000);
        zookeeperConfiguration.setMaxSleepTimeMilliseconds(10000);

        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        return regCenter;
    }
}

1.3 定义job 业务类:

MyJob:

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class MyJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {

        // 分片参数 0=text,1=image,2=radio,3=vedio
        String  shardingParameter= shardingContext.getShardingParameter();
        String  jobParameter= shardingContext.getJobParameter();

        log.debug("job 执行 error,job名称:{},分片数量:{},分片:{},分片参数:{},jobParamer:{}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(), shardingParameter,jobParameter);
        if ("text".equals(jobParameter)) {
            // do something by sharding
        }
        switch (shardingContext.getShardingItem()) {
            case 0:
                // do something by sharding item 0
                break;
            case 1:
                // do something by sharding item 1
                break;
            case 2:
                // do something by sharding item 2
                break;
            // case n: ...
        }
    }
}

MyJob1:

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class MyJob1 implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        log.debug("job 执行 error,job名称:{},分片数量:{}",shardingContext.getJobName(),shardingContext.getShardingTotalCount());
        switch (shardingContext.getShardingItem()) {
            case 0:
                // do something by sharding item 0
                break;
            case 1:
                // do something by sharding item 1
                break;
            case 2:
                // do something by sharding item 2
                break;
            // case n: ...
        }
    }
}

1.4 job 注册到zookeeper:

ElasticJobConfigure

import com.example.springelasticjob.quickstart.MyJob;
import com.example.springelasticjob.quickstart.MyJob1;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ElasticJobConfigure implements InitializingBean {
    @Autowired
    private MyJob myJob;

    @Autowired
    private MyJob1 myJob1;
    @Autowired
    private CoordinatorRegistryCenter coordinatorRegistryCenter;


    public JobConfiguration createJobConfiguration(Class<? extends SimpleJob> JobClass, int shardingTotalCount, String cron, String shardingItemParameters) {
        // 创建作业配置
        JobConfiguration jobConfiguration = JobConfiguration.newBuilder(JobClass.getName(), shardingTotalCount).cron(cron).overwrite(true)
                .shardingItemParameters(shardingItemParameters).jobListenerTypes().build();


        return jobConfiguration;

    }

    @Override
    public void afterPropertiesSet() throws Exception {
        JobConfiguration jobConfiguration = createJobConfiguration(myJob.getClass(), 1, "0/10 * * * * ?", null);
        new ScheduleJobBootstrap(coordinatorRegistryCenter, myJob, jobConfiguration).schedule();
        JobConfiguration jobConfiguration1 = createJobConfiguration(myJob1.getClass(), 1, "0/1 * * * * ?", null);
        new ScheduleJobBootstrap(coordinatorRegistryCenter, myJob1, jobConfiguration1).schedule();
    }
}

1.5 项目启动:

1.5.1 zookeeper 注册中心实例:

在这里插入图片描述

1.5.2 任务执行日志输出:

在这里插入图片描述

二、扩展:

2.1 任务监听器:

  1. 定义监听器:
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.time.DateFormatUtils;
import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;

import java.util.Date;

@Slf4j
public class MyElasticJobListener implements ElasticJobListener {

    private long beginTime = 0;

    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
        beginTime = System.currentTimeMillis();
        log.info("===>{} MyElasticJobListener BEGIN TIME: {} <===",shardingContexts.getJobName(),  DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
    }

    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
        long endTime = System.currentTimeMillis();
        log.info("===>{} MyElasticJobListener END TIME: {},TOTAL CAST: {} <===",shardingContexts.getJobName(), DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"), endTime - beginTime);
    }

    @Override
    public String getType() {
        return "myElasticJobListener";
    }


}

2) 在项目resources 新建文件夹: META-INF\services
在这里插入图片描述
3)新建文件,名称为:org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener
文集内容:

# 监听器实现类的 类全路径
com.example.springelasticjob.config.MyElasticJobListener

4)job 配置增加监听器:

// 创建作业配置
        JobConfiguration jobConfiguration = JobConfiguration.newBuilder("myjob-param", 1).cron("0/5 * * * * ?")
                .overwrite(true).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").jobParameter("0=a,1=b,2=c")
                .jobListenerTypes("myElasticJobListener")
                .build();

jobListenerTypes(“myElasticJobListener”) 中 “myElasticJobListener” 要和 MyElasticJobListener getType() 返回的保持一致,否则启动无法找到 监听器:
在这里插入图片描述

2. 2 DataflowJob 流工作:

2.2.1 新建 DataflowJob:


import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;

import java.util.ArrayList;
import java.util.List;

/**
 * 流任务
 */
@Slf4j
public class MyDataFlowJob implements DataflowJob {
    @Override
    public List fetchData(ShardingContext shardingContext) {
        // 抓取数据
        // 分片参数 0=text,1=image,2=radio,3=vedio
        String jobParameter = shardingContext.getJobParameter();

        log.debug("job 执行 error,job名称:{},分片数量:{},分片:{},分片参数:{}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), jobParameter);
        List list = new ArrayList(1);
        list.add("lgx");
        return list;
    }

    @Override
    public void processData(ShardingContext shardingContext, List list) {
        // 数据处理
        System.out.println("list.toString() = " + list.toString());
    }
}

2.2.2 streaming.process 属性配置:

 private static JobConfiguration createJobConfiguration() {
        JobConfiguration jobConfiguration = JobConfiguration.newBuilder("myjob-dataflow-param", 1).cron("0/30 * * * * ?")
            
                .overwrite(true).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").jobParameter("0=a,1=b,2=c")
              	//  streaming.process 流处理设置为true
                .setProperty("streaming.process","true")
                .build();


        return jobConfiguration;

    }

2.2.3 执行效果:

虽然任务是每隔30s 执行一次,但是因为 fetchData 可以一直获取到数据,使的 processData 方法可以一直被调用:
在这里插入图片描述


总结

本文对 springBoot 整合 elasticjob 整合,监听器的使用,任务的流式处理做介绍。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/452017.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【数据挖掘】练习1:R入门

课后作业1&#xff1a;R入门 一&#xff1a;习题内容 1.要与R交互必须安装Rstudio&#xff0c;这种说法对不对&#xff1f; 不对。虽然RStudio是一个流行的R交互集成开发环境&#xff0c;但并不是与R交互的唯一方式。 与R交互可以采用以下几种方法&#xff1a; 使用R Conso…

AHU 汇编 实验六

一、实验名称&#xff1a;实验6 输入一个16进制数&#xff0c;把它转换为10进制数输出 实验目的&#xff1a; 培养汇编中设计子程序的能力 实验过程&#xff1a; 源代码&#xff1a; data segmentbuff1 db Please input a number(H):$buff2 db 30,?,30 dup(?),13,10buff3 …

社交革命的引领者:探索Facebook如何改变我们的生活方式

1.数字社交的兴起 随着互联网的普及&#xff0c;社交媒体成为我们日常生活的重要组成部分。Facebook作为其中的先驱&#xff0c;从最初的社交网络演变成了一个拥有数十亿用户的全球化平台。它不仅改变了我们与世界互动的方式&#xff0c;还深刻影响了我们的社交习惯、人际关系以…

数据结构:树和二叉树

树的概念 1.树是一种非线性的数据结构。它是由n个有限节点的集合。 2.树分为根节点和子树。根节点没有前驱节点。 3.树的子树是由一个个子树组成&#xff0c;它们可以看作一个个集合。每个集合下面又有集合。 因此&#xff0c;树是递归定义的。 树形结构中&#xff0c;子树…

搜索引擎SEO策略介绍

baidu搜索&#xff1a;如何联系八爪鱼SEO baidu搜索&#xff1a;如何联系八爪鱼SEO baidu搜索&#xff1a;如何联系八爪鱼SEO 第一、 关键词的选择策略&#xff1a; 1、门户类的网站关键词选择策略&#xff1a; 网站每个页面本身基本都包含有关键词&#xff1a;网站拥有上百…

嵌入式数据库SQlite3-进阶篇

嵌入式数据库sqlite3 - HQ 文章目录 嵌入式数据库sqlite3 - HQ[toc] 嵌入式数据库sqlite3【进阶篇】数据库准备order子句Where 子句与逻辑运算符语法实例 group by子句having子句举例 函数SQLite COUNT 函数SQLite MAX 函数SQLite MIN 函数SQLite AVG 函数SQLite SUM 函数SQLit…

Qt 使用RAW INPUT获取HID触摸屏,笔设备,鼠标的原始数据,最低受支持的客户端:Windows XP [仅限桌面应用]

在开发绘图应用程序时&#xff0c;经常会需要读取笔设备的数据&#xff0c;通过对笔数据的解析&#xff0c;来判断笔的坐标&#xff0c;粗细。如果仅仅只是读取鼠标的坐标&#xff0c;就需要人为在应用程序端去修改笔的粗细&#xff0c;并且使用体验不好&#xff0c;如果可以实…

【C++】STL(五) Stack Queue容器

5、 stack容器 5.1 简介 ① stack是一种先进后出的容器&#xff0c;它只有一个出口。 ② 栈中只有顶端的元素才可以被外界使用&#xff0c;因此栈不允许有遍历行为。 ③ 栈中进入数据称为&#xff1a;入栈 push ④ 栈中弹出数据称为&#xff1a;出栈 pop 5.2 常用接口 …

Fair Data Exchange:区块链实现的原子式公平数据交换

1. 引言 2024年斯坦福大学和a16z crypto research团队 论文 Atomic and Fair Data Exchange via Blockchain 中&#xff0c;概述了一种构建&#xff08;包含过期EIP-4844 blobs的&#xff09;fair data-markets的协议。该论文源自a16z crypto的暑期实习计划&#xff0c;与四名…

R语言tidycmprsk包分析竞争风险模型

竞争风险模型就是指在临床事件中出现和它竞争的结局事件&#xff0c;这是事件会导致原有结局的改变&#xff0c;因此叫做竞争风险模型。比如我们想观察患者肿瘤的复发情况&#xff0c;但是患者在观察期突然车祸死亡&#xff0c;或者因其他疾病死亡&#xff0c;这样我们就观察不…

KAFKA入门教程

目录 1.安装kafka 2.安装kafkamanager可视化工具 3.springboot整合kafka 1.pom导包 2.启动类和yml配置 3.代码演示 编写生产者&#xff1a; 消费者&#xff1a; 1.安装kafka 进入kafka官网下载对应版本kafka kafka官网地址&#xff1a;Apache Kafka kafka是使用Scal…

Kotlin 数据解析(Gson)

一、添加依赖 build.gradle.kts(:app) // gson数据解析implementation("com.google.code.gson:gson:2.8.6") 对象类&#xff1a; // 对象类 class Account {var uid:String "00001"var userName:String "Freeman"var password:String &quo…

Midjourney能让角色保持一致了

Midjourney发布新功能&#xff0c;网友直呼“不可思议”&#xff01; 现在你可以让生成的图像几乎保持角色一致&#xff0c;belike&#xff1a; 所有超级英雄长一个模样盯着你。 甚至动漫风、写实风等跨风格生成也同样适用&#xff1a; 保持同一风格&#xff0c;感jio配上文字…

【python】自动化工具Selenium与playwright去除webdriver检测

对这个世界如果你有太多的抱怨 跌倒了就不敢继续往前走 为什么人要这么的脆弱 堕落 请你打开电视看看 多少人为生命在努力勇敢的走下去 我们是不是该知足 珍惜一切 就算没有拥有 &#x1f3b5; 周杰伦《稻香》 # -*- coding:utf-8 -*- import timefrom s…

【C语言】如何规避野指针

✨✨ 欢迎大家来到莉莉的博文✨✨ &#x1f388;&#x1f388;养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 目录 一、概念&#xff1a; 二、野指针成因&#xff1a; 2.1. 指针未初始化 2.2 指针越界访问 3. 指针指向的空间释放 三、如何规避野指针 3.…

Manacher 算法——Leetcode 5.最长回文子串

在了解之前&#xff0c;我们先要了解什么是回文串&#xff0c;什么是回文子串。 回文串和回文子串&#xff1a; 回文串是指一个字符串正序遍历和反向遍历结果相同的字符串。如 ABBA&#xff0c;正着读反着读结果是一样的。 有了回文串的概念&#xff0c;回文子串的概念也就显…

STM32的DMA搬运串口数据

简介&#xff1a; 最近在学习stm32的外设初始化过程中&#xff0c;学到DMA这个外设的时候&#xff0c;还是花费了不少时间&#xff0c;特此记录一下。 实验&#xff1a;配置DMA搬运UART1的数据 &#xff0c;串口调试工具给单片机发送数据&#xff0c;然后单片机回发给串口调试…

Python实现企业微信自动打卡程序二:跳过节假日,随机打卡时间,定时任务,失败通知

实现打卡时间随机范围 既然我们程序写完后需要定时执行&#xff0c;那定时执行打卡就会导致每次上班或下班打卡时都是同一时间&#xff0c;这并不好&#xff0c;为了避免被发现&#xff0c;每次打卡时间都是同一时间&#xff0c;这里我们优化程序&#xff0c;增加随机等待时间来…

CSS元素显示模式

CSS元素显示模式 定义&#xff1a;元素显示模式是指元素&#xff08;即标签&#xff09;以什么方式进行显示。 HTML元素分为块元素和行内元素 块元素 常见块元素 &#xff08;下列仅举出部分&#xff09; <h1>~<h6>、<p>、<div>、<ul>、<…

进程间通信——IPC(Linux)

进程间通信 前言一、管道1. 管道原理2. 匿名管道①理解匿名管道②创建匿名管道——pipe③模拟实现进程池——管道 3. 命名管道①理解命名管道②使用命名管道——mkfifo拓展 —— 日志俩无关进程通信 3. 小结①管道总结②拓展命令和接口 二、System V1. 共享内存①原理②使用共享…