大数据Flink(一百二十二):阿里云Flink MySQL连接器介绍

文章目录

阿里云Flink MySQL连接器介绍

一、特色功能

二、​​​​​​​语法结构

三、​​​​​​​​​​​​​​WITH参数


阿里云Flink MySQL连接器介绍

阿里云提供了MySQL连接器,其作为源表时,扮演的就是flink cdc的角色。

一、特色功能

MySQL的CDC源表,即MySQL的流式源表,会先读取数据库的历史全量数据,并平滑切换到Binlog读取上,保证不多读一条也不少读一条数据。即使发生故障,也能保证通过Exactly Once语义处理数据。MySQL CDC源表支持并发地读取全量数据,通过增量快照算法实现了全程无锁和断点续传。

作为源表,支持以下功能特性。

  • 流批一体,支持读取全量和增量数据,无需维护两套流程。
  • 支持并发读取全量数据,性能水平扩展。
  • 全量读取无缝切换增量读取,自动缩容,节省计算资源。
  • 全量阶段读取支持断点续传,更稳定。
  • 无锁读取全量数据,不影响在线业务。

二、​​​​​​​​​​​​​​语法结构

CREATE TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

 

三、​​​​​​​​​​​​​​WITH参数

  • 通用

参数

说明

是否必填

数据类型

默认值

备注

connector

表类型。

STRING

作为源表时,可以填写为mysql-cdc或者mysql,二者等价。作为维表或结果表时,固定值为mysql

hostname

MySQL数据库的IP地址或者Hostname。

STRING

建议填写专有网络VPC地址。

username

MySQL数据库服务的用户名。

STRING

无。

password

MySQL数据库服务的密码。

STRING

无。

database-name

MySQL数据库名称。

STRING

  1. 作为源表时,数据库名称支持正则表达式以读取多个数据库的数据。
  2. 使用正则表达式时,尽量不要使用^和$符号匹配开头和结尾。具体原因详见table-name备注的说明。

table-name

MySQL表名。

STRING

  1. 作为源表时,表名支持正则表达式以读取多个表的数据。
  2. 使用正则表达式时,尽量不要使用^和$符号匹配开头和结尾。具体原因详见以下说明。

说明:MySQL CDC源表在正则匹配表名时,会将您填写的 database-name,table-name 通过字符串 \\.(VVR 8.0.1前使用字符.)连接成为一个全路径的正则表达式,然后使用该正则表达式和MySQL数据库中表的全限定名进行正则匹配。例如:当配置'database-name'='db_.*'且'table-name'='tb_.+'时,连接器将会使用正则表达式db_.*\\.tb_.+(8.0.1版本前为db_.*.tb_.+)去匹配表的全限定名来确定需要读取的表。

port

MySQL数据库服务的端口号。

INTEGER

3306

无。

  • 源表独有

参数

说明

是否必填

数据类型

默认值

备注

server-id

数据库客户端的一个数字ID。

STRING

默认会随机生成一个5400~6400的值。

该ID必须是MySQL集群中全局唯一的。建议针对同一个数据库的每个作业都设置一个不同的ID。

该参数也支持ID范围的格式,例如5400-5408。在开启增量读取模式时支持多并发读取,此时推荐设定为ID范围,使得每个并发使用不同的ID。

scan.incremental.snapshot.enabled

是否开启增量快照。

BOOLEAN

true

默认开启增量快照。增量快照是一种读取全量数据快照的新机制。与旧的快照读取相比,增量快照有很多优点,包括:

  1. 读取全量数据时,Source可以是并行读取。
  2. 读取全量数据时,Source支持chunk粒度的检查点。
  3. 读取全量数据时,Source不需要获取全局读锁(FLUSH TABLES WITH read lock)。

如果您希望Source支持并发读取,每个并发的Reader需要有一个唯一的服务器ID,因此server-id必须是5400-6400这样的范围,并且范围必须大于等于并发数。

scan.incremental.snapshot.chunk.size

表的chunk的大小(行数)。

INTEGER

8096

当开启增量快照读取时,表会被切分成多个chunk读取。在读完chunk的数据之前,chunk的数据会先缓存在内存中,因此chunk 太大,可能导致内存OOM。chunk越小,故障恢复的粒度也越小,但也会降低吞吐。

scan.snapshot.fetch.size

当读取表的全量数据时,每次最多拉取的记录数。

INTEGER

1024

无。

scan.startup.mode

消费数据时的启动模式。

STRING

initial

