xxl-job 整合 Seatunnel 实现定时任务

流处理

#!/bin/bash
SEATUNNEL_CMD="$SEATUNNEL_HOME/bin/seatunnel.sh"
SEATUNNEL_HOST=localhost
SEATUNNEL_PORT=5801

# 定义任务停止时执行的清理操作
exit_func() {
    # 在这里放入你希望在任务停止时执行的操作,比如释放资源、记录日志等
	$SEATUNNEL_CMD -can "$JOB_ID"
    exit;
}

# 捕获 SIGINT (Ctrl+C) 和 SIGTERM (手动终止) 信号
trap exit_func SIGINT SIGTERM SIGHUP SIGQUIT SIGKILL

# 将配置内容写入变量
config_content=$(cat <<EOL
env {
    "job.mode"=STREAMING
    "job.name"="SeaTunnel_Job"
    "savemode.execute.location"=CLUSTER
}
source {
    MySQL-CDC {
        "snapshot.split.size"="8096"
        "snapshot.fetch.size"="1024"
        "incremental.parallelism"="1"
        "connect.timeout.ms"="30000"
        "connect.max-retries"="3"
        "connection.pool.size"="20"
        "chunk-key.even-distribution.factor.lower-bound"="0.05"
        "chunk-key.even-distribution.factor.upper-bound"="100.0"
        "sample-sharding.threshold"="1000"
        "inverse-sampling.rate"="1000"
        "startup.mode"=INITIAL
        "exactly_once"="false"
        "stop.mode"=NEVER
        parallelism="1"
        "result_table_name"=Table15381274549824
        catalog {
            factory=Mysql
        }
        database-names=[
            "test_source"
        ]
        table-names=[
            "test_source.user"
        ]
        format=DEFAULT
        password="123456"
        username=root
        base-url="jdbc:mysql://127.0.0.1:3306/test_cdc"
        server-time-zone=UTC
    }
}
transform {
}
sink {
    Jdbc {
        "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
        "data_save_mode"="APPEND_DATA"
        "create_index"="true"
        "connection_check_timeout_sec"="30"
        "batch_size"="1000"
        "is_exactly_once"="false"
        "max_commit_attempts"="3"
        "transaction_timeout_sec"="-1"
        "max_retries"="0"
        "auto_commit"="true"
        "support_upsert_by_query_primary_key_exist"="false"
        "multi_table_sink_replica"="1"
        "source_table_name"=Table15381274549824
        "generate_sink_sql"=true
        database="test_jdbc"
        table=user
        driver="com.mysql.cj.jdbc.Driver"
        url="jdbc:mysql://127.0.0.1:3306/test_jdbc"
        password="123456"
        user=root
    }
}
EOL
)

echo "开始执行任务"
echo "--------    配置信息    --------------"
echo "$config_content"
echo "--------    end    --------------"

# 将配置内容写入标准输入并传递给 SeaTunnel
SUBJOB_OUTPUT=$(echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin --async 2>&1)
JOB_ID=$(echo "$SUBJOB_OUTPUT" | grep "job name" | awk -F'job id: ' '{print $2}' | awk -F',' '{print $1}')

echo "任务Id: $JOB_ID"

