flink-connector-mysql-cdc:01 mysql-cdc础配置代码演示

flink-connector-mysql-cdc:

  • 01 mysql-cdc基础配置代码演示
  • 02 mysql-cdc高级扩展
  • 03 mysql-cdc常见问题汇总
  • 04 mysql-cdc-kafka生产级代码分享
  • 05 flink-kafka-doris生产级代码分享
  • 06 flink-kafka-hudi生产级代码分享

flink-cdc版本:3.2.0

flink版本:flink-1.18.0

mysql版本:8.0.26

java版本:1.8

maven版本:3.8.4

目录

1. Mysql数据库设置

1.1 开启binlog日志

1.2 创建用户

1.3 准备测试数据 

2. 编写测试代码

2.1 maven 依赖

2.2 测试代码

3. mysql-cdc扩展

3.1 时区设置

3.2 为每个读取器设置不同的 SERVER ID


1. Mysql数据库设置

1.1 开启binlog日志

编辑 MySQL 配置文件

  • 在 Unix/Linux 系统中,通常是 /etc/my.cnf 或 /etc/mysql/my.cnf

  • 在 Windows 上,可能位于 C:\ProgramData\MySQL\MySQL Server X.Y\my.ini

# 在 mysqld 部分下添加以下内容(如果已经存在,请确认其值):
[mysqld]
log-bin=mysql-bin  # 二进制日志文件前缀,MySQL将生成名为 mysql-bin.000001, mysql-bin.000002 等的文件。
binlog-format=row   # 设置二进制日志格式为行级(row),可选值为 STATEMENT、ROW 和 MIXED;这里推荐使用行级。
expire_logs_days=7  # 设置二进制日志的过期时间,单位为天,超过这个天数后的日志将被自动删除,这里以 7 天为例。
max-binlog-size=100M # 设置单个二进制日志文件的最大大小,超出后将自动创建一个新的日志文件(可以根据需要调整)。

1.2 创建用户

以 “flinkcdc”用户为例

# 创建 MySQL 用户:
CREATE USER 'flinkcdc'@'localhost' IDENTIFIED BY '123456';
# 向用户授予所需的权限:
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkcdc' IDENTIFIED BY '123456';
# 授权所有权限
GRANT ALL PRIVILEGES ON *.* TO 'flinkcdc'@'localhost';
GRANT ALL PRIVILEGES ON *.* TO 'flinkcdc'@'%';
# 完成用户的权限:
FLUSH PRIVILEGES;

1.3 准备测试数据 

# 使用flinkcdc用户登录数据库

