FlinkCDC的分析和应用代码

前言:原本想讲如何基于Flink实现定制化计算引擎的开发,并以FlinkCDC为例介绍;发现这两个在表达上不知以谁为主,所以先分析FlinkCDC的应用场景和技术实现原理,下一篇再去分析Flink能在哪些方面,做定制化计算引擎的开发操作。本文将从FlinkCDC应用场景开始,然后讲述其基于Flink的实现原理和代码应用,为下一篇介绍基于Flink开发定制化引擎做铺垫。

一、FlinkCDC应用场景

经常有同事或朋友问,Flink和FlinkCDC有什么区别?

Flink是一个流数据处理计算框架,FlinkCDC是数据采集工具:

Flink应用场景对比的是Storm、Spark;

FlinkCDC应用场景对比的是Sqoop、Canal、Maxwell和KafkaConnectSource、Debezium等;

FlinkCDC是Flink社区伙伴对数据采集需求,开发的一个SDK工具,让Flink在数据捕捉场景,使用起来更方便一些。

1.1 CDC的应用场景分析

       CDC的英文名是Change Data Capture (变化数据获取);解决的应用场景,是对存储中间件中数据的采集,比如Mysql、Orcle、PGSql、MongoDB等中间件;

采集的方式分为基于查询和基于BinLog两种;

以mysql的数据采集为例:可以通过jdbc批次查询,也可以通过Binlog解析增量数据采集;

两者的一些特性对比如下:

基于查询的CDC直接获取数据,基于binlog的采集需要开启binlog服务。

1.2 FlinkCDC的应用分析

       FlinkCDC和Canal实现的应用场景需求是差不多的,都是通过binlog采集增量数据;

但是可用性上的不同是:  

对于cannal类似的采集服务需要三步:

  • 1.开启mysql的binlog
  • 2.将数据写到kafka
  • 3.用flink订阅kafka中的数据进行业务需求处理

对于FlinkCDC:只需要在binlog开启后,直接在一个Flink任务内做业务处理(可以写到kafka处理也行);

       所有如cannal的采集功能服务,都需要单独维护一套服务,增加了运维负担,FlinkCDC可以当作任务部署到集群,大幅减轻了数据采集的应用难度;

用一个任务就完成这个应用功能:

二、FlinkCDC技术分析与本地操作

2.1 FlinkCDC的技术架构分析

       与Canal这些提供服务能力的服务不同,FlinkCdc只是一个任务,可以简单的开发和部署。

       Flink是借用了Debezium的功能,Debezium是一个可轻量级嵌入代码逻辑的服务,将Debezium的采集功能,用Flink的sourceFunction包装,然后打包成SDK提供给Flink开发使用;

       借用Flink自己的算子和sink能力,可以将采集到的数据以Flink的特性加工数据,并将数据写入Flink内置的connect组件,sink到服务里,如Kafka、Pulser、ES、RabbitMQ、MongoDB等。

2.2 本地操作
2.2.1准备mysql数据库表和数据
use flink_test;
#检测binlog是否开启
show variables like '%log_bin%'

#构建测试表
CREATE TABLE `event_info` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(255) NOT NULL,
  `category` varchar(512) DEFAULT NULL,
  `pv` int DEFAULT 0,
  `uv` int DEFAULT 0,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

#写入数据
insert  into `event_info`(`id`,`name`,`category`,`pv`,`uv`) values 
(1,'aaa','nfh',20,8),
(2,'bbb','dfgf',30,2),
(3,'ccc','fsd',40,4),
(4,'ddd','afs',50,7),
(5,'eee','asfa',60,3)
(6,'aaa','nfh',20,8),
(7,'bbb','dfgf',30,2),
(8,'ccc','fsd',40,4),
(9,'ddd','afs',50,7),
(10,'eee','asfa',60,3);
2.2.2 pom文件

       注意Flink和FlinkCDC的版本映射,很多显示的可以关联的版本之间是冲突的,这是一个很繁琐的工作,我调试各个版本之间的映射,花了一天左右的时间[求赞求收藏];

下面这是Flink1.14.5版本和FlinkCDC2.2.1版本已经调好的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>changedateDoris</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.version>2.12</scala.version>
        <java.version>1.8</java.version>
        <flink.version>1.14.5</flink.version>
        <fastjson.version>1.2.62</fastjson.version>
        <hadoop.version>2.8.3</hadoop.version>
        <scope.mode>compile</scope.mode>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>
        <!--        springboot 依赖-->
        <!-- flink  -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <!-- Add log dependencies when debugging locally -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- mysql-connector -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.12</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>2.2.1 </version>
            <exclusions>
                <exclusion>
                    <artifactId>flink-shaded-guava</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

    </dependencies>

 <build>
