CDC 整合方案:MySQL > Flink CDC > Kafka > Hudi

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

继上一篇 《CDC 整合方案:MySQL > Kafka Connect + Schema Registry + Avro > Kafka > Hudi》 讨论了一种典型的 CDC 集成方案后,本文,我们改用 Flink CDC 完成同样的 CDC 数据入湖任务。与上一个方案有所不同的是:借助现有的 Flink 环境,我们可以直接使用 Flink CDC 从源头数据库接入数据,所以这是一个完整的端到端的解决方案,而上一篇文章我们省略了搭建 Kafka Connect + Debezium MySQL Connector 采集 CDC 数据的环节,因为这部分操作确实很复杂,很难在一篇文章中详细展开,这也说明了使用 Flink CDC 的一个优势,那就是:Flink CDC 在应用和架构上确实要比 Kafka Connect + Debezium MySQL Connector 的组合简单很多,如果你需要,甚至可以跳过 Kafka 直接将数据落到数据湖上。

1. 环境准备


  • 本文依旧使用 Debezium 官方提供的一个 MySQL Docker镜像,构建操作可参考其 官方文档,使用的是其内置的 inventory 数据库;
    (https://docs.confluent.io/platform/7.4/installation/docker/config-reference.html#sr-long-configuration);

  • 我们需要安装多个 Flink Connector 和 Format 组件,包括:Flink CDC MySQL Connector、Flink Hudi Connector、Flink ‘debezium-avro-confluent’ Format Support,关于这些组件的安装,已经全部记录在了《Flink SQL Client 安装各类 Connector、Format 组件的方法汇总(持续更新中…)》一文中,请移步此文选择需要的组件酌情安装;

  • 安装好各种依赖组件后,执行如下脚本,清空 Hudi 表目标位置上的文件,停止正在运行中的 Yarn App,并启动一个新的 Flink Yarn Session:

    echo "clean kafak topic..."
    # run on a host installed kafka console client
    kafka-topics.sh --bootstrap-server 'b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092' --delete --topic 'orders_kafka_debezium_json'
    echo "clean hudi table target location..."
    aws s3 rm --recursive s3://glc-flink-hudi-test/sink_hudi_orders
    echo "Kill all running apps..."
    for appId in $(yarn application -list -appStates RUNNING 2>1 | awk 'NR > 2 { print $1 }'); do
        yarn application -kill $appId &> /dev/null
    done
    echo "start a flink yarn session..."
    flink-yarn-session -d
    
  • 清理 Kafka 中的 Topic (二次测试时会上次写入的消息会影响测试结果)

    export KAFKA_BOOTSTRAP_SERVERS='b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092'
    kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --list
    kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --delete --topic 'orders.*'
    kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --list
    
  • 启动 Flink SQL Client

    /usr/lib/flink/bin/sql-client.sh embedded shell
    

2. 创建 Flink CDC 源表


本文依旧使用 Debezium 官方提供的一个 MySQL Docker镜像,构建操作可参考其 官方文档,使用的是其内置的 inventory 数据库。在一个既有的 Flink 环境上提前安装 Kafka 和 Flink CDC Connector 以及 Debezium Json Format,具体安装方法参考:Flink SQL Client 安装各类 Connector、组件的方法汇总(持续更新中…)。环境准备好后,打开 Flink SQL Client,执行如下建表语句:

SET 'sql-client.execution.result-mode' = 'TABLEAU';

DROP TABLE IF EXISTS orders_mysql_cdc;

CREATE TABLE IF NOT EXISTS orders_mysql_cdc (
    `order_number` INT NOT NULL,
    `order_date` DATE NOT NULL,
    `purchaser` INT NOT NULL,
    `quantity` INT NOT NULL,
    `product_id` INT NOT NULL,
    CONSTRAINT `PRIMARY` PRIMARY KEY (`order_number`) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '10.0.13.30',
    'port' = '3307',
    'username' = 'root',
    'password' = 'Admin1234!',
    'database-name' = 'inventory',
    'table-name' = 'orders'
);

-- SELECT * FROM orders_mysql_cdc;

3. 创建 Kafka 中间表 ( debezium-json 格式)


Kafka 中间表使用 kafka connector + debezium-json 格式,也就是 《Flink CDC 与 Kafka 集成:State Snapshot 还是 Changelog?Kafka 还是 Upsert Kafka?》一文 第 《4. 测试组合:'connector'='kafka' + 'format'='debezium-json'》一节介绍的方式:

DROP TABLE IF EXISTS orders_kafka_debezium_json;

CREATE TABLE IF NOT EXISTS orders_kafka_debezium_json (
    order_number int,
    order_date   date,
    purchaser    int,
    quantity     int,
    product_id   int
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders_kafka_debezium_json',
    'properties.bootstrap.servers' = 'b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092',
    'properties.group.id' = 'orders_kafka_debezium_json',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'debezium-json'
);

insert into orders_kafka_debezium_json select * from orders_mysql_cdc;

-- select * from orders_kafka_debezium_json;

4. 创建 Hudi 目标表


Kafka 中流式注入 debezium-json 格式的 CDC 消息后,就可以写入 Hudi 表了,Hudi 表使用 Hudi Connector 创建,必须使用流式读取(‘read.streaming.enabled’=‘true’),且必须开启 changelog 模式(‘changelog.enabled’ = ‘true’):

SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.interval' = '2s';
SET 'state.backend' = 'rocksdb';
SET 'state.backend.incremental' = 'true';
SET 'state.checkpoints.num-retained' = '10';

DROP TABLE IF EXISTS sink_hudi_orders;

CREATE TABLE IF NOT EXISTS sink_hudi_orders (
    order_number int PRIMARY KEY NOT ENFORCED,
    order_date   date,
    purchaser    int,
    quantity     int,
    product_id   int
) WITH (
    'connector' = 'hudi',
    'path' = 's3://glc-flink-hudi-test/sink_hudi_orders',
    'table.type' = 'MERGE_ON_READ',
    'changelog.enabled' = 'true',
    'read.streaming.enabled'='true',
    'read.streaming.check-interval' = '2',
    'read.streaming.skip_compaction' = 'true',
    'read.streaming.start-commit' = 'earliest'
);

insert into sink_hudi_orders select * from orders_kafka_debezium_json;

select * from sink_hudi_orders;

5. 完整演示


2024-02-06_12-06-25


关联阅读

《CDC 整合方案:MySQL > Flink CDC + Schema Registry + Avro > Kafka > Hudi》

《CDC 整合方案:MySQL > Flink CDC > Kafka > Hudi》

《CDC 整合方案:MySQL > Kafka Connect + Schema Registry + Avro > Kafka > Hudi》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/397127.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

力扣145 二叉树的后序遍历 Java版本

文章目录 题目描述递归解法代码 非递归解法思路代码 题目描述 给你一棵二叉树的根节点 root ,返回其节点值的 后序遍历 。 示例 1: 输入:root [1,null,2,3] 输出:[3,2,1] 示例 2: 输入:root [] 输出…

log4j2的使用

基础用法 1. pom文件导入依赖 junit用来做测试 <dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.5</version></dependency><dependency><groupId>org.…

第五次作业(防御安全)

需求: 1.办公区设备可以通过电信链路和移动链路上网&#xff08;多对多的NAT&#xff0c;并且需要保留一个公网IP 不能用来转换&#xff09; 2.分公司设备可以通过总公司的移动链路和电信链路访问到DMZ区的http服务器 3.分公司内部的客户端可以通过公网地址访问到内部的服务…

两大公示 总结先行先试经验,提炼可复制推广成果

2024年1月18日&#xff0c;水利部官网发布《数字孪生水利建设典型案例名录&#xff08;2023年&#xff09;》&#xff08;共28项&#xff0c;排名不分先后&#xff09;、《数字孪生水利建设十大样板名单&#xff08;2023年&#xff09;》&#xff08;排名不分先后&#xff09;等…

从数据库中读取文件导出为Excel

使用的库&#xff08;org.apache.poi&#xff09; 在poi包中有Apache提供的各种分类文件&#xff0c;如下 结构功能HSSF读写Microsoft Excel XLS文件XSSF读写Microsoft Excel OOXML XLSX文件HWPF读写Microsoft Word DOC文件HSLF读写Microsoft PowerPoint文件 下面以XSSF为例&…

代码随想录算法训练营29期|day55 任务以及具体安排

第九章 动态规划part12 309.最佳买卖股票时机含冷冻期 class Solution {public int maxProfit(int[] prices) {//0代表持股票&#xff0c;1代表保持卖出状态&#xff0c;2代表卖出股票。3代表冷冻int[][] dp new int[prices.length][4];dp[0][0] -prices[0];for(int i 1 ; …

Redis事务长什么样?一文带你全面了解

一、概述 1.1、Redis事务简介 在 Redis 中&#xff0c;事务是一组命令的有序队列&#xff0c;Redis 使用 MULTI、EXEC、WATCH 和 DISCARD 等命令来实现事务。事务的执行是原子的&#xff0c;要么所有命令都执行成功&#xff0c;要么所有命令都不执行。事务中的命令在 EXEC 执行…

Avalonia学习(二十五)-系统界面

目前项目式练习&#xff0c;界面内容偏多&#xff0c;所以不给大家贴代码了&#xff0c;可以留言交流。此次为大家展示的是物联项目的例子&#xff0c;仅仅是学习&#xff0c;我把一些重点列举一下。 界面无边框 同前面 treeview控件 通过treevie控件导航 tabcontrol控件 …

AUTOSAR CP--chapter7从CAN网络学习Autosar通信

从CAN网络学习Autosar通信 前言缩写词CAN通信在AUTOSAR架构中的传输上位机配置 第六章总结&#xff1a;学习了如何使用工具的自动配置功能&#xff0c;位我们生成系统描述中部分ecu的BSW模块配置&#xff0c;但是自动配置的功能虽然为我们提供了极大的便利&#xff0c;我们仍然…

css3的var()函数

css3的var()函数 变量要以两个连字符--(横杆)(减号)为开头 变量可以在:root{}中定义, :root可以在css中创建全局样式变量。通过 :root本身写的样式&#xff0c;相当于 html&#xff0c;但优先级比后者高。 在CSS3中&#xff0c;var()函数是一个用于插入CSS自定义属性&#xff…

Active Directory 的密码管理策略

员工使用的密码可以决定或破坏组织中的数据安全性&#xff0c;但是&#xff0c;知道员工通常不遵循良好的密码卫生习惯也就不足为奇了。从在本机工具&#xff08;如 Windows Active Directory 组策略&#xff09;中设置弱密码和通用密码到宽松的密码策略规则&#xff0c;有几个…

Eclipse - Text Editors (文本编辑器)

Eclipse - Text Editors [文本编辑器] References Window -> Preferences -> General -> Editors -> Text Editors Displayed tab witdth: 4 勾选 Insert spaces for tabs 勾选 Show line number References [1] Yongqiang Cheng, https://yongqiang.blog.csdn.n…

学习总结19

# 奶牛的耳语 ## 题目描述 在你的养牛场&#xff0c;所有的奶牛都养在一排呈直线的牛栏中。一共有 n 头奶牛&#xff0c;其中第 i 头牛在直线上所处的位置可以用一个整数坐标 pi(0< pi < 10^8) 来表示。在无聊的日子里&#xff0c;奶牛们常常在自己的牛栏里与其它奶牛交…

【Azure 架构师学习笔记】- Azure Databricks (7) --Unity Catalog(UC) 基本概念和组件

本文属于【Azure 架构师学习笔记】系列。 本文属于【Azure Databricks】系列。 接上文 【Azure 架构师学习笔记】- Azure Databricks (6) - 配置Unity Catalog 前言 在以前的Databricks中&#xff0c;主要由Workspace和集群、SQL Warehouse组成&#xff0c; 这两年Databricks公…

NLP_BERT与GPT争锋

文章目录 介绍小结 介绍 在开始训练GPT之前&#xff0c;我们先比较一下BERT和 GPT 这两种基于 Transformer 的预训练模型结构&#xff0c;找出它们的异同。 Transformer架构被提出后不久&#xff0c;一大批基于这个架构的预训练模型就如雨后春笋般地出现了。其中最重要、影响…

2024全年放假日历表及调休安排 用手机便签设置放假倒计时

对于绝大多数的上班族来说&#xff0c;春节长假已经结束&#xff0c;现在要回归到正常的工作和生活中。为了给生活增加一些“盼头”&#xff0c;很多小伙伴不约而同打开手机日历&#xff0c;查看下个法定节假日是什么时候。下面给大家具体讲一下2024全年放假日历表及调休安排&a…

EasySass: could not generate CSS file. See Output panel for details.微信小程序报错及解决

解决微信小程序导入vscode的easysass包报错 问题发现问题来源和解决制作不易&#xff0c;感谢三联&#xff0c;谢谢大家啦 问题发现 当我喜滋滋的在vscode中导入easysass包之后&#xff0c;又在微信小程序中添加vscode扩展&#xff0c;又去文件中改好了配置文件后却直接弹出了…

5G——物理层仿真

1.前置条件 2.仿真流程 1.填写搜索过程 解&#xff1a; 2.填写每一步细节 2.2.1 准备 解&#xff1a; &#xff08;1&#xff09;BCH &#xff08;2&#xff09;BCCH 解析&#xff1a;因为PBCH是物理广播信道&#xff0c;BCCH是用于广播系统控制信息的下行信道&#…

生成对抗网络----GAN

系列文章目录 文章目录 系列文章目录前言一、基本构成二、应用领域三、基本原理四、如何训练GAN 前言 一、基本构成 GAN (Generative Adversarial Network) : 通过两个神经网络&#xff0c;即生成器&#xff08;Generator&#xff09;和判别器&#xff08;Discriminator&#…

【Linux 内核源码分析】虚拟内存地址空间

在现代操作系统中&#xff0c;每个进程被分配了独享的虚拟内存地址空间。这个地址空间可以视为一维线性空间&#xff0c;由多个连续的内存页组成。初始时&#xff0c;操作系统会将整个虚拟地址空间分成几个不同的区域&#xff0c;每个区域用于特定的目的。以下是一个常见的布局…