09 flink-sql 中基于 mysql-cdc 的 select * from test_user 的具体实现

前言

这也是最近帮一个朋友看问题 遇到的一个问题 

然后 引发了一下 对于 flink-sql 里面的一些 常规处理的思考, 理解 

原始问题主要是 在测试库可以使用 flink-sql 可以正常同步, 但是 在生产环境 无法正常同步数据 

这个问题 我们后面单独 记录一篇文章 

87fe04f3239e4e768da72132e7774269.png

 

 

测试用例

下载 flink-1.13.6, 首先启动一个 standalone 的集群 

master:flink-1.13.6 jerry$ ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host master.
Starting taskexecutor daemon on host master.

 

启动 flink sql-client 

master:flink-1.13.6 jerry$ ./bin/sql-client.sh 
Listening for transport dt_socket at address: 5007
No default environment specified.
Searching for '/Users/jerry/Downloads/flink-1.13.6/conf/sql-client-defaults.yaml'...not found.
Command history file path: /Users/jerry/.flink-sql-history

                                   ▒▓██▓██▒
                               ▓████▒▒█▓▒▓███▓▒
                            ▓███▓░░        ▒▒▒▓██▒  ▒
                          ░██▒   ▒▒▓▓█▓▓▒░      ▒████
                          ██▒         ░▒▓███▒    ▒█▒█▒
                            ░▓█            ███   ▓░▒██
                              ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
                            █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
                            ████░   ▒▓█▓      ██▒▒▒ ▓███▒
                         ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
                   ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
                  ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
                ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
               ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
              ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
           ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
           ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
           ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
           ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
          ▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
          █▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
          ██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
          ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
           ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
           ▓█   ▒█▓   ░     █░                ▒█              █▓
            █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
             █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
              ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
               ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
                ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
                  ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
                      ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░
          
    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA   
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ 
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ 
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
          
        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.

 

创建 flink-sql 的表结构 

