Flink CDC 提取记录变更时间作为事件时间和 Hudi 表的 precombine.field 以及1970-01-01 取值问题

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

CDC 数据中的记录变更时间标记着这条记录在数据库中执行对应操作(创建/更新/删除)的时间,可以说是天然的“事件时间”,特别是对于那些本身没有记录时间字段的表来说就更加合适了。Flink 官方文档 也建议在使用 CDC 的情况下,优先使用 CDC 中的这个时间字段,这个时间更加精准。

与此同时,在定义 Hudi 表时,precombine.field 也是一个非常重要的配置,显然 CDC 数据中的记录变更时间是最合适的,没有之一。

CDC 数据中的记录变更时间属于元数据范畴,以 Flink CDC 的 MySQL 数据库为例,它提供四种元数据的抽取:

KeyDataTypeDescription
table_nameSTRING NOT NULLName of the table that contain the row.
database_nameSTRING NOT NULLName of the database that contain the row.
op_tsTIMESTAMP_LTZ(3) NOT NULLIt indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the binlog, the value is always 0.
row_kindSTRING NOT NULLIt indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if the source operator chooses to output the ‘row_kind’ column for each record. It is recommended to use this metadata column only in simple synchronization jobs. ‘+I’ means INSERT message, ‘-D’ means DELETE message, ‘-U’ means UPDATE_BEFORE message and ‘+U’ means UPDATE_AFTER message.

其中的 op_ts 就是我们想要的,也就是:CDC 数据中的记录变更时间。我们可以在定义数据表时声明这个列,Flink CDC 可以将其提取出来作为普通字段供下游使用,就像下表中这样:

CREATE TABLE IF NOT EXISTS orders_mysql_cdc (
    `order_number` INT NOT NULL,
    `order_date` DATE NOT NULL,
    `purchaser` INT NOT NULL,
    `quantity` INT NOT NULL,
    `product_id` INT NOT NULL,
    `op_ts` TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    PRIMARY KEY (`order_number`) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    ...
);

注意,在定义 Flink CDC 源表时,op_ts 的数据类型是 TIMESTAMP_LTZ(3),不是 TIMESTAMP(3),写入下游表时,可以是 TIMESTAMP(3)

当我们初次使用这个 op_ts 字段时,你会发现,写入到的数据库的数据全部都是 1970-01-01 00:00:00.000,就像下面这样:

在这里插入图片描述

你可能会认为是哪里出错了,实际上,这是 Flink CDC 特别设计的,也是合理的,Flink CDC 官方文档的解释是:

If the record is read from snapshot of the table instead of the binlog, the value is always 0.

我们知道,Flink CDC ( 2.0+ ) 的一个显著特征是:它是全量 + 增量的一体化读取!全量就是经常说的历史数据,增量就是实时的数据,控制 Flink CDC 是从全部历史数据开始同步整个数据库还是从只当下的 binlog 中同步近期增量数据的配置项是:scan.startup.mode ( 官方文档 ),该配置项支持 5 种配置,而默认配置(initial)就是以当前分界点,数据中的现有数据使用全量方式读取(也叫快照读取),此后的数据从 binlog 中读取,这样就和上面描述的 op_ts 字段的取值吻合上了:

当 Flink CDC 使用全量方式读取表中的历史数据时,op_ts 字段全部取值为 0,即 1970-01-01 00:00:00.000,当 Flink CDC 使用增量方式读取 binlog 数据时,op_ts 字段的取值为数据发生变更的实际时间

这种设计还是非常合理的,因为,Flink CDC 本身在使用快照方式读取时,就没有任何变更时间可以读取,这个时间只在 binlog 中才有,而这对下游也不会造成太大的影响,因为此时的数据都是 insert-only 的数据,同一主键也不会出现两条记录,至少对 Hudi 表是没有影响的。

此外,作为一个“额外收获”,你会发现:op_ts 这个字段本身恰好标记了一条记录是通过全量同步进来的,还是增量同步进来的!


补充:以下是 Flink CDC 官方文档对 scan.startup.mode 5 种同步模式的解释:

