顺丰基于 Flink CDC + Hudi 推进实时业务落地

摘要:本文整理自大数据研发高级工程师唐尚文,在 Flink Forward Asia 2022 数据集成专场的分享。本篇内容主要分为三个部分:

  1. 应用场景

  2. 实践与优化

  3. 未来规划

点击查看原文视频 & 演讲PPT

一、应用场景

1.1 顺丰集团业务概览

1

顺丰除了大家所熟悉的个人快递服务之外,它还支持同城即时配送、冷链、仓配一体、供应链综合物流服务等等,它是一个能够为客户提供涵盖多行业、多场景、智能化的综合物流服务商。

在这些行业场景背后,主要是由三网进行支撑,分别为天网、地网、信息网。天网指我们所理解的货机和无人机的航空资源;地网指服务网络、网点、陆运、铁运等资源;信息网主要是由顺丰科技支撑。

1.2 顺丰科技业务全景

2

顺丰科技的主要业务可以分成两部分,数字化全流程和科技服务。

数字化全流程:通过提升数字化的科技能力,助力我们内部经营的智能化升级和管理效率的提升。

科技服务:顺丰也非常关注生态上下游的用户需求,希望通过多年累积的科技能力和数字化实战经验,去沉淀输出科技化的产品和服务。比如一些成熟的行业解决方案、标准的科技产品、数字化物流开放平台等等,助力用户产业的升级与转型。

1.3 实时计算在顺丰的应用

3

在顺丰的业务场景中,哪些场景使用到实时计算的能力了呢?简单列举了如下四个场景。

  • 第一个场景,可视化监控及资源动态调度。我们通过经营热力图对件量、客户、产品、收派比进行可视化,对资源进行动态调度。
  • 第二个场景,路径规划。依据收派标准、路线距离等进行路径规划、为小哥输出动态时效及最优路径。
  • 第三个场景,运单/运力异常监控。对于异常晚到的快件,现场调度人员就可以联系司机确认车辆情况。对于时效性较高的快件,比如生鲜如果出现运力异常,就会导致货物损坏和赔付。这时我们就可以通过 Flink 做实时和准实时的监控,让一线人员能够快速干预异常情况,降低公司损失。
  • 第四个场景,IOT 万物互联。顺丰有数以万计的传感器设备,它们无时无刻都在产生大量的数据。我们需要对这些数据进行收集、清洗、分析,及时发现问题或者优化业务流程。

除了以上场景,实时计算也在其他场景起着非常重要的作用,给业务带来巨大的价值。

二、实践与优化

2.1 实时数据接入实践

4

下面来看一下我们是怎样支撑这些场景在顺丰落地的。

首先是数据实时接入实践。在顺丰实时数据接入也经历了几个较大版本的迭代和演进。

2017 年,我们基于 JStorm + Canal 实现了第一版实时数据入湖方案。但这个方案存在一些问题,比如 JStorm 不能保障数据一致,且吞吐率较低,比较难维护。

2019 年,随着 Flink 社区不断发展,Flink 逐渐补齐了很多重要的特性。因此也在这一年,我们基于 Flink+ Canal 实现了第二版实时数据集成方案。但这个方案也存在着一些问题,这个问题会在后续章节里详细展开。

为了解决这些问题,我们经历了内部的调研和实践,在 2021 年全面转向 Flink CDC。

5

上图是 Flink + Canal 的架构。首先 Flink 在启动时,它会读取当前 Binlog 的位置,然后 Flink 通过全量采集的方式将数据发往下游。当全量数据读取完后,再从刚刚标记的位置里消费 Binlog 数据。数据采集完后,会将数据发往 Kafka,再由 Spark 程序消费 Kafka 中的数据写往 Hudi。

这个架构它存在以下三个问题。

  • 第一个问题,数据一致性难保障。因为在全量采集过程中,不会进行锁表。如果发生数据变更,全量任务可能就会采集到这些变更的数据,那么就有可能和 Binlog 采集的数据存在一些重复。
  • 第二个问题,架构复杂、链路长。因为需要两套计算引擎和一个消息队列 Kafka 才能将数据写入到数据湖中,所以这个方案涉及的组件多,链路长,消耗资源大,维护复杂。
  • 第三个问题,存储端要求高。它需要下游 Upsert 或者 Merge 写入,才能去除重复的数据,确保数据的最终一致性。如果下游不支持这个能力,它的数据一致性就无法得到保障。

