XXL-JOB中断信号感知

目录

背景

思路

实现逻辑

总结


背景

  在使用xxl-job框架时,由于系统是由线程池去做异步逻辑,然后主线程等待,在控制台手动停止时,会出现异步线程不感知信号中断的场景,如下场景

而此时如果人工在控制台停止xxl-job执行,异步任务并不会感知到调度线程被interrupt了,上面3个异步任务仍旧执行,而主线程却退出了,如果此时再次调度该任务,而代码逻辑没做幂等,可能出现预期外的异常

思路

  先看看xxl-job trigger的时序图

原图plantuml

    @startuml
'https://plantuml.com/sequence-diagram

!theme plain
autonumber
hide footbox
skinparam ParticipantPadding 50
skinparam BoxPadding 150
title xxljob接口调用

actor "User" as user

box "xxl-job-admin" #LightGray
    participant "controller"  as controller
    participant "trigger" as trigger
    participant "executor-proxy" as proxy
    participant "adminBiz" as admin
end box

box "xxl-job-client" #LightGray
participant "executor"  as executor
participant "jobThread" as job
participant "callBackThread" as callback
participant "retryCallThread" as retryCallBack
end box


autonumber 1
user -> controller++:手动调度 /jobinfo/trigger
controller->trigger++: jobId/触发类型/参数
trigger->trigger:提交trigger任务
    group 异步流程
    trigger->trigger:根据jobId获取jobInfo
    trigger->trigger:获取执行器信息
    note left
    已注册的机器地址列表
    end note
    alt 分片广播
    loop
    trigger->trigger:遍历触发
    end loop
    else 其他
    trigger->trigger:单个触发
    end
    end group
    return 返回提交结果
    == 异步rpc触发==
    autonumber 1
        group 触发流程
        trigger->trigger:获取路由策略&阻塞策略
        trigger->trigger:根据路由策略获取需调度的机器地址
        trigger -> proxy++:获取执行器代理对象&缓存
        note left
        jdk代理+netty
        xxljob的log是客户端记录在本地文件
        admin调用时也通过代理调用远端接口
        end note
        proxy->executor:远程调用(传递触发信息)
        executor->executor:根据jobId获取执行线程
        executor->executor:获取job执行器
        alt 执行线程不为空
        executor->executor:根据阻塞策略处理
        end
        alt 执行线程为空
        executor->executor:新建job线程
        end
        executor->job++:把任务参数加入阻塞队列
        job->job:jobId去重
        return:返回结果
        return:返回结果
        end group
    == 异步jobThread ==
    autonumber 1
    job->job:执行handler init 方法
    loop toStop=false
    job->job:从阻塞队列中获取任务参数
    job->job:准备工作
    note left
    状态设置为运行中
    空闲次数=0
    去除jobId
    设置logFile&分片信息
    end note
   alt 超时时间>0
   job->job:新建线程处理handler信息
   else
   job->job:本线程处理handler信息
   end
   job->job:把执行结果or终止结果加入callback阻塞队列
    end loop
    job->job:清除阻塞队列里的待任务
    note left
    此时已经该线程已经被停止了
    end note
    == 异步callBackThread ==
  autonumber 1
  loop toStop=false
  callback->callback:从callback阻塞队列中获取callback参数
  alt 获取成功
  callback->callback:清空当前阻塞队列中的参数,并将其放到一个新的list
  loop 遍历admin列表
  callback->controller++:调用callback接口
  controller->admin:调用callback逻辑
  alt 任务处理成功
  admin->admin:获取job信息
  admin->admin:获取子任务信息
  loop 遍历子任务
  admin->trigger:提交trigger任务
  end loop
  admin->admin:更新job信息
  end
  return:返回回调结果
  callback->callback:记录日志到本地文件
  alt 回调失败
  callback->callback:记录序列化后的失败参数,用于重试
  end
  end loop
  end
  end loop
== 重试retryCallBack ==
autonumber 1
  loop toStop=false
    retryCallBack->retryCallBack:获取本地重试文件信息
    retryCallBack->retryCallBack:反序列化内容,重试callback请求
  end loop