The config option scan.startup.mode specifies the startup mode for MySQL CDC consumer. The valid enumerations are:

  • initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
  • earliest-offset: Skip snapshot phase and start reading binlog events from the earliest accessible binlog offset.
  • latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
  • specific-offset: Skip snapshot phase and start reading binlog events from a specific offset. The offset could be specified with binlog filename and position, or a GTID set if GTID is enabled on server.
  • timestamp: Skip snapshot phase and start reading binlog events from a specific timestamp.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/413993.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

windows系统使用Vscode在WSL调试golang本地进程

背景: windows10企业版 vscodegolang1.20 wsl编译运行。 vscode 使用本地wsl进行进程attach操作,发现:Access is denied. 本地进程启动,vscode调试进程。windows-Linux控制台: Starting: C:\Users\book\go\bin\dlv.exe dap --l…

node.js 用 xml2js.Parser 读 Freeplane.mm文件,生成测试用例.csv文件

Freeplane 是一款基于 Java 的开源软件,继承 Freemind 的思维导图工具软件,它扩展了知识管理功能,在 Freemind 上增加了一些额外的功能,比如数学公式、节点属性面板等。 编写 mm_xml2js_csv.js 如下 // 用 xml2js.Parser 读 F…

ROS开发基础-Linux基础第四部(开发板设置本地IP)

一 、网线连接设备 使用网线连接jetson NX与机械臂,如下图所示: 二、 修改上位机IPV4 IP ①测试是否可连接。网线连接机械臂之后,在桌面打开终端输入命令“ping 192.168.1.18”,如不可正常通信,可按照下述步骤进行设置。 ②在U…

Vue3前端实现一个本地消息队列(MQ), 让消息延迟消费或者做缓存

MQ功能实现的具体代码(TsMQ.ts): import { v4 as uuidx } from uuid;import emitter from /utils/mitt// 消息类 class Message {// 过期时间,0表示马上就消费exp: number;// 消费标识,避免重复消费tag : string;// 消息体body : any;constr…

备战蓝桥杯Day18 - 双链表

一、每日一题 蓝桥杯真题之工作时长 这个题写代码做的话很麻烦,而且我也不一定能写出来,所以我直接就是用的excel来计算的时间和。 使用excel的做法 1.先把文件中的时间复制到excel中。 2.把日期和时间分到两列。 分成两列的步骤: 选中要…

让两个电脑通信的方法(TCP连接,UDP连接,C/S架构)

目录 TCP-面向连接UDP-面向无连接C/S架构服务器和客户端的工作过程C/S架构例子 让两个电脑通信的方法是 在C/S的基础上,采用TCP和UDP的方式连接 TCP-面向连接 UDP-面向无连接 C/S架构 服务器和客户端的工作过程 C/S架构例子 服务器与客户端通信的过程类似公司与客户…

【亚马逊云】跨AWS账号创建复制规则同步S3存储桶中的数据

文章目录 注意事项一、创建存储桶【创建方&接收方完成操作】二、上传数据至bucket-transmit待同步测试三、创建复制规则【创建方完成操作】四、接收复制的对象【接收方完成操作】五、创建复制任务【创建方操作】六、运行批处理操作【创建方完成操作】七、检查是否完成跨账号…

React UI框架Antd 以及 如何按需引入css样式配置

一、react UI框架Antd使用 1.下载模块 npm install antd -S 2.引入antd的样式 import ../node_modules/antd/dist/reset.css; 3.局部使用antd组件 import {Button, Calendar} from antd; import {PieChartTwoTone} from ant-design/icons; {/* 组件汉化配置 */} import l…

StarRocks——Stream Load 事务接口实现原理

目录 前言 一、StarRocks 数据导入 二、StarRocks 事务写入原理 三、InLong 实时写入StarRocks原理 3.1 InLong概述 3.2 基本原理 3.3 详细流程 3.3.1 任务写入数据 3.3.2 任务保存检查点 3.3.3 任务如何确认保存点成功 3.3.4 任务如何初始化 3.4 Exactly Once 保证…

go test用法(获取单元测试覆盖率)