6

基于以上问题,我们整理出了对实时数据接入的一些需求,主要概括为以下三点。

  • 能够保障数据一致。全量增量数据同步自动切换,并能够保障数据的一致。比如刚刚提到的 Flink+Canal,它能做到全量增量的自动切换,但不能做到数据的准确性。
  • 最大限度降低对源数据库的影响。比如不能使用锁,也要能进行流控等等。
  • 具有较好的同步性能,这是非常核心的需求。

在正常的实践过程中,我们往往比较关注同步任务的稳定性和处理的性能,因为它直接可能影响到实时性要求较高的业务下游。我们对比了很多开源的社区的同步框架,最终选择了 Flink CDC,主要有下面几方面的考虑。

  • 它能很好的解决我们的业务痛点。
  • 它的可扩展性和稳定性比较好,社区活跃度也比较高。
  • 它的技术栈和我们的技术栈相匹配。

Flink CDC 的核心特性包括数据转换能力强,它能够复用 Flink 数据清理、数据转换的能力,同时也能利用 Flink 支持 Sink 的能力;支持全量增量的采集自动切换;能够保证数据的一致性;能够做到架构简单,这是之前的架构所不能达到的。

7

在引入 Flink CDC 后,用户怎样使用实时数据同步呢?

  • 第一步,用户 A 需要向数据库管理员申请数据库 A1 的访问权限,这个数据库的相关信息是由管理员去维护的。
  • 第二步,用户 A 得到相关的数据库访问信息后,他通过 SQL/JAR 模式,在实时数据计算平台上进行作业的开发、调试、上线,然后由用户 A 自己维护作业整体的生命周期。同时用户 B 可能也有类似的表的接入需求,他也需要走前面的流程。
  • 第三步,用户可以选择将他的表注册到数据地图供其他人查询使用。

这个基本上也能够满足一些业务的需求,但是我们在接入过程中发现了一些问题。

8

问题一:使用门槛高、维护难。用户对实时数据进行接入的时候,他需要了解 Flink SQL、Flink DataStream API 的使用方式以及相关的参数。对小白用户来说,它的门槛比较高。而且用户只想做表的接入,并不想太了解相关的技术细节。另外,数据库这种分享链接的方式,很容易造成信息泄露,就导致数据源管理员工作量较大,同时也会造成一定安全性的问题。

9

为了解决接入门槛较高这个问题,我们首先对接入过程进行了产品化,使用 Flink CDC 做底座,实现了顺丰实时数据接入产品,实时数据直通车。通过实时数据直通车,用户可以以一种零编码的方式勾选待同步表的相关信息,然后自动生成对应的数据同步任务。同时也能够完成敏感字段自动加密的功能,无需了解 Flink 的相关配置就能实现数据快速入湖。

另外,通过数据源管理授权用户访问、避免密码泄漏,方便用户进行数据管理和数据共享。

10

上图是实现的大概架构,简要步骤如下:

  • 第一步,数据源授权。用户申请数据源读取权限并获得管理员授权。

  • 第二步,作业创建。直通车根据用户勾选的相关信息生成对应的同步作业。

  • 第三步,元数据同步。直通车根据待同步的表信息在数据资产创建对应的元数据。

  • 第四步,数据使用。用户根据数据资产上面的信息,通过查询引擎使用同步后的数据。

11

问题二:实时采集链路不稳定,有可能会对源系统造成较大的影响。以 MySQL CDC 为例,它本质上是伪造了一个从节点。当 MySQL 发生数据变更时,它就会把变更的一些 Binlog 同步给从节点。然后 MySQL CDC 需要解析出 Binlog 信息,获取出当前监听表的数据源。如果需要采取多个表,对应就会分成多个 CDC 任务,Binlog 就会被反复的去拉取,很容易造成机器带宽打满,同时也会造成数据库压力较高。

除此之外,Flink CDC 采集的过程中,也很容易出现 OOM 的问题,导致作业被反复拉起。