参数取值如下:

  1. initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。
  2. latest-offset:在第一次启动时,不会扫描历史全量数据,直接从Binlog的末尾(最新的Binlog处)开始读取,即只读取该连接器启动以后的最新变更。
  3. earliest-offset:不扫描历史全量数据,直接从可读取的最早Binlog开始读取。
  4. specific-offset:不扫描历史全量数据,从您指定的Binlog位点启动,位点可通过配置scan.startup.specific-offset.filescan.startup.specific-offset.pos的方式来指定从特定Binlog文件名和偏移量启动,也可以通过配置scan.startup.specific-offset.gtid-set指定从某个GTID集合启动。
  5. timestamp:不扫描历史全量数据,从指定的时间戳开始读取Binlog。时间戳通过scan.startup.timestamp-millis指定,单位为毫秒。

scan.startup.specific-offset.file

使用指定位点模式启动时,启动位点的Binlog文件名。

STRING

使用该配置时,scan.startup.mode必须配置为specific-offset。文件名格式例如mysql-bin.000003

scan.startup.specific-offset.pos

使用指定位点模式启动时,启动位点在指定Binlog文件中的偏移量。

INTEGER

使用该配置时,scan.startup.mode必须配置为specific-offset

scan.startup.specific-offset.gtid-set

使用指定位点模式启动时,启动位点的GTID集合。

STRING

使用该配置时,scan.startup.mode必须配置为specific-offset。GTID集合格式例如24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

scan.startup.timestamp-millis

使用指定时间模式启动时,启动位点的毫秒时间戳。

LONG

使用该配置时,scan.startup.mode必须配置为timestamp。时间戳单位为毫秒。

重要:在使用指定时间时,MySQL CDC会从最早Binlog开始读取,直至Binlog事件的时间戳大于等于指定的时间戳后开始向下游发送数据。因此请保证指定的时间戳对应的Binlog文件在数据库上没有被清理且可以被读取到。

server-time-zone

数据库在使用的会话时区。

VVR-6.0.2以下版本必填,其他版本选填

STRING

如果您没有指定该参数,则系统默认使用Flink作业运行时的环境时区作为数据库服务器时区,即您选择的可用区所在的时区。

例如Asia/Shanghai,该参数控制了MySQL中的TIMESTAMP类型如何转成STRING类型。更多信息请参见Debezium时间类型

debezium.min.row.count.to.stream.results

当表的条数大于该值时,会使用分批读取模式。

INTEGER

1000

Flink采用以下方式读取MySQL源表数据:

  1. 全量读取:直接将整个表的数据读取到内存里。优点是速度快,缺点是会消耗对应大小的内存,如果源表数据量非常大,可能会有OOM风险。
  2. 分批读取:分多次读取,每次读取一定数量的行数,直到读取完所有数据。优点是读取数据量比较大的表没有OOM风险,缺点是读取速度相对较慢。

connect.timeout

连接MySQL数据库服务器超时时,重试连接之前等待超时的最长时间。

DURATION

30s

无。

connect.max-retries

连接MySQL数据库服务时,连接失败后重试的最大次数。

INTEGER

3

无。

connection.pool.size

数据库连接池大小。

INTEGER

20

数据库连接池用于复用连接,可以降低数据库连接数量。

jdbc.properties.*

JDBC URL中的自定义连接参数。

STRING

您可以传递自定义的连接参数,例如不使用SSL协议,则可配置为'jdbc.properties.useSSL' = 'false'

支持的连接参数请参见Mysql Configuration Properties

heartbeat.interval

Source通过心跳事件推动Binlog位点前进的时间间隔。

DURATION

30s

心跳事件用于推动Source中的Binlog位点前进,这对MySQL中更新缓慢的表非常有用。对于更新缓慢的表,Binlog位点无法自动前进,通过够心跳事件可以推到Binlog位点前进,可以避免Binlog位点不前进引起Binlog位点过期问题,Binlog位点过期会导致作业失败无法恢复,只能无状态重启。

scan.incremental.snapshot.chunk.key-column

可以指定某一列作为快照阶段切分分片的切分列。

见备注列。

STRING

  1. 无主键表必填,选择的列必须是非空类型(NOT NULL)。
  2. 有主键的表为选填,仅支持从主键中选择一列。

说明:仅Flink计算引擎VVR 6.0.7及以上版本支持。

rds.region-id

RDS实例所在的地域 ID。

使用读取OSS归档日志功能时必填。

STRING

仅Flink计算引擎VVR 6.0.7及以上版本支持。

地域ID请参见地域和可用区

