Flink CDC 的 debezium-json 格式和 debezium 原生格式是一回事吗?

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

这是一个很容易混淆和误解的问题,值得拿出来讨论对比一下。我们知道 Debezium 是专门用于捕获 CDC 数据的开源框架,它对接了多种数据库,同时也定义了自己的 CDC 数据交换格式,也就是常说的 debezium 格式。而Flink CDC 复用了 Debezium 的部分功能,也就是说:Debezium 是 Flink CDC 的底层采集工具,Flink CDC 的工程依赖会用使用到 Debezium 的 Jar 包,然后 Flink CDC 在 Debezium 基础之上,封装了额外的功能,例如:无锁读取,并发读取(全量数据的读取性能可以水平扩展),断点续传,这些功能是 Debezium 所不具备的,也是 Flink CDC 存在的意义。

同时,Flink 还有一种专门的数据格式 debezium-json,从名称上看,它似乎就是 debezium 格式的 json 表达形式,那 debezium-json 格式和 debezium 原生格式是一回事吗?

首先,我们要主要到这样一个细节:mysql-cdc 作为一个 source connector,并不要求指定 format,实际上,它的 format 是不可配置的,因为 Flink CDC 在内部实现依赖 debezium,获得的原始的数据格式就是 debezium 格式,对外,这不可配置,也不可见,只有向下游传导数据时,才会涉及到解析和转换的问题。

其次,我们还要先澄清一种误解:debezium-json 并不是跟 Flink CDC(例如mysql-cdc)绑定在一起的,作为一种独立的、可描述 changelog 的格式,实际上,它可以应用到任何动态表上,例如:如果上游表是:connector=upsert-kafka,format=json,下游依旧可以使用: connector=kafka,format=debezium-json,关于这一点,可以参考本文的实测 《Flink SQL:debezium-json 格式的表一定是数据库的 CDC 数据吗?》,这个测试给出了这样一个非常明确的结论:

使用 debezium-json 格式的表不一定是数据库的 CDC 数据,但一定是上游动态表的 changelog,然后使用 debezium-json 格式描述。

Flink CDC 从数据库 binlog 中提取数据时使用了 debezium,获得的原始的数据格式也是 debezium 格式,但是,这都是发生在 Flink CDC 内部的,对外是不可见的!当需要把 CDC 数据传给下游时,才会针对下游指定的格式进行转换,这种转换也是根据目标表 DDL 中定义的 Schema 自动地隐式地完成的。

我们还是靠举例和试验来说明这个问题吧。再次看一下 《Flink CDC 与 Kafka 集成:Snapshot 还是 Changelog?Upsert Kafka 还是 Kafka?》 一文的 ”测试组合(1):connector=kafka,format=debezium-json“ 一节给出的案例。

原生 Debezium 格式(样例)

使用如下 SQL 创建一个 mysql-cdc 的源表:

SET 'sql-client.execution.result-mode' = 'TABLEAU';

DROP TABLE IF EXISTS orders_mysql_cdc;

CREATE TABLE IF NOT EXISTS orders_mysql_cdc (
    `order_number` INT NOT NULL,
    `order_date` DATE NOT NULL,
    `purchaser` INT NOT NULL,
    `quantity` INT NOT NULL,
    `product_id` INT NOT NULL,
    CONSTRAINT `PRIMARY` PRIMARY KEY (`order_number`) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '10.0.13.30',
    'port' = '3307',
    'username' = 'root',
    'password' = 'Admin1234!',
    'database-name' = 'inventory',
    'table-name' = 'orders'
);

那从 Flink CDC 源表提取出来的数据应该是什么样子呢?前面我们已经说过,这个动作发生在 Flink CDC 内部,提取的数据也是外部不可见的,那我们能不能从其他渠道确定实际的数据格式吗?能,如果说 Flink CDC 就是通过 Debezium 来采集数据,那么采集到的最原始的数据格式就是标准的 Debezium 格式,通常,这是这个样子的:

{
  "before": null,
  "after": {
    "osci.mysql-server-3.inventory.orders.Value": {
      "order_number": 10006,
      "order_date": 16852,
      "purchaser": 1003,
      "quantity": 1,
      "product_id": 107
    }
  },
  "source": {
    "version": "2.2.0.Final",
    "connector": "mysql",
    "name": "osci.mysql-server-3",
    "ts_ms": 1705645511000,
    "snapshot": {
      "string": "false"
    },
    "db": "inventory",
    "sequence": null,
    "table": {
      "string": "orders"
    },
    "server_id": 223344,
    "gtid": null,
    "file": "mysql-bin.000004",
    "pos": 640,
    "row": 0,
    "thread": {
      "long": 10
    },
    "query": null
  },
  "op": "c",
  "ts_ms": {
    "long": 1705645511455
  },
  "transaction": null
}

