Fink CDC数据同步(二)MySQL数据同步

1 开启binlog日志

2 数据准备

use bigdata;
drop table if exists user;

CREATE TABLE `user`(
      `id` INTEGER NOT NULL AUTO_INCREMENT,
      `name` VARCHAR(20) NOT NULL DEFAULT '',
      `birth` VARCHAR(20) NOT NULL DEFAULT '',
      `gender` VARCHAR(10) NOT NULL DEFAULT '',
      PRIMARY KEY(`id`)
);
ALTER TABLE user AUTO_INCREMENT = 1001;

insert into user values(default , '东契奇' , '1995-01-01' , '男');
insert into user values(default , '斯蒂芬' , '1996-12-21' , '男');
insert into user values(default , '里奥梅西' , '1993-05-10' , '男');
insert into user values(default , '凯里欧文' , '1994-08-06' , '男');
insert into user values(default , '张淋艳' , '1997-12-01' , '女');
insert into user values(default , '王珊珊' , '1995-03-01' , '女');
insert into user values(default , '唐佳丽' , '1994-07-01' , '女');
insert into user values(default , '杨力维' , '1995-10-20' , '女');

select * from user;

3 jar包依赖

在flink/lib目录下添加依赖:

flink-sql-connector-mysql-cdc-2.3.0.jar

下载地址:

Central Repository: com/ververica/flink-sql-connector-mysql-cdc

4 启动sql-client

# 启动服务
/opt/flink/flink-1.16.2/bin/start-cluster.sh 
# 启动sql-client
/opt/flink/flink-1.16.2/bin/sql-client.sh

设置模式

set sql-client.execution.result-mode = tableau;

设置checkpont

set execution.checkpointing.interval=30sec;

建mysql的映射表

CREATE TABLE if not exists mysql_user (
     id     STRING,
     name   STRING,
     birth  STRING,
     gender    STRING,
     PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector'= 'mysql-cdc',
    'hostname'= '192.168.0.1',
    'port'= '3306',
    'username'= 'user',
    'password'='password',
    'server-time-zone'= 'Asia/Shanghai',
    'debezium.snapshot.mode'='initial',
    'database-name'= 'bigdata1',
    'table-name'= 'user'
); 

执行查询语句,会生成一个flink job任务

select * from mysql_user; 

5 常用参数表

参数名

必填

默认值

类型

参数描述

connector

String

指定connector,这里填 mysql-cdc

hostname

String

MySql server 的主机名或者 IP 地址

username

String

连接 MySQL 数据库的用户名

password

String

连接 MySQL 数据库的密码

database-name

String

需要监控的数据库名,支持正则表达式

table-name

String

需要监控的表名,支持正则表达式

port

3306

Integer

MySQL 服务的端口号

server-id

Integer

当开启scan.incremental.snapshot.enabled时,建议指定server-id;server-id 可以是单个值,如5400; 也可以提供数值范围,如5400-5408

scan.incremental.snapshot.enabled

TRUE

Boolean

增量快照是读取表快照的新机制;和旧的快照读相比有以下优点:1. 并行读取 2. 支持checkpoint 3. 不需要锁表;当需要并行读取时,server-id需要设置数值范围,如5400-5408

scan.incremental.snapshot.chunk.size

8096

Integer

表快照的块大小

scan.snapshot.fetch.size

1024

Integer

每次读表接受的最大值

scan.startup.mode

initial

String

MySQL CDC 启动模式,有效值:initial 和 latest-offset

connect.timeout

30s

Duration

connector 连接 MySQL 服务的最长等待超时时间

connect.max-retries

3

Integer

connector 创建 MySQL 连接的重试次数

connection.pool.size

20

Integer

连接池的大小

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/368387.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数学建模-退火算法和遗传算法

退火算法和遗传算法 一.退火算法 退火算法Matlab程序如下: [W]xlsread(D:100个目标经度纬度);>> x[W(:,1)];>> y[W(:,2)];>> w[x y];;d1[70, 40];>> w[d1;w;d1]ww*pi/180;%角度化成弧度dzeros(102);%距离矩阵初始化for i1:101…

STL——空间配置器

空间配置器是STL六大组件之一,它和其他五个组件相互配合,起着很关键的作用。 容器:各种数据结构、如vector、list、stack、deque、queue、set、map、unordered_map等等算法:各种算法,如sort、serach、copy、erase 提供…

【vue】报错 Duplicate keys detected 解决方案

