seatunnel数据集成(三)多表同步

seatunnel数据集成(一)简介与安装
seatunnel数据集成(二)数据同步
seatunnel数据集成(三)多表同步
seatunnel数据集成(四)连接器使用


seatunnel除了单表之间的数据同步之外,也支持单表同步到多表,多表同步到单表,以及多表同步到多表,下面简单举例。

1、单表 to 单表

一个source,一个sink

env {
  # You can set flink configuration here
  execution.parallelism = 2
  job.mode = "BATCH"
}
source{
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
        query = "select * from base_region"
    }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform/sql
}

sink {
  jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/dw"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
    query = "insert into base_region(id,region_name) values(?,?)"
  }
}
./bin/seatunnel.sh --config ./config/mysql2mysql_batch.conf

2、单表 to 多表

一个source,多个sink

env {
  execution.parallelism = 2
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
        result_table_name="t_user"
        query = "select * from t_user;"
    }
}

transform {
  Sql {
    source_table_name = "t_user"
    result_table_name = "t_user_nan"
    query = "select id,name,birth,gender from t_user where gender ='男';"
  }
  Sql {
    source_table_name = "t_user"
    result_table_name = "t_user_nv"
    query = "select id,name,birth,gender from t_user where gender ='女';"
  }
}

sink {
  jdbc {
    url = "jdbc:mysql://127.0.0.1:3306/dw"
    driver = "com.mysql.cj.jdbc.Driver"
    connection_check_timeout_sec = 100
    user = "user"
    password = "password"
    source_table_name = "t_user_nan"
    query =  "insert into t_user_nan(id,name,birth,gender) values(?,?,?,?)"
  }
  jdbc {
    url = "jdbc:mysql://127.0.0.1:3306/dw"
    driver = "com.mysql.cj.jdbc.Driver"
    connection_check_timeout_sec = 100
    user = "user"
    password = "password"
    source_table_name = "t_user_nv"
    query =  "insert into t_user_nv(id,name,birth,gender) values(?,?,?,?)"
  }
}
./bin/seatunnel.sh --config ./config/mysql2mysql_1n.conf

3、多表 to 单表

多个source,一个sink

表结构:

-- dw 源表1
CREATE TABLE IF NOT EXISTS ads_device_switch_performance (
  `event_time` timestamp COMMENT '业务时间',
  `device_id` VARCHAR(32) COMMENT '设备id',
  `device_type` VARCHAR(32) COMMENT '设备类型',
  `device_name` VARCHAR(128) COMMENT '设备名称',
  `cpu_usage` INT COMMENT 'CPU使用率百分比'
) ;

INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-15 14:25:11', '2001', '2', '交换器1', 49);
INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-17 22:25:40', '2002', '1', '交换器2', 65);

-- dw 源表2
CREATE TABLE IF NOT EXISTS ads_device_router_performance (
  `event_time` timestamp COMMENT '业务时间',
  `device_id` VARCHAR(32) COMMENT '设备id',
  `device_type` VARCHAR(32) COMMENT '设备类型',
  `device_name` VARCHAR(128) COMMENT '设备名称',
  `cpu_usage` INT COMMENT 'CPU使用率百分比'
);

INSERT INTO `ads_device_router_performance` VALUES ('2024-01-17 21:23:22', '1001', '1', '路由器1', 35);
INSERT INTO `ads_device_router_performance` VALUES ('2024-01-16 17:23:53', '1002', '2', '路由器2', 46);


-------------------------------------------------------------------------------
-- olap 目标表
CREATE TABLE `device_performance` (
  `id` INT NOT NULL AUTO_INCREMENT COMMENT '表主键',
  `event_time` VARCHAR(32) NOT NULL COMMENT '业务时间',
  `device_id` VARCHAR(32) COMMENT '设备id',
  `device_type` VARCHAR(32) COMMENT '设备类型',
  `device_name` VARCHAR(128) NOT NULL COMMENT '设备名称',
  `cpu_usage` FLOAT NOT NULL COMMENT 'CPU利用率单位是%',
  `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`)
) COMMENT='设备状态';
env {
    job.mode="BATCH"
    job.name="device_performance"
}

source {
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        result_table_name="switch_src"
        query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"
    }
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        result_table_name="router_src"
        query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"
    }
}

