使用Flink CDC实时监控MySQL数据库变更

在现代数据架构中,实时数据处理变得越来越重要。Flink CDC(Change Data Capture)是一种强大的工具,可以帮助我们实时捕获数据库的变更,并进行处理。本文将介绍如何使用Flink CDC从MySQL数据库中读取变更数据,并将其打印到控制台。

环境准备

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.3</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.75</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.23</version>
</dependency>
  1. 获取Flink执行环境

首先,我们需要获取Flink的执行环境。这是所有Flink作业的起点。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  1. 启用检查点和设置并行度

为了确保作业的容错性和状态恢复,我们需要启用检查点,并设置作业的并行度。

env.enableCheckpointing(500); // 每500毫秒创建一个检查点
env.setParallelism(1); // 设置作业的并行度为1
  1. 使用Debezium Source读取MySQL的binlog

接下来,我们使用Debezium Source读取MySQL的binlog。我们需要配置MySQL的连接信息、监控的数据库和表、反序列化器以及启动选项。

DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
        .serverTimeZone("Asia/Shanghai") // 设置时区为亚洲/上海
        .hostname("localhost") // MySQL的IP地址
        .port(3306) // MySQL的端口
        .username("root") // MySQL的用户名
        .password("123456") // MySQL的密码
        .databaseList("my_db") // 监控的数据库
        .tableList("my_db.user") // 监控的数据库下的表
        .deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化
        .startupOptions(StartupOptions.initial()) // 启动选项
        .build();

这里 JsonDebeziumDeserializationSchema类的代码如下:

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

/**
*  自定义DeserializationSchema进行反序列化。
*/

public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
   @Override
   public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
       //创建JSON对象用于存储最终数据
       JSONObject result = new JSONObject();
       String topic = sourceRecord.topic();
       String[] fields = topic.split("\\.");
       String database = fields[1];
       String tableName = fields[2];
       Struct value  = (Struct)sourceRecord.value();
       //获取before数据
       Struct before = value.getStruct("before");
       JSONObject beforeJson = getJson(before);
       //获取after数据
       Struct after = value.getStruct("after");
       JSONObject afterJson = getJson(after);
       //获取操作类型
       Envelope.Operation operation = Envelope.operationFor(sourceRecord);
       //将字段写入JSON对象
       result.put("database",database);
       result.put("tableName",tableName);
       result.put("type",operation);
       result.put("before",beforeJson);
       result.put("after",afterJson);
       //输出数据
       collector.collect(result.toJSONString());
   }
   /**
    *  获取字段值并写入result对象
    * @param before
    * @return
    */
   private JSONObject getJson(Struct before) {
       JSONObject jsonObject = new JSONObject();
       if(before != null){
           Schema beforeSchema = before.schema();
           List<Field> beforeFields = beforeSchema.fields();
           for (Field field : beforeFields) {
               Object beforeValue = before.get(field);
               jsonObject.put(field.name(), beforeValue);
           }
       }
       return jsonObject;
   }
   @Override
   public TypeInformation getProducedType() {
       return BasicTypeInfo.STRING_TYPE_INFO;

   }
}
  1. 添加数据源并打印数据

将Debezium源函数添加到Flink环境中,生成一个数据流,并将数据流中的数据打印到控制台。

DataStream<String> dataStreamSource = env.addSource(sourceFunction, TypeInformation.of(String.class));
DataStreamSink<String> print = dataStreamSource.print();
  1. 启动任务

最后,启动Flink作业,开始处理数据流。

env.execute("Flink-CDC");

6.测试

在这里插入图片描述

总结

通过上述步骤,我们可以使用Flink CDC实时监控MySQL数据库的变更,并将变更数据以JSON格式打印出来。这种方法不仅适用于数据监控,还可以用于实时数据处理和分析。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/749621.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Docker部署常见应用之Oracle数据库

