集成xxljob项目如何迁移到K8S

10.png

前言

大家好,今天我们将基于XXL-Job,探讨任务调度迁移到云端的相关话题。

XXL-Job是一款功能强大、易用可靠的国产分布式任务调度平台,是目前国内使用比较广泛的分布式任务调度平台之一。它的主要特点包括:

  • 支持分布式、多线程任务调度;
  • 具有完整的管理后台,可以实现任务调度的创建、修改、启动和监控;
  • 提供了丰富的调度方式,包括cron表达式、API调用、消息队列等;
  • 支持任务执行过程的日志记录和错误处理,可以帮助用户快速定位问题。

随着云计算的全面普及和发展,越来越多企业开始认识到公共云平台的无限潜力。许多企业开始将自己的应用程序和业务迁移到云环境中,以获取更高的灵活性、弹性和可扩展性。然而,任务调度作为企业中的一个重要业务组件,对于软件开发和运营的质量都有着极大的影响。在云环境下部署和运行任务调度组件,需要考虑诸多因素,如安全性、可靠性、性能等。因此,企业需要认真思考如何在云平台上部署和运行任务调度组件,以保证运营效率、降低成本、提高应用程序的质量和性能。

云端迁移过程

由于历史原因,我们的 xxl-job-admin 端是部署在 k8s 集群外部的。在我们的项目中,我们是使用XML文件来集成xxl-job的,相关的集成配置如下所示:

<bean id="xxlJobExecutor" class="com.xxl.job.core.executor.impl.XxlJobSpringExecutor">
    <property name="adminAddresses" value="${xxl.job.admin.addresses}"/>
    <property name="appname" value="${xxl.job.executor.appname}"/>
    <property name="ip" value="${xxl.job.executor.ip}"/>
    <property name="port" value="${xxl.job.executor.port}"/>
    <property name="accessToken" value="${xxl.job.accessToken}" />
    <property name="logPath" value="${xxl.job.executor.logpath}"/>
    <property name="logRetentionDays" value="${xxl.job.executor.logretentiondays}"/>
</bean>

其中,相关配置值如下:

xxl.job.admin.addresses = http://127.0.0.1/xxl-job-admin
xxl.job.executor.appname = xxl-job-executor-sample
xxl.job.executor.ip = 
xxl.job.executor.port = 30065
xxl.job.accessToken = mytoken
xxl.job.executor.logpath = /etc/logs
xxl.job.executor.logretentiondays = -1

解决注册IP错误问题

当我们使用了与其他普通 Spring 项目的 JAR 包相同的部署方式将任务调度组件部署到了 k8s 上后,虽然我们通过管理页面看到已经成功将服务注册到了 xxl-job-admin,但我们发现该服务的 IP 地址为 k8s 中 Pod 的私有 IP 地址。因为k8s 集群内部通信的私有 IP 地址在集群外不可访问,这导致了任务无法正常执行,系统提示 IP 地址无效。

15.png

那么该如果解决这个问题呢?

阅读XXL-Job源码可以深入了解XXL-Job框架的实现细节和内部机制。在XXL-Job源码中,可以找到一些关键方法,帮助我们了解IP和port的获取规则。

具体来说,这些方法位于com.xxl.job.core.executor.XxlJobExecuto类中的initEmbedServer方法。当执行器启动时,会优先使用配置文件中的IP和端口,如果配置文件未指定,则通过NetUtils获取本地主机地址和默认端口。在注册成功后,执行器就可以通过该IP和端口与注册中心进行正常通信。部分源码如下:

port = port > 0 ? port : NetUtil.findAvailablePort(9999);
ip = ip != null && ip.trim().length() > 0 ? ip : IpUtil.getIp();

由此可见,为了解决这个问题,我们有两种方法可以尝试。

  • 我们可以直接将配置文件中的 xxl.job.executor.ip 指定为正确的IP地址,这样XXL-Job就可以正确地找到执行器并与之通信了。
  • 在XXL-Job的管理页面上将执行器的注册方式改为手动录入,并直接填写正确的IP地址。

16.png

无论使用哪种方法,唯一的要求就是确保与执行器实际运行的IP地址匹配。这样就可以使XXL-Job正常工作了。

实现动态注册IP

