JDBC SQL Server Source Connector: 一览与实践

file

在快速发展的数据驱动业务环境中,确保数据在各个系统间高效、准确地同步至关重要。为了进一步的数据处理和分析,经常需要将这些数据同步到其他数据处理系统。Apache SeaTunnel 提供了一个强大而灵活的数据集成框架,使得从 SQL Server 到其他系统的数据同步变得简单且高效。

本文档将指导您如何配置 Apache SeaTunnel,使用 JDBC SQL Server Source Connector 来实现数据的有效同步。

SQL Server

JDBC SQL Server Source Connector

支持 SQL Server 版本

  • 服务器:2008(或更高版本,仅供信息参考)

支持以下引擎

Spark
Flink
Seatunnel Zeta

主要特点

  • 批处理
  • 流处理
  • 精准一次性
  • 列投影
  • 并行处理
  • 支持用户定义拆分

支持查询 SQL 并能够实现投影效果。

描述

通过 JDBC 读取外部数据源数据。

支持的数据源信息

数据源支持的版本驱动URLMaven
SQL Server支持版本 >= 2008com.microsoft.sqlserver.jdbc.SQLServerDriverjdbc:sqlserver://localhost:1433下载

数据库依赖

请下载与 'Maven' 对应的支持列表,并将其复制到 '$SEATNUNNEL_HOME/plugins/jdbc/lib/' 工作目录
例如 SQL Server 数据源:cp mssql-jdbc-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/

数据类型映射

SQL Server 数据类型Seatunnel 数据类型
BITBOOLEAN
TINYINT
SMALLINT
SHORT
INTEGERINT
BIGINTLONG
DECIMAL
NUMERIC
MONEY
SMALLMONEY
DECIMAL((指定列的指定列大小)+1,
(获取指定列的小数点右边的数字的数量。)))
REALFLOAT
FLOATDOUBLE
CHAR
NCHAR
VARCHAR
NTEXT
NVARCHAR
TEXT
STRING
DATELOCAL_DATE
TIMELOCAL_TIME
DATETIME
DATETIME2
SMALLDATETIME
DATETIMEOFFSET
LOCAL_DATE_TIME
TIMESTAMP
BINARY
VARBINARY
IMAGE
UNKNOWN
尚不支持

源选项

名称类型必需默认值描述
url字符串-JDBC 连接的 URL。例如:jdbc:sqlserver://127.0.0.1:1434;database=TestDB
driver字符串-用于连接到远程数据源的 JDBC 类名,如果使用 SQL Server,则值为 com.microsoft.sqlserver.jdbc.SQLServerDriver
user字符串-连接实例的用户名
password字符串-连接实例的密码
query字符串-查询语句
connection_check_timeout_sec整数30等待用于验证连接的数据库操作完成的秒数
partition_column字符串-并行处理的分区列,仅支持数值类型。
partition_lower_bound长整数-用于扫描的 partition_column 最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。
partition_upper_bound长整数-用于扫描的 partition_column 最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。
partition_num整数作业并行度分区计数的数量,仅支持正整数。默认值为作业并行度。
fetch_size整数0对返回大量对象的查询,您可以配置查询中使用的行抓取大小,以减少满足选择条件所需的数据库命中次数,从而提高性能。
零表示使用 JDBC 默认值。
common-options-源插件的常见参数,请参阅 源常用选项 以获取详细信息。

提示

如果未设置 partition_column,则将以单一并发运行;如果设置了 partition_column,则将根据任务的并发度进行并行执行。

任务示例

简单:

简单的单一任务以读取数据表

# 定义运行时环境

env {

# 您可以在此处设置 Flink 配置

execution.parallelism = 1
job.mode = "BATCH"
}
source{
Jdbc {
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
user = SA
password = "Y.sa123456"
query = "select \* from full_types_jdbc"
}
}

transform { # 如果您想要获取有关如何配置 seatunnel 和查看变换插件的完整列表的更多信息, # 请转到 [seatunnel.apache.org/docs/transform-v2/sql](https://seatunnel.apache.org/docs/transform-v2/sql)
}

sink {
Console {}
}

并行:

使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

env {
  # 您可以在此处设置 Flink 配置
  execution.parallelism = 10
  job.mode = "BATCH"
}

source {
    Jdbc {
        driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
        url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
        user = SA
        password = "Y.sa123456"
        # 根据需要定义查询逻辑
        query = "select * from full_types_jdbc"
        # 并行分片读取字段
        partition_column = "id"
        # 片段数量
        partition_num = 10
    }
}

transform {
    # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
    # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    Console {}
}

并行:

使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

env {
  # 您可以在此处设置 Flink 配置
  execution.parallelism = 10
  job.mode = "BATCH"
}

