elastic是一个定时任务库
https://shardingsphere.apache.org/elasticjob/index_zh.html
项目结构
依赖
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
实现simplejob
simplejob是使用最多、最简单的定时任务
任务类
定时任务类需要实现相应的定时任务接口(idea快捷键 ctrl+i)
public class MySimpleJob implements SimpleJob
然后在实现的execute里写定时任务的逻辑
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("分片项: " + shardingContext.getShardingItem() +
",总分片项数: " + shardingContext.getShardingTotalCount());
}
}
定时任务配置
新建App.java
public class App {
}
添加配置信息(都写在App.java里)
1)zookeeper配置信息(zookeeper作为注册中心,elasticjob将服务注册到zookeeper)
zookeeper搭建可以看我的这一篇文章
在windows搭建zookeeper(单机/集群) - 知乎
/**
* 注册中心zookeeper
*/
public static CoordinatorRegistryCenter zkCenter() {
// 参数1: zk的地址(集群就写多个,中间用逗号隔开),参数2: 命名空间
var zc =
new ZookeeperConfiguration("localhost:2181", "java-simple-job");
var crc = new ZookeeperRegistryCenter(zc);
// 初始化注册中心
crc.init();
return crc;
}
2)simplejob任务配置
/**
* simple-job配置
*
* @return
*/
public static LiteJobConfiguration configurationSimple() {
// 1,job核心配置
var jcc = JobCoreConfiguration
// 参数1: 任务名称,参数2: cron表达式(0/10 -> 10秒执行一次),参数3: 分片项数量
.newBuilder("mySimpleJob", "0/10 * * * * ?", 2)
.build();
// 2,job类型配置
// 参数1: 核心配置,参数2: 任务类的全类名
var jtc = new SimpleJobConfiguration(jcc, MySimpleJob.class.getCanonicalName());
// 3,job根配置 (LiteJobConfiguration)
return LiteJobConfiguration.newBuilder(jtc)
// 有这个才能重新布置任务,否则修改不会生效
.overwrite(true)
.build();
}
3)启动定时任务
public static void main(String[] args) {
// 启动定时任务
// 参数1: 注册中心;参数2: 配置
new JobScheduler(zkCenter(), configurationSimple()).init();
}
启动
因为我们设置的分片数量是2,所以可以启动另一个定时任务,elasticjob会自动分配任务
复制运行配置
启动两个任务,可以看到自动分配任务,原本是一个服务执行分片1和0,现在是分别执行单个任务
dataflow任务
dataflow任务适合处理流式作业,和simplejob不同,分为数据抓取和处理,先获取数据然后进行处理
订单类(被处理的类)
public class Order {
private Integer orderId;
// 0 未处理; 1 已处理
private Integer status;
@Override
public String toString() {
return "Order{" +
"orderId=" + orderId +
", status=" + status +
'}';
}
public Integer getOrderId() {
return orderId;
}
public void setOrderId(Integer orderId) {
this.orderId = orderId;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
}
任务类
实现接口,有两个方法,对应抓取和处理,抓取方法的返回值会交给处理方法
public class MyDataflowJob implements DataflowJob<Order> {
// 抓取数据
@Override
public List<Order> fetchData(ShardingContext shardingContext) {
return null;
}
// 处理数据
@Override
public void processData(ShardingContext shardingContext, List<Order> data) {
}
}
具体逻辑:初始化100个order,然后抓取指定数据(status为0 并且 订单号%分片总数 == 当前分片项)的订单进行处理,返回值交给处理方法,处理方法进行处理(将order的status设置为1)
public class MyDataflowJob implements DataflowJob<Order> {
private List<Order> orders = new ArrayList<Order>();
{
// 实例化该类时执行
for (int i = 0; i < 100; i++) {
Order order = new Order();
order.setOrderId(i + 1);
// 未处理
order.setStatus(0);
orders.add(order);
}
}
// 抓取数据
@Override
public List<Order> fetchData(ShardingContext shardingContext) {
// 将 订单号%分片总数 == 当前分片项 的订单进行处理
var orderList = orders.stream()
// 过滤状态为0的
.filter(o -> o.getStatus() == 0)
.filter(o -> o.getOrderId() % shardingContext.getShardingTotalCount()
== shardingContext.getShardingItem())
// 放入集合
.collect(toList());
List<Order> subList = null;
if (orderList != null && orderList.size() > 0) {
// (抓)截取list
subList = orderList.subList(0, 10);
}
try {
// 休眠3秒
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LocalTime time = LocalTime.now();
System.out.println(time + "我是分片项: " + shardingContext.getShardingItem() + ",我抓取的数据是: " + subList);
return subList;
}
// 处理数据
@Override
public void processData(ShardingContext shardingContext, List<Order> data) {
// 设置为已处理,下次不会再抓取到
data.forEach(o -> o.setStatus(1));
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LocalTime time = LocalTime.now();
System.out.println(time + "我是分片项: " + shardingContext.getShardingItem() + ",正在处理数据!");
}
}
App.java
1)dataflow任务配置
/**
* dataflow-job配置
*
* @return
*/
public static LiteJobConfiguration configurationDataflow() {
// 1,job核心配置
var jcc = JobCoreConfiguration
// 参数1: 任务名称,参数2: cron表达式(0/10 -> 10秒执行一次),参数3: 分片项数量
.newBuilder("myDataflowJob", "0/10 * * * * ?", 2)
.build();
// 2,job类型配置
// 参数1: 核心配置,参数2: 任务类的全类名,参数3: 是否开启定时任务(不开则只执行1次)
var jtc =
new DataflowJobConfiguration(jcc, MyDataflowJob.class.getCanonicalName(), true);
// 3,job根配置 (LiteJobConfiguration)
return LiteJobConfiguration.newBuilder(jtc)
// 有这个才能重新布置任务,否则修改不会生效
.overwrite(true)
.build();
}
2)main方法
public static void main(String[] args) {
// 启动定时任务
// 参数1: 注册中心;参数2: 配置
new JobScheduler(zkCenter(), configurationDataflow()).init();
}
启动
script任务
可以运行脚本文件(cmd、python……)
d盘下新建test.txt,修改内容后重命名为.cmd
%1这些是用来接收elastic传递来的参数的
echo running cmd cript: %1,%2,%3,%4,%5
App.java
1)任务配置
/**
* script-job配置
*
* @return
*/
public static LiteJobConfiguration configurationScript() {
// 1,job核心配置
var jcc = JobCoreConfiguration
// 参数1: 任务名称,参数2: cron表达式(0/10 -> 10秒执行一次),参数3: 分片项数量
.newBuilder("myScriptJob", "0/10 * * * * ?", 2)
.build();
// 2,job类型配置
// 参数1: 核心配置,参数2: 任务脚本所在目录
var jtc =
new ScriptJobConfiguration(jcc, "d:/test.cmd");
// 3,job根配置 (LiteJobConfiguration)
return LiteJobConfiguration.newBuilder(jtc)
// 有这个才能重新布置任务,否则修改不会生效
.overwrite(true)
.build();
}
2)main方法
public static void main(String[] args) {
// 启动定时任务
// 参数1: 注册中心;参数2: 配置
new JobScheduler(zkCenter(), configurationScript()).init();
}