<!--        <filters>-->
<!--            <filter>${project.basedir}/src/main/resources/env/application-${profileActive}.properties</filter>-->
<!--        </filters>-->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <artifactSet>
                                <includes>
                                    <include>*:*</include>
                                </includes>
                                <excludes>
                                    <exclude>org.slf4j:slf4j-api:jar:</exclude>
                                </excludes>
                            </artifactSet>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.handlers</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.schemas</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.factories</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-resources-plugin</artifactId>
                <configuration>
                    <encoding>utf-8</encoding>
                    <useDefaultDelimiters>true</useDefaultDelimiters>
                    <delimiters>
                        <delimiter>$[*]</delimiter>
                    </delimiters>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                        <configuration>
                            <descriptorRefs>
                                <descriptorRef>jar-with-dependencies</descriptorRef>
                            </descriptorRefs>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2.2.3 java代码
package yto.com.net.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class mysqlBinlogRead {
    
    private static final Logger log = LoggerFactory.getLogger(mysqlBinlogRead.class);

    public static void main(String[] args) throws Exception {

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("ip")
                .port(3306)
                .databaseList("flink_test")
                .tableList("flink_test.event_info")
                .username("mysqlUser")
                .password("password")
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
//                .startupOptions(StartupOptions.earliest())
                .build();

        Configuration configuration = new Configuration();
        configuration.setInteger(RestOptions.PORT, 8083);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // enable checkpoint
        env.enableCheckpointing(10000);

        DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");

        SingleOutputStreamOperator<String> process = cdcSource.process(new ProcessFunction<String, String>() {
            @Override
            public void processElement(String row, ProcessFunction<String, String>.Context context, Collector<String> collector) throws Exception {
                JSONObject rowJson = JSON.parseObject(row);

                String op = rowJson.getString("op");

                JSONObject source = rowJson.getJSONObject("source");
                String table = source.getString("table");

            }
        });
        
        cdcSource.print("message=:");
        env.execute("flinkCdc Read message");
    }


}
2.2.4 运行结果

如图所示,已经写入库的结果通过connect获取,增量数据通过binlog获取;

注意:表中的历史数据过多,全量读取的时候将会内存溢出。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/315273.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【动态规划】LeetCode-42. 接雨水

42. 接雨水。 给定 n 个非负整数表示每个宽度为 1 的柱子的高度图&#xff0c;计算按此排列的柱子&#xff0c;下雨之后能接多少雨水。 示例 1&#xff1a; 输入&#xff1a;height [0,1,0,2,1,0,1,3,2,1,2,1] 输出&#xff1a;6 解释&#xff1a;上面是由数组 [0,1,0,2,1…

OpenCV-22高斯滤波

一、高斯函数的基础 要理解高斯滤波首先要直到什么是高斯函数&#xff0c;高斯函数是符合高斯分布的&#xff08;也叫正态分布&#xff09;的数据的概率密度函数。 高斯函数的特点是以x轴某一点&#xff08;这一点称为均值&#xff09;为对称轴&#xff0c;越靠近中心数据发生…

css——文字实现渐变色的两种方案

&#xff08;一&#xff09;通过设置color、background-image及background-clip实现文字颜色渐变 <template><span class"title">文字实现渐变色的两种方案</span> </template><style> .title {color: transparent;background-image:…

Windows 项目从0到1的部署

目录 一. 安装jdk 1.1 安装jdk 1.2 配置jdk的环境配置jdk 1.3 配置成功 二. 配置tomcat 2.1 启动tomcat 2.2 防火墙设置 三. 安装MySQL 3.1 安装步骤 3.2 内部连接 3.3 外部连接 四. 部署项目 4.1 项目部署 4.2 修改mysql的用户密码 一. 安装jdk 这里给大家准备好了jdk和…

【Python】使用Opencv裁剪指定区域,再重构大小和保存示例

在Python中&#xff0c;使用OpenCV库可以很方便地截取图像的某一区域&#xff0c;然后尺寸重构&#xff0c;最后保存为新的图像文件。以下是一个示例代码&#xff0c;演示如何实现这一操作&#xff1a; import cv2# 读取图像 img cv2.imread(img.jpg)# 定义截取的区域&#x…

小学生练字神器,在线字帖生成器网站源码

源码介绍 帮助小学生规范汉字书写&#xff0c;提高汉字书写水平&#xff0c;在线小学生练字神器&#xff01;它不仅功能强大&#xff0c;而且完全免费&#xff0c;是每个小学生必备的练字工具。 海量字库&#xff1a;收录9574个常用汉字&#xff0c;满足小学生的学习需求。多…

Java面试之虚拟机

1、前言 本篇的面试题基于网络整理&#xff0c;和自己编辑。在不断的完善补充哦。 2、什么是虚拟机&#xff1f; Java 虚拟机&#xff0c;是一个可以执行 Java 字节码的虚拟机进程。Java 源文件被编译成能被 Java 虚拟机执行的字节码文件( .class )。 Java 被设计成允许应用程…

【模型评估 07】过拟合与欠拟合

在模型评估与调整的过程中&#xff0c;我们往往会遇到“过拟合”或“欠拟合”的情况。如何有效地识别“过拟合”和“欠拟合”现象&#xff0c;并有针对性地进行模型调整&#xff0c;是不断改进机器学习模型的关键。特别是在实际项目中&#xff0c;采用多种方法、从多个角度降低…