12

任务带宽打满的问题,其实就是做任务的合并。任务的合并我们也尝试了很多方案,最终使用了如下的方案。

比如每个表在采集全量数据的过程中,都会启动 Flink CDC 任务读取数据。当任务到达 Binlog 阶段时,Flink CDC 就会采集当前任务具体的 ID 位置汇报给后台的服务。后台的服务会定时检测每个任务,当有任务达到合并状态时,也就是都达到 Binlog 时,他就会把这两个任务合并成一个新的任务。

对于数据库系统压力的问题,我们实现了 Flink CDC 的数据限流。在全量数据读取的过程中,有可能因为突然采集的数据比较多,造成数据库压力较高的问题,数据读取限流就能降低数据全量读取数据时数据库的压力。

同时读取限流对增量读取也有一定的效果。比如当一个 CDC 任务挂掉很长一段时间后再次启动时,它会先去挂掉之前的位置重新消费,这个段时间就可能会产生大量的数据。如果没有限流,数据大量涌入会造成流量暴涨,程序的反压,甚至有可能导致程序内存不足而挂掉。

13

Flink CDC 在读取全量数据块时,有可能会在某一个高低水位中写入大量数据,然后把高低水位中的 Binlog 数据 Merge 到全量的数据块的时候,会导致程序的 OOM 的问题。我们优化了这个步骤,使用流式 Merge 避免了 OOM 的问题。

Flink CDC 在分配 Binlog 时,会默认分配到 Subtask-0 上,而 Flink 程序支持多实例采集。当采集几十个 DB 时,这几十个 DB 的实例都分配在 Subtask-0 上,会导致数据倾斜或者 OOM 的问题,因此我们支持了随机分配策略,即让 Binlog 采集随机分配到不同的 Subtask 上。

同时在顺丰内部,我们很多数据源都是分库分表的,分布在不同的实例上。因此我们支持一个任务采集多个实例上的表,每一个实例对应一个 CDC Source,多个 Source 通过 Union 的方式进行数据合并。另外我们也支持同一个表写入不同的存储后端,这样就不需要为同一个表的采集再起一个任务。

14

问题三:表结构变更无法同步。Flink CDC 支持 DataStream API 的方式获取 DDL 变更的数据,但不同的存储系统处理的 Schema Evolution 的方式是不一样的。如果在写入下游的时候没有处理 Schema Evolution,有可能就会造成数据丢失和原数据不一致的问题。

15

在表结构变更自动同步到 Kafka 的场景中,当我们选择写入的存储后端是 Kafka 时,为了支持 Kafka Schema Evolution,引入了 Schema Registry 组件,写入 Kafka 中的数据都是以 Avro 形式进行存储的,所以当原端系统发生 Schema 变更时,我们只需要把 Schema 信息注册到 Schema Registry 上,同时再把 Schema 同步到数据地图就能够完成 Schema Evolution 的流程。

那么我们如何高效的识别同步的数据是否发生了变更呢?

我们知道 Flink CDC 在采集数据到 Binlog 阶段的时候,它是单线程的,而且它采集的数据是严格有序的。我们的方案就是利用了这个原理,在单线程处理的过程中,实现了带 Schema 签名的 Source Record。

当遇到 DDL 时,会对这个 DDL 之后的第一条数据重新计算 Schema 的签名,且这个 Schema 的签名的有效期只到下个 DDL 为止。此时的数据都包含了 Schema 的签名信息,然后数据会发到不同 Subtask。在每个 Subtask 中,我们只要对比前后数据的签名就能够判断 Schema 发生了变更。

通过这种方式,去判断 Schema 是否一致,实现高效的 Schema 变更识别。

16

在表结构变更自动同步到 Hudi 的场景中,与原来设计的方案相比,Flink 写入 Hudi 的任务在启动之后写入的 Schema 是不能变更的。如果 Schema 进行了变更,需要停止任务,并以新的 Schema 进行启动,这种方式会造比较大的维护成本。而且 Flink 写 Hudi 时候,每一次 Checkpoint 都会触发一次 Commit,每一个 Commit 中的数据对应的 Schema 信息都必须保持一致。因此为了实现表结构同步而不终止任务,我们做了如下两件事。