source {
    Jdbc {
        driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
        url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
        user = SA
        password = "Y.sa123456"
        # 根据需要定义查询逻辑
        query = "select * from full_types_jdbc"
        # 并行分片读取字段
        partition_column = "id"
        # 片段数量
        partition_num = 10
    }
}

transform {
    # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
    # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    Console {}
}

并行:

使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

env {
  # 您可以在此处设置 Flink 配置
  execution.parallelism = 10
  job.mode = "BATCH"
}

source {
    Jdbc {
        driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
        url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
        user = SA
        password = "Y.sa123456"
        # 根据需要定义查询逻辑
        query = "select * from full_types_jdbc"
        # 并行分片读取字段
        partition_column = "id"
        # 片段数量
        partition_num = 10
    }
}

transform {
    # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
    # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    Console {}
}

并行:

使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

env {
  # 您可以在此处设置 Flink 配置
  execution.parallelism = 10
  job.mode = "BATCH"
}

source {
    Jdbc {
        driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
        url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
        user = SA
        password = "Y.sa123456"
        # 根据需要定义查询逻辑
        query = "select * from full_types_jdbc"
        # 并行分片读取字段
        partition_column = "id"
        # 片段数量
        partition_num = 10
    }
}

transform {
    # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
    # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    Console {}
}

分段并行读取示例:

这是一个快速并行读取数据的分片示例

env {
  # 您可以在此处设置引擎配置
  execution.parallelism = 10
}

source {
  # 这是一个示例源插件,仅用于测试和展示源插件的功能
  Jdbc {
    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    user = SA
    password = "Y.sa123456"
    query = "select * from column_type_test.dbo.full_types_jdbc"
    # 并行分片读取字段
    partition_column = "id"
    # 片段数量
    partition_num = 10
  }
  # 如果您想要获取有关如何配置 Seatunnel 和查看源插件的完整列表的更多信息,
  # 请转到 https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
}

transform {
  # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
  # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
  Console {}
  # 如果您想要获取有关如何配置 Seatunnel 和查看接收插件的完整列表的更多信息,
  # 请转到 https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
}

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/120787.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

「随笔」浅谈2023年云计算的发展趋势

在2023年,云计算的发展趋势将受到政治、经济、社会和科技四个维度的影响。以下是对这些维度的具体分析: 1.1 政治维度: 全球政策推动: 随着全球各国政策对云计算的重视程度不断提高,云计算服务将获得更广泛的市场准入…

MES管理系统中常规的生产建模有哪些

随着制造业的快速发展,MES生产管理系统已经成为了现代制造业不可或缺的核心系统。MES通过对生产过程进行建模,实现了生产过程的可视化、可控制和可优化,为企业提供了全方位的生产管理解决方案。本文将深化对MES管理系统及其主要生产模型的理解…

『MySQL快速上手』-⑥-表的约束

文章目录 1.空属性2.默认值3.列描述4.zerofill5.主键6.自增长7.唯一键8.外键9.综合案例 真正约束字段的是数据类型,但是数据类型约束很单一,需要有一些额外的约束,更好的保证数据的合法性,从业务逻辑角度保证数据的正确性。 1.空…

第一个ARM程序裸板点灯

硬件知识LED原理图 如何点亮一个LED灯? 看原理图,确定控制LED的引脚。看主芯片的芯片手册,确定如何设置控制这个引脚。写程序。 LED有插脚封装的、贴片封装的。 它们长得完全不一样,因此我们在原理图中把它们抽象出来。 点亮…

[SSD综述 1.5] SSD 主控和固件核心功能详解(万字)

依公知及经验整理,原创保护,禁止转载。 1. 主控概述1.1 主控作用 2. 主控的硬件功能和实现2.1 主控处理器2.2 闪存、主机接口2.3 主控纠错2.4 断电保护 3 固件功能3.1 FTL3.2 预留空间(Over-provisioning)3.3 Trim3.4 写入放大(Write amplification)3.5 …

Django初窥门径-自定义附件存储模型

前言 Django自带了一个名为FileField的字段,用于处理文件上传。然而,有时我们需要更多的控制权,例如定义文件的存储路径、文件名以及文件类型。在本篇文章中,我们将探讨如何自定义Django附件存储模型。 创建attachment应用 pyt…

【计算机网络】物理层知识

目录 1、物理层的基本概念 2、数据通信的基础知识 2.1、数据通信系统模型 2.2、信道的几个基本概念 3、物理层下面的传输媒体 4、信道复用技术 1、物理层的基本概念 物理层考虑的是怎样才能在连接各种计算机的传输媒体上传输数据比特流,而不是指具体的 传输媒…