# 监控任务状态
while true; do
    STATUS_OUTPUT=$(curl -s http://$SEATUNNEL_HOST:$SEATUNNEL_PORT/hazelcast/rest/maps/job-info/$JOB_ID)
    echo $(date "+%Y-%m-%d %H:%M:%S.%3N") "写入数量 : "$(echo "$STATUS_OUTPUT" | awk -F'"SinkWriteCount":"' '{print $2}' | awk -F '","' '{print $1}')", 读取数量 :"$(echo "$STATUS_OUTPUT" | awk -F'"SourceReceivedCount":"' '{print $2}' | awk -F '","' '{print $1}')
    
	TASK_STATE=$(echo "$STATUS_OUTPUT" | awk -F'"jobStatus":"' '{print $2}' | awk -F '","' '{print $1}')

    if [[ "$TASK_STATE" == "FINISHED" ]]; then
        echo "任务完成, 状态: $TASK_STATE"
        exit 0
    fi
    
    if [[ "$TASK_STATE" != "RUNNING" ]]; then
        echo "任务已结束,状态:$TASK_STATE"
        exit 1
    else
        echo "任务运行中 ... 状态: $TASK_STATE"
        sleep 300
    fi
done



批处理

#!/bin/bash

SEATUNNEL_CMD="$SEATUNNEL_HOME/bin/seatunnel.sh"

# 定义任务停止时执行的清理操作
exit_func() {
    # 在这里放入你希望在任务停止时执行的操作,比如释放资源、记录日志等
	$SEATUNNEL_CMD -can "$JOB_ID"
    exit;
}

# 捕获 SIGINT (Ctrl+C) 和 SIGTERM (手动终止) 信号
trap exit_func SIGINT SIGTERM SIGHUP SIGQUIT SIGKILL


# 将配置内容写入变量
config_content=$(cat <<EOL
env {
  # You can set SeaTunnel environment configuration here
  parallelism = 2
  job.mode = "BATCH"
  checkpoint.interval = 10000
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  FakeSource {
    parallelism = 2
    result_table_name = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }

  # If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
  # please go to https://seatunnel.apache.org/docs/connector-v2/source
}

sink {
  Console {
  }

  # If you would like to get more information about how to configure SeaTunnel and see full list of sink plugins,
  # please go to https://seatunnel.apache.org/docs/connector-v2/sink
}
EOL
)

echo "开始执行任务"
# 将配置内容写入标准输入并传递给 SeaTunnel
SUBJOB_OUTPUT=$(echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin --async 2>&1)
JOB_ID=$(echo "$SUBJOB_OUTPUT" | grep "job name" | awk -F'job id: ' '{print $2}' | awk -F',' '{print $1}')

echo "任务Id: $JOB_ID"

# 监控任务状态
while true; do
    # 查询任务状态
    STATUS_OUTPUT=$($SEATUNNEL_CMD -j "$JOB_ID" 2>&1)
    TASK_STATE=$(echo "$STATUS_OUTPUT" | grep "$JOB_ID" | awk -F'"jobStatus":"' '{print $2}' | awk -F '","' '{print $1}')

    if [[ "$TASK_STATE" == "FINISHED" ]]; then
        echo "任务完成, 状态: $TASK_STATE"
        exit 0
    fi
    # 检查任务是否已完成
    if [[ "$TASK_STATE" != "RUNNING" ]]; then
        echo "任务已结束,状态:$TASK_STATE"
        exit 1
    else
        echo "任务运行中 ... 状态: $TASK_STATE"
        # 等待 5 秒后再次查询
        sleep 5
    fi
done

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/938677.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

期权懂|期权新手入门知识:个股期权标的资产的作用

锦鲤三三每日分享期权知识&#xff0c;帮助期权新手及时有效地掌握即市趋势与新资讯&#xff01; 期权新手入门知识&#xff1a;个股期权标的资产的作用 个股期权标的资产的作用主要体现在以下几个方面‌&#xff1a; &#xff08;1&#xff09;基本面影响‌&#xff1a; 标的资…

Unity超优质动态天气插件(含一年四季各种天气变化,可用于单机局域网VR)

效果展示&#xff1a;https://www.bilibili.com/video/BV1CkkcYHENf/?spm_id_from333.1387.homepage.video_card.click 在你的项目中设置enviro真的很容易&#xff01;导入包裹并按照以下步骤操作开始的步骤&#xff01; 1. 拖拽“EnviroSky”预制件&#xff08;“environme…

【算法】【优选算法】链表

目录 一、链表常用技巧与操作总结二、2.两数相加三、24.两两交换链表中的节点3.1 迭代3.2 递归 四、143.重排链表五、23.合并K个升序链表5.1 堆5.2 分治5.3 暴力枚举 六、25.K个⼀组翻转链表 一、链表常用技巧与操作总结 技巧&#xff1a; 画图解题。使用虚拟头结点。像有插入…

【面试】Redis 常见面试题

一、介绍一下什么是 Redis&#xff0c;有什么特点? Redis 是一个高性能的 key-value 内存数据库。 不同于传统的 MySQL 这样的关系型数据库&#xff0c;Redis 主要使用内存存储数据&#xff08;当然也支持持久化存储到硬盘上&#xff09;&#xff0c;并非是使用 “表” 这样…

【Linux】NET9运行时移植到低版本GLIBC的Linux纯内核板卡上

背景介绍 自制了一块Linux板卡(基于全志T113i) 厂家给的SDK和根文件系统能够提供的GLIBC的版本比较低 V2.25/GCC 7.3.1 这个版本是无法运行dotnet以及dotnet生成的AOT应用的 我用另一块同Cortex-A7的板子运行dotnet的报错 版本不够&#xff0c;运行不了 而我的板子是根本就识…

MySQL Explain 分析SQL语句性能

一、EXPLAIN简介 使用EXPLAIN关键字可以模拟优化器执行SQL查询语句&#xff0c;从而知道MySQL是如何处理你的SQL语句的。分析你的查询语句或是表结构的性能瓶颈。 &#xff08;1&#xff09; 通过EXPLAIN&#xff0c;我们可以分析出以下结果&#xff1a; 表的读取顺序数据读取…

vue3实现商城系统详情页(前端实现)

目录 写在前面 预览 实现 图片部分 详情部分 代码 源码地址 总结 写在前面 笔者不是上一个月毕业了么&#xff1f;找工作没找到&#xff0c;准备在家躺平两个月。正好整理一下当时的毕业设计&#xff0c;是一个商城系统。还是写篇文章记录下吧 预览 商品图片切换显示…

uniapp 微信小程序 功能入口

单行单独展示 效果图 html <view class"shopchoose flex jsb ac" click"routerTo(要跳转的页面)"><view class"flex ac"><image src"/static/dyd.png" mode"aspectFit" class"shopchooseimg"&g…

6.1 初探MapReduce

MapReduce是一种分布式计算框架&#xff0c;用于处理大规模数据集。其核心思想是“分而治之”&#xff0c;通过Map阶段将任务分解为多个简单任务并行处理&#xff0c;然后在Reduce阶段汇总结果。MapReduce编程模型包括Map和Reduce两个阶段&#xff0c;数据来源和结果存储通常在…

聚观早报 | 百度回应进军短剧;iPad Air将升级OLED

聚观早报每日整理最值得关注的行业重点事件&#xff0c;帮助大家及时了解最新行业动态&#xff0c;每日读报&#xff0c;就读聚观365资讯简报。 整理丨Cutie 12月18日消息 百度回应进军短剧 iPad Air将升级OLED 三星Galax S25 Ultra配色细节 一加Ace 5系列存储规格 小米…

CH582F BLE5.3 蓝牙核心板开发板 60MHz RAM:32KB ROM:448KB

CH582F BLE5.3 蓝牙核心板开发板 60MHz RAM:32KB ROM:448KB 是一款基于南京沁恒&#xff08;WCH&#xff09;推出的高性能、低功耗无线通信芯片CH582F的开发板。以下是该开发板的功能和参数详细介绍&#xff1a; 主要特性 双模蓝牙支持&#xff1a; 支持蓝牙5.0标准&#xff0…

【软件工程复习】

第1章 软件工程概述 1.2软件工程 ​ 1983年IEEE给出的定义&#xff1a;“软件工程是 开发、运行、维护和修复软件的系统方法 ” 1.4软件生存期 软件开发和运行维护由三个时期组成&#xff1a; 软件定义时期软件开发时期运行维护时期 里程碑指可以用来标识项目进程状态的事…

DuckDB: 从MySql导出数据至Parquet文件

在这篇文章中&#xff0c;介绍使用DuckDB将数据从MySQL数据库无缝传输到Parquet文件的过程。该方法比传统的基于pandas方法更高效、方便&#xff0c;我们可以从DuckDB cli实现&#xff0c;也可以结合Python编程方式实现&#xff0c;两者执行核心SQL及过程都一样。 Parquet格式…

safe area helper插件

概述 显示不同机型的必能显示的区域 实现步骤 引入safearea&#xff0c;引入其中的safearea的csharp 为cancas加入gameobject gameobject中加入safearea脚本 将UI作为这个gameobject的子物体&#xff0c;就可以完成显示

数据结构 ——二叉树转广义表

数据结构 ——二叉树转广义表 1、树转广义表 如下一棵树&#xff0c;转换为广义表 root(c(a()(b()()))(e(d()())(f()(j(h()())())))) (根&#xff08;左子树&#xff09;&#xff08;右子树&#xff09;) 代码实现 #include<stdio.h> #include<stdlib.h>//保存…

实现echart大屏动画效果及全屏布局错乱解决方式

如何实现echarts动画效果?如何实现表格或多个垂直布局的柱状图自动滚动效果?如何解决tooltip位置超出屏幕问题,如何解决legend文字过长,布局错乱问题?如何处理饼图的中心图片永远居中? 本文将主要解决以上问题,如有错漏,请指正. 一、大屏动画效果 这里的动画效果主要指&…

【YashanDB知识库】如何处理yasql输入交互模式下单行字符总量超过限制4000字节

现象 在yasql执行sql语句后报错&#xff1a;YASQL-00021 input line overflow (>4000 byte at line 4) 原因 yasql在交互模式模式下单行字符总量限制4000字节&#xff0c;超出该限制即报错。 交互式模式下&#xff0c;yasql会显示一个提示符&#xff0c;通常是 SQL>…

DALL·E 2(内含扩散模型介绍)-生成式模型【学习笔记】

视频链接&#xff1a;DALLE 2&#xff08;内含扩散模型介绍&#xff09;【论文精读】_哔哩哔哩_bilibili&#xff08;up主讲的非常好&#xff0c;通俗易懂&#xff0c;值得推荐&#xff09; 目录 1、GAN模型 2、VAE模型 2.1、AE&#xff08;Auto-Encoder&#xff09; 2.2、…

FPGA 16 ,Verilog中的位宽:深入理解与应用

目录 前言 一. 位宽的基本概念 二. 位宽的定义方法 1. 使用向量变量定义位宽 ① 向量类型及位宽指定 ② 位宽范围及位索引含义 ③ 存储数据与字节数据 2. 使用常量参数定义位宽 3. 使用宏定义位宽 4. 使用[:][-:]操作符定义位宽 1. 详细解释 : 操作符 -: 操作符 …

使用 Vue3 实现摄像头拍照功能

参考资料:MediaDevices.getUserMedia() - Web API | MDN 重要: navigator.mediaDevices.getUserMedia 需要在安全的上下文中运行。现代浏览器要求摄像头和麦克风的访问必须通过 HTTPS 或 localhost&#xff08;被视为安全的本地环境&#xff09;进行,如果上传服务器地址是http…