目录
背景
思路
实现逻辑
总结
背景
在使用xxl-job框架时,由于系统是由线程池去做异步逻辑,然后主线程等待,在控制台手动停止时,会出现异步线程不感知信号中断的场景,如下场景
而此时如果人工在控制台停止xxl-job执行,异步任务并不会感知到调度线程被interrupt了,上面3个异步任务仍旧执行,而主线程却退出了,如果此时再次调度该任务,而代码逻辑没做幂等,可能出现预期外的异常
思路
先看看xxl-job trigger的时序图
原图plantuml
@startuml
'https://plantuml.com/sequence-diagram
!theme plain
autonumber
hide footbox
skinparam ParticipantPadding 50
skinparam BoxPadding 150
title xxljob接口调用
actor "User" as user
box "xxl-job-admin" #LightGray
participant "controller" as controller
participant "trigger" as trigger
participant "executor-proxy" as proxy
participant "adminBiz" as admin
end box
box "xxl-job-client" #LightGray
participant "executor" as executor
participant "jobThread" as job
participant "callBackThread" as callback
participant "retryCallThread" as retryCallBack
end box
autonumber 1
user -> controller++:手动调度 /jobinfo/trigger
controller->trigger++: jobId/触发类型/参数
trigger->trigger:提交trigger任务
group 异步流程
trigger->trigger:根据jobId获取jobInfo
trigger->trigger:获取执行器信息
note left
已注册的机器地址列表
end note
alt 分片广播
loop
trigger->trigger:遍历触发
end loop
else 其他
trigger->trigger:单个触发
end
end group
return 返回提交结果
== 异步rpc触发==
autonumber 1
group 触发流程
trigger->trigger:获取路由策略&阻塞策略
trigger->trigger:根据路由策略获取需调度的机器地址
trigger -> proxy++:获取执行器代理对象&缓存
note left
jdk代理+netty
xxljob的log是客户端记录在本地文件
admin调用时也通过代理调用远端接口
end note
proxy->executor:远程调用(传递触发信息)
executor->executor:根据jobId获取执行线程
executor->executor:获取job执行器
alt 执行线程不为空
executor->executor:根据阻塞策略处理
end
alt 执行线程为空
executor->executor:新建job线程
end
executor->job++:把任务参数加入阻塞队列
job->job:jobId去重
return:返回结果
return:返回结果
end group
== 异步jobThread ==
autonumber 1
job->job:执行handler init 方法
loop toStop=false
job->job:从阻塞队列中获取任务参数
job->job:准备工作
note left
状态设置为运行中
空闲次数=0
去除jobId
设置logFile&分片信息
end note
alt 超时时间>0
job->job:新建线程处理handler信息
else
job->job:本线程处理handler信息
end
job->job:把执行结果or终止结果加入callback阻塞队列
end loop
job->job:清除阻塞队列里的待任务
note left
此时已经该线程已经被停止了
end note
== 异步callBackThread ==
autonumber 1
loop toStop=false
callback->callback:从callback阻塞队列中获取callback参数
alt 获取成功
callback->callback:清空当前阻塞队列中的参数,并将其放到一个新的list
loop 遍历admin列表
callback->controller++:调用callback接口
controller->admin:调用callback逻辑
alt 任务处理成功
admin->admin:获取job信息
admin->admin:获取子任务信息
loop 遍历子任务
admin->trigger:提交trigger任务
end loop
admin->admin:更新job信息
end
return:返回回调结果
callback->callback:记录日志到本地文件
alt 回调失败
callback->callback:记录序列化后的失败参数,用于重试
end
end loop
end
end loop
== 重试retryCallBack ==
autonumber 1
loop toStop=false
retryCallBack->retryCallBack:获取本地重试文件信息
retryCallBack->retryCallBack:反序列化内容,重试callback请求
end loop
@enduml
主要关注异步JobThread部分,可以看出是有个toStop的flag去感知这个中断信号的,那怎么去获取toStop的信息呢?这里可以通过起另一个线程去检查这个信号,如果为stop,则透传到异步task中,设计流程如下
原图plantuml
@startuml
'https://plantuml.com/sequence-diagram
!theme plain
autonumber
hide footbox
skinparam ParticipantPadding 50
skinparam BoxPadding 150
title xxljob接口调用
actor "xxl-job-admin" as user
box "xxl-job-client" #LightGray
participant "xxl-client" as client
participant "xxl-main-thread" as mainThread
participant "check-interrupt-thread" as checkThread
participant "async-task..." as asyncThread
end box
autonumber 1
user -> client++:手动调度 /jobinfo/trigger
client->client:加入任务队列
return
client-->mainThread:获取队列任务执行
mainThread->mainThread++:init
mainThread->checkThread++:定期检查mainThread的 stopFlag属性
loop
checkThread->checkThread:定期检查停止属性属性
end loop
mainThread->mainThread:初始化完毕
mainThread->asyncThread:分发任务
asyncThread-->asyncThread++:任务执行
mainThread->mainThread:等待子任务执行完成
user->client:手动中断任务
client->client:捞取jobId对应的线程
client->mainThread:调用暂停方法,interrupt,设置 stopFlag
mainThread-->client:返回暂停结果
mainThread->mainThread:等待执行中的子任务完成
checkThread->asyncThread:设置给子任务 stopFlag
asyncThread->asyncThread:业务逻辑判断 stopFlag
return:stop
mainThread->mainThread:等待检查线程完成
return:check-thread end
mainThread->mainThread:后置处理
return:stop
@enduml
即对于异步的任务,可以做一个封装,用于接受中断信号,而信息的传递则通过threadLocal复制的方式给到异步任务,主要是解决中断信号如何传递到异步任务的问题,异步任务可以通过某个方法来获取主线程是否中断
要点如下
- 感知xxl-job主线程的中断信号
- 传递中断信号到异步任务,异步任务执行的方法可以手动调用某个方法判断是否中断,进而更快地停止任务
实现逻辑
定义异步任务封装类,用于接受信息
public class TaskWrapper<T> implements Runnable {
private Runnable runnable;
private volatile boolean isInterrupt;
private Supplier<T> supplier;
private T result;
private final String taskId;
private Map<String, String> copyMdc = null;
//有需要传递的变量可以通过context传递
private Map<String, Object> executeContext = null;
Throwable errorCause;
TaskWrapper(Runnable runnable, String taskId) {
this.runnable = runnable;
this.isInterrupt = false;
this.taskId = taskId;
copyMdc = MDC.getCopyOfContextMap();
executeContext = XxlShardingTask.getCopyOfContext();
}
TaskWrapper(Supplier<T> supplier, String taskId) {
this.supplier = supplier;
this.isInterrupt = false;
this.taskId = taskId;
copyMdc = MDC.getCopyOfContextMap();
executeContext = XxlShardingTask.getCopyOfContext();
}
@Override
public void run() {
if (!CollectionUtils.isEmpty(copyMdc)) {
MDC.setContextMap(copyMdc);
}
if (!CollectionUtils.isEmpty(executeContext)) {
XxlShardingTask.setExecuteContext(executeContext);
}
XxlShardingTask.setWrapper(this);
try {
if (isInterrupt) {
return;
}
if (runnable != null) {
runnable.run();
}
if (supplier != null) {
result = supplier.get();
}
} finally {
MDC.clear();
XxlShardingTask.removeContext();
}
}
static boolean isInterrupt() {
return Optional.ofNullable(XxlShardingTask.getFromContext(XxlShardingTask.EXECUTE_KEY)).map(e -> ((TaskWrapper<?>) e).interrupted()).orElse(Boolean.FALSE);
}
public T getResult() {
return result;
}
public String getTaskId() {
return taskId;
}
public Throwable getErrorCause() {
return errorCause;
}
/**
* 是否成功
*
* @return
*/
public boolean isSuccess() {
return !isInterrupt && errorCause == null;
}
public boolean interrupted() {
return isInterrupt;
}
synchronized void setInterrupt() {
this.isInterrupt = true;
}
}
在xxljob的主线程初次调用时,会调用init方法,定一个handler继承xxljob的IJobHandler,并实现
他的init方法,新建检查线程用于check中断信号,执行过程中,会把当前在跑的任务丢到一个map中存储,而检查线程会调用异步任务,把对应的标志未置为停止
public abstract class XxlAsyncTaskHandler<T> extends IJobHandler {
...
public void init() throws InvocationTargetException, IllegalAccessException {
super.init();
JobThread thread = (JobThread) Thread.currentThread();
Field toStop = ReflectionUtils.findField(JobThread.class, "toStop");
if (toStop == null) {
throw new IllegalStateException("current thread don't have field [toStop],please check the xxl-job version");
}
mainThreadInterrupt.set(false);
ReflectionUtils.makeAccessible(toStop);
checkInterruptThread = new Thread(() -> {
try {
while (!mainThreadInterrupt.get()) {
TimeUnit.MILLISECONDS.sleep(getCheckInterruptMills());
if ((boolean) toStop.get(thread)) {
if (mainThreadInterrupt.compareAndSet(false, true)) {
currentRunTask.forEach((s, tTaskWrapper) -> {
tTaskWrapper.setInterrupt();
});
}
}
}
} catch (InterruptedException e) {
//ignore
} catch (Exception ex) {
LOGGER.error("check interrupt error", ex);
}
});
checkInterruptThread.start();
}
}
主流程(即xxl-job调度线程所执行的execute方法)通过获取待执行的任务,对其进行封装,并加入到当前在运行的任务map中,核心的代码如下,逻辑流程
- 从任务生成器中获取待执行的封装好的任务
- 并加入到异步线程池执行
- 主线程等待
while (currentTaskGenerator.hasNextTask()) {
List<TaskWrapper<T>> wrappers = new ArrayList<>();
for (int i = 0; i < parallelCount; i++) {
if (currentTaskGenerator.hasNextTask()) {
TaskWrapper<T> nextTask = currentTaskGenerator.getNextTask();
String taskId = nextTask.getTaskId();
//加入到当前执行中的任务
currentRunTask.put(taskId, nextTask);
CompletableFuture.runAsync(nextTask, executor).whenComplete((unused, throwable) -> {
if (throwable != null) {
currentRunTask.get(taskId).errorCause = throwable;
} else {
if (nextTask.isSuccess()) {
successCount.incrementAndGet();
}
}
//任务处理完,countDown一下
count.countDown();
currentRunTask.remove(taskId);
});
//代表任务分配完毕
} else {
count.countDown();
}
}
//主线程等待
count.await();
对于异步任务的逻辑
由于开始时设置当前执行的封装任务到本地线程,可以通过static方法进行获取标识,比如循环或者一些较重的耗时操作,可以在执行前进行判断,如果中断了就返回结果
protected static boolean isWorkerInterrupt() {
return TaskWrapper.isInterrupt();
}
比如继承该类,子类可以在业务逻辑进行判断
while (!isWorkerInterrupt()) {
...业务逻辑
}
由于整块优化的异步调度任务的代码比较多,而且涉及了公司信息,不在此展示,重点在于
- xxl-job异步线程如何感知主线程中断信息——了解xxljob trigger原理,封装runnable,管理当前封装的runnable任务,把中断信息透传异步任务
- 线程间的信息如何传递——这里通过封装runnable类作为一个信息载体,threadLocal用于接受信息,实现不同线程的信息传递