【Flink CDC(一)】实现mysql整表与增量读取

文章目录

  • 一. 运行前准备
    • 1. 依赖
      • 1.1. Maven dependency
      • 1.2. SQL Client JAR(推荐)
    • 2. 配置 MySQL 服务器(必须)
  • 二. 功能说明
    • 1. 启动模式
    • 2. 全量阶段支持 checkpoint
    • 3. 关于无主键表
    • Exactly-Once 处理
  • 三. 实战
    • 1. 实现mysql整表与增量表同步
  • FAQ

MySQL CDC 连接器允许从 MySQL 数据库读取快照数据(比如:flink任务消费时刻的整表数据)和增量数据。本文描述了如何设置 MySQL CDC 连接器来对 MySQL 数据库运行 SQL 查询。

本篇只关注mysql整表与增量读取的实现,对于并发读取等能力后续再探索。

 

一. 运行前准备

1. 依赖

1.1. Maven dependency

<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <!-- 请使用已发布的版本依赖,snapshot版本的依赖需要本地自行编译。 -->
  <version>2.4.0</version>
</dependency>

 

1.2. SQL Client JAR(推荐)

下载 flink-sql-connector-mysql-cdc-2.4.0.jar 到 <FLINK_HOME>/lib/ 目录下。

 

2. 配置 MySQL 服务器(必须)

你必须定义一个 MySQL 用户,该用户对 MySQL CDC 连接器监视的所有数据库都应该具有所需的权限。

# 创建用户
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';

# 赋权
mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

# 刷新权限
mysql> FLUSH PRIVILEGES;

注意:

scan.incremental.snapshot.enabled 参数已启用时(默认情况下已启用)时,不再需要授予 reload 权限。

 

二. 功能说明

1. 启动模式

配置选项scan.startup.mode指定 MySQL CDC 使用者的启动模式。有效枚举包括:

  • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog

  • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取

  • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog
    的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改

  • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。

  • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。



MySQLSource.builder()
    .startupOptions(StartupOptions.earliest()) // 从最早位点启动
    .startupOptions(StartupOptions.latest()) // 从最晚位点启动
    .startupOptions(StartupOptions.specificOffset("mysql-bin.000003", 4L) // 从指定 binlog 文件名和位置启动
    .startupOptions(StartupOptions.specificOffset("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19")) // 从 GTID 集合启动
    .startupOptions(StartupOptions.timestamp(1667232000000L) // 从时间戳启动
    ...
    .build()




CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    'scan.startup.mode' = 'earliest-offset', -- 从最早位点启动
    'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动
    'scan.startup.mode' = 'specific-offset', -- 从特定位点启动
 
    'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特定位点启动模式下指定 binlog 文件名
    'scan.startup.specific-offset.pos' = '4', -- 在特定位点启动模式下指定 binlog 位置
    'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特定位点启动模式下指定 GTID 集合

    'scan.startup.mode' = 'timestamp', -- 从特定位点启动
    'scan.startup.timestamp-millis' = '1667232000000' -- 在时间戳启动模式下指定启动时间戳
    ...
)

 

2. 全量阶段支持 checkpoint

增量快照读取提供了在区块级别执行检查点的能力。它使用新的快照读取机制解决了以前版本中的检查点超时问题。

 

3. 关于无主键表

从2.4.0 版本开始支持无主键表,使用无主键表必须设置 scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的一个字段。

在使用无主键表时,需要注意以下两种情况。

  1. 配置 scan.incremental.snapshot.chunk.key-column 时,如果表中存在索引,请尽量使用索引中的列来加快 select 速度。

  2. 无主键表的处理语义由 scan.incremental.snapshot.chunk.key-column 指定的列的行为决定:

  • 如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。
  • 如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。

 

Exactly-Once 处理

MySQL CDC 连接器是一个 Flink Source 连接器,它将首先读取表快照块,然后继续读取 binlog, 无论是在快照阶段还是读取 binlog 阶段,MySQL CDC 连接器都会在处理时准确读取数据,即使任务出现了故障。

 

三. 实战

1. 实现mysql整表与增量表同步

-- 'scan.startup.mode'= 'initial' 
-- 
CREATE TABLE tjy_sql1  
(  
  `id` int,  
  `name` string,  
  `face` string  
 ,PRIMARY KEY(id) NOT ENFORCED  
) WITH (  
        'connector' = 'mysql-cdc',  
        'hostname' = 'xxx',  
        'port' = '3306',  
        'username' = 'middle_test',  
        'password' = '123456',  
        'database-name' = 'middle_test',  
        'table-name' = 'tjy_fortest1'  
       -- ,'scan.incremental.snapshot.enabled' = 'false'  
       --  initial: 默认值,全表同步,然后进行增量同步;
       --  'scan.startup.mode'= 'initial'  
       -- 'debezium.snapshot.mode' = 'initial'      );  
  
  
 CREATE TABLE tjy_sql1_sink  
 (  
  `id` int,  
  `name` string,  
  `face` string  
  ,PRIMARY KEY(id) NOT ENFORCED  
 ) WITH (  
           'connector' = 'mysql-x',  
           'url' = 'jdbc:mysql://xxx:3306/middle_test?useunicode=true&characterEncoding=utf8&useSSL=false&useCursorFetch=true',  
           'username' = 'middle_test',  
           'password' = '123456',  
           'table-name' = 'flink_type',  
           'table-name' = 'tjy_fortest2'  
       );  
  
  
insert into tjy_sql1_sink select * from tjy_sql1;

 

FAQ

相关问题:https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)