# 创建测试数据库
create database cdc_demo;
# 创建测试表
CREATE TABLE cdc_demo.flink_cdc_test (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(50) NOT NULL,
    description TEXT,
    age INT,
    balance DECIMAL(10, 2),
    is_active BOOLEAN DEFAULT TRUE,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    birth_date DATE,
    long_value BIGINT,
    last_login TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
# 插入测试数据
INSERT INTO cdc_demo.flink_cdc_test (name, description, age, balance, is_active, created_at, birth_date, long_value, last_login) VALUES
('Alice Smith', 'Alice is a software engineer with 5 years of experience.', 30, 2500.50, TRUE, '2023-01-01 10:00:00', '1992-05-15', 12345678901234, '2023-05-20 10:00:00'),
('Bob Johnson', 'Bob enjoys hiking and outdoor activities.', 25, 1500.00, TRUE, '2023-02-15 12:30:00', '1998-08-22', 987654321054321, '2023-05-18 14:00:00'),
('Charlie Brown', 'Charlie is an avid reader and coffee lover.', 35, 3200.75, FALSE, '2023-03-22 14:45:00', '1988-01-11', 135792468012345, '2023-05-19 09:20:00'),
('Daisy Miller', 'Daisy loves painting and traveling.', 28, 1800.25, TRUE, '2023-04-05 09:15:00', '1994-11-03', 24681357901234, '2023-05-21 12:30:00'),
('Ethan White', 'Ethan enjoys playing guitar and writing songs.', 40, 5000.00, TRUE, '2023-05-18 16:20:00', '1983-07-30', 98765432102468, '2023-05-22 15:00:00');

2. 编写测试代码

2.1 maven 依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.toroidal</groupId>
    <artifactId>flink-connector-mysql-cdc-demo</artifactId>
    <name>flink-connector-mysql-cdc-demo</name>
    <version>1.0-SNAPSHOT</version>

    <repositories>
        <repository>
            <id>aliyunmaven</id>
            <name>阿里云公共仓库</name>
            <url>https://maven.aliyun.com/repository/public/</url>
        </repository>
        <repository>
            <id>mirrorId</id>
            <name>Human Readable Name for this Mirror.</name>
            <url>http://my.repository.com/repo/path</url>
        </repository>
    </repositories>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.18.0</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <flinkcdc.version>3.2.0</flinkcdc.version>
        <mysql.version>8.0.26</mysql.version>
        <log4j.version>2.17.1</log4j.version>
        <lombok.version>1.18.24</lombok.version>
        <fastjson.version>1.2.83</fastjson.version>
    </properties>

    <dependencies>
        <!-- flink Dependency -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- mysql-cdc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>${flinkcdc.version}</version>
        </dependency>

        <!-- mysql -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>

        <!-- json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <!-- log -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.9.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <!-- 打包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!--
                                        zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- 设置jar包的入口类(可选) -->
                                    <mainClass>com.toroidal.mysql.MysqlCdcStreamApp</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

2.2 测试代码

package com.toroidal.mysql;


import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;


/**
 * @Author Toroidal
 * @Date 2024/12/04 14:42
 * @Version 1.0
 */
public class MysqlCdcStreamApp {
    public static void main(String[] args) throws Exception {

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .username("flinkcdc")
                .password("123456")
                // 表所在的数据库库名,如果需要捕获整个库的表,配置为:".*".;如果需要捕获多个数据库配置为:.databaseList("cdc01", "cdc02")
                .databaseList("cdc_demo")
                // 设置需要捕获日志的表名,注意需要配置库名,大小敏感
                .tableList("cdc_demo.flink_cdc_test")
                // 将 SourceRecord 转换为 JSON 字符串。
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // enable checkpoint
        env.enableCheckpointing(3000);

        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(4)
                .print().setParallelism(1);

        env.execute("Print MySQL Snapshot + Binlog");
    }
}

运行结果 

3. mysql-cdc扩展

3.1 时区设置

mysql-cdc读取出来的 timestamp 字段时区相差8小时,将时区和MySQL服务器时区设置一致即可:

查询当前数据库时区:

SELECT * FROM mysql.time_zone_name;

设置时区为东八时区

.serverTimeZone("Asia/Shanghai")

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .username("flinkcdc")
                .password("123456")
                // 表所在的数据库库名,如果需要捕获整个库的表,配置为:".*".;如果需要捕获多个数据库配置为:.databaseList("cdc01", "cdc02")
                .databaseList("cdc_demo")
                // 设置需要捕获日志的表名,注意需要配置库名,大小敏感
                .tableList("cdc_demo.flink_cdc_test")
                .serverTimeZone("Asia/Shanghai")
                // 将 SourceRecord 转换为 JSON 字符串。
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

3.2 为每个读取器设置不同的 SERVER ID

每个用于读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 ID,称为 server id。MySQL 服务器将使用此 id 来维护网络连接和 binlog 位置。因此,如果不同的作业共享相同的服务器 ID,则可能会导致从错误的 binlog 位置读取。 

.serverId("flink-cdc-01")
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                // 表所在的数据库库名,如果需要捕获整个库的表,配置为:".*".;如果需要捕获多个数据库配置为:.databaseList("cdc01", "cdc02")
                .databaseList("cdc")
                // 设置需要捕获日志的表名,注意需要配置库名,大小敏感
                .tableList("cdc.flink_cdc_test")
                .username("flinkcdc")
                .serverTimeZone("Asia/Shanghai")
                .serverId("flink-cdc-01")
                .password("123456")
                // 将 SourceRecord 转换为 JSON 字符串。
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/929903.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【机器学习】机器学习的基本分类-监督学习-支持向量机(Support Vector Machine, SVM)

支持向量机是一种强大的监督学习算法&#xff0c;主要用于分类问题&#xff0c;但也可以用于回归和异常检测。SVM 的核心思想是通过最大化分类边界的方式找到数据的最佳分离超平面。 1. 核心思想 目标 给定训练数据 &#xff0c;其中 是特征向量&#xff0c; 是标签&#xf…

[Redis#15] 持久化 | AOF | rewrite | aof_buf | 混合持久化

目录 1 使用AOF 流程 问题一&#xff1a;父进程在fork之后继续写旧AOF文件的意义 问题二&#xff1a;执行BGREWRITEAOF时的特殊情况处理 2 命令写入 3 文件同步 4 重写机制 工作流程&#xff1a; 触发条件 混合持久化 持久化 sum AOF&#xff08;Append Only File&a…

springai结合ollama

目录 ollama 介绍 使用 下载&#xff1a; 安装&#xff1a; 点击这个玩意next就行了。 运行 spring ai使用ollama调用本地部署的大模型 加依赖 配置yml 写代码 ollama 介绍 官网&#xff1a;Ollama Ollama是一个用于部署和运行各种开源大模型的工具&#xff1b; …

关于Chrome自动同步书签的解决办法

前言 并不一定适用所有用户&#xff0c; 目前我在网上搜集了一些资料&#xff0c;也做了一些尝试。 就我个人总结的经验来讲&#xff0c;分享大家以下几种办法&#xff1a; 1.书签同步插件 点击如下&#x1f517;&#xff1a; Chrome书签同步https://bm.famend.cn/ …

SpringMVC纯注解快速开发

此文章适合具有一定的java基础的同学看哦&#xff0c;如果有看不懂的基本代码还是先补补java基础哦。 此教程带您不使用xml文件而是纯注解开发&#xff0c;易懂、快捷、迅速&#xff0c;从0开始搭建&#xff0c;很快就能构建起一个SpringMVC项目&#xff0c;能学到两种使用tom…

16-02、JVM系列之:内存与垃圾回收篇(二)

JVM系列之&#xff1a;内存与垃圾回收篇(二) ##本篇内容概述&#xff1a; 1、堆Heap Area 2、方法区Method Area 3、运行时数据区总结 4、对象的实例化内存布局和访问定位一、堆 Heap Area 1、堆的核心概念 一个JVM实例只存在一个堆内存&#xff0c;堆也是Java内存管理的核心…

第二篇:k8s工作流程

我们来看通过deployment部署pod的常规流程&#xff1a; kubectl向apiserver发送部署请求&#xff08;例如使用 kubectl create -f deployment.yml&#xff09;apiserver将 Deployment 持久化到etcd&#xff1b;etcd与apiserver进行一次http通信。controller manager通过watch a…

「Mac畅玩鸿蒙与硬件39」UI互动应用篇16 - 倒计时环形进度条

本篇将带你实现一个倒计时环形进度条应用。用户可以设置倒计时的时间&#xff0c;启动倒计时后&#xff0c;应用会动态显示一个随着时间递减的环形进度条&#xff0c;同时伴有数字倒计时显示。这是结合动画效果和时间管理的实用示例。 关键词 UI互动应用倒计时器环形进度条动…

Qt Qtablewidget 标题 QHeaderView 增加可选框 QcheckBox

创建自定义QHeaderView #pragma once#include <QObject> #include <QHeaderView> #include <QPainter> #include <QMouseEvent>class SSHeaderView : public QHeaderView {Q_OBJECTprivate:bool isChecked;int m_checkColIdx; public:SSHeaderView(i…

Java 单元测试模拟框架-Mockito 的介绍

Mockito 是什么 Mockito 是一个用于单元测试的模拟框架&#xff0c;基于它可以使用简洁易用的API编写出色的测试。 Mockito 允许开发人员创建和管理模拟对象&#xff08;mock objects&#xff09;&#xff0c;以便在测试过程中替换那些不容易构造或获取的对象。 Mockito的基本…

斯坦福李飞飞《AI Agent:多模态交互前沿调查》论文

多模态AI系统很可能会在我们的日常生活中无处不在。将这些系统具身化为物理和虚拟环境中的代理是一种有前途的方式&#xff0c;以使其更加互动化。目前&#xff0c;这些系统利用现有的基础模型作为构建具身代理的基本构件。将代理嵌入这样的环境中&#xff0c;有助于模型处理和…

shell语法(1)bash

shell是我们通过命令行与操作系统沟通的语言&#xff0c;是一种解释型语言 shell脚本可以直接在命令行中执行&#xff0c;也可以将一套逻辑组织成一个文件&#xff0c;方便复用 Linux系统中一般默认使用bash为脚本解释器 在Linux中创建一个.sh文件&#xff0c;例如vim test.sh…

十六(AJAX3)、XMLHttpRequest、Promise、简易axios封装、案例天气预报、lodash-debounce防抖

1. XMLHttpRequest 1.1 XMLHttpRequest-基本使用 /* 定义&#xff1a;XMLHttpRequest&#xff08;XHR&#xff09;对象用于与服务器交互。通过 XMLHttpRequest 可以在不刷新页面的情况下请求特定 URL&#xff0c;获取数据。这允许网页在不影响用户操作的情况下&#xff0c;更…

AI效率手册学习笔记

目录 概要 主流的AI工具 提示词工程 概要 多一个工具&#xff0c;就意味着我们多一个助手&#xff0c;多一个信息源渠道&#xff0c;也就相当于多一份帮助。 学习书籍&#xff1a;《AI效率手册&#xff1a;从ChatGPT开启高效能》常青 著 出版时间&#xff1a;2024-10-01 主…

瑞芯微RK3566/RK3568开发板安卓11固件ROOT教程,Purple Pi OH演示

本文介绍RK3566/RK3568开发板Android11系统&#xff0c;编译ROOT权限固件的方法。触觉智能Purple Pi OH鸿蒙开发板演示&#xff0c;搭载了瑞芯微RK3566四核处理器&#xff0c;Laval鸿蒙社区推荐开发板&#xff0c;已适配全新OpenHarmony5.0 Release系统&#xff0c;SDK源码全开…

【Linux】线程概念 | 线程控制

文章目录 &#x1f449;知识补充&#x1f448;&#x1f449;Linux线程概念&#x1f448;什么是线程Makefile线程 VS 进程线程的优点线程的缺点线程异常线程用途 &#x1f449;线程控制&#x1f448;线程终止pthread_exit 函数pthread_cancel 函数线程 ID 的深入理解在多线程的场…

LongVU:用于长视频语言理解的空间时间自适应压缩

晚上闲暇时间看到一种用于长视频语言理解的空间时间自适应压缩机制的研究工作LongVU&#xff0c;主要内容包括&#xff1a; 背景与挑战&#xff1a;多模态大语言模型&#xff08;MLLMs&#xff09;在视频理解和分析方面取得了进展&#xff0c;但处理长视频仍受限于LLM的上下文长…

爬虫项目基础知识详解

文章目录 Python爬虫项目基础知识一、爬虫与数据分析1.1 Python中的requests库Requests 库的安装Requests 库的 get() 方法爬取网页的通用代码框架HTTP 协议及 Requests 库方法Requests 库主要方法解析 1.2 python中的json库1.3 xpath学习之python中lxml库html了解html结构html…

LeetCode - #152 乘积最大子数组(Top 100)

文章目录 前言1. 描述2. 示例3. 答案关于我们 前言 本题为 LeetCode 前 100 高频题 我们社区陆续会将顾毅&#xff08;Netflix 增长黑客&#xff0c;《iOS 面试之道》作者&#xff0c;ACE 职业健身教练。&#xff09;的 Swift 算法题题解整理为文字版以方便大家学习与阅读。 …

记一次跑前端老项目的问题

记一次跑前端老项目的问题 一、前言二、过程1、下载依赖2、启动项目3、打包 一、前言 在一次跑前端老项目的时候&#xff0c;遇到了一些坑&#xff0c;这里记录一下。 二、过程 1、下载依赖 使用 npm install下载很久&#xff0c;然后给我报了个错 core-js2.6.12: core-js…