再次强调,上述格式的数据在 Flink CDC 中是不可见的,发生于 Flink CDC 内部,以上格式是标准的 debezium 数据格式,Flink CDC一定是率先拿到了这种格式的数据然后再经处理转发给下游的,比如:如果 DDL 中提取了某些元数据,也是从上面这种原始的 Debezium 数据中获取的。

debezium-json 格式(样例)

如下的 SQL 在 Kafka 上创建了一个 debezium-json 格式的目标表,然后使用 INSERT INTO ... SELECT ... 把源表和目标表的数据流驱动起来:

DROP TABLE IF EXISTS orders_kafka_debezium_json;

CREATE TABLE IF NOT EXISTS orders_kafka_debezium_json (
    order_number int,
    order_date   date,
    purchaser    int,
    quantity     int,
    product_id   int
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders_kafka_debezium_json',
    'properties.bootstrap.servers' = 'b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092',
    'properties.group.id' = 'orders_kafka_debezium_json',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'debezium-json'
);

-- 提交持续查询,驱动整个 Pipeline

insert into orders_kafka_debezium_json select * from orders_mysql_cdc;

这时,写入 Kafka 中的 debezium-json 格式的数据是这样的:

{
  "before": {
    "order_number": 10003,
    "order_date": "2016-02-19",
    "purchaser": 1002,
    "quantity": 2,
    "product_id": 106
  },
  "after": null,
  "op": "d"
}

结论

比较上述两种消息格式就能看出:

debezium-json 格式并不等于原生的 debezium 格式,两者有很多相似之处,都有 before,after,op,原生 debezium 格式仅发生并存在于 Flink CDC 内部,对外不可见,debezium-json 格式可用于表达任何动态表的 changelog,与数据库 CDC 数据已无必然的绑定关系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/550014.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【介绍下负载均衡原理及算法】

🎥博主:程序员不想YY啊 💫CSDN优质创作者,CSDN实力新星,CSDN博客专家 🤗点赞🎈收藏⭐再看💫养成习惯 ✨希望本文对您有所裨益,如有不足之处,欢迎在评论区提出…

IP协议如何进行地址管理?

如今,IP协议有两个版本,分别是IPv4和IPv6,IPv4是目前主要应用的版本。IPv4的IP地址是以4个字节的数字来表示的,比如 127.0.0.1。因此,IPv4所能表示IP地址的个数是2^32次方,也就是42亿多个,看起来…

48.HarmonyOS鸿蒙系统 App(ArkUI)常用组件的使用

48.HarmonyOS鸿蒙系统 App(ArkUI)常用组件的使用 按钮触发事件 toast信息提示 单选按钮 复选框 切换按钮,开关按钮 进度条 textbox,textinput,TextArea文本输入框 气泡提示 import prompt from ohos.prompt; import promptAction from ohos.promptAction; …

Qt对象池,单例模式,对象池可以存储其他类的对象指针

代码描述: 写了一个类,命名为对象池(ObjectPool ),里面放个map容器。 3个功能:添加对象,删除对象,查找对象 该类只构建一次,故采用单例模式功能描述:对象池可…

【ARFoundation自学01】搭建AR框架,检测平面点击位置克隆物体

Unity开发ARFoundation相关应用首先安装ARFoundation包 然后设置XR 1.基础AR场景框架搭建 2.一个基本的点击克隆物体到识别的平面脚本 挂在XROrigin上 脚本AppController 脚本说明书 ## 业务逻辑 AppController 脚本旨在实现一个基本的 AR 应用程序功能:用户通过…

Spring Cloud+Uniapp 智慧工地云平台源码 智慧工地云平台AI视频分析应用

目录 AI应用与环境治理 设备管理与危大工程 塔吊安全监管 智慧工地APP端 智慧工地硬件设备 智慧工地主要功能模块 智慧工地可以通过以下几个方面为建筑行业赋能: 1.提高工程效率 2.提高工程安全性 3.提高工程质量 4.提高工程管理效率 绿色施工 质量管理…

Codeforces Round 924 (Div. 2) --- E. Modular Sequence ---- 题解

E. Modular Sequence: 题目描述: 思路解析: 这里第一个一定要需要填充x,然后后面每一位填充 ai-1 y 或者 ai-1 % y,那么其实相当于除了第一位固定,后面每一位都可以表现为 a ki * y;其中 a …

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单人脸检测/识别实战案例 之十二 简单人脸识别

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单人脸检测/识别实战案例 之十二 简单人脸识别 目录 Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单人脸检测/识别实战案例 之十二 简单人脸识别 一、简单介绍 二、简单人脸识别实现原理 三、简单人脸识别案例实现简…