CREATE TABLE test_user (
`name` string,
`age` string,
PRIMARY KEY (`name`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = 'postgres',
'database-name' = 'test',
'table-name' = 'test_user'
);

 

源表数据如下 

d53a95f8b01a4676981c365726074779.png

 

然后 flink-sql 这边查询 结果如下 

1fbd5138ad994a5383fbcb31bd47d9f4.png

 

然后 我们来看一下 这里的整个处理流程 

 

 

flink-sql 中 select * from test_user 获取全量数据的调试

首先这里交互的角色抽象的可以理解为两个, 一个是 flink 集群, 一个是 flink sql-client 

然后 flink sql-client 这边组合查询, 相关业务, 然后创建一个 flink 任务, 抛给 flink 集群 

然后 两者进行交互, 首先是拿到 test_user 的快照全量数据, 然后 flink sql-client 这边做业务展示 

然后 test_user 的之后的增删改查, 的处理是基于 mysql binlog 这边来做增量处理 

我们这里 仅仅演示 test_user 的快照数据获取 以及 在 test_user 中增加一条记录, 然后 flink sql-client 这边是 怎么获取到的这个整个流程 

 

如下这里是 flink sql-client 这边将用户输入的 " select * from test_user " 转换为 flink sql 上下文的 operations, 然后封装成 pipeline, 提交给 flink 集群 

11a2df70ea0a4b589f0a9e7c2cfaef2a.png

 

 

看一下 flink 集群这边任务的执行, 首先是 第一次的全量数据的快照 

从 CollectSinkFunction 这边从 buffer 中获取到两条记录, 大致可以看出是 第一条记录 和 第二条记录, 然后 sendBackResults 通过 tcp交互, 将这两条数据对应的 StreamRecord 传输回 flink sql-client 

0609d767ecca4bce9d4605071734d1f9.png

 

往前回溯, 看一下 真正执行查询的地方, 执行的是 "select * from test_user;" 

然后这里迭代会将 查询的记录封装成为 SourceRecord, 然后添加到 recorderMarker 的 bufferedRecordQueue 中 

92e1ba3608324a0b847d9289c3c34dc1.png

 

然后这个 bufferedRecordQueue 是一个队列, 会将消耗的元素调用 enqueueRecord 将数据放入到 records 中 

6b48eaeadc2549758874c0f7b5bfcf25.png

 

这里是更细节的 enqueueRecord 的执行流程, 比如这里 迭代的事 第一条记录 

1c3973e8afe84d86af26ef340e11bb32.png

 

然后接着是 更上一层 Engine 的业务流程, 他会将 SnapshotReader 这边读取的记录更新到 batch 中 

351bda6f26bf4455b4d2f1adfb10a528.png

 

然后就是 Engine 这边的任务的执行, 将数据经过 map, filter, NotNull, 等等相关处理 

最终到达 CollectSinkFunction 这边 

1cf7e36ff8344e1f9391efc607f06186.png

 

然后 CollectSinkFunction 这边将数据封装成 GenericRowData, 然后序列化, 放入 buffer 队列 

然后 最终就是 CollectSinkFunction 上面的流程, 将序列化之后的数据通过 CollectCoordinationResponse 回传给 flink sql-client 

b0fa1585ee8c4f99b1aff634cf651099.png

 

 

然后 flink sql-client 这边的处理如下  

将拿到的数据, 添加到 buffer  队列 

3a6a2ea3c8344bd1be5fa3723ecb62f7.png

dc17460a9c7e409b8701bcf524ac4b56.png

 

然后就是 flink sql-client 这边的主线程的处理了, 从 buffer 中迭代 记录出来, 然后 放到 materializedTable, 然后 之后 cli 这边获取表格数据的时候, 将其传输到 snapshot 中 

494c6dd347c84e6a9548d88ba2d01e01.png

c677e808922f43cb83bc423ff641252b.png

 

 

flink sql-client 这边的展示流程如下 

1b2cf8d16f3b4cf097c9df6b07ef4f66.png

 

然后 做具体的展示, 展示结果如下, 然后 随着之后的迭代 能够获取完整当前页的数据, 展示在 cli 中 

f90e68011bb843cb9196cf2ca7002bfa.png

eae2ea9b78e54ea9a484632a45c7ab34.png

 

 

 

 

flink-sql 中 select * from test_user 获取增量数据的调试

增量数据的获取, 来自于 BinlogClient 这边的获取, 连接 mysql 的服务 

发送获取 binlog 的命令, 然后 之后 mysql 这边有 binlog 的事件之后, 会将相关 事件传递到 BinlogClient 这边 

比如这里 执行了一个 ”insert into test_user (`name`, age) select max(age)+1, max(age+1) from test_user;”, 增加了一条记录 (3, 3) 

然后这边 反序列化之后, 读取到 WriteRowsEvent 数据为 (3, 3) 

5ef9dc67350d489b91903f2abb0547c0.png

 

然后就是 BinlogClient 的后续流程, 将数据使用 recordMarker 记录 

和上面 SnapshotReader 这边处理一样, recorderMarker 会将 SourceRecord记录 添加到 records 列表, 由外层 Engine 层轮询 records 将其进行任务的执行, 到后面的 CollectSinkFunction 传输给 flink sql-client 这边做数据增删改查, 以及展示 

e596147a59714192b4bafcfa5f73d28a.png

 

为记录 (3, 3) 生成 SourceRecord 并放到 records 队列 

34bbed10370743b695fd3af2c99174be.png

 

Engine 层的处理, 其他的这里就不细化了 

f08ed652eda44ad39fc545c7290248f3.png

 

 

 

 

flink mysql-cdc MysqlConnectorTask 的处理 

我们可以看到上面 全量读取使用了 快照读, 然后增量的部分使用基于 binlog 来进行处理 

那么这个 处理流程是在这里呢? 

9da3b393683b4988be75f76ab669e9ef.png

 

 

 

 

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/519485.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

设计模式总结-外观模式(门面模式)

外观模式 模式动机模式定义模式结构外观模式实例与解析实例一&#xff1a;电源总开关实例二&#xff1a;文件加密 模式动机 引入外观角色之后&#xff0c;用户只需要直接与外观角色交互&#xff0c;用户与子系统之间的复杂关系由外观角色来实现&#xff0c;从而降低了系统的耦…

携程旅行 abtest

声明: 本文章中所有内容仅供学习交流使用&#xff0c;不用于其他任何目的&#xff0c;抓包内容、敏感网址、数据接口等均已做脱敏处理&#xff0c;严禁用于商业用途和非法用途&#xff0c;否则由此产生的一切后果均与作者无关&#xff01;wx a15018601872 本文章…

WindowsPowerShell安装配置Vim的折腾记录

说明 vim一直以来都被称为编辑器之神一样的存在。但用不用vim完全取决于你自己&#xff0c;但是作为一个学计算机的同学来说&#xff0c;免不了会和Linux打交道&#xff0c;而大部分的Linux操作系统都预装了vim作为编辑器&#xff0c;如果是简单的任务&#xff0c;其实vim只要会…

c/c++之编译链接

了解我们写的代码是如何转变成可执行文件.exe的是很有必要的&#xff0c;我们将这些底层的东西掌握清楚才能打好基础&#xff0c;筑高楼。 编译链接的全流程 我们平时写代码的文件是.c或者.cpp文件。这里面包括我们的代码&#xff0c;还有宏定义&#xff0c;引用头文件以及注…

齐护机器人方位传感器指南针罗盘陀螺仪

一、方位传感器原理及功能说明 齐护方位传感器是一款集成了三轴磁传感器芯片的方位传感器模块。适用于无人机、机器人、移动和个人手持设备中的罗盘&#xff08;指南针&#xff09;、导航和游戏等高精度应用。模块可以感应XYZ平面角度外&#xff0c;还可实现1至2的水平面角度罗…

Python--Django--说明

Django 是基于python 的 Web 开发框架. &nsbp;   Web开发指的是开发基于B/S 架构, 通过前后端的配合, 将后台服务器上的数据在浏览器上展现给前台用户的应用. &nsbp;   在早期, 没有Web框架的时候, 使用 Python CGI 脚本显示数据库中的数据. Web框架致力于解决一些…

考古:IT架构演进之IOE架构

考古&#xff1a;IT架构演进之IOE架构 IOE架构&#xff08;IBM, Oracle, EMC&#xff09;出现在20世纪末至21世纪初&#xff0c;是一种典型的集中式架构体系。在这个阶段&#xff0c;企业的关键业务系统往往依赖于IBM的小型机&#xff08;后来还包括大型机&#xff09;、Oracle…

后端灰度发布

在软件开发中&#xff0c;"灰度"通常指的是渐进式地将新功能、更新或改进引入到生产环境中&#xff0c;但只对一小部分用户或流量进行部署和测试的过程。这种方法允许开发团队在生产环境中逐步测试新功能&#xff0c;以确保其稳定性、可靠性和用户体验&#xff0c;同…

vscode+anaconda 环境python环境

环境说明&#xff1a; windows 10 vscodeanaconda anaconda 安装&#xff1a; 1、官网下载地址:Free Download | Anaconda 2、安装 接受协议&#xff0c;选择安装位置&#xff0c;一直next&#xff0c;到下面这一步&#xff0c;上面是将Anaconda 添加至环境变量&#xff0…

非关系型数据库--------------------Redis 群集模式

目录 一、集群原理 二、集群的作用 &#xff08;1&#xff09;数据分区 &#xff08;2&#xff09;高可用 Redis集群的作用和优势 三、Redis集群的数据分片 四、Redis集群的工作原理 五、搭建redis群集模式 5.1启用脚本配置集群 5.2修改集群配置 5.3启动redis节点 5…

自动驾驶涉及相关的技术

当科幻走进现实&#xff0c;当影视照进生活&#xff0c;无数次憧憬的自动驾驶&#xff0c;正在慢慢的梦想成真。小时候天马星空的想象&#xff0c;现在正悄无声息的改变着我们的生活。随着汽车电动化进程的加快&#xff0c;自动驾驶技术映入眼帘&#xff0c;很多人可能感觉遥不…

非关系型数据库------------Redis的安装和部署

目录 一、关系型数据库与非关系型数据库 1.1关系型数据库 1.2非关系型数据库 1.2.1非关系型数据库产生背景 1.3关系型非关系型区别 1.4客户访问时&#xff0c;关系型数据库与redis的工作过程 二、Redis 2.1redis简介 2.2Redis命中机制和淘汰机制 2.3Redis 具有以下优…

每天五分钟深度学习:深度学习中数据样本和标签的符号化表示

本文重点 在深度学习的研究与应用中&#xff0c;数据样本和标签的符号化表示是至关重要的一环。通过合理的符号化表示&#xff0c;我们可以将现实世界中的数据转化为计算机能够理解和处理的形式&#xff0c;从而为后续的模型训练和推理提供基础。本文将对深度学习中数据样本和…

基于SpringBoot和Vue的校园周边美食探索以及分享系统

今天要和大家聊的是基于SpringBoot和Vue的校园周边美食探索以及分享系统 &#xff01;&#xff01;&#xff01; 有需要的小伙伴可以通过文章末尾名片咨询我哦&#xff01;&#xff01;&#xff01; &#x1f495;&#x1f495;作者&#xff1a;李同学 &#x1f495;&#x1f…

Hadoop-入门

资料来源&#xff1a;尚硅谷-Hadoop 一、Hadoop 概述 1.1 Hadoop 是什么 1&#xff09;Hadoop是一个由Apache基金会所开发的分布式系统基础架构。 2&#xff09;主要解决&#xff1a;海量数据的存储和海量数据的分析计算问题。 3&#xff09;广义上来说&#xff0c;Hadoop…

文件服务器之二:SAMBA服务器

文章目录 什么是SAMBASAMBA的发展历史与名称的由来SAMBA常见的应用 SAMBA服务器基础配置配置共享资源Windows挂载共享Linux挂载共享 什么是SAMBA 下图来自百度百科 SAMBA的发展历史与名称的由来 Samba是一款开源的文件共享软件&#xff0c;它基于SMB&#xff08;Server Messa…

wordpress全站开发指南-面向开发者及深度用户(全中文实操)--php函数

php函数 wordpress会封装一部分函数&#xff0c;比如bloginfo该函数的作用是直接调用你设置的你的网站的名称 示例 This is our amazing custom theme <?php echo 22; function myfirstfunction(){ echo 33; echo "<p>Hello ,this is my first function</…

移动开发技术历史演化简介h5,跨平台,原生的各种技术实现方案的简单介绍

移动端的开发技术是指针对移动设备如智能手机和平板电脑等便携终端进行应用程序和服务创建的过程。本文将主要介绍一下移动端的开发技术的历史进化历程。讲述h5&#xff0c;跨平台&#xff0c;原生的各种技术实现方案和他们各自的优势与不足。 移动开发&#xff0c;不仅是编程技…

微电网优化:基于巨型犰狳优化算法(Giant Armadillo Optimization,GAO)的微电网优化(提供MATLAB代码)

一、微电网优化模型 微电网是一个相对独立的本地化电力单元&#xff0c;用户现场的分布式发电可以支持用电需求。为此&#xff0c;您的微电网将接入、监控、预测和控制您本地的分布式能源系统&#xff0c;同时强化供电系统的弹性&#xff0c;保障您的用电更经济。您可以在连接…

jvm基础三——类加载器

类加载器 在Java中&#xff0c;类加载器&#xff08;Class Loader&#xff09;是Java虚拟机&#xff08;JVM&#xff09;的一部分&#xff0c;负责将类文件&#xff08;.class文件&#xff09;加载到JVM中&#xff0c;使得程序能够使用这些类。类加载器在Java中具有重要的作用&…