数据结构:Map和Set(2):相关OJ题目

目录 136. 只出现一次的数字 - 力扣(LeetCode) 771. 宝石与石头 - 力扣(LeetCode) 旧键盘 (20)__牛客网 (nowcoder.com) 138. 随机链表的复制 - 力扣(LeetCode) 692. 前K个高频单词 - 力扣&#xff08…

CDN加速技术:降低企业云服务成本的有效利用

在当今数字化时代,云服务已经成为企业运营的不可或缺的一部分。然而,与此同时,云服务的需求也在不断增长,使企业不得不应对更大的数据传输和负载。这就引出了一个关键问题:如何有效降低企业云服务成本,同时…

【管理工具】CMAK安装和使用(kafka-manager)

文章目录 前言一、安装和启动1.1 安装CMAK1.2 启动cmak 二、使用CMAK2.1 添加kafka集群2.2 topic 概述2.3 broker概述2.4 其他操作 前言 一、安装和启动 1.1 安装CMAK Java环境:需要jdk11 $ java -version java version "11.0.5" 2019-10-15 LTS Java…

Maven3.9.1安装及环境变量配置

一、Maven的下载与安装 maven各版本下载地址 打开链接后自行选择对应版本 下载完成后解压安装,最好别选择c盘,安装目录路径等使用英文,避免产生其他问题 我这里选择的是D盘 二、Maven的环境变量配置 2.1、右键点击此电脑选择属性,点击高级系统设置,点…

Python之字符串、正则表达式练习

目录 1、输出随机字符串2、货币的转换(字符串 crr107)3、凯撒加密(book 实验 19)4、字符替换5、检测字母或数字6、纠正字母7、输出英文中所有长度为3个字母的单词 1、输出随机字符串 编写程序,输出由英文字母大小写或…

现一个智能的SQL编辑器

补给资料 管注公众号:码农补给站 前言 目前我司的多个产品中都支持在线编辑 SQL 来生成对应的任务。为了优化用户体验,在使用 MonacoEditor 为编辑器的基础上,我们还支持了如下几个重要功能: 多种 SQL 的语法高亮多种 S…

订水商城实战教程08-轮播图

首页我们已经开发了店铺信息展示以及搜索功能,接着需要展示轮播图的功能。轮播图需要存放在数据源中,点击图片的时候要访问公众号的文章。 1 创建数据源 先创建数据源用来存放轮播图,打开控制台,点击数据模型,点击号…

sqlite3.NotSupportedError: deterministic=True requires SQLite 3.8.3 or higher

问题描述 sqlite3.NotSupportedError: deterministicTrue requires SQLite 3.8.3 or higher 解决方法 A kind of solution is changing the database from sqlite3 to pysqlite3. After acticate the virtualenv, install pysqlite. pip3 install pysqlite3 pip3 install …

网工内推 | 上市公司,云平台运维,IP认证优先,13薪

01 上海新炬网络信息技术股份有限公司 招聘岗位:云平台运维工程师 职责描述: 1、负责云平台运维,包括例行巡检、版本发布、问题及故障处理、平台重保等,保障平台全年稳定运行; 2、参与制定运维标准规范与流程&#x…

Ansible 自动化运维工具 --- playbook 剧本

文章目录 1. Host inventory ---- 主机清单1.1 简介1.2 inventory文件1.3 Inventory 文件的构成1.3.1 主机与组1.3.2 变量 1.4 inventory 中的常用变量 2. Ansible-playbook剧本2.1 简介2.2 Playbook的结构组成2.3 编写playbook的基本格式与写法2.3.1 基本格式2.3.2 语句的横向…

【T690 之十二】基于方寸EVB2开发板(T690芯片)构建基于GMSSL的文件系统的方式

备注: 1,假设您已对方寸微电子的T690系列芯片的使用方式都有了一定的了解,然后需要构建基于GMSSL的文件系统,此文才对您有意义; 2,若您对方寸微电子的T690芯片不了解,但想进一步了解它&#xff…

[C++ 中]:6.类和对象下(static成员 + explicit +友元函数 + 内部类 + 编译器优化)

(static成员 explicit 友元函数 内部类 编译器优化) 一.static 成员:1.概念引入:1-1:定义全局变量记录个数? 2.如果有多个类需要分开去记录类对象的个数?2-1:可不可以声明成员变量解决&#…

LangChain之关于RetrievalQA input_variables 的定义与使用

最近在使用LangChain来做一个LLMs和KBs结合的小Demo玩玩,也就是RAG(Retrieval Augmented Generation)。 这部分的内容其实在LangChain的官网已经给出了流程图。 我这里就直接偷懒了,准备对Webui的项目进行复刻练习,那么…