rds.access-key-id

阿里云账号Access Key ID。

使用读取OSS归档日志功能时必填。

STRING

仅Flink计算引擎VVR 6.0.7及以上版本支持。

rds.access-key-secret

阿里云账号Access Key Secret。

使用读取OSS归档日志功能时必填。

STRING

仅Flink计算引擎VVR 6.0.7及以上版本支持。

rds.db-instance-id

RDS实例ID。

使用读取OSS归档日志功能时必填。

STRING

仅Flink计算引擎VVR 6.0.7及以上版本支持。

scan.incremental.close-idle-reader.enabled

是否在快照结束后关闭空闲的 Reader。

BOOLEAN

false

  1. 仅Flink计算引擎VVR 8.0.1及以上版本支持。
  2. 该配置生效需要设置execution.checkpointing.checkpoints-after-tasks-finish.enabled为true。

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/882489.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

操作系统 | 学习笔记 | | 王道 | 5.3 磁盘和固态硬盘

5.3 磁盘和固态硬盘 5.3.1 磁盘 磁盘结构 磁盘&#xff1a;磁盘的表面由一些磁性物质组成&#xff0c;可以用这些磁性物质来记录二进制数据 磁道&#xff1a;磁盘的盘面被划分成一个个磁道。这样的一个“圈”就是一个磁道 扇区&#xff1a;一个磁道又被划分成一个个扇区&am…

大数据毕业设计选题推荐-网络电视剧收视率分析系统-Hive-Hadoop-Spark

✨作者主页&#xff1a;IT毕设梦工厂✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、PHP、.NET、Node.js、GO、微信小程序、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇…

通信工程学习:什么是NFVO网络功能虚拟化编排器

NFVO&#xff1a;网络功能虚拟化编排器 NFVO&#xff08;Network Functions Virtualization Orchestrator&#xff09;&#xff0c;即网络功能虚拟化编排器&#xff0c;是网络功能虚拟化&#xff08;NFV&#xff09;架构中的核心组件之一。NFV是一种将传统电信网络中的网络节点…

从零开始学习Python

目录 从零开始学习Python 引言 环境搭建 安装Python解释器 选择IDE 基础语法 注释 变量和数据类型 变量命名规则 数据类型 运算符 算术运算符 比较运算符 逻辑运算符 输入和输出 控制流 条件语句 循环语句 for循环 while循环 循环控制语句 函数和模块 定…

黑马智数Day3

渲染基础Table列表 封装接口&#xff1a; export function getCardListAPI(params) {return request({url: /parking/card/list,params}) } 具体实现&#xff1a; import { getCardListAPI } from /apis/cardexport default {data() {return {// 请求参数params: {page: 1,pa…

乌克兰因安全风险首次禁用Telegram

据BleepingComputer消息&#xff0c;乌克兰国家网络安全协调中心 &#xff08;NCCC&#xff09; 以国家安全为由&#xff0c;已下令限制在政府机构、军事单位和关键基础设施内使用 Telegram 消息应用程序。 这一消息通过NCCC的官方 Facebook 账号对外发布&#xff0c;在公告中乌…

【小程序】uniapp自定义图标组件可动态更换svg颜色

组件描述 通过图标名称加载对应svg&#xff0c;size参数调整图标大小&#xff0c;color参数调整图标颜色 解决思路&#xff1a; 存svg获svg&#xff0c;对象方式正则替换svg的fill值&#xff0c;不改变源文件&#xff0c;通过base64直接加载缓存svg源文件&#xff0c;避免重…

上传富文本插入文件时报错:JSON parse error: Unexpected character解决办法

方式一&#xff08;加密解密&#xff09;&#xff1a; 1.前端 &#xff08;1&#xff09;安装 crypto-js npm install crypto-js&#xff08;2&#xff09;util下创建asc.js asc.js import CryptoJS from crypto-js// 需要和后端一致 const KEY CryptoJS.enc.Utf8.parse(…

爬虫逆向学习(七):补环境动态生成某数四代后缀MmEwMD

声明&#xff1a;本篇文章内容是整理并分享在学习网上各位大佬的优秀知识后的实战与踩坑记录 前言 这篇文章主要是研究如何动态生成后缀参数MmEwMD的&#xff0c;它是在文章爬虫逆向学习(六)&#xff1a;补环境过某数四代的基础上进行研究的&#xff0c;代码也是在它基础上增…

C++之初识STL(概念)

