10 Flink CDC

10 Flink CDC

  • 1. CDC是什么
  • 2. CDC 的种类
  • 3. 传统CDC与Flink CDC对比
  • 4. Flink-CDC 案例
  • 5. Flink SQL 方式的案例

1. CDC是什么

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
在广义的概念上,只要能捕获数据变更的技术,我们都可以称为 CDC 。通常我们说的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。
CDC 技术应用场景非常广泛:
数据同步,用于备份,容灾;
数据分发,一个数据源分发给多个下游;
数据采集(E),面向数据仓库/数据湖的 ETL 数据集成。

2. CDC 的种类

CDC 主要分为基于查询和基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:
在这里插入图片描述

3. 传统CDC与Flink CDC对比

  1. 传统 CDC ETL 分析
    在这里插入图片描述

  2. 基于 Flink CDC 的 ETL 分析
    在这里插入图片描述

  3. 基于 Flink CDC 的聚合分析
    在这里插入图片描述

  4. 基于 Flink CDC 的数据打宽
    在这里插入图片描述

4. Flink-CDC 案例

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。
开源地址:https://github.com/ververica/flink-cdc-connectors。
示例代码:

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;

public class FlinkCDC {
 public static void main(String[] args) throws Exception {
 //1.创建执行环境
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 //2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点
续传,需要从 Checkpoint 或者 Savepoint 启动程序
 //2.1 开启 Checkpoint,每隔 5 秒钟做一次 CK
 env.enableCheckpointing(5000L);
 //2.2 指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 //2.3 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
ointCleanup.RETAIN_ON_CANCELLATION);
 //2.4 指定从 CK 自动重启策略
 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
 //2.5 设置状态后端
 env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));
 //2.6 设置访问 HDFS 的用户名
 System.setProperty("HADOOP_USER_NAME", "atguigu");
 //3.创建 Flink-MySQL-CDC 的 Source
 //initial (default): Performs an initial snapshot on the monitored database tables upon 
first startup, and continue to read the latest binlog.
 //latest-offset: Never to perform snapshot on the monitored database tables upon first 
startup, just read from the end of the binlog which means only have the changes since the 
connector was started.
 //timestamp: Never to perform snapshot on the monitored database tables upon first 
startup, and directly read binlog from the specified timestamp. The consumer will traverse the 
binlog from the beginning and ignore change events whose timestamp is smaller than the 
specified timestamp.
 //specific-offset: Never to perform snapshot on the monitored database tables upon 
first startup, and directly read binlog from the specified offset.
 DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
 .hostname("hadoop01")
 .port(3306)
 .username("root")
 .password("000000")
 .databaseList("gmall-flink")
 .tableList("gmall-flink.z_user_info") //可选配置项,如果不指定该参数,则会
读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
 .startupOptions(StartupOptions.initial())
 .deserializer(new StringDebeziumDeserializationSchema())
 .build();
 //4.使用 CDC Source 从 MySQL 读取数据
 DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
 //5.打印数据
 mysqlDS.print();
 //6.执行任务
 env.execute();
 } 
}

