FlinkCDC基础篇章2-数据源 SqlServerCDC写入到ES中

接着 上期FlinkCDC基础篇章1-安装使用  

  1. 下载 Flink 1.17.0 并将其解压至目录 flink-1.17.0

  2. 下载下面列出的依赖包,并将它们放到目录 flink-1.17.0/lib/ 下:

    下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译

    • flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
    • flink-sql-connector-mysql-cdc-2.4.0.jar
    • flink-sql-connector-postgres-cdc-2.4.0.jar

首先,开启 checkpoint,每隔3秒做一次 checkpoint

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s
-- 创建源表t_source_sqlserver,使用SQL Server Change Data Capture (CDC)连接器从SQL Server数据库读取数据
CREATE TABLE t_source_sqlserver (
    id INT,
    order_date DATE,
    purchaser INT,
    quantity INT,
    product_id INT,
    PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
) WITH (
    'connector' = 'sqlserver-cdc',  -- 使用SQL Server CDC连接器
    'hostname' = '10.194.183.120',  -- SQL Server主机名
    'port' = '30027',               -- SQL Server端口
    'username' = 'sa',              -- SQL Server用户名
    'password' = 'abc@123456',      -- SQL Server密码
    'database-name' = 'cdc_test',   -- 数据库名称
    'schema-name' = 'dbo',          -- 模式名称
    'table-name' = 'orders'         -- 要捕获更改的表名
);

-- 创建目标表table_sink_mysql,使用JDBC连接器将数据写入MySQL数据库
CREATE TABLE table_sink_mysql (
    id INT,
    order_date DATE,
    purchaser INT,
    quantity INT,
    product_id INT,
    PRIMARY KEY (id) NOT ENFORCED  -- 主键定义(可选)
)
WITH (
    'connector' = 'jdbc',                        -- 使用JDBC连接器
    'url' = 'jdbc:mysql://10.194.183.120:30025/test',  -- MySQL的JDBC URL
    'username' = 'root',                        -- MySQL用户名
    'password' = 'root',                        -- MySQL密码
    'table-name' = 'orders'                     -- 要写入的MySQL表名
);

-- 从t_source_sqlserver表中选择数据,并将其插入到table_sink_mysql表中
INSERT INTO table_sink_mysql SELECT * FROM t_source_sqlserver;
CREATE TABLE income_distribution (

serviceCode STRING,

accountPeriod STRING,

subjectCode STRING,

subjectName STRING,

amt DECIMAL(13,2),

PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED

) WITH (

'connector' = 'elasticsearch-7',

'hosts' = 'http://xxxx:9200',

'index' = 'income_distribution',

'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL'

);

可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:

参考文献:

使用 Flink CDC 构建 Streaming ETL | Apache Flink CDC

flink sqlserver cdc实时同步(含sqlserver安装配置等)_flink cdc sqlserver-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/556529.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

视频批量高效剪辑,轻松翻转视频画面,支持将视频画面进行逆时针90度翻转。

在视频编辑的海洋中,你是否曾遇到过需要批量翻转视频画面的情况?传统的视频编辑工具在面对这样的需求时,往往显得力不从心,效率低下。今天,我要为大家介绍一款全新的视频编辑神器,它将彻底改变你的视频编辑…

小试牛刀!

1.从双倍数组中还原原数组&#xff08;力扣&#xff0c;vector&#xff09; java式c解法。 class Solution { public:vector<int> findOriginalArray(vector<int>& changed) {int n changed.size();if(n % 2 1) return {};map<int, int> mp;for(int c…

【最新可用】Claude国内镜像,可上传图片,可用Claude3全系模型,包括Pro版本的Opus),亲测比GPT好用

Claude对话、上传图片的超详细教程来啦&#xff01; 近期&#xff0c;Claude 3 Opus的发布引发了网络上的广泛关注与热议&#xff0c;有观点认为其性能已经凌驾于GPT-4之上。虽然网络上已经出现了大量基于这两款先进AI技术的实际应用案例&#xff0c;但仍有许多人对在国内如何…

游戏生成式 AI:编织梦想,避开阴影

想象一下&#xff0c;一个沉浸式的游戏世界中玩家遇到的每个 NPC 都由 AI 驱动&#xff0c;他们能与玩家进行互动&#xff0c;从改变游戏体验。据 Inword 一项研究显示&#xff0c;绝大多数游戏玩家渴望这种互动&#xff0c;愿意投入更多的时间和金钱来玩这种由 AI 驱动的游戏。…

网络编程套接字(三)之TCP服务器简单实现

目录 一、服务端TcpServer 1、tcp_server.hpp 2、tcp_server.cc 二、客户端TcpClient tcp_client.cc 三、服务器和客户端进行通信 四、完整代码 一、服务端TcpServer 首先我们需要对服务端进行封装。我们需要的成员变量有IP地址&#xff0c;端口号port&#xff0c;以及监…

JMM与内存屏障

一、cpu多核并发缓存架构解析 JMM内存模型&#xff1a;java多线程内存模型跟cpu缓存模型类似&#xff0c;是基于cpu缓存模型来建立的&#xff0c;java线程内存模型是标准化的&#xff0c;屏蔽掉了底层不同计算机的区别 JMM数据原子操作 read(读取)&#xff1a;从主内存读取数据…

作为Boss,还在写代码。what?赶紧改掉这个坏毛病