可能涉及到的问题

在这里插入图片描述

 

参考:
官网:https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/mysql-cdc%28ZH%29.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/411907.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

交叉编译qt到arm平台

使用pkg-config命令查看xxx包是否存在&#xff1a; pkg-config --print-errors xxx pkg-config的搜索路径可以通过环境变量PKG_CONFIG_PATH指定。需要在运行./configure 之前指定。 ./configure -release -qt-libjpeg -qt-libpng -qt-zlib -qt-pcre -xplatform linux-aarch64-…

一文读懂 Python 值传递和引用传递

文章目录 版本前言形参和实参值传递和引用传递Python 变量存储值语义和引用语义值语义引用语义 探讨 Python 值传递和引用传递不可变&#xff08;immutable&#xff09;类型可变&#xff08;mutable&#xff09;类型案例一案例二 拓展&#xff1a;不可变类型真的不可变&#xf…

华为---RSTP(三)---P/A机制及RSTP的生成树形成过程

目录 1. P/A机制简介 1.1 P/A机制的作用 1.2 P/A协商的前提条件 1.3 RSTP选举思路 2. P/A协商过程 3. 举例说明RSTP的生成树形成过程 3.1 示例环境要求 3.2 RSTP的生成树形成过程 3.2.1 SW和SW1之间链路上抓包分析 3.2.2 SW和SW2之间链路上抓包分析 3.2.3 SW1和SW2之…

LeetCode_Java_转变日期格式、种花问题(题目+思路+代码)

目录 1507. 转变日期格式 605.种花问题 1507. 转变日期格式 给你一个字符串 date &#xff0c;它的格式为 Day Month Year &#xff0c;其中&#xff1a; Day 是集合 {"1st", "2nd", "3rd", "4th", ..., "30th", "3…

【计算机图形学】Where2Act: From Pixels to Actions for Articulated 3D Objects

文章目录 1.论文做了件什么事儿2. 论文为什么要做这件事3. 介绍Introduction4. 相关工作预测语义表达推理几何和物体属性从被动观察中学习Affordance从交互中学习感知 5. 问题陈述6. 方法6.1 网络模块主干特征提取器可运动性评分模块运动建议模块运动评分模块 6.2 训练数据收集…

路由器端口映射如何配置?

在网络通信中&#xff0c;路由器是一个重要的设备&#xff0c;它负责将数据包从一个网络传输到另一个网络。路由器的端口映射配置是一种重要的设置&#xff0c;可以使外部网络中的计算机通过访问路由器上的特定端口与内部网络中的计算机进行通信。本文将介绍什么是路由器端口映…

k8s pv与pvc理解与实践

参考文章&#xff1a; https://blog.csdn.net/qq_41337034/article/details/117220475 一、 pv/pvc简述 Pv是指PersistentVolume&#xff0c;中文含义是持久化存储卷是对底层的共享存储的一种抽象&#xff0c;Pv由管理员进行配置和创建&#xff0c;只要包含存储能力&#xff…

【DL】深度学习之语音识别

目录 1 核心概念 2 安装依赖库 3 实践 语音信号处理&#xff08;Speech Signal Processing&#xff09;简称语音处理。 语音识别&#xff08;ASR&#xff09;和自然语言处理&#xff08;NLP&#xff09;&#xff1a;语音识别就是将语音信号转化成文字文本&#xff0c;简单实…

Redis 服务集群、哨兵、缓存及持久化的实现原理和应用场景

Redis 是一种高性能的键值存储系统&#xff0c;已经成为了许多企业和互联网公司的核心技术之一。本文将介绍 Redis 的服务集群、哨兵以及缓存实现原理和应用场景&#xff0c;以帮助读者更好地理解和使用 Redis。 引言&#xff1a; 随着互联网应用规模不断扩大&#xff0c;Redi…