错误描述:Duplicate keys detected. This may cause an update error.错误直译:检测到重复的键。这可能会导致错误。错误原因:有相同父元素的多个子元素的v-for有相同的key值。 解决方法: return:{dataList:[{name:张三&#xf…

坚持刷题|二叉树的前、中、后序遍历(递归迭代)

文章目录 题目思考递归实现迭代实现前序遍历后序遍历中序遍历 在前、中、后序的迭代遍历中,为什么都采用栈来模拟递归,而非队列? Hello,大家好,我是阿月。坚持刷题,老年痴呆追不上我,今天刷&…

excel给数据库初始化/旧数据处理(自动sql拼装)

思路: 首先导出数据到excel编写单条数据操作的sql利用excel CONCATENATE 函数自动生成,每一行数据的操作sql 小技巧:对于需要套娃的字段值,可以加一个临时列同样使用CONCATENATE函数进行sql拼装 案例: 1.临时列:CONCATENATE(C2, …

ROS方向第二次汇报(5)

文章目录 1.本方向内学习内容:1.1.自定义msg:1.1.1.定义msg文件:1.1.2.编辑配置文件: 1.2.自定义srv:1.2.1.定义srv文件:1.2.2.编辑配置文件: 1.3.服务通信案例实现:1.3.1.服务端实现…

回溯法:回溯法通用模版汇总以及模版应用

从一个问题开始 给定两个整数 n 和 k,返回 1 ... n 中所有可能的 k 个数的组合。 示例: 输入: n 4, k 2 输出: [ [2,4], [3,4], [2,3], [1,2], [1,3], [1,4] ] 很容易想到 用两个for循环就可以解决。 如果n为100,k为50呢,那就50层for循…

修改Vim编辑器的缩进和显示行数

一、Vim编辑器的缩进和显示行数 1.指令 sudo vi /etc/vim/vimrc2.插入内容 set tabstop4 set shiftwidth4 set nu 注意输入的格式,前后不要留空格 tabstop是输入按下tab缩进4个 shiftwidth是批量缩进4个 nu是显示行数

【blender烘焙】法线烘焙出现大面积结构丢失怎么办?blender烘焙vs八猴烘焙

用dcc烘焙法线是很常用的减面优化手段,很多建模的dcc自己也内置的烘焙的功能,像我自己在工作流中也偶尔用blender的烘焙做一下材质的整合优化,在质量要求不高的时候还算凑合可用。 问题描述 在前期的文章中飞燕2号建模,我就遇到…

Vue3+vite搭建基础架构(5)--- 使用vue-i18n

Vue3vite搭建基础架构(5)--- 使用vue-i18n 说明官方文档安装vue-i18n使用vue-i18n测试vue-i18n的国际化配置 说明 这里记录下自己在Vue3vite的项目使用vue-i18n做国际化语言的过程,不使用ts语法,方便以后直接使用。这里承接自己的…

Paper - 转角密度估计器 RDE (Rotamer Density Estimator) 算法

欢迎关注我的CSDN:https://spike.blog.csdn.net/ 本文地址:https://spike.blog.csdn.net/article/details/136002649 Paper: Rotamer density estimator is an unsupervised learner of the effect of mutations on protein-protein interaction 转角密…

公交最短距离-算法

题目 给定一个一维数组,其中每一个元素表示相邻公交站之间的距离,比如有四个公交站A,B,C,D,对应的距离数组为,1,2,3,4,如下图示 给定目标站X和Y,求他们之间最短的距离 解题 遍历一次整个数组,…

Docker搭建MySQL8主从复制

之前文章我们了解了面试官:说一说Binlog是怎么实现的,这里我们用Docker搭建主从复制环境。 docker安装主从MySQL 这里我们使用MySQL8.0.32版本: 主库配置 master.cnf //基础配置 [client] port3306 socket/var/run/mysqld/mysql.sock [m…

三分钟学懂C语言关键字——const

1,const修饰普通变量 const类型变量名常量; //类型:int char short 等等 类型const变量名常量; //举例:const int a5; int const a5;这两种写法表示a的值不能够改变 当我们直接改变const修饰的普通变量时,编译器会报…

Map和Set的封装

目录 一、底层原理 二、红黑树的节点 三、仿函数 四、迭代器 4.1、迭代器的定义: 4.2、*:解引用操作 4.3、->:成员访问操作符 4.4、!、 4.5、迭代器的: 4.6、迭代器的-- 五、Map 六、Set 七、红黑树源码 一、底层原理 我们要知道&#…

Docker 安装篇(CentOS)

Docker社区版 Docker从1.13版本之后采用时间线的方式作为版本号,分为社区版CE和企业版EE。 社区版是免费提供给个人开发者和小型团体使用的,企业版会提供额外的收费服务,比如经过官方测试认证过的基础设施、容器、插件等。 1、Docker 要求 C…

【Redis】整理

对于现代大型系统而言,缓存是一个绕不开的技术话题,一提到缓存我们很容易想到Redis。 Redis整理,供回顾参考

JVM系列——垃圾收集器Parrlel Scavenge、CMS、G1常用参数和使用场景

背景 当前在Java领域,JDK 8版本仍然享有广泛的使用,它支持了Parallel Scavenge、CMS和G1这几种垃圾收集器。因此,为了在业务应用中更加高效地进行开发和性能调优,我们需要对这些垃圾收集器的工作原理和特性有一个全面的理解和认识…

【Linux】vim的简单使用

我们知道在Windows下的VS2019是一个集成开发环境,也就是说,集编辑,编译,调试等功能都放在了一起;但是在Linux下,这些步骤都是分开的,我们这篇博客就来说一说vim这个编辑器,它只有编辑…

Android平台如何实现RTSP转GB28181

为什么要做GB28181设备接入侧? 实际上,在做Android平台GB28181设备接入模块的时候,我们已经有了非常好的技术积累,比如RTMP推送、轻量级RTSP服务、一对一互动模块、业内几乎最好的RTMP|RTSP低延迟播放器。 Android平台GB28181接…