Flink实时数仓同步:切片表实战详解

一、背景

在大数据领域,初始阶段业务数据通常被存储于关系型数据库,如MySQL。然而,为满足日常分析和报表等需求,大数据平台采用多种同步方式,以适应这些业务数据的不同存储需求。

一项常见需求是,业务使用人员需要大数据分析平台中实时查看业务表中某一维度的相应数据数据,示例如下:

  1. [Mysql] 业务数据 - 假设我们有一个订单表(也称为事实表),记录了公司的销售订单信息。该表包含以下字段:订单ID、客户ID、产品ID、销售日期、销售数量和销售额等。:
订单ID客户ID产品ID销售日期销售数量销售额
1100120012022-01-013150
2100220022022-01-02280
3100320012022-01-03150
4100120032022-01-045250
5100220022022-01-054160
  1. [大数据平台] - 业务人员希望按照客户ID维度聚合销售数量和销售额,以便实时分析每个客户的销售情况,如下:
客户ID销售数量总计销售额总计
10018400
10026240
1003150
  1. [Mysql] 业务数据 - 新增了两条订单数据,如下:
订单ID客户ID产品ID销售日期销售数量销售额
1100120012022-01-013150
2100220022022-01-02280
3100320012022-01-03150
4100120032022-01-045250
5100220022022-01-054160
6100320012022-01-06150
7100420012022-01-06150

加粗为更新/新增数据

  1. [大数据平台] - 此时每个客户的销售情况,如下:
客户ID销售数量总计销售额总计
10018400
10026240
10032100
1004150

加粗为更新/新增数据

根据上述需求,我们可以得出需要构建实时切片表以满足业务数据的实时分析需求。

切片表也叫维度表,是根据基础表(事实表)某个维度或多个维度对事实数据进行汇总计算,并展示为一个交叉分析的表格。与事实表相比,切片表的数据更加聚合,只包含某些维度或者满足某些特定条件的数据。

二、技术架构

为了实现上述需求,我们可以利用实时同步任务将业务数据实时同步至下游的 MPP(Massively Parallel Processing)库,从而构建切片表。结合市场上常见的技术组件,本文选择了实时引擎 FlinkCDC 和 Doris(MPP)库作为实时同步技术架构。整体架构如下:

在这里插入图片描述

三、设计方案

从背景需求不难看出只需实现切片表即可满足需求,但是在flink + Mpp库中却可以有多种方案,可分为三种,具体如下:

3.1、FlinkCDC + FlinkSQL状态计算

该方案利用了FlinkCDC实时捕获业务数据,并在Flink内部进行有状态的计算,例如聚合查询等操作。这种方法依赖于Checkpoint分布式快照,确保精确一次性的处理。最终,计算得到的聚合结果会实时地下沉到下游MPP库中,使业务人员能够直接查询切片表数据。示例如下:

-- flink cdc 读取订单表
create table mysql_order( 
# ...
) WITH ( 
# ...
);

-- flink sql doris 
create table doris_order( 
# ...
) WITH ( 
# ...
);

-- flink sql 
insert into doris_order select 客户ID, sum(销售数量), sum(销售额) from mysql_order group by 客户ID;
  • 优点:
    • 实现了实时捕获和处理业务数据,保证了数据的准确性和实时性。
    • 利用了Flink的状态计算能力,使得处理逻辑更加灵活和高效。
  • 缺点:
    • 下游Doris表结构固定,无法灵活满足用户对不同维度的查询需求。

3.2、FlinkCDC + Doris Aggregate 模型

这种方法利用了Doris Aggregate聚合模型,实现了切片表的功能。在Doris Aggregate聚合模型中,数据会在每批次导入时进行内部聚合,从而无需上游有状态计算。只需将聚合后的数据下沉至Doris数据库即可。

以下是一个示例的Doris建表语句:

-- Doris aggregate 建表语句
CREATE TABLE IF NOT EXISTS example_db.example_order_agg
(
    `客户ID` LARGEINT NOT NULL COMMENT "客户ID",
    `销售数量总计` BIGINT SUM DEFAULT "0" COMMENT "销售数量总计",
    `销售额总计` BIGINT SUM DEFAULT "0" COMMENT "销售额总计"
)
AGGREGATE KEY(`客户ID`)
DISTRIBUTED BY HASH(`客户ID`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

更多信息:Doris Aggregate 模型

  • 优点:
    • 通过FlinkCDC实现了实时捕获和处理业务数据,确保了数据的准确性和实时性。
    • 利用了Doris aggregate模型进行聚合查询,将聚合压力下沉至下游。
  • 缺点:
    • 下游Doris表结构固定,无法灵活满足用户对不同维度的查询需求。

3.3、FlinkCDC + 实时表 + OLAP查询

这种方案充分利用了Doris的OLAP能力,只需建立一个实时表,业务人员便可根据需要自定义查询语句进行查询。

以下是一个示例的实现:

-- flink cdc 读取订单表
create table mysql_order( 
# ...
) WITH ( 
# ...
);

-- flink sql doris 
create table doris_order( 
# ...
) WITH ( 
# ...
);

-- flink sql 实时同步
insert into doris_order select * from mysql_order;

-- 业务人员查询
select 客户ID, sum(销售数量), sum(销售额) from doris_order group by 客户ID;

对于实时表的具体实现,可参考笔者另一篇文章:Flink实时数仓同步:实时表实战详解

  • 优点:
    • 利用 FlinkCDC 实现了实时捕获和处理业务数据,确保了数据的准确性和实时性。
    • 借助 Doris 的 OLAP查询能力,将聚合压力下沉至下游,提高了系统的性能和稳定性。
    • 无需固定 Doris 表结构,可以灵活满足用户对不同维度的查询需求。
  • 缺点:
    • 当数据量巨大时可能存在一定查询延迟问题。
    • 可能存在并发查询效率降低问题,需要合理规划和调整查询策略。

3.4、总结

针对不同的需求场景,我们需要选择最合适的实现方案。通常情况下,对于固定的聚合查询需求,比如定期汇总统计,FlinkCDC + Doris Aggregate 模型FlinkCDC + FlinkSQL状态计算 是更为合适的选择。而对于需要更灵活查询的情况,FlinkCDC + 实时表 则更加适用。

然而,最终的选择取决于具体的业务需求和场景特点。结合以上几种实现设计,笔者更倾向于 FlinkCDC + 实时表 这种方式。我已经在另一篇博客中详细描述了该实现方式:Flink实时数仓同步:实时表实战详解。

故本文将采用FlinkCDC + FlinkSQL有状态计算实现设计,旨在给读者带来不同的体验。

四、实现方式

设计方案确定后我们还需要考虑实现方式,FlinkCDC 提供了三种实现方式,具体如下:

  1. Flink run jar 模式: 这种模式适用于处理复杂的流数据。当使用简单的 Flink SQL 无法满足复杂业务需求时(例如拉链表等),可以通过编写自定义逻辑的方式,将其打包成 Jar 包并运行。以下是一个示例:
// 示例代码
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    // 配置数据源和处理逻辑
    ...
    
    // 实时任务启动
    env.execute("Print MySQL Snapshot + Binlog");
  }
}

更多信息:MysqlCDC connector

  1. sql脚本模式: bin/sql-client -f file ,这种模式适用于简单的流水任务,例如实时表同步等简单的 ETL 任务。你可以通过编写 SQL 文件并使用 Flink SQL 客户端执行,而无需编写额外的 Java 代码。以下是一个示例:
-- 示例 mysql2doris SQL 文件
set 'execution.checkpointing.interval'='30000';

create table mysql_order(
# ...
) WITH ( 
# ...
);

create table doris_order( 
# ...
) WITH ( 
# ...
);

insert into doris_order select 客户ID, sum(销售数量), sum(销售额) from mysql_order group by 客户ID;

执行如下:

$> bin/sql-client.sh --file /usr/local/flinksql/mysql2doris

更多信息:FlinkSQL 客户端

  1. FlinkCDC Pipeline: 这是 FlinkCDC 3.0 版本引入的全新功能,旨在通过简单的配置即可实现数据同步,无需编写复杂的 Flink SQL。缺点是需要使用 Flink 版本 1.16 或更高版本。以下是一个示例:
# 示例配置文件
source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: doris
  name: Doris Sink
  fenodes: 127.0.0.1:8030
  username: root
  password: pass