当遇到 DDL 时,我会对数据进行截段,如上图所示。当我遇到 DDL 时,我就会把 DDL 和 DDL 之后数据存储到 Split Data 中,这样做是为了 Schema 的数据一致。直到下一次 Checkpoint 的时候,他会把 Split Data 中的 DDL 数据取出来,并发往下游的某个 writeFunction。writeFunction 接收到 DDL 之后会将它保存,然后在 Checkpoint 的时候将 DDL 数据发往 coodinator,由 coodinator 执行 DDL 变更。然后动态刷新对应的 Commit 的 Schema 信息,让 Commit 的 Schema 信息在接下来的数据 Schema 的信息里面保持一致。

我们通过表结构自动变更,能够解决大部分生产中比如像简单的类型修改、列增加等一些场景的 Schema 同步的问题,降低了维护的成本。

17

上图是场景化的整体架构。从下往上看,我们支持 MySQL、Oracle、PG 的采集,还有数据直通车、分库分表、敏感字段识别、表结构同步、数据加密、任务合并、读取限流、资产注册等功能。用户只需要在实时计算的实时数据直通车里做一些数据源的申请和表结构的确认,就能实现快速的数据入湖。

2.2 实时数据开发实践

18

首先介绍下实时数据开发的背景。如上图所示,对于离线开发,我们可以通过数据地图对数据资产进行查询和管理,同时资产打通了离线开发平台。然后在开发平台上使用数据地图的元数据,实现了查数据、管数据、用数据的统一。

但对于实时开发,我们如果需要使用实时资产,首先需要获取到对应 Kafka 的相关信息,这些信息是由每一个用户自行维护的,另外需要在实时开发平台上通过 create table 引用这个信息,然后再进行数据开发。这里会出现一些问题,我们没有对元数据进行维护,管理也会很不方便。比如 Kafka 有上百个字段,用户就需要写很长的 SQL,就会造成比较难维护。

19

为了解决查数据的问题,我们对实时数据资产和离线数据资产,在数据地图上进行了统一的管理。数据地图上有详细的字段描述信息以及数据的类型、安全等级等等,用户可以快速了解字段的含义。同时我们也支持数据的快速预览,通过预览数据用户可以快速知道数据的实际情况,以便后续的数据开发。

20

除此之外,我们还支持链路血缘追踪。通过血缘用户可以知道数据上下游之间的关系,对应的计算作业以及具体的影响范围。

21

在管数据方面,为了快速让数据进行实时资产的接入,在资产引入过程中,只要用户填好对应的信息,数据地图就会采集具体数据自动生成对应的 Schema,并让用户确认和补充完善相关的信息。

另外,在数据地图上还支持权限的授权管理,它会生成对应的策略到 Ranger 中做权限进行统一管理。

22

在用数据方面,我们实现了 KafkaCatalog 和离线类似一样的开发体验,就是说用户可以在左边栏通过点击的方式就可以很快对实时字段进行引用。这些数据都统一来自数据地图进行管理,同时还对 SQL 进行一些相关的权限校验。如上图右侧的两个截图,实时开发基本和离线开发保持一致,这样用户用起来就比较方便。

23

另外我们还支持调试功能。比如用户登录实时计算开发平台时,SessionManager 就会识别到这个登录,同时在后台创建一个 Flink Session DEBUG 的服务。这时用户将他需要调试的 SQL 发送到的调试服务,调试服务会根据它的 SQL 获取到对应的目标 Schema 信息。

然后通过获取到的目标 Schema,创建模拟的目标表,同时生成对应的 insert into 语句,再发送到 Flink DEBUG 集群。Flink DEBUG 集群会将调试结果实时写回调试服务。然后用户的前端会以轮询的方式查询这个实时的 DEBUG 数据。通过调试功能,能让用户清楚每一段 SQL 生成的数据结果,提高调试的效率。

24

接下来讲一下数据接入和开发全链路追踪对比。

以前,如果要接入数据和数据开发。首先要知道数据库的地址和 Kafka 的地址,同时申请权限和了解字段的相关信息。之后还要需要了解 Flink CDC 和 Flink SQL 接入等相关知识、配置参数使用,测试环境调试。之后在生产环境发布,验证数据。最后对资产在地图上的维护,同时在生产环境供其他人使用。