文章目录 安装部署参考文章 安装部署 使用Docker安装Oracle数据库是一个相对简便的过程&#xff0c;可以避免在本地环境中直接安装Oracle数据库的复杂性。 安装Docker环境&#xff1a;确保你的系统上已经安装了Docker&#xff0c;并且Docker服务正在运行。具体的安装方法可以根…

TikTok网页版使用指南:如何登录TikTok网页版?

海外版抖音TikTok&#xff0c;已成为连接全球观众的重要平台。据统计&#xff0c;在美国&#xff0c;TikTok的用户数量已达到近1.3亿&#xff0c;并且在国外的95后用户群体中很受欢迎。 TikTok网页版也提供了一个广阔的平台&#xff0c;让品牌和创作者在电脑端与全球观众互动&…

数据结构与算法基础(王卓)--学习笔记

1 数据结构分类 1.1 逻辑结构分类 集合结构线性结构&#xff1a;线性表、栈、队列、串树形结构图形结构 1.2 物理结构分类 逻辑结构在计算机中的真正表示方式&#xff08;又称为映射&#xff09;称为物理结构&#xff0c;也可叫做存储结构 顺序存储结构&#xff1a;数组链…

【Unity】Excel配置工具

1、功能介绍 通过Excel表配置表数据&#xff0c;一键生成对应Excel配置表的数据结构类、数据容器类、已经二进制数据文件&#xff0c;加载二进制数据文件获取所有表数据 需要使用Excel读取的dll包 2、关键代码 2.1 ExcelTool类 实现一键生成Excel配置表的数据结构类、数据…

Centos7源码方式安装sqle及开发相关

官方文档-源码安装 操作系统&#xff1a;centos:7.9,everything (DVD版应该也可以) (在ubuntu22.04装了两天之后乖乖开了一个新Centos7虚拟机) 镜像&#xff1a;清华大学开源软件镜像站 centos/7.9.2009 安装git sudo yum update -y sudo yum install -y git git --version安…

Sonia索尼娅:填补心理健康护理缺口的创新人工智能治疗师应用APP

聊天机器人可以取代人类治疗师吗&#xff1f;一些初创公司和患者声称他们可以。但这并不是完全确定的科学。 一项引人注目的研究发现&#xff0c;高达80%的使用OpenAI的ChatGPT寻求心理健康建议的人认为&#xff0c;这项技术可作为传统治疗的理想替代方案。与此同时&#xff0…

Android高级面试_2_IPC相关

Android 高级面试-3&#xff1a;语言相关 1、Java 相关 1.1 缓存相关 问题&#xff1a;LruCache 的原理&#xff1f; 问题&#xff1a;DiskLruCache 的原理&#xff1f; LruCache 用来实现基于内存的缓存&#xff0c;LRU 就是最近最少使用的意思&#xff0c;LruCache 基于L…

国外8年联培访学迎来逆袭|国家最高科学技术奖薛其坤成长史

国家最高科技奖花落薛其坤&#xff0c;他是该奖项史上最年轻得主。在追踪其成长史的过程中&#xff0c;知识人网小编注意到&#xff1a;薛其坤的学习研究开局并不顺利&#xff0c;直至到日本做联合培养博士研究生&#xff0c;他才真正迎来了自己学术生涯的重要转折点。后来到美…

面试相关-接口测试常问的问题

1.为什么要做接口测试 (1)现在大多系统都是前后端分离的项目,前端和后端的进度可能不一样,那为了尽早的进入测试,前端界面没有开发完成的情况下,只要后端的接口开发完了,就可以提前做接口测试了; (2)基于安全考虑,只依赖前端进行限制,已经完全不满足系统的安全性…

ELK日志集成

https://www.bilibili.com/video/BV1x94y1674x/?buvidXY705117E90F73A790429C9CFBD5F70F22168&vd_source939ea718db29535a3847d861e5fe37ef

Aigtek:为何要使用电压放大器