@enduml

主要关注异步JobThread部分,可以看出是有个toStop的flag去感知这个中断信号的,那怎么去获取toStop的信息呢?这里可以通过起另一个线程去检查这个信号,如果为stop,则透传到异步task中,设计流程如下

原图plantuml

@startuml
'https://plantuml.com/sequence-diagram

!theme plain
autonumber
hide footbox
skinparam ParticipantPadding 50
skinparam BoxPadding 150
title xxljob接口调用

actor "xxl-job-admin" as user

box "xxl-job-client" #LightGray
    participant "xxl-client"  as client
    participant "xxl-main-thread"  as mainThread
    participant "check-interrupt-thread" as checkThread
    participant "async-task..." as asyncThread
end box



autonumber 1
user -> client++:手动调度 /jobinfo/trigger
client->client:加入任务队列
return
client-->mainThread:获取队列任务执行
mainThread->mainThread++:init
mainThread->checkThread++:定期检查mainThread的 stopFlag属性
loop
checkThread->checkThread:定期检查停止属性属性

end loop
mainThread->mainThread:初始化完毕
mainThread->asyncThread:分发任务
asyncThread-->asyncThread++:任务执行
mainThread->mainThread:等待子任务执行完成
user->client:手动中断任务
client->client:捞取jobId对应的线程
client->mainThread:调用暂停方法,interrupt,设置 stopFlag
mainThread-->client:返回暂停结果
mainThread->mainThread:等待执行中的子任务完成
checkThread->asyncThread:设置给子任务 stopFlag
asyncThread->asyncThread:业务逻辑判断 stopFlag
return:stop
mainThread->mainThread:等待检查线程完成
return:check-thread end
mainThread->mainThread:后置处理
return:stop
@enduml

即对于异步的任务,可以做一个封装,用于接受中断信号,而信息的传递则通过threadLocal复制的方式给到异步任务,主要是解决中断信号如何传递到异步任务的问题,异步任务可以通过某个方法来获取主线程是否中断

要点如下

  1. 感知xxl-job主线程的中断信号
  2. 传递中断信号到异步任务,异步任务执行的方法可以手动调用某个方法判断是否中断,进而更快地停止任务

实现逻辑

定义异步任务封装类,用于接受信息

public class TaskWrapper<T> implements Runnable {
    private Runnable runnable;
    private volatile boolean isInterrupt;
    private Supplier<T> supplier;
    private T result;
    private final String taskId;

    private Map<String, String> copyMdc = null;

    //有需要传递的变量可以通过context传递
    private Map<String, Object> executeContext = null;
    Throwable errorCause;

    TaskWrapper(Runnable runnable, String taskId) {
        this.runnable = runnable;
        this.isInterrupt = false;
        this.taskId = taskId;
        copyMdc = MDC.getCopyOfContextMap();
        executeContext = XxlShardingTask.getCopyOfContext();
    }

    TaskWrapper(Supplier<T> supplier, String taskId) {
        this.supplier = supplier;
        this.isInterrupt = false;
        this.taskId = taskId;
        copyMdc = MDC.getCopyOfContextMap();
        executeContext = XxlShardingTask.getCopyOfContext();
    }

    @Override
    public void run() {
        if (!CollectionUtils.isEmpty(copyMdc)) {
            MDC.setContextMap(copyMdc);
        }
        if (!CollectionUtils.isEmpty(executeContext)) {
            XxlShardingTask.setExecuteContext(executeContext);
        }
        XxlShardingTask.setWrapper(this);
        try {
            if (isInterrupt) {
                return;
            }
            if (runnable != null) {
                runnable.run();
            }
            if (supplier != null) {
                result = supplier.get();
            }
        } finally {
            MDC.clear();
            XxlShardingTask.removeContext();
        }
    }

    static boolean isInterrupt() {
        return Optional.ofNullable(XxlShardingTask.getFromContext(XxlShardingTask.EXECUTE_KEY)).map(e -> ((TaskWrapper<?>) e).interrupted()).orElse(Boolean.FALSE);
    }

    public T getResult() {
        return result;
    }