pipeline:
   name: MySQL to Doris Pipeline
   parallelism: 4

执行如下:

$> bin/flink-cdc.sh mysql-to-doris.yaml

更多信息:FlinkCDC Pipeline

这三种方式各有优劣,可以根据具体需求和场景选择合适的实现方式。考虑到前几篇 Flink 实时数仓同步相关博客都采用了 Jar 包形式,为了给读者带来不同的体验,本文采用 sql脚本模式 模式来实现背景需求。

五、FlinkCDC + FlinkSQL状态计算实现

5.1、Doris切片表设计

由于FlinkSQL完成聚合计算,因此在Doris中设计表结构时采用了Unique数据模型。建表语句如下:

CREATE TABLE `example_order_slice`
(
    `user_id` INT NOT NULL COMMENT '客户id',
    `sale_count` BIGINT NULL COMMENT '销售数量总计',
    `sale_total` BIGINT NULL COMMENT '销售金额总计'
) ENGINE=OLAP
UNIQUE KEY(`user_id`)
COMMENT '订单切片表'
DISTRIBUTED BY HASH(user_id) BUCKETS AUTO;

关于mysql type 转换 doris type 可参考 Doris 源码内置转换工具

5.2、实时同步逻辑

  1. 首先,由于实时流水表同步使用Flink-cdc读取关系型数据库,flink-cdc提供了四种模式: “initial”,“earliest-offset”,“latest-offset”,“specific-offset” 和 “timestamp”。本文使用的Flink-connector-mysq是2.3版本,这里简单介绍一下这四种模式:

    • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
    • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
    • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
    • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
    • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
  2. 本文采用initial模式同步任务

  3. 编写mysql2doris SQL文件,这里需要注意的是类型转换:由于 mysql2doris 是 Flink SQL 文件,故需要将 mysql type -> flink type 以及 doris type -> flink type,示例如下:

set 'execution.checkpointing.interval'='30000';
set 'state.checkpoints.dir'='file:///home/finloan/flink-1.16.1/checkpoint/mysql2doris';

create table mysql_order(
                            `id` INT,
                            `user_id` INT,
                            `sale_id` INT,
                            `sale_time` TIMESTAMP(0),
                            `sale_quantity` BIGINT,
                            `sales_volume` BIGINT,
                            PRIMARY KEY(id) NOT ENFORCED
) WITH (
    'connector'='mysql-cdc',
    'hostname'='10.185.163.177',
    'port' = '80',
    'username'='rouser',
    'password'='123456',
    'database-name' = 'database',
    'table-name'='table'
);

create table doris_order(
        `user_id` INT,
        `sale_count` BIGINT,
        `sale_total` BIGINT
) WITH (
    'password'='password',
    'connector'='doris',
    'fenodes'='11.113.208.103:8030',
    'table.identifier'='database.table',
    'sink.label-prefix'='任务唯一标识,每次启动都要更换',
    'username'='username'
);

insert into doris_order select user_id, sum(sale_quantity), sum(sales_volume) from mysql_order group by user_id;

类型转换参考:

Doris & Flink Column Type Mapping

Mysql CDC Data Type Mapping

  1. 执行命令如下:此时任务已经提交到flink 集群,本文中使用的是Flink-Cluster 模式而非yarn模式
$> ./sql-client.sh -f  ~/mysql2doris