电压放大器在现代电子技术中起到了至关重要的作用。它是一种电子设备&#xff0c;用于将输入信号的电压增大到所需的输出电压水平。电压放大器的使用有以下几个方面的原因和优势。 电压放大器可以提高信号的强度和质量。许多实际应用中的输入信号往往很微弱&#xff0c;比如来自…

“管式加热炉简单控制系统和串级控制系统设计与Matlab仿真”,高分资源,匠心制作,下载可用。强烈推荐!!!

“管式加热炉简单控制系统和串级控制系统设计与Matlab仿真”毕业设计&#xff0c;高分资源&#xff0c;匠心制作&#xff0c;下载可用。强烈推荐&#xff01;&#xff01;&#xff01; 1.控制目标 加热炉的任务是把原油加热到一定温度&#xff0c;以保证下道工艺的顺利进行。…

windows安装mysql8.0.35保姆级教程

一、下载mysql安装包 点击mysql安装包下载链接&#xff1a;https://downloads.mysql.com/archives/community/ 选择window版本&#xff0c;点击下载按钮&#xff0c;如下所示&#xff1a; 二、解压安装包并新建my.ini文件 将下面内容复制到新建的my.ini文件里面 [mysqld] #…

阿里云oss存储

文章目录 准备阿里云的OSS控制台创建bucket获取AccessKey java使用oss导入依赖官网demo修改参数运行demo代码 封装工具类Oss下载如何保证指定时间段内可以访问私有权限的图片文件&#xff1f; 准备阿里云的OSS 控制台 访问阿里云官网&#xff0c;登录以后&#xff0c;右上角有…

大众点评根据关键词搜索采集店铺信息

大众点评根据关键词搜索采集店铺信息&#xff0c;包括店铺名称、大中小分类、省市区划分、人均价格、评价数量、团购数量、全部团购名称、全部团购链接&#xff08;团购信息还可解析出每个团购的价格&#xff09; ​​​

【代码安全】如何通过实现代码加密与魔改Python,防止代码泄露、恶意窃取

如何通过实现代码加密与魔改Python&#xff0c;防止代码泄露、恶意窃取 文章目录 如何通过实现代码加密与魔改Python&#xff0c;防止代码泄露、恶意窃取前言概述代码运行演示Step 0: 正常代码运行Step 1: 代码加密Step 2: 加密代码在魔改环境运行Step 3: 加密代码在正常环境运…

matlab编辑稀疏单位方阵

创建 10001000 稀疏单位方阵&#xff0c;并查看稀疏模式。 &#xff08;1&#xff09; I speye(1000); spy(I)&#xff08;2&#xff09; S speye(400,800); spy(S)此命令等同于 speye([400 800])。

【Python】易错题 [1]

目录 一、选择&#xff1a; 1.列表的复制​编辑 2.函数 二、填空 一、选择&#xff1a; 1.列表的复制 在Python中&#xff0c;列表是可变的数据类型。当将一个列表赋值给另一个变量时&#xff0c;实际上是将这个变量的引用指向原始列表。&#xff08;指针&#xff09;因此&…

直播怎么录制视频?直播视频,3种录制方法

“今晚我最喜欢的游戏博主要进行直播&#xff0c;但我可能还要加班。怎么办&#xff0c;不想错过直播的内容&#xff01;电脑怎么才能进行直播录制视频啊&#xff1f;谁能教教我&#xff1f;” 在数字化的今天&#xff0c;直播已经成为人们获取信息和娱乐的重要途径。有时&…

Adobe Acrobat编辑器最新版下载安装 Adobe Acrobat版本齐全!

功能强大&#xff0c;Adobe Acrobat无疑是PDF文档处理领域的翘楚。这款软件集多种PDF文档处理功能于一身&#xff0c;不仅使得用户可以轻松地编辑PDF文档&#xff0c;更能轻松应对转换和合并等多种需求。 在编辑功能上&#xff0c;Adobe Acrobat的表现尤为出色。无论是添加文字…