    public String getTaskId() {
        return taskId;
    }

    public Throwable getErrorCause() {
        return errorCause;
    }

    /**
     * 是否成功
     *
     * @return
     */
    public boolean isSuccess() {
        return !isInterrupt && errorCause == null;
    }

    public boolean interrupted() {
        return isInterrupt;
    }

    synchronized void setInterrupt() {
        this.isInterrupt = true;
    }
}

在xxljob的主线程初次调用时,会调用init方法,定一个handler继承xxljob的IJobHandler,并实现

他的init方法,新建检查线程用于check中断信号,执行过程中,会把当前在跑的任务丢到一个map中存储,而检查线程会调用异步任务,把对应的标志未置为停止

public abstract class XxlAsyncTaskHandler<T> extends IJobHandler {
...

public void init() throws InvocationTargetException, IllegalAccessException {
        super.init();
        JobThread thread = (JobThread) Thread.currentThread();
        Field toStop = ReflectionUtils.findField(JobThread.class, "toStop");
        if (toStop == null) {
            throw new IllegalStateException("current thread don't have field [toStop],please check the xxl-job version");
        }
        mainThreadInterrupt.set(false);
        ReflectionUtils.makeAccessible(toStop);
        checkInterruptThread = new Thread(() -> {
            try {
                while (!mainThreadInterrupt.get()) {
                    TimeUnit.MILLISECONDS.sleep(getCheckInterruptMills());
                    if ((boolean) toStop.get(thread)) {
                        if (mainThreadInterrupt.compareAndSet(false, true)) {
                            currentRunTask.forEach((s, tTaskWrapper) -> {
                                tTaskWrapper.setInterrupt();
                            });

                        }
                    }
                }
            } catch (InterruptedException e) {
                //ignore
            } catch (Exception ex) {
                LOGGER.error("check interrupt error", ex);
            }
        });
        checkInterruptThread.start();
    }

}