无论采用前面提到的两种方式中的哪一种,均存在一个xxl-job配置写死IP地址的问题,而无法实现IP的动态获取,这对于后期的维护和动态扩缩容都是不利的。那么如何在保证获取到的IP正确的前提下实现自动获取呢?

为了实现xxl-job自动获取注册IP的目的,在获取IP的过程中,我们可以结合Dubbo框架的获取IP逻辑,改造获取IP的顺序。按照以下顺序获取IP:

  • 首先根据环境变量获取IP,如果环境变量中存在,则获取环境变量中的IP地址。

  • 如果环境变量中不存在,则根据配置文件获取IP,如果配置文件中存在,则获取配置文件中的IP地址。

  • 如果配置文件中不存在,则获取本地IP地址。

这样的优先级顺序可以确保我们始终能够获得一个可用的注册IP。通过这种方式会让获取IP更加智能化和可靠。以下是具体改造步骤:

  • deploy.yaml 文件中添加环境变量。
spec:
  template:
    spec:
      containers:
        - env:
            - name: XXLJOB_IP_TO_REGISTRY
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: status.hostIP
  • 使用Java代码中的注释@Configuration和@Bean注释来替代使用XML文件进行Bean的注册和配置。
@Slf4j
@Configuration
public class XxlJobConfiguration {

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init start...");

        // 获取ip规则优先级, 环境变量(此值为deploy.yaml中配置)>配置文件>默认(本地)
        String ip = System.getenv("XXLJOB_IP_TO_REGISTRY");
        ip = StringUtils.isBlank(ip) ? PropertiesCacheUtil.getConfigValue("xxl.job.executor.ip") : ip;