go test用法(获取ut覆盖率) 为了提升系统的稳定性,一般公司都会对代码的单元测试覆盖率有一定要求。下面针对golang自带的测试命令go test做讲解。 1 命令 1.1 go test ./… (运行当前目录及所有子目录下的测试用例) …

【Nginx笔记02】通过Nginx服务器转发客户端的WebSocket接口到后端服务

这篇文章,主要介绍如何通过Nginx服务器转发客户端的WebSocket接口到后端服务【知识星球】。 目录 一、Nginx配置WebSocket 1.1、Nginx配置内容 1.2、客户端请求地址 1.3、创建WebSocket测试工程 1.4、启动测试 1.5、WebSocket超时问题 1.5.1、设置超时时间 …

计算机网络——IPV4数字报

1. IPv4数据报的结构 本结构遵循的是RFC 791规范,介绍了一个IPv4数据包头部的不同字段。 1.1 IPv4头部 a. 版本(Version):指明了IP协议的版本,IPv4表示为4。 b. 头部长度(IHL, Internet Header Length&…

Adobe illustrator CEP插件调试

1.创建插件CEP面板,可以参考:http://blog.nullice.com/%E6%8A%80%E6%9C%AF/CEP-%E5%BC%80%E5%8F%91%E6%95%99%E7%A8%8B/%E6%8A%80%E6%9C%AF-CEP-%E5%BC%80%E5%8F%91%E6%95%99%E7%A8%8B-Adobe-CEP-%E6%89%A9%E5%B1%95%E5%BC%80%E5%8F%91%E6%95%99%E7%A8%8…

【Docker】安装及相关的命令

目录 一 Docker简介 1.1 是什么 1.2 优缺点 1.3 应用场景 1.4 安装 二 命令 2.1 Docker基本命令 2.2 Docker镜像命令 2.3 Docker容器命令 一 Docker简介 1.1 是什么 Docker是一个开源的应用容器引擎,它基于Go语言实现,并利用操作系统本身已有的…

LeetCode142. 环形链表 II刷题详解

今天力扣刷到了一个特别有意思的题目,于是就写了下面的题解来加深以下理解。 142. 环形链表 II - 力扣(LeetCode) 这个可以分为两大步去写,首先要判断链表是否有环,然后如果有环就去找到环的入口,没有环返…

【Vite】解决Vite http proxy error: Error: connect ECONNREFUSED

今天写bug,发现了这个问题 我经过我一晚上的搜索努力,在github上找到了解决办法,不得不说,交友网站还是很好用的。 参考 这一行是关键代码。 因为我连的是本地后台服务,所以最后配置成这样 server: {open: true,pro…

关于年化收益率的思考

近期,对于投资的年化收益率有一些思考,想着将这些思考整理一下,顺便也就记录在这里。 1. 计算方式 年化收益率常见的计算有三种:算数平均,几何平均,IRR。 1.1 算术平均 算数平均用于度量产品的回报率&a…

ChatGPT 正测试Android屏幕小组件;联想ThinkBook 推出透明笔记本电脑

▶ ChatGPT 测试屏幕小组件 近日 ChatGPT 正在测试 Android 平台上的屏幕小组件,类似于手机中的悬浮窗,按住 Android 手机主屏幕上的空白位置就可以调出 ChatGPT 的部件菜单。 菜单中提供了许多选项,包括文本、语音和视频查询的快捷方式&…

【Java设计模式】一、工厂模式、建造者模式、原型设计模式

文章目录 1、简单工厂模式2、工厂方法模式3、抽象工厂模式4、建造者模式5、原型设计模式 1、简单工厂模式 即由一个工厂决定创建哪一种产品类型的实例。 //抽象的课程类 public abstract class Course{//有一个制作课程的抽象方法public abstract void make(); }以上抽象类的…

袁庭新ES系列12节 | Elasticsearch高级查询操作

前言 上篇文章讲了关于Elasticsearch的基本查询操作。接下来袁老师为大家带来Elasticsearch高级查询部分相关的内容。Elasticsearch是基于JSON提供完整的查询DSL(Domain Specific Language:领域特定语言)来定义查询。因此,我们有…