主流程(即xxl-job调度线程所执行的execute方法)通过获取待执行的任务,对其进行封装,并加入到当前在运行的任务map中,核心的代码如下,逻辑流程

  1. 从任务生成器中获取待执行的封装好的任务
  2. 并加入到异步线程池执行
  3. 主线程等待
 while (currentTaskGenerator.hasNextTask()) {
                List<TaskWrapper<T>> wrappers = new ArrayList<>();
                for (int i = 0; i < parallelCount; i++) {
                    if (currentTaskGenerator.hasNextTask()) {
                        TaskWrapper<T> nextTask = currentTaskGenerator.getNextTask();
                        String taskId = nextTask.getTaskId();
//加入到当前执行中的任务
                        currentRunTask.put(taskId, nextTask);
                        CompletableFuture.runAsync(nextTask, executor).whenComplete((unused, throwable) -> {
                            if (throwable != null) {
                                currentRunTask.get(taskId).errorCause = throwable;
                            } else {
                                if (nextTask.isSuccess()) {
                                    successCount.incrementAndGet();
                                }
                            }
//任务处理完,countDown一下
                            count.countDown();
                            currentRunTask.remove(taskId);
                        });
                        //代表任务分配完毕
                    } else {
                        count.countDown();
                    }
                }
//主线程等待
                count.await();

对于异步任务的逻辑

由于开始时设置当前执行的封装任务到本地线程,可以通过static方法进行获取标识,比如循环或者一些较重的耗时操作,可以在执行前进行判断,如果中断了就返回结果

  protected static boolean isWorkerInterrupt() {
        return TaskWrapper.isInterrupt();
    }

比如继承该类,子类可以在业务逻辑进行判断

            while (!isWorkerInterrupt()) {
...业务逻辑
}

由于整块优化的异步调度任务的代码比较多,而且涉及了公司信息,不在此展示,重点在于

  1. xxl-job异步线程如何感知主线程中断信息——了解xxljob trigger原理,封装runnable,管理当前封装的runnable任务,把中断信息透传异步任务
  2. 线程间的信息如何传递——这里通过封装runnable类作为一个信息载体,threadLocal用于接受信息,实现不同线程的信息传递

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/777895.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

insert阻塞了insert?

一、发现问题 在arms监控页面看到某条insert语句的执行时长达到了431毫秒。 数据库中存在&#xff0c;insert语句受到了行锁阻塞&#xff0c;而阻塞的源头也在执行同样的insert语句&#xff0c;同样都是对表USERSYS_TASK_USER_LOG_TEMP01的插入操作&#xff0c;很是费解。 二…

idea创建的maven项目pom文件引入的坐标报红原因

如下所示 我们在引入某些依赖坐标的时候&#xff0c;即使点击了右上角的mavne刷新之后还是报红。 其实这是正常现象&#xff0c;实际上是我们的本地仓库当中没有这些依赖坐标&#xff0c;而idea就会通过报红来标记这些依赖来说明在我们的本地仓库是不存在的。 那有的同学就会…

ODOO17的邮件机制-系统自动推送修改密码的邮件

用户收到被要求重置密码的邮件&#xff1a; 我们来分析一下ODOO此邮件的工作机制&#xff1a; 1、邮件模板定义 2、渲染模板的函数&#xff1a; 3、调用此函数的机制&#xff1a; 当用户移除或增加了信任的设备&#xff08;如电脑、手机端等&#xff09;&#xff0c;系统会自…

农业气象站:现代农业的守护者与引领者

随着科技的飞速发展&#xff0c;农业领域也在经历着前所未有的变革。在这一变革中&#xff0c;农业气象站以其独特的功能和作用&#xff0c;逐渐成为了现代农业的守护者与引领者。 农业气象站&#xff0c;顾名思义&#xff0c;是专门用于观测和记录农田气象要素的设施。这些气象…

轻松设置:服务器域名配置全攻略

目录 前置条件 在阅读本篇内容之前&#xff0c;请先确保以下物料已准备好&#xff1a; 一台公网服务器&#xff0c;服务正常运行申请完成的域名&#xff0c;在对应域名服务商后台正常DNS解析域名备案完成可选条件&#xff1a;有https访问请求时&#xff0c;需要申请SSL证书 …

Android在framework层添加自定义服务的流程

环境说明 ubuntu16.04android4.1java version “1.6.0_45”GNU Make 3.81gcc version 5.4.0 20160609 (Ubuntu 5.4.0-6ubuntu1~16.04.12) 可能有人会问&#xff0c;现在都2024了怎么还在用android4版本&#xff0c;早都过时了。确实&#xff0c;现在最新的都是Android13、And…

在Linux环境下搭建Redis服务结合内网穿透实现通过GUI工具远程管理数据库

文章目录 前言1. 安装Docker步骤2. 使用docker拉取redis镜像3. 启动redis容器4. 本地连接测试4.1 安装redis图形化界面工具4.2 使用RDM连接测试 5. 公网远程访问本地redis5.1 内网穿透工具安装5.2 创建远程连接公网地址5.3 使用固定TCP地址远程访问 前言 本文主要介绍如何在Li…

Python处理表格数据常用的 N+个操作

Python作为一种强大且易用的编程语言&#xff0c;其在数据处理方面表现尤为出色。特别是当我们面对大量的表格数据时&#xff0c;Python的各类库和工具可以极大地提高我们的工作效率。以下&#xff0c;我将详细介绍Python处理表格数据常用的操作。 首先&#xff0c;我们需要安…

【算法笔记自学】第 3 章 入门篇(1)——入门模拟

3.1简单模拟 自己写的题解 #include <stdio.h> #include <stdlib.h> int main() {int N;int num0;scanf("%d",&N);while(N!1){if(N%20){NN/2;}else{N(3*N1)/2;}num;}printf("%d",num);system("pause"); // 防止运行后自动退出&…

SpringBoot+OSS实现文件上传

创建spring boot项目 pom依赖 <dependency><groupId>com.aliyun.oss</groupId><artifactId>aliyun-sdk-oss</artifactId><version>3.17.4</version></dependency><dependency><groupId>javax.xml.bind</groupI…

Transformer前置知识:Seq2Seq模型

Seq2Seq model Seq2Seq&#xff08;Sequence to Sequence&#xff09;模型是一类用于将一个序列转换为另一个序列的深度学习模型&#xff0c;广泛应用于自然语言处理&#xff08;NLP&#xff09;任务&#xff0c;如机器翻译、文本摘要、对话生成等。Seq2Seq模型由编码器&#…

直播预告|飞思实验室暑期公益培训7月10日正式开启,报名从速!

01 培训背景 很荣幸地向大家宣布&#xff1a;卓翼飞思实验室将于7月10日正式开启为期两个月的暑期公益培训&#xff01;本次培训为线上直播&#xff0c;由中南大学计算机学院特聘副教授&#xff0c;RflySim平台总研发负责人戴训华副教授主讲。 培训将基于“RflySim—智能无人…

数据可视化之智慧农业的窗口与引擎

在科技日新月异的今天,农业作为国民经济的基础产业,正逐步向智能化、数字化转型。农业为主题的数据可视化大屏看板,作为这一转型过程中的重要工具,不仅为农业管理者提供了全面、实时的农田信息,还促进了农业资源的优化配置和农业生产效率的提升。本文将深入探讨农业数据可…

Git 运用小知识

1.Git添加未完善代码的解决方法 1.1 Git只是提交未推送 把未完善的代码提交到本地仓库 只需点击撤销提交&#xff0c;提交的未完善代码会被撤回 代码显示未提交状态 1.2 Git提交并推送 把未完善的代码提交并推送到远程仓库 点击【未完善提交并推送】的结点选择还原提交&#x…

最佳 iPhone 解锁软件工具,可免费下载用于电脑操作的

业内专业人士表示&#xff0c;如果您拥有 iPhone&#xff0c;您一定知道忘记锁屏密码会多么令人沮丧。由于 Apple 的安全功能强大&#xff0c;几乎不可能在没有密码或 Apple ID 的情况下访问锁定的 iPhone。 “当我忘记密码时&#xff0c;如何在没有密码的情况下解锁iPhone&am…

银河麒麟V10 SP1 审计工具 auditd更新

前言 银河麒麟V10 SP1 审计工具 auditd 引发的内存占用过高&#xff0c; 内存使用率一直在 60% 以上&#xff0c; 内存一直不释放 排查 可以使用ps或者top查看系统进程使用情况 ps -aux|sort -k4nr|head -n 5 发现银河麒麟审计工具 auditd 一直占用内存不释放 解决 办法一…

Java视频点播网站

作者介绍&#xff1a;计算机专业研究生&#xff0c;现企业打工人&#xff0c;从事Java全栈开发 主要内容&#xff1a;技术学习笔记、Java实战项目、项目问题解决记录、AI、简历模板、简历指导、技术交流、论文交流&#xff08;SCI论文两篇&#xff09; 上点关注下点赞 生活越过…

基于OpenCv的快速图片颜色交换,轻松实现图片背景更换

图片颜色更换 图片颜色转换 当我们有2张图片,很喜欢第一张图片的颜色,第2张图片的前景照片,很多时候我们需要PS进行图片的颜色转换,这当然需要我们有强大的PS功底,当然小编这里不是介绍PS的,我们使用代码完全可以代替PS 进行图片的颜色转换 图片颜色转换步骤: 步骤…

智慧校园-教职工管理系统总体概述

在当今信息化时代&#xff0c;智慧校园教职工管理系统成为了提升教育机构管理效能的重要工具。该系统巧妙融合了先进的信息技术&#xff0c;为教职工的日常管理带来了一场静悄悄的革命。它不仅是一个信息存储库&#xff0c;记录着每位教职工从加入到离开的完整职业轨迹&#xf…

AI 与数据的智能融合丨大模型时代下的存储系统

WOT 全球技术创新大会2024北京站于 6 月 22 日圆满落幕。本届大会以“智启新纪&#xff0c;慧创万物”为主题&#xff0c;邀请到 60 位不同行业的专家&#xff0c;聚焦 AIGC、领导力、研发效能、架构演进、大数据等热门技术话题进行分享。 近年来&#xff0c;数据和人工智能已…