transform {
  Sql {
    source_table_name = "switch_src"
    result_table_name = "switch_dst"
    query = "SELECT  event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time  FROM switch_src;"
  }
  Sql {
    source_table_name = "router_src"
    result_table_name = "router_dst"
    query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"
  }
}

sink {
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        source_table_name = "switch_dst"
        query="INSERT INTO device_performance  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
      }
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        source_table_name = "router_dst"
        query="INSERT INTO device_performance  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
       }
}

./bin/seatunnel.sh --config ./syn_job/mysql2mysql_n1_batch.conf

4、多表 to 多表

多个source,多个sink

env {
    job.mode="BATCH"
    job.name="device_performance"
}

source {
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        result_table_name="switch_src"
        query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"
    }
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        result_table_name="router_src"
        query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"
    }
}

transform {
  Sql {
    source_table_name = "switch_src"
    result_table_name = "switch_dst"
    query = "SELECT  event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time  FROM switch_src;"
  }
  Sql {
    source_table_name = "router_src"
    result_table_name = "router_dst"
    query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"
  }
}

sink {
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        source_table_name = "switch_dst"
        query="INSERT INTO device_performance_switch  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
      }
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        source_table_name = "router_dst"
        query="INSERT INTO device_performance_router  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
       }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/373463.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Elasticsearch:基本 CRUD 操作 - Python