优化后,我们只需在直通车上申请权限,然后就可以直接在直通车上进行发布,同时验证数据。之后就可以开放给所有人使用了。发布时间从原来的五天到现在不到一天就能够完成实时数据的接入。

2.3 总结

25

我们从以下三个方面进行了优化。

  • 实时接入方面,我们进行了实时接入的产品化,同时支持表结构实时同步和一些实践的优化。
  • 实时开发方面,我们对实时资产进行了管理,同时我们支持 Flink SQL 权限校验和 KafkaCatalog。
  • 实时调试方面,我们支持单步调试,对用户的 DEBUG 体验进行了优化。

三、未来规划

26

未来的规划主要分为以下三个方面:

  • 资源的弹性伸缩。通过识别有瓶颈的 Flink 作业,为其适配合适的资源、并行度,保障业务时效。

  • 统一的元数据管理。通过构建统一的元数据服务,并集成支持数据湖表管理等功能,推进数据湖在顺丰实时场景落地。

  • 流批一体。基于 Flink、数据湖技术打造流批一体的计算平台。

点击查看原文视频 & 演讲PPT

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/43385.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

宝塔的Redis绑定IP

宝塔安装Redis 软件商店搜索Redis 连接宝塔面板的redis服务器失败的解决办法 检查Linux是否放行6379端口修改Redis绑定IP检查阿里云/腾讯云的防火墙策略是否放行6379端口 1.bind 127.0.0.1 修改为 bind 0.0.0.0 127.0.0.1 表示只允许本地访问,无法远程连接 0.0.0.0 表…

51单片机--AT24C02数据存储

文章目录 存储器的介绍AT24C02I2C总线I2C时序结构AT24C02数据帧AT24C02数据存储实例 存储器的介绍 存储器是计算机系统中的一种重要设备,用于存储程序和数据,它可以通过电子、磁性介质等技术来记录和保持数据。在这里,主要介绍的是随机存储器…

TableGPT: Towards Unifying Tables, Nature Language and Commands into One GPT

论文标题:TableGPT: Towards Unifying Tables, Nature Language and Commands into One GPT 论文地址:https://github.com/ZJU-M3/TableGPT-techreport/blob/main/TableGPT_tech_report.pdf 发表机构:浙江大学 发表时间:2023 本文…

BeanFactory容器的构建和使用示例

BeanFactory容器的实现流程: BeanFactory是Spring框架中的一部分,它提供了IoC(控制反转)的实现机制。下面是BeanFactory的IoC实现过程: 定义Bean定义:首先,我们需要在配置文件中定义Bean的定义…

详解分类指标Precision,Recall,F1-Score

文章目录 1. Precision(精度)2. Recall(召回率)3. F1-Score4. Accuracy(准确率)5. P-R 曲线6. TPR、FPR6.1 TPR(真正率)6.2 FPR(假正率) 7. ROC曲线8. AUC曲线…

vscode使用g++编译.c文件或.cpp文件

vscode是一个跨平台、轻量级、插件非常丰厚的IDE,这里介绍在vscode里使用g来编译.cpp文件。g也叫GCC, 在Window中,是使用MinGW方式实现g的,它分为32位和64位2个版本,其中,MinGW-64是64位的,MinGW-32是32位的…

linux学成之路(基础篇)(二十)rsync服务器

目录 前言 一、概述 监听端口 二、特点 快捷 安全 三、数据的同步方式 四、rsync传输方式 本地传输 远程传输 守护进程 五、命令 作为远程命令 作为rsync服务 选项 六、配置文件 全局配置 模块配置 守护进程传输 七、rsyncinotfy实时同步 一、服务端 二、…

力扣奇遇记 [第一章]

文章目录 😦第一题:拿下LeetCode1769. 移动所有球到每个盒子所需的最小操作数学习内容:LeetCode1769. 移动所有球到每个盒子所需的最小操作数🙈思路分析:💖代码产出: 😦第二题&#…

优雅的设计测试用例