Laravel04 eloquent

eloquent 1. eloquent2. 创建eloquent model 以及 取数据 1. eloquent 文档地址&#xff1a; https://learnku.com/docs/laravel/8.x/eloquent/9406 下面是我们&#xff0c;通过laravel的DB类从数据库中获取了post记录&#xff0c;那么有没有可能我们直接获取一个post对象&am…

[算法沉淀记录]排序算法 —— 快速排序

排序算法 —— 快速排序介绍 基本概念 快速排序&#xff08;Quicksort&#xff09;是一种排序算法&#xff0c;最早由东尼霍尔提出。在平均状况下&#xff0c;排序 n 个项目要 Ο(n log n) 次比较。在最坏状况下则需要 Ο(n2) 次比较&#xff0c;但这种状况并不常见。事实上&…

Arduino单片机基础介绍

&#xff08;本文为简单介绍&#xff0c;内容源于网络和AI&#xff09; Arduino单片机&#xff0c;自2005年诞生以来&#xff0c;已经成为全球爱好者和专业工程师们快速实现创意原型的重要工具。Arduino的普及不仅因其强大的功能和简易的操作&#xff0c;还在于其背后强大的社…

【数据结构】队列OJ题《用队列实现栈》(题库+解析+代码)

1.前言 通过前面队列的实现和详解大家对队列应该有一定熟悉了&#xff0c;现在上强度开始做题吧 队列详解&#xff1a;http://t.csdnimg.cn/dvTsW 2.OJ题目训练225. 用队列实现栈 题目分析 请你仅使用两个队列实现一个后入先出&#xff08;LIFO&#xff09;的栈&#xff0…

亿道丨三防平板丨手持平板丨加固平板丨助力地震救援

自土耳其发生7.8级大地震以来&#xff0c;一直都牵动着世人的心。2023年2月10日&#xff0c;据法新社最新消息&#xff0c;强震已造成土耳其和叙利亚两国超2万人遇难。报道称&#xff0c;相关官员和医护人员表示&#xff0c;地震造成土耳其17674人死亡&#xff0c;叙利亚则有33…

洛谷C++简单题小练习day22—小鱼记忆小程序!一题五解,高效学习

day22--小鱼记忆--2.26 习题概述 题目描述 小鱼最近被要求参加一个数字游戏&#xff0c;要求它把看到的一串数字 ai​&#xff08;长度不一定&#xff0c;以 0 结束&#xff09;&#xff0c;记住了然后反着念出来&#xff08;表示结束的数字 0 就不要念出来了&#xff09;。…

iOS App 上架指南及关键

引言 上架App Store是将iOS应用提交申请并上线的过程&#xff0c;旨在让应用在App Store上展示&#xff0c;吸引用户并获取流量。本文将介绍iOS上架的整体流程&#xff0c;并提供一些建议和注意事项。 一、iOS上架的整体流程 1. 申请开发者账号 首先&#xff0c;需要申请苹…

08_css

文章目录 CSS概念在HTML中引入CSS的三种方式CSS的选择器标签选择器类选择器id选择器后代选择器子类选择器并集选择器伪类选择器伪元素选择器属性选择器选择器的优先顺序 盒子模型边框的写法内外边距的写法外边距合并 标签的分类块级元素行级元素行内块 浮动 CSS 概念 css是层…

共同学习|Spring Cloud Alibaba一一服务网关Gateway

目录 服务网关-Gateway 环境搭建 负载均衡 Gateway Predicates Path After Before Cookie Header Weight GatewayFilter Factories StripPrefix AddResponseHeader 自定义全局Filter 网关(Gateway)又称网间连接器、协议转换器。网关在传输层上以实现网络互连&…

北斗卫星赋能,宠物定位新篇章—追踪宠物,不再是难题

北斗卫星赋能&#xff0c;宠物定位新篇章—追踪宠物&#xff0c;不再是难题 随着社会的快速发展与科技的不断进步&#xff0c;人们的生活方式也在不断改变。宠物已经成为越来越多家庭的重要成员&#xff0c;在这个宠爱宠物的时代&#xff0c;如何确保宠物的安全&#xff0c;特…

js之继承

js之继承 1、原型 prototype 和 __proto__2、原型链3、继承4、hasOwnProperty 1、原型 prototype 和 proto 每个对象都有一个__proto__属性&#xff0c;并且指向它的prototype原型对象每个构造函数都有一个prototype原型对象prototype原型对象里的constructor指向构造函数本身…