有些创业中的老板&#xff0c;经常或者偶尔也要写代码&#xff0c;我听了很震惊呀&#xff0c;这叫创业吗&#xff1f;这不是给员工打工吗&#xff1f;其他重要的事情谁来干&#xff0c;这个毛病一定要改。 一、比起写代码&#xff0c;你还有更重要的事情要做 作为BOSS和创业…

【Node.js从基础到高级运用】二十五、Node.js中Cluster的作用

引言 Node.js中的cluster模块允许您轻松创建共享服务器端口的子进程。这是一个核心模块&#xff0c;用于在Node.js应用程序中实现多进程架构&#xff0c;以充分利用多核CPU系统的计算能力。 cluster介绍 当您启动一个Node.js应用程序时&#xff0c;默认情况下它运行在单个进程…

怎么设置启用远程桌面? 如何让外网电脑远程本地内网?

如何远程控制电脑&#xff1f;最简单实用的方案是开启电脑系统自带的远程桌面功能&#xff0c;如果涉及跨网、内外网互通&#xff0c;可以同时用快解析内网映射外网。下面是方案的具体实施步骤&#xff0c;供大家参考。 怎么打开设置启用远程桌面&#xff1f; 1.在目标需要远…

idea 中导入的项目maven不自动下载依赖包

导入之后不会自动引入依赖包&#xff0c;如下图&#xff0c;external libraries 下没有依赖 解决方案&#xff1a;重新更新下maven的Local repository 即可

实测52.4MB/s!全志T3+FPGA的CSI通信案例分享!

CSI总线介绍与优势 CSI&#xff08;CMOS sensor parallel interfaces&#xff09;总线是一种用于连接图像传感器和处理器的并行通信接口&#xff0c;应用于工业自动化、能源电力、智慧医疗等领域&#xff0c;CSI总线接口示意图如下所示&#xff08;以全志科技T3处理器的CSI0为…

Qt实现Mysql数据库的连接,查询,修改,删除,增加功能

Qt实现Mysql数据库的连接&#xff0c;查询&#xff0c;修改&#xff0c;删除&#xff0c;增加功能 安装Mysql数据库&#xff0c;QtCreator Mysql选择Mysql Server 8.1版本安装。 Mysql Server 8.1安装过程 1.首先添加网络服务权限&#xff1a; WinR键输入compmgmt.msc进入…

密码学 | 数字签名 + 数字证书

&#x1f951;原文&#xff1a;数字签名和数字证书的原理解读 - 知乎 &#x1f951;声明&#xff1a;后文图中若未明确指明&#xff0c;默认是 Bob 的公钥或私钥。 Step1&#xff1a;Bob 有两把钥匙&#xff0c;一把是公钥&#xff0c;另一把是私钥。 Step2&#xff1a;Bob 把…

安全狗云眼的主要功能有哪些?

"安全狗云眼"是一款综合性的网络安全产品&#xff0c;主要用于实时监控和保护企业的网络安全。其核心功能包括威胁检测、漏洞扫描、日志管理和合规性检查等。 以下是安全狗云眼的主要功能详细介绍&#xff1a; 1、资产管理 定期获取并记录主机上的Web站点、Web容器、…

达梦数据库——异常崩溃(core)分析处理

CORE文件介绍 core文件是在程序异常崩溃时生成的文件&#xff0c;它包含了程序在崩溃瞬间的内存状态信息&#xff0c;主要是用来调试和分析问题。我们可以使用调试器工具&#xff08;如GDB&#xff09;来分析core文件&#xff0c;以便定位和解决问题。 CORE文件的配置 开启cor…

ActiveMQ主从架构和集群架构的介绍及搭建

目录 一、主从和集群架构的特点 1.1 主从架构的-Master/slave模式特点 1.2 集群架构-Cluster模式特点 二、ActiveMQ的主从架构 2.1 架构图 2.2 特点 2.3 实现方式&#xff08;3种&#xff09; 2.4 实现 &#xff08;基于LevelDB复制&#xff09; 2.4.1 准备环境 2.4.2…

2024化工制造企业数字化白皮书

来源&#xff1a;蓝凌研究院 中国石油和化学工业联合会发布2023年中国石油和化工行业经济运行情况。数据显示&#xff0c;2023年&#xff0c;我国石化行业实现营业收入15.95万亿元&#xff0c; 同比下降1.1%&#xff0c;利润总额8733.6亿元&#xff0c;行业经济运行总体呈现低…

vscode搭建C/C++环境

文章目录 一、安装vscode 二、下载安装g 三、安装VSCode插件 四、配置运行环境 一、安装vscode 直接官网免费下载&#xff1a;下载完成后进行安装&#xff0c;记得更换安装路径Visual Studio Code - Code Editing. RedefinedVisual Studio Code is a code editor redefine…

大数据------额外插件及技术------Git(完整知识点汇总)

Git 定义 它是分布式版本控制工具&#xff0c;主要用于管理开发过程中的源代码文件&#xff08;如&#xff1a;Java类、xml文件、html页面等&#xff09;&#xff0c;在软件开发过程中被广泛应用 作用 代码回溯&#xff1a;快速回到某一代码历史版本版本切换&#xff1a;同一个…

Python文件处理--进阶

Python标准库文件操作相关模块&#xff1a; 1.使用pickle序列化 序列化指的是&#xff1a;将对象转化成“串行化”数据形式&#xff0c;存储到硬盘或通过网络传输到其他地方。反序列化是指相反的过程&#xff0c;将读取到的“串行化数据”转化成对象。我们可以使用pickle模块…