Flink SQL> [INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 5c683fba8567e65509870a6db4e99fa5
  1. 登录flinkUi界面查看任务,如下所示:

在这里插入图片描述

  1. 此时Doris 切片表数据如下:
user_idsale_countsale_total
10018400
10026240
1003150
  1. [Mysql]-业务数据新增了两条订单数据,如下:
订单ID客户ID产品ID销售日期销售数量销售额
1100120012022-01-013150
2100220022022-01-02280
3100320012022-01-03150
4100120032022-01-045250
5100220022022-01-054160
6100320012022-01-06150
7100420012022-01-06150
  1. 此时Doris 切片表数据如下:
user_idsale_countsale_total
10018400
10026240
10032100
1004150

六、总结

本文详细介绍了实时数仓同步中切片表的设计与实现。首先,分析了业务背景和需求,说明了切片表的作用和必要性。然后,介绍了基于 FlinkCDC 和 Doris 的技术架构,并比较了不同的设计方案。针对不同的需求场景,提出了三种具体的实现方案:FlinkCDC + FlinkSQL状态计算、FlinkCDC + Doris Aggregate 模型以及 FlinkCDC + 实时表,并分析了它们的优缺点。最后,为了给读者带来不同体验选择了 FlinkCDC + FlinkSQL状态计算 方案进行实现,并详细介绍了实时同步逻辑和相关的技术细节。

通过本文的阅读,读者可以了解到实时数仓同步中切片表的设计与实现方法,以及不同方案的选择和比较。同时,本文还提供了相关资料和参考链接,方便读者进一步深入学习和研究。

七、相关资料

  • Flink实时数仓同步:实时表实战详解
  • Doris Aggregate 模型
  • Flink Doris Connector
  • FlinkCDC Pipeline
  • FlinkSQL 客户端
  • Flink Run jar 模式
  • Doris 源码内置转换工具
  • Doris & Flink Column Type Mapping
  • Mysql CDC Data Type Mapping

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/437239.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

分布式事务-Seata

分布式事务:在分布式系统下,一个业务跨越多个服务或者数据源,每个服务都是一个分支事务,要保证所有分支事务最终一致,这样的事务就是分布式事务、 事务ACID原则 原子性:事务中的所有操作,要么全部成功,要么全部失败 一致性:要保…

go语言基础 -- 单元测试

go语言testing框架说明 go语言有自己的测试框架,封装在testing包中。 我们编写的测试案例通常都写在xxx_test.go文件中,比如我们写了个calc.go,对里面的函数进行测试,通常会写一个calc_test.go;testing框架会将_test.go结尾的文件引入;testing框架会在自己的main方法中执…

太阳能光伏电池的simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 4.1 光伏电池的基本结构 4.2 光伏电池的工作原理 5.完整工程文件 1.课题概述 太阳能光伏电池的simulink建模与仿真.分析不同光照温度,光照强度下的光伏电池的U-I特性曲线以及P-V特性曲线。 …

车辆伤害VR安全教育培训复用性强

VR工地伤害虚拟体验是一种新兴的培训方式,它利用虚拟现实技术为参与者提供身临其境的体验。与传统的培训方式相比,VR工地伤害虚拟体验具有许多优势。 首先,VR工地伤害虚拟体验能够模拟真实的工作环境和事故场景,让参与者在安全的环…

C++ 路径问题

目录 例1 例2 例3 例4 例5 例6 例1 62. 不同路径 1.初始化 2.当前位置的条数,就是上面位置的条数 ,加上其左边位置的条数,dp[i][j] dp[i - 1][j] dp[i][j - 1]; 参考代码 class Solution { public:int uniquePaths(int m, int n) …

静态时序分析:典型与非典型时序路径的约束详解(一)

相关阅读 静态时序分析https://blog.csdn.net/weixin_45791458/category_12567571.html?spm1001.2014.3001.5482 时序路径是静态时序分析中的一个重要概念,了解时序路径能帮助设计者更好地编写SDC脚本,本文旨在详细介绍时序路径相关内容。 首先给出时序…

Git误操作补救错失:恢复误删的本地分支、将某个提交从一个分支复制到另一个分支

一、恢复误删的本地分支 作为一枚强迫症,没用的分支总是喜欢及时删删删删掉删掉统统删掉,结果今天发现有些分支还是应该保留。 比如,①前段时间切了个分支用来专门做图表,但因为需求还没有最终确定,已经上线了测试服而…

计网《一》|互联网结构发展史|标准化工作|互联网组成|性能指标|计算机网络体系结构

计网《一》| 概述 计算机网络在信息时代的作用什么是互联网呢?互联网有什么用呢?为什么互联网能为用户提供许多服务 互联网基础结构发展的三个阶段第一个阶段:第二阶段:第三个阶段: 互联网标准化的工作互联网的组成边缘…

Observer 模式

文章目录 💡问题引入💡概念💡例子💡总结 💡问题引入 假设有一个在线商店系统,用户可以订阅商品的库存通知。当某个商品的库存数量发生变化时,系统会自动发送通知给所有订阅了该商品的用户。设计…

Android 13 WMS-动画流程

动画的类型如下 IntDef(flag true, prefix { "ANIMATION_TYPE_" }, value {ANIMATION_TYPE_NONE,ANIMATION_TYPE_APP_TRANSITION,ANIMATION_TYPE_SCREEN_ROTATION,ANIMATION_TYPE_DIMMER,ANIMATION_TYPE_RECENTS,ANIMATION_TYPE_WINDOW_ANIMATION,ANIMATION_TYPE_…

CentOS7.9基于Apache2.4+Php7.4+Mysql8.0架构部署Zabbix6.0LTS 亲测验证完美通过方案

前言: Zabbix 由 Alexei Vladishev 创建,目前由 Zabbix SIA 主导开发和支持。 Zabbix 是一个企业级的开源分布式监控解决方案。 Zabbix 是一款监控网络的众多参数以及服务器、虚拟机、应用程序、服务、数据库、网站、云等的健康和完整性的软件。 Zabbix 使用灵活的通知机制,…

云计算项目八:Harbor

部署企业私有镜像仓库Harbor 私有镜像仓库有许多优点: 节省网络带宽,针对于每个镜像不用每个人都去中央仓库上面去下载,只需要从私有仓库中下载即可提供镜像资源利用,针对于公司内部使用的镜像,推送到本地私有仓库中…

华硕AMD主板开启TPM2.0支持

目录 配置问题设置开启 Firmware TPM开启 Security Device Support保存设置 检查 配置 主板:TUF Gaming B550m-e Wifi   BIOS: 3402 问题 今天更新Win11,告诉我不支持 TPM 2.0,导致更新失败。   网上搜这个问题,基本只提供了…

selenium中ChromeDriver配置,一把过,并且教你伪装

最近正值毕业季,我之前不是写了个问卷星代码嘛,昨晚上有人凌晨1点加我,问我相关内容。 由于我之前C盘重装了一下,导致我很多东西空有其表,实际不能用,借此机会,向大家编写ChromeDriver配置&…

江苏某机场多座智慧公厕上线,黑科技满满打造标杆性机场智慧卫生间

在现代社会,智慧科技正在各个领域中得到广泛应用,机场也不例外。智慧机场是信息化程度、建设标准、功能要求最高的领域,智慧卫生间的建设要求同样是业界的最高标准。智慧公厕源头厂家广州中期科技有限公司,已经建设了浙江某机场、…

SICP解读指南:深度阅读 “计算机领域三巨头” 之一(文末送书)

🌈个人主页:聆风吟_ 🔥系列专栏:Linux实践室、网络奇遇记 🔖少年有梦不应止于心动,更要付诸行动。 文章目录 📋前言一. 书籍介绍1.1 SICP侧重点1.2 SICP章节介绍 二. 书籍推荐2.1 书籍介绍2.2 推…

边缘计算基础知识

目录 边缘计算简介任务卸载简介边缘存储系统 边缘计算简介 边缘计算是指利用靠近数据生成的网络边缘侧的设备(如移动设备、基站、边缘服务器、边缘云等)的计算能力和存储能力,使得数据和任务能够就近得到处理和执行。 一个典型的边缘计算系…

java集合(泛型数据结构)

1.泛型 1.1泛型概述 泛型的介绍 泛型是JDK5中引入的特性&#xff0c;它提供了编译时类型安全检测机制 泛型的好处 把运行时期的问题提前到了编译期间 避免了强制类型转换 泛型的定义格式 <类型>: 指定一种类型的格式.尖括号里面可以任意书写,一般只写一个字母.例如: …

这可能是最全的Web测试各个测试点,有这一篇就够了

前言 什么是Web测试&#xff1f; Web测试测试Web或Web应用程序的潜在错误。它是在上线前对基于网络的应用程序进行完整的测试。 Web测试检查 功能测试 易用性测试 接口测试 性能测试 安全测试 兼容性测试 1、功能测试 测试网页中的所有链接、数据库连接、网页中用于提交或从…

通过Dockerfile创建镜像

通过Dockerfile创建镜像 Docker 提供了一种更便捷的方式&#xff0c;叫作 Dockerfile docker build命令用于根据给定的Dockerfile构建Docker镜像。 docker build语法&#xff1a; # docker build [OPTIONS] <PATH | URL | -> 1. 常用选项说明--build-arg&#xff0c;设置…