【自控实验】3. 带有饱和非线性环节控制系统相平面分析

本科课程实验报告&#xff0c;有太多公式和图片了&#xff0c;干脆直接转成图片了 仅分享和记录&#xff0c;不保证全对 实验内容&#xff1a; 有无非线性环节的相轨迹对比&#xff0c;并求超调量。 在输入单位阶跃信号Xsr时&#xff0c;用示波器观察和记录系统输入饱和非线…

最强联网Chat GPT 火爆全网高速 永久免费

&#x1f534;高速联网 秒响应支持语音通话&#x1f388; 首先介绍一下她的功能吧&#x1f601; 女友消息代回机&#x1f44c;&#x1f3fb; 朋友圈文案&#x1f44c;&#x1f3fb; 聊天话术&#x1f44c;&#x1f3fb; 高情商回复&#x1f44c;&#x1f3fb; 脱单助…

redis源码之:事件驱动epoll

一、aeEventLoop初始化 从server.c的main方法中进入initServer&#xff0c;在initServer方法中&#xff0c;server.el aeCreateEventLoop(server.maxclientsCONFIG_FDSET_INCR);创建eventloop&#xff1a;&#xff08;注意fileevent与epollevent的区分fileEvent是标识往epoll…

直播带货2024:洗牌、阵痛和暗流涌动

文 | 螳螂观察 作者 | 青月 一天前&#xff0c;大学生齐夏根本不会在直播间购买《额尔古纳河右岸》这种书籍。 她是喜欢看小说&#xff0c;但只钟爱悬疑无限流题材&#xff0c;至于《额尔古纳河右岸》这种讲述一个弱小民族顽强的抗争和优美的爱情的长篇小说&#xff0c;用齐…

mysql表的约束问题

目录 1. 表的约束问题: 主键约束: 案例&#xff1a; 非空约束 唯一约束: 默认值约束: 案例&#xff1a; 2.表的结构 前提:接上一张博客内容经行操作 1. 表的约束问题: 用来保证数据插入的安全性、完整性、正确性 主健,外键,唯一,默认值,非空,检查 1.1…

Docker的基础概念及命令

这篇主要介绍一下Docker比较重要的两个概念&#xff0c;镜像和容器&#xff0c;以及操作它们的一些常用命令。 文章目录 一、基础命令二、镜像三、容器 一、基础命令 docker -v&#xff1a;查看 Docker 的版本 systemctl start docker&#xff1a;启动docker systemctl stat…

Surface mesh结构学习

CGAL 5.6 - Surface Mesh: User Manual Surface_mesh 类是半边数据结构的实现&#xff0c;可用来表示多面体表面。它是半边数据结构&#xff08;Halfedge Data Structures&#xff09;和三维多面体表面&#xff08;3D Polyhedral Surface&#xff09;这两个 CGAL 软件包的替代品…

如何关闭iPhone 14或14 Pro Max,这里有详细步骤

你刚买了新的iphone 14或iphone 14 pro max&#xff0c;迫不及待地想开始使用它。但如果你需要关闭它怎么办&#xff1f;有几种方法可以用来关闭这两种设备。 如何关闭iPhone 14 你可以通过每个人都熟悉的老式侧按钮轻松关闭iPhone 14&#xff0c;也可以通过面部识别关闭它。 …

Unity之角色控制器

PS:公司终于给我派任务了&#xff0c;最近几天都没学Unity&#x1f927;。 一、角色控制器的实现方式 目前小编知道的角色控制器实现方式有三种&#xff1a; 应用商店的角色控制系统Unity自己的角色控制器通过物理系统去做角色控制器 本篇介绍的是第二种Unity自己的角色控制…

电商新趋势:解析养号的必要性及海外云手机运用攻略

在电商领域&#xff0c;什么最为关键&#xff1f;答案无疑是流量&#xff01;然而&#xff0c;如何以较低成本获取大量流量成为了许多电商从业者头疼的问题。虽然直接投放广告是一种方式&#xff0c;但在内卷的情况下效果越来越难以令人满意&#xff0c;高昂的广告费用也原来越…

Java零基础教学文档第三篇:JDBC

今日新篇章 【JDBC】 【主要内容】 JDBC概述 使用JDBC完成添加操作 使用JDBC完成更新和删除 DBUtils的简单封装 使用JDBC完成查询 使用JDBC完成分页查询 常用接口详解 JDBC批处理 SQL注入问题 事务处理解决转账问题 连接池 使用反射对DBUtils再次的封装 BaseDAO的封…

供水管网动态模型分类及应用分析

当供水管网中发生启停泵、快速关阀等事件时, 延时时段模拟 (即准稳态模型) 不能准确预测系统的瞬时动态变化, 而需要采用更为准确复杂的瞬变流动态模型。为明确多种动态模型之间的差异, 探讨和分析了供水管网动态模型的分类、模型理论以及在管网运行管理中的应用。结果表明, 准…