在我之前的文章 “Elasticsearch:关于在 Python 中使用 Elasticsearch 你需要知道的一切 - 8.x”,我详细讲述了如何建立 Elasticsearch 的客户端连接。我们也详述了如何对数据的写入及一些基本操作。在今天的文章中,我们针对数据的 CRUD (cre…

Dockerfile文件参数配置和使用

天行健,君子以自强不息;地势坤,君子以厚德载物。 每个人都有惰性,但不断学习是好好生活的根本,共勉! 文章均为学习整理笔记,分享记录为主,如有错误请指正,共同学习进步。…

算法学习——LeetCode力扣链表篇2

算法学习——LeetCode力扣链表篇2 24. 两两交换链表中的节点 24. 两两交换链表中的节点 - 力扣(LeetCode) 描述 给你一个链表,两两交换其中相邻的节点,并返回交换后链表的头节点。你必须在不修改节点内部的值的情况下完成本题&…

零售新业态,让老牧区焕发新生命

敦煌老马一声魔性“浇给”勾起了无数人对羊肉的食欲,而当大家集体涌入餐厅或者在网上下单,都想要尝一尝网红同款的时候,可能并没有想过这样一个问题——为什么在今天,即便是远离牧区的现代大城市,草原羊肉却一样能触手…

12. UE5 RPG使用GameplayEffect修改角色属性(三)

书接 11. UE5 RPG使用GameplayEffect修改角色属性(二) 前面,介绍了GameplayEffect的Instant和Duration的使用,这一篇主要介绍一下无限制时间类型的infinite的使用方式。 无限时间限制模式下,如果你的周期时间&#xff…

tee漏洞学习-翻译-2:探索 Qualcomm TrustZone的实现

原文:http://bits-please.blogspot.com/2015/08/exploring-qualcomms-trustzone.html 获取 TrustZone image 从两个不同的位置提取image 从手机设备本身从google factory image 已经root的Nexus 5设备,image存储在eMMC芯片上,并且eMMC芯片…

[软件工具]文档页数统计工具软件pdf统计页数word统计页数ppt统计页数图文打印店快速报价工具

文档页数统计工具软件——打印方面好帮手 在信息化时代,文档已成为我们工作、学习、生活中不可或缺的一部分。无论是学术论文、商业报告,还是个人日记,都需要我们对其进行有效的管理。而在这个过程中,文档页数统计工具软件就显得…

读千脑智能笔记05_千脑智能理论

1. 现有的新皮质理论 1.1. 最普遍的看法是新皮质就像一个流程图 1.2. 特征层次理论 1.2.1. 该理论最大的弊端在于认为视觉是个静止的过程,就像拍一张照片一样,但事实并非如此 1.2.1.1. 眼睛每秒会快速转…

LoRA:语言模型微调的计算资源优化策略

编者按:随着数据量和计算能力的增加,大模型的参数量也在不断增加,同时进行大模型微调的成本也变得越来越高。全参数微调需要大量的计算资源和时间,且在进行切换下游任务时代价高昂。 本文作者介绍了一种新方法 LoRA,可…

docker程序镜像的制作

目录 一、每种资源的预安装(基础) 安装 nginx安装 redis 二、dockerfile文件制作(基础) 打包 redis 镜像 创建镜像制作空间制作dockerfile 打包 nginx 镜像 三、创建组合镜像(方式一) 生成centos容器并…

3.0 Zookeeper linux 服务端集群搭建步骤

本章节将示范三台 zookeeper 服务端集群搭建步骤。 所需准备工作,创建三台虚拟机环境并安装好 java 开发工具包 JDK,可以使用 VM 或者 vagrantvirtualbox 搭建 centos/ubuntu 环境,本案例基于宿主机 windows10 系统同时使用 vagrantvirtualb…

时序预测 | MATLAB实现基于CNN-GRU-AdaBoost卷积门控循环单元结合AdaBoost时间序列预测

时序预测 | MATLAB实现基于CNN-GRU-AdaBoost卷积门控循环单元结合AdaBoost时间序列预测 目录 时序预测 | MATLAB实现基于CNN-GRU-AdaBoost卷积门控循环单元结合AdaBoost时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 1.MATLAB实现基于CNN-GRU-AdaBo…

Docker进阶篇-CIG重量级监控系统

一、简介 通过docker stats命令可以很方便的查看当前宿主机上所有容器的CPU、内存、网络流量等数 据,可以满足一些小型应用。 但是docker stats统计结果只能是当前宿主机的全部容器,数据资料是实时的,没有地方存储、 没有健康指标过线预警…

二叉树的详解

二叉树 【本节目标】 掌握树的基本概念掌握二叉树概念及特性掌握二叉树的基本操作完成二叉树相关的面试题练习 树型结构(了解) 概念 树是一种非线性的数据结构,它是由n(n>0)个有限结点组成一个具有层次关系的集合。…

Redis核心技术与实战【学习笔记】 - 24.Redis 数据分片方案选择:Codis 和 Redis Cluster

简述 Redis 的切片集群使用多个实例保存数据,能很好的应对大数据量的场景。在《4.Redis 切片集群》中,介绍了 Redis 官方提供的切片集群方法 Redis Cluster。本章,再来学习下,在 Redis Cluster 方案正式发布前,业界广…

CodeMeter强化了ETM WinCC 开放架构平台的许可与安全保护

在面对日益复杂的网络安全威胁时,ETM professional control采取了前瞻性的措施,选择了业界领先的威步CodeMeter技术,以保护其标志性的WinCC开放架构平台。这一选择不仅体现了ETM对安全的高度重视,也标志着其在保障关键基础设施运营…

Jmeter 01 -概述线程组

1、Jmeter:概述 1.1 是什么? Jmeter是Apache公司使用Java 开发的一款测试工具 1.2 为什么? 高效、功能强大 模拟一些高并发或多次循环等特殊场景 1.3 怎么用? 下载安装 1、下载jmeter,解压缩2、安装Java环境(jmet…

基于OpenCV灰度图像转GCode的螺旋扫描实现

基于OpenCV灰度图像转GCode的螺旋扫描实现 引言激光雕刻简介OpenCV简介实现步骤 1.导入必要的库2. 读取灰度图像3. 图像预处理4. 生成GCode5. 保存生成的GCode6. 灰度图像螺旋扫描代码示例 总结 系列文章 ⭐深入理解G0和G1指令:C中的实现与激光雕刻应用⭐基于二值…

5-3、S曲线生成器【51单片机+L298N步进电机系列教程】

↑↑↑点击上方【目录】,查看本系列全部文章 摘要:本节介绍步进电机S曲线生成器的计算以及使用 一.计算原理 根据上一节内容,已经计算了一条任意S曲线的函数。在步进电机S曲线加减速的控制中,需要的S曲线如图1所示,横…

React 实现表单组件

表单是html的基础元素,接下来我会用React实现一个表单组件。支持包括输入状态管理,表单验证,错误信息展示,表单提交,动态表单元素等功能。 数据状态 表单元素的输入状态管理,可以基于react state 实现。 …