RAID 磁盘阵列及RAID配置实战

目录 一.RAID磁盘阵列介绍 二.常用的RAID磁盘阵列的介绍 1.RAID 0 (条带化存储) 2.RAID 1(镜像存储) 3.RAID 5 4.RAID 6 5.RAID 10(先做镜像,再做条带) 6.RAID 01 (先做条带…

Java代码执行顺序

Java代码的执行顺序 后面大量的涉及到了static,我曾经写过一篇static的博客,可以看一眼 我上次写了static的加载顺序,没看过的可以进去看一眼 JavaSE:static关键字详解 ---------------------分割线-------------------------…

魔方网表 存在 mailupdate.jsp接口 任意文件上传漏洞

声明: 本文仅用于技术交流,请勿用于非法用途 由于传播、利用此文所提供的信息而造成的任何直接或者间接的后果及损失,均由使用者本人负责,文章作者不为此承担任何责任。 简介 魔方网表mailupdate.jsp接口存在任意文件上传漏洞 …

Jenkins配置windows/linux从节点

背景: 环境:jenkins环境(Ubuntu) 节点机器:Linux、Windows 前置条件: 节点机器:安装java、allure、python 1 Linux节点管理机器添加 1.1 系统管理->节点列表->New Node 1.2 节点配置…

Python --- 在python中安装NumPy,SciPy和Matplotlib(Windows平台)

在python中安装NumPy,SciPy和Matplotlib(Windows平台) NumPy NumPy是Python的一个最常用最基本的扩展程序库之一,主要用于矩阵运算或数组计算。很多其他的python库都要依赖于NumPy才能跑。 NumPy的发展史: Matrix-sig 1995年,特殊…

RabbitMQ - Spring boot 整合 RabbitMQ

一、RabbitMQ 1、RabbitMQ 使用场景 1.1、服务解耦 假设有这样一个场景, 服务A产生数据, 而服务B,C,D需要这些数据, 那么我们可以在A服务中直接调用B,C,D服务,把数据传递到下游服务即可 但是,随着我们的应用规模不断扩大,会有更多的服务需要A的数据,如果有几十甚至几百个下…

系统调优助手,PyTorch Profiler TensorBoard 插件教程

0x1. 前言 使用PyTorch Profiler进行性能分析已经一段时间了,毕竟是PyTorch提供的原生profile工具,个人感觉做系统性能分析时感觉比Nsys更方便一些,并且画的图也比较直观。这里翻译一下PyTorch Profiler TensorBoard Plugin的教程并分享一些…

SEO之搜索引擎的工作原理(三)

初创企业需要建站的朋友看这篇文章,谢谢支持:我给不会敲代码又想搭建网站的人建议 (接上一篇。。。) 排名 经过搜索引擎蜘蛛抓取页面,索引程序计算得到倒排索引后,搜索引擎就准备好可以随时处理用户搜索了…

基于Echarts的超市销售可视化分析系统(数据+程序+论文

本论文旨在研究Python技术和ECharts可视化技术在超市销售数据分析系统中的应用。本系统通过对超市销售数据进行分析和可视化展示,帮助决策层更好地了解销售情况和趋势,进而做出更有针对性的决策。本系统主要包括数据处理、数据可视化和系统测试三个模块。…

通义千问:官方开放API开发基础

目录 一、模型介绍 1.1主要模型 1.2 计费单价 二、前置条件 2.1 开通DashScope并创建API-KEY 2.2 设置API-KEY 三、基于DashScope SDK开发 3.1 Maven引入SDK 3.2 代码实现 3.3 运行代码 一、模型介绍 通义千问是由阿里云自主研发的大语言模型,用于理解和分…

JVM虚拟机(九)如何开启 GC 日志

目录 一、引言二、开启 GC 日志三、解析 GC 日志四、优化建议 一、引言 在 Java 应用程序的运行过程中,垃圾收集(Garbage Collection,简称 GC)是一个非常重要的环节。GC 负责自动管理内存,回收不再使用的对象所占用的…

贵阳市人民政府副市长刘岚调研珈和科技

4月9日,贵阳市人民政府副市长、党组成员刘岚一行到珈和科技走访调研,珈和科技总经理冷伟热情接待了考察团,就企业算力需求与合作,特色产业园区建设,科技成果转化落地等方面进行深入交流。 贵阳市教育局局长李波&#…