⭐️前言⭐️ 入职以后接触到了公司的具体业务,提升了设计测试用例的能力,于是沉淀出这篇文档与大家分享。 🍉欢迎点赞 👍 收藏 ⭐留言评论 📝私信必回哟😁 🍉博主将持续更新学习记录收获&…

【C语言】表达式求值相关问题汇总—>隐式类型转换(整型提升)、算数转换与操作符优先级汇总(收藏查阅)

👀樊梓慕:个人主页 🎥个人专栏:《C语言》《数据结构》《蓝桥杯试题》 🌝每一个不曾起舞的日子,都是对生命的辜负。 目录 前言: 一、隐式类型转换 (一)整型提升的意义…

后端(四):博客系统项目

咱们在这里实现的是后端项目,前端代码就提一提,不全做重点介绍,在开始讲解这个博客系统项目之前,我们先看看这个项目的前端界面: 登录界面: 个人主页: 博客详情页: 写博客页&#x…

再见 Spring Boot 1.X ,Spring Boot 2.X 走向舞台中心

2019年8月6日,Spring 官方在其博客宣布,Spring Boot 1.x 停止维护,Spring Boot 1.x 生命周期正式结束。 其实早在2018年7月30号,Spring 官方就已经在博客进行过预告,Spring Boot 1.X 将维护到2019年8月1日。 1.5.x 将会…

【Java】重写compareTo()方法给对象数组排序

我们先给一个数组排序,我们肯定用的是Arrays.sort()方法: public class test2 {public static void main(String[] args) {int[] arr{3,5,4,6,9,8,1};System.out.println(Arrays.toString(arr));System.out.println("---------");Arrays.sort…

【C语言初阶】指针的运算or数组与指针的关系你了解吗?

🎬 鸽芷咕:个人主页 🔥 个人专栏:《快速入门C语言》《C语言初阶篇》 ⛺️生活的理想,就是为了理想的生活! 文章目录 📋 前言💬 指针运算💭 指针-整数💭 指针-指针💭 指针…

【Java基础教程】(四十二)多线程篇 · 上:多进程与多线程、并发与并行的关系,多线程的实现方式、线程流转状态、常用操作方法解析~

Java基础教程之多线程 上 🔹本节学习目标1️⃣ 线程与进程🔍关于多进程、多线程、并发与并行之间的概念关系? 2️⃣ 多线程实现2.1 继承 Thread 类2.2 实现 Runnable 接口2.3 多线程两种实现方式的区别2.4 利用 Callable 接口实现多线程2.5 …

数学建模学习(4):TOPSIS 综合评价模型及编程实战

一、数据总览 需求:我们需要对各个银行进行评价,A-G为银行的各个指标,下面是银行的数据: 二、代码逐行实现 清空代码和变量的指令 clear;clc; 层次分析法 每一行代表一个对象的指标评分 p [8,7,6,8;7,8,8,7];%每一行代表一个…

为Android构建现代应用——设计原则

为Android构建现代应用——设计原则 - 掘金 state”是声明性观点的核心 在通过Compose或SwiftUI等框架设计声明性视图时,我们必须明确的第一个范式是State(状态)。UI组件结合了它的图形表示(View)和它的State(状态)。UI组件中发生变化的任何属性或数据都可以…

RuoYi-VUE : make sure to provide the “name“ option

前言 略 错误 错误原因 theme-picker 组件未被注册。 解决 src/App.vue代码恢复成若依的代码即可。&#xff08;PS&#xff1a;不知道代码被谁修改了&#xff09; 缺少这一段&#xff1a; <script> import ThemePicker from "/components/ThemePicker";…

hive基础

目录 DDL&#xff08;data definition language&#xff09; 创建数据库 创建表 hive中数据类型 create table as select建表 create table like语法 修改表名 修改列 更新列 替换列 清空表 关系运算符 聚合函数 字符串函数 substring:截取字符串 replace :替换…

C进阶:内存操作函数

内存操作函数 memcpy 头文件&#xff1a;string.h 基本用途&#xff1a;进行不相关&#xff08;不重叠的内存&#xff09;拷贝。 函数原型&#xff1a;void* memcpy(void* destination,//指向目标数据的指针 const void* source,//指向被拷贝数据的指针 size_t num);//拷贝的数…