5. Flink SQL 方式的案例

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQL_CDC {
 public static void main(String[] args) throws Exception {
 //1.创建执行环境
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 //2.创建 Flink-MySQL-CDC 的 Source
 tableEnv.executeSql("CREATE TABLE user_info (" +
 " id INT," +
 " name STRING," +
 " phone_num STRING" +
 ") WITH (" +
 " 'connector' = 'mysql-cdc'," +
 " 'hostname' = 'hadoop01'," +
 " 'port' = '3306'," +
 " 'username' = 'root'," +
 " 'password' = '000000'," +
 " 'database-name' = 'gmall-flink'," +
 " 'table-name' = 'z_user_info'" +
 ")");
 tableEnv.executeSql("select * from user_info").print();
 env.execute();
 }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/964143.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【25考研】南开软件考研复试复习重点!

一、复试内容 复试采取现场复试的方式。复试分为笔试、机试和面试三部分。三部分合计100分&#xff0c;其中笔试成绩占30%、机试成绩占30%、面试成绩占40%。 1.笔试&#xff1a;专业综合基础测试 考核方式&#xff1a;闭卷考试&#xff0c;时长为90分钟。 笔试考查内容范围…

【Cadence仿真技巧学习笔记】求解65nm库晶体管参数un, e0, Cox

在设计放大器的第一步就是确定好晶体管参数和直流工作点的选取。通过阅读文献&#xff0c;我了解到L波段低噪声放大器的mos器件最优宽度计算公式为 W o p t . p 3 2 1 ω L C o x R s Q s p W_{opt.p}\frac{3}{2}\frac{1}{\omega LC_{ox}R_{s}Q_{sp}} Wopt.p​23​ωLCox​Rs…

【leetcode练习·二叉树拓展】归并排序详解及应用

本文参考labuladong算法笔记[拓展&#xff1a;归并排序详解及应用 | labuladong 的算法笔记] “归并排序就是二叉树的后序遍历”——labuladong 就说归并排序吧&#xff0c;如果给你看代码&#xff0c;让你脑补一下归并排序的过程&#xff0c;你脑子里会出现什么场景&#xff…

[ESP32:Vscode+PlatformIO]新建工程 常用配置与设置

2025-1-29 一、新建工程 选择一个要创建工程文件夹的地方&#xff0c;在空白处鼠标右键选择通过Code打开 打开Vscode&#xff0c;点击platformIO图标&#xff0c;选择PIO Home下的open&#xff0c;最后点击new project 按照下图进行设置 第一个是工程文件夹的名称 第二个是…

Docker 部署教程jenkins

Docker 部署 jenkins 教程 Jenkins 官方网站 Jenkins 是一个开源的自动化服务器&#xff0c;主要用于持续集成&#xff08;CI&#xff09;和持续交付&#xff08;CD&#xff09;过程。它帮助开发人员自动化构建、测试和部署应用程序&#xff0c;显著提高软件开发的效率和质量…

MySQL锁类型(详解)

锁的分类图&#xff0c;如下&#xff1a; 锁操作类型划分 读锁 : 也称为共享锁 、英文用S表示。针对同一份数据&#xff0c;多个事务的读操作可以同时进行而不会互相影响&#xff0c;相互不阻塞的。 写锁 : 也称为排他锁 、英文用X表示。当前写操作没有完成前&#xff0c;它会…

《苍穹外卖》项目学习记录-Day11销量排名统计

销量排名需要查两张表&#xff0c;一张是order_detail&#xff0c;它里面有number字段&#xff0c;这个字段体现了商品的销售的份数。我们仅仅查这一张表是不够的&#xff0c;因为用户下单了&#xff0c;下单了之后就会产生订单数据和对应的订单详情数据&#xff0c;假设他下完…

走向基于大语言模型的新一代推荐系统:综述与展望

HightLight 论文题目&#xff1a;Towards Next-Generation LLM-based Recommender Systems: A Survey and Beyond作者机构&#xff1a;吉林大学、香港理工大学、悉尼科技大学、Meta AI论文地址&#xff1a; https://arxiv.org/abs/2410.1974 基于大语言模型的下一代推荐系统&…

Vue3学习笔记-模板语法和属性绑定-2

一、文本插值 使用{ {val}}放入变量&#xff0c;在JS代码中可以设置变量的值 <template><p>{{msg}}</p> </template> <script> export default {data(){return {msg: 文本插值}} } </script> 文本值可以是字符串&#xff0c;可以是布尔…

python算法和数据结构刷题[5]:动态规划

动态规划&#xff08;Dynamic Programming, DP&#xff09;是一种算法思想&#xff0c;用于解决具有最优子结构的问题。它通过将大问题分解为小问题&#xff0c;并找到这些小问题的最优解&#xff0c;从而得到整个问题的最优解。动态规划与分治法相似&#xff0c;但区别在于动态…

【阅读笔记】New Edge Diected Interpolation,NEDI算法,待续

一、概述 由Li等提出的新的边缘指导插值(New Edge—Di-ected Interpolation&#xff0c;NEDI)算法是一种具有良好边缘保持效果的新算法&#xff0c;它利用低分辨率图像与高分辨率图像的局部协方差问的几何对偶性来对高分辨率图像进行自适应插值。 2001年Xin Li和M.T. Orchard…

C++ Primer 迭代器

欢迎阅读我的 【CPrimer】专栏 专栏简介&#xff1a;本专栏主要面向C初学者&#xff0c;解释C的一些基本概念和基础语言特性&#xff0c;涉及C标准库的用法&#xff0c;面向对象特性&#xff0c;泛型特性高级用法。通过使用标准库中定义的抽象设施&#xff0c;使你更加适应高级…

DeepSeek R1 简易指南:架构、本地部署和硬件要求

DeepSeek 团队近期发布的DeepSeek-R1技术论文展示了其在增强大语言模型推理能力方面的创新实践。该研究突破性地采用强化学习&#xff08;Reinforcement Learning&#xff09;作为核心训练范式&#xff0c;在不依赖大规模监督微调的前提下显著提升了模型的复杂问题求解能力。 技…

直方图:摄影中的视觉数据指南

目录 一、直方图基础&#xff1a;揭开它的神秘面纱 二、解读直方图类型&#xff1a;亮度与色彩的密码 &#xff08;一&#xff09;亮度直方图 &#xff08;二&#xff09;RGB 直方图 三、拍摄中巧用直方图&#xff1a;优化曝光与效果 &#xff08;一&#xff09;精准判断曝…

力扣动态规划-19【算法学习day.113】

前言 ###我做这类文章一个重要的目的还是记录自己的学习过程&#xff0c;我的解析也不会做的非常详细&#xff0c;只会提供思路和一些关键点&#xff0c;力扣上的大佬们的题解质量是非常非常高滴&#xff01;&#xff01;&#xff01; 习题 1.矩形中移动的最大次数 题目链接…

树莓派pico入坑笔记,睡眠

关于树莓派pico和circuitpython的更多玩法&#xff0c;请看树莓派pico专栏 关于在 CircuitPython 中使用警报和浅/深度睡眠的更多信息&#xff0c;请参阅此学习指南。 树莓派pico支持浅睡眠和深度睡眠&#xff0c;其中深度睡眠唤醒后将从boot.py开始运行 支持按时间唤醒和引…

【蓝桥杯】日志统计

日志统计&#xff08;编程题&#xff09;https://dashoj.com/d/lqbproblem/p/53https://dashoj.com/d/lqbproblem/p/53https://dashoj.com/d/lqbproblem/p/53 题目 日志统计(编程题) 讲解 这个讲解感觉比较通俗易懂。 蓝桥杯2018年省赛B组08&#xff08;c/c&#xff09;日…

实验十一 Servlet(二)

实验十一 Servlet(二) 【实验目的】 1&#xff0e;了解Servlet运行原理 2&#xff0e;掌握Servlet实现方式 【实验内容】 改造实验10&#xff0c;引入数据库&#xff0c;创建用户表&#xff0c;包括用户名和密码&#xff1a;客户端通过login.jsp发出登录请求&#xff0c;请求…

Weevely代码分析

亲测php5和php8都无效&#xff0c;只有php7有效 ailx10 1949 次咨询 4.9 网络安全优秀回答者 互联网行业 安全攻防员 去咨询 上一次做weevely实验可以追溯到2020年&#xff0c;当时还是weevely3.7&#xff0c;现在的是weevely4 生成php网页木马依然差不多…… php菜刀we…

vue.js学习笔记

一、Vue概述 通过我们学习的htmlcssjs已经能够开发美观的页面了&#xff0c;但是开发的效率还有待提高&#xff0c;那么如何提高呢&#xff1f;我们先来分析下页面的组成。一个完整的html页面包括了视图和数据&#xff0c;数据是通过请求 从后台获取的&#xff0c;那么意味着我…