        String port = PropertiesCacheUtil.getConfigValue("xxl.job.executor.port");
        String logRetentionDays = PropertiesCacheUtil.getConfigValue("xxl.job.executor.logretentiondays");

        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.admin.addresses")));
        xxlJobSpringExecutor.setAppname(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.executor.appname")));
        xxlJobSpringExecutor.setIp(ip);

        if (StringUtils.isNotBlank(port)) {
            xxlJobSpringExecutor.setPort(Integer.parseInt(port));
        }
        xxlJobSpringExecutor.setAccessToken(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.accessToken")));
        xxlJobSpringExecutor.setLogPath(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.executor.logpath")));
        if (StringUtils.isNotBlank(logRetentionDays)) {
            xxlJobSpringExecutor.setLogRetentionDays(Integer.parseInt(logRetentionDays));
        }
        
        log.info(">>>>>>>>>>> xxl-job config init end...");
        return xxlJobSpringExecutor;
    }
}

通过这样的改造,我们可以更加智能可靠地获取注册IP,实现了xxl-job自动获取IP地址的目的。

解决分片问题

无论使用上面提到的写死配置方式还是实现动态注册IP,都是仅适用于单机的情况,如果需要部署多台任务调度组件,那么又该如何配置才能保证每个服务都可以被调度,以达到实现分片处理的目的呢?

方法1:

我们可以通过在deploy.yaml文件中配置Pod的反亲和性,使得单台宿主机上仅能部署一个服务,并且配置在service.yaml中配置代理策略为Local的方式来达到上述目的。具体配置如下:

deploy.yaml改造如下:

spec:
  template:
    spec:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - topologyKey: kubernetes.io/hostname
              labelSelector:
                matchExpressions:
                  - key: app
                    operator: In
                    values:
                      - 你的APP名称

service.yaml 改造如下:

spec:
  ## 代理策略:默认Cluster。Cluster表示:流量可以转发到其他节点上的Pod。Local表示:流量只发给本机的Pod
  externalTrafficPolicy: Local

经过上面的改造,我们成功的解决了分片问题,但是又带来了新的问题,如下图所示:
在这里插入图片描述

上面的方法都是使用Deployment方式部署的,那么,我们是否可以换下思路使用StatefulSet方式部署呢?这就衍生出了下面的方法。

方法2:
  • 改造配置:
## 注册到xxljob的端口,多个使用英文逗号分隔
xxl.job.executor.port = 30065,30066,30067
  • 改造代码
@Slf4j
@Configuration
public class XxlJobConfiguration {

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");

        // 获取ip规则优先级, 配置中心>环境变量(此值为deploy.yml中配置)
        String ip = PropertiesCacheUtil.getConfigValue("xxl.job.executor.ip");
        ip = StringUtils.isBlank(ip) ? System.getenv("XXLJOB_IP_TO_REGISTRY") : ip;
        log.info("==>ip:{}", ip);

        String podName = StringUtils.trimToEmpty(System.getenv("POD_NAME"));
        log.info("==>POD_NAME:{}", podName);

        String[] split = StringUtils.split(podName, "-");
        String index = split[split.length - 1];
        log.info("==>index:{}", index);

        String allPort = PropertiesCacheUtil.getConfigValue("xxl.job.executor.port");
        String[] portSplit = StringUtils.split(allPort, ",");
        String port = portSplit[Integer.parseInt(index)];
        log.info("==>port:{}", port);

        String logRetentionDays = PropertiesCacheUtil.getConfigValue("xxl.job.executor.logretentiondays");

        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.admin.addresses")));
        xxlJobSpringExecutor.setAppname(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.executor.appname")));
        xxlJobSpringExecutor.setIp(ip);

        if (StringUtils.isNotBlank(port)) {
            xxlJobSpringExecutor.setPort(Integer.parseInt(port));
        }
        xxlJobSpringExecutor.setAccessToken(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.accessToken")));
        xxlJobSpringExecutor.setLogPath(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.executor.logpath")));
        if (StringUtils.isNotBlank(logRetentionDays)) {
            xxlJobSpringExecutor.setLogRetentionDays(Integer.parseInt(logRetentionDays));
        }
        return xxlJobSpringExecutor;
    }
}
@Slf4j
@Component
public class InitNotifyDataFromDBHandler {

    @XxlJob("initNotifyDataFromDBHandler")
    public void initNotifyDataFromDBHandler(String params) {
            // XxlJobHelper.getShardIndex():当前分片序号(从0开始),执行器集群列表中当前执行器的序号;
            // XxlJobHelper.getShardTotal():总分片数,执行器集群的总机器数量;

            String podName = StringUtils.trimToEmpty(System.getenv("POD_NAME"));
            log.info("==>POD_NAME:{}", podName);
            XxlJobHelper.log("==>POD_NAME:{}", podName);

            String[] split = StringUtils.split(podName, "-");
            String index = split[split.length - 1];
            log.info("==>index:{}", index);
            XxlJobHelper.log("==>index:{}", index);

            // 下标0:机器总数目,下标1:当前机器在总机器中的位置下标
            String[] args = {XxlJobHelper.getShardTotal() + "", index};
            
            // 其他业务逻辑
            }
     }
  • 重写K8S中yaml部署文件
## 创建StatefulSet
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: your-app
  namespace: your-namespace
spec:
  serviceName: your-app
  replicas: 3
  selector:
    matchLabels:
      app: your-app
  template:
    metadata:
      annotations:
        statefulset.kubernetes.io/pod-name: $(POD_NAME)
      labels:
        app: your-app
    spec:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: project.node
                    operator: In
                    values:
                      - your-project-node
      volumes:
        - name: timezone
          hostPath:
            path: /usr/share/zoneinfo/Asia/Shanghai
      containers:
        - env:
            - name: POD_NAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            - name: DUBBO_IP_TO_REGISTRY
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: status.hostIP
            - name: XXLJOB_IP_TO_REGISTRY
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: status.hostIP
          image: your-image
          imagePullPolicy: Always
          name: your-app
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      terminationGracePeriodSeconds: 30
## 创建service
---
apiVersion: v1
kind: Service
metadata:
  name: service-your-app-0
  namespace: your-namespace
spec:
  selector:
    statefulset.kubernetes.io/pod-name: your-app-0
  type: NodePort
  sessionAffinity: None
  ports:
    - name: xxljob-your-app
      port: 30065
      targetPort: 30065
      nodePort: 30065
---
apiVersion: v1
kind: Service
metadata:
  name: service-your-app-1
  namespace: your-namespace
spec:
  selector:
    statefulset.kubernetes.io/pod-name: your-app-1
  type: NodePort
  sessionAffinity: None
  ports:
    - name: xxljob-your-app
      port: 30066
      targetPort: 30066
      nodePort: 30066
---
apiVersion: v1
kind: Service
metadata:
  name: service-your-app-2
  namespace: your-namespace
spec:
  selector:
    statefulset.kubernetes.io/pod-name: your-app-2
  type: NodePort
  sessionAffinity: None
  ports:
    - name: xxljob-your-app
      port: 30067
      targetPort: 30067
      nodePort: 30067

经过上面的改造,我们成功的解决了使用第一种方法带来的问题。但是这个方法同样以下缺点,但是这种缺点相对来说是可以忽略的,因为生产环境不会随便增减副本数量。

  • 在K8S的dashboard页面直接新增副本数量无效,需要先新增配置文件中的端口,再新增部署yaml中对应的Service,才能真正实现副本数量的增加。

小结

以上就是今天分享的任务调度上云的相关内容,我们的目标不仅仅是将任务调度程序迁移到云端,更是要通过实现自动注册功能,使任务调度程序能自动加入云端调度集群,从而更方便地进行任务调度,提升运行效率和可扩展性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/324486.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java中的异常处理

目录 前言&#xff1a; 异常简介&#xff1a; Error类&#xff1a; Exception类&#xff1a; Exception异常&#xff1a; 运行异常&#xff1a; 编译异常&#xff1a; throw和throws关键字&#xff1a; throw: throws: try-catch关键字&#xff1a; finally: 为…

nvcc -V显示command not found

出现这个问题&#xff0c;不仅是 nvcc -V会显示command not found,nvidia-smi同样也会显示 解决方法如下&#xff1a; 1&#xff09;这里首先转换到CUDA所在位置&#xff0c;一般是在这个位置 cd /usr/local 2&#xff09;打开、编辑环境变量的配置文件 vim ~/.bashrc …

NLP论文阅读记录 - 2021 | WOS 利用 ParsBERT 和预训练 mT5 进行波斯语抽象文本摘要

文章目录 前言0、论文摘要一、Introduction1.1目标问题1.2相关的尝试1.3本文贡献 二.前提三.本文方法A. 序列到序列 ParsBERTB、mT5 四 实验效果4.1数据集4.2 对比模型4.3实施细节4.4评估指标4.5 实验结果4.6 细粒度分析 五 总结思考 前言 Leveraging ParsBERT and Pretrained …

【JupyterLab】在 conda 虚拟环境中 JupyterLab 的安装与使用

【JupyterLab】在 conda 虚拟环境中 JupyterLab 的安装与使用 1 JupyterLab 介绍2 安装2.1 Jupyter Kernel 与 conda 虚拟环境 3 使用3.1 安装中文语言包(Optional)3.2 启动3.3 常用快捷键3.3.1 命令模式下 3.4 远程访问个人计算机3.4.1 局域网下 1 JupyterLab 介绍 官方文档: …

分布式搜索——Elasticsearch

Elasticsearch 文章目录 Elasticsearch简介ELK技术栈Elasticsearch和Lucene 倒排索引正向索引倒排索引正向和倒排 ES概念文档和字段索引和映射Mysql与Elasticsearch 安装ES、Kibana安装单点ES创建网络拉取镜像运行 部署kibana拉取镜像部署 安装Ik插件扩展词词典停用词词典 索引…

政采网调试要求及常见问题解决方法

登录平台软件环境要求&#xff1a; 操作系统&#xff1a;建议Win10及以上&#xff08;Win10-64位专业版 版本号17134纯净安装版本&#xff09; 浏 览 器&#xff1a;IE11浏览器、谷歌120.0.6099.217&#xff08;64位正式版&#xff09;浏览器 必要软件&#xff1a;CA互联互通…

python高校舆情分析系统+可视化+情感分析 舆情分析+Flask框架(源码+文档)✅

毕业设计&#xff1a;2023-2024年计算机专业毕业设计选题汇总&#xff08;建议收藏&#xff09; 毕业设计&#xff1a;2023-2024年最新最全计算机专业毕设选题推荐汇总 &#x1f345;感兴趣的可以先收藏起来&#xff0c;点赞、关注不迷路&#xff0c;大家在毕设选题&#xff…

蓝桥杯省赛无忧 STL 课件19 第2次学长带练

01 讲解例题 02 复习和拓展课程知识

HDFS和MapReduce综合实训

文章目录 第1关&#xff1a;WordCount词频统计第2关&#xff1a;HDFS文件读写第3关&#xff1a;倒排索引第4关&#xff1a; 网页排序——PageRank算法 第1关&#xff1a;WordCount词频统计 测试说明 以下是测试样例&#xff1a; 测试输入样例数据集&#xff1a;文本文档test1…

上下左右视频转场模板PR项目工程文件 Vol. 05

pr转场模板&#xff0c;视频画面上下左右转场后带有一点点回弹效果的PR项目工程模板 Vol. 05 项目特点&#xff1a; 回弹效果视频转场&#xff1b; Premiere Pro 2020及以上&#xff1b; 适用于照片和视频转场&#xff1b; 适用于任何FPS和分辨率&#xff1b; 视频教程。 PR转场…

从0开始学Git指令(3)

从0开始学Git指令 因为网上的git文章优劣难评&#xff0c;大部分没有实操展示&#xff0c;所以打算自己从头整理一份完整的git实战教程&#xff0c;希望对大家能够起到帮助&#xff01; 远程仓库 Git是分布式版本控制系统&#xff0c;同一个Git仓库&#xff0c;可以分布到不…

训练官方源码RT-DETR(血泪的教训!严格按照官方流程!)

文章目录 参考链接1 配置环境2 配置数据路径3 配置训练参数4 可能的报错AttributeError: module torchvision has no attribute disable_beta_transforms_warning 参考链接 源码&#xff1a;https://github.com/lyuwenyu/RT-DETR详解RT-DETR网络结构/数据集获取/环境搭建/训练…

22.实战演练--记住密码和登录状态

在登录注册案例的基础上&#xff0c;实现一个相对完整的登录注册模块 (1).记住密码 (2).记住登录状态&#xff08;自动登录&#xff09; (3).注册成功&#xff0c;登录成功&#xff0c;退出登录时的页面跳转

【JavaScript】多种实现文件下载的工具类

【JavaScript】多种实现文件下载的工具类 方法一方法二方法三整体调用代码异常处理 示例以下载txt文件为例&#xff0c;代码已封装上传&#xff0c;可直接下载资源在服务器中使用。如有异常&#xff0c;可查看“异常处理”小节或评论区指出。 方法一 在html中&#xff0c;可以…

java中String的两种创建方法、字符串常量池

java中String的两种创建方法 字符串常量池 字符串常量池 String的两种创建方式: 第一种方式是在常量池中获取字符串对象。第二种方式是直接在堆空间创建一个新的字符串对象。 //先检查字符串常量池中有没有“apesource”,如果字符产常量池中没有&#xff0c;则创建一个&#x…

测绘资质工程测量乙级资质办理条件

新测绘资质分为10个专业&#xff1a; 1.大地测量 2.测绘航空摄影 3.摄影测量与遥感 4.工程测量 5.海洋测绘 6.界线与不动产测绘 7.地理信息系统工程 8.地图编制 9.导航电子地图制作 10.互联网地图服务。 新《测绘资质管理办法》和《测绘资质分类分级标准》&#xff…

【linux】查看Debian应用程序图标对应的可执行命令

在Debian系统中&#xff0c;应用程序图标通常与.desktop文件关联。您可以通过查看.desktop文件来找到对应的可执行命令。这些文件通常位于/usr/share/applications/或~/.local/share/applications/目录下。这里是如何查找的步骤&#xff1a; 1. 打开文件管理器或终端。 2. 导…

Windows下python用ctypes调用C++程序的动态链接库方法(vs2019)

Windows下python用ctypes调用C程序的动态链接库方法&#xff08;vs2019&#xff09; https://blog.csdn.net/qq_34288751/article/details/121939189 https://blog.csdn.net/iambinglan1/article/details/133790822

​HDD回暖于2024,与SSD决战于2028--part1

去年小编曾表达过类似观点&#xff0c;市场留给HDD的时间已经不多了&#xff0c;未来5年的发展决定了HDD的最终命运。 扩展阅读&#xff1a;HDD最后的冲刺&#xff1a;大容量硬盘的奋力一搏 SSD以其高速度和低延迟等优点&#xff0c;尤其在容量增长和每GB成本降低方面&#x…

基于Ubuntu22.04部署生产级K8S集群v1.27(规划和核心组件部署篇)

本文档主要根据k8s官网文档和其插件的官网文档&#xff0c;参考部分他人优秀经验&#xff0c;在实际操作中逐渐完成&#xff0c;比较详尽&#xff0c;适合在境内学习者和实践者参考。 实操环境基于VMware Workstation 17 pro&#xff0c;采用ubuntu22.04操作系统&#xff08;有…