STL&#xff08;标准模板库&#xff09; STL广义分类为&#xff1a;容器&#xff0c;算法&#xff0c;迭代器 * **容器**和**算法**之间通过**迭代器**进行无缝连接 意义&#xff1a;C的**面向对象**和**泛型编程**思想&#xff0c;目的就是**复用性的提升** STL六大组件 1. 容…

论文阅读:Omni-Kernel Network for Image Restoration

论文地址&#xff1a;https://ojs.aaai.org/index.php/AAAI/article/view/27907 项目地址&#xff1a;https://github.com/c-yn/OKNet 发表时间&#xff1a;2024 图像恢复的目的是从一个退化的低质量的观测中重建一个高质量的图像。最近&#xff0c;Transformer模型由于其强大…

JavaScript 安装库npm报错

今天在编写JavaScript代码时&#xff0c;缺少了包express。 const express require(express); const app express();app.get(/, (req, res) > {res.send(Hello, world!); });app.listen(3000, () > {console.log(Server is running on port 3000); });npm install exp…

【Redis技能熟练掌握之十年内功】

Redis技能熟练掌握之十年内功 1.redis是什么&#xff1f;为什么要使用redis&#xff1f;2.redis一般应用于什么场景&#xff08;四个场景&#xff09;&#xff1f;3. Redis持久化机制是什么&#xff1f;各自的优缺点&#xff1f;一般咋么用&#xff1f;4. redis五个基础类型支持…

速通汇编(七)BX、SI、DI寄存器,BP寄存器,直接寻址和间接寻址

下文中出现的"idata"&#xff0c;指的都是任意常量 一&#xff0c;基于BX、SI、DI等寄存器的寻址形式 在第五篇中曾介绍过DS寄存器的作用&#xff0c;简要复习一下->速通汇编&#xff08;五&#xff09;认识段地址与偏移地址&#xff0c;CS、IP寄存器和jmp指令&a…

百度飞浆Paddle OCR检测和识别【OCR数据收集、标注、数据集划分、检测识别模型训练、导出模型】

文章目录 前言一、OCR数据集采集二、OCR数据标注三、划分数据集四、数据训练五、导出模型 前言 1、我的电脑没有GPU&#xff0c;如果不使用AI Studio训练的话&#xff0c;第一遍我是按照CPU进行环境配置和训练的&#xff0c;可以参考这篇文章&#xff0c;我按着弄了一遍&#…

Kafka技术详解[1]:简介与基础概念

目录 1. Kafka入门 1.1 概述 1.1.1 初识Kafka 1.1.2 消息队列 1.1.3 生产者-消费者模式 1.1.4 消息中间件对比 1.1.5 ZooKeeper 1. Kafka入门 1.1 概述 1.1.1 初识Kafka Kafka是由Scala和Java语言开发的高吞吐量分布式消息发布和订阅系统&#xff0c;也是大数据技术领…

10月23-27日六西格玛绿带公开课即将在雄安新区开课

在金秋送爽、硕果累累的季节里&#xff0c;天行健管理咨询公司宣布了一项重要决定——定于10月23日至27日&#xff0c;在充满未来气息的河北雄安新区&#xff0c;举办一场旨在提升企业质量管理水平、培养精英人才的六西格玛绿带公开课。此次课程的举办&#xff0c;不仅是对当前…

Spring6梳理9—— 依赖注入之注入对象类型属性

目录 9.1 依赖注入之外部注入对象类型属性 9.1.1 创建dept与emp类 9.1.2 创建配置文件 9.1.3 创建测试类 9.1.4 运行结果 9.2 依赖注入之内部注入对象类型 9.3 依赖注入之级联注入对象类型 9.1 依赖注入之外部注入对象类型属性 9.1.1 创建dept与emp类 1.dept…

Python在AI中的应用--使用决策树进行文本分类

Python在AI中的应用--使用决策树进行文本分类 文本分类决策树什么是决策树 scikit算法 使用scikit的决策树进行文章分类一个文本分类的Python代码使用的scikit APIs说明装入数据集决策树算法类类构造器&#xff1a; 构造决策树分类器产生输出评估输出结果分类准确度分类文字评估…

达梦-华为鲲鹏ARM架构下性能测试最佳实践

一、测试综述 1.1 测试目的 本次测试的目的是验证达梦数据库&#xff0c;在鲲鹏服务器下&#xff0c;不同服务器参数基于sysbench性能压力测试的表现。本次参数是根据为华为鲲鹏arm服务器调优十板斧内建议值调整 成长地图-鲲鹏开发套件开发文档-鲲鹏社区 1.2 通用指标 指标…