基于 Flink Paimon 实现 Streaming Warehouse 数据一致性管理

摘要:本文整理自字节跳动基础架构工程师李明,在 Apache Paimon Meetup 的分享。本篇内容主要分为四个部分:

  1. 背景

  2. 方案设计

  3. 当前进展

  4. 未来规划

点击查看原文视频 & 演讲PPT

一、背景

1

早期的数仓生产体系主要以离线数仓为主,业务按照自己的业务需求将数仓分为不同的层次,例如 DWD、DWS、ADS 等。在离线数仓中,业务数据会经过离线 ETL 加工进入数仓,层与层之间的数据转换也会使用离线 ETL 来进行处理。ADS 层可以直接对外提供 Serving 能力,中间层通常会使用 Hive 来存储中间数据。基于 Hive 也可以提供一些 OLAP QUERY 的能力。

在离线数仓生产体系下,优势是离线数仓的生产体系非常完整,工具链也比较成熟,存储和维护的成本比较低,对于用户的开发门槛相对也比较低。但劣势也非常明显,首先数据新鲜度非常低,通常是 T+1 级别,一般是小时级,甚至是天级。其次 changelog 支持不完善,虽然是面向Table开发,但中间存储 Hive 主要支持 append 类型的数据,同时离线 ETL 更适合处理全量数据,而不是增量更新。

2

随着数据量的增多,离线 ETL 的执行时间越来越长,同时业务对数据新鲜度的要求也越来越高。业务迫切的需要一种新的低延迟数仓生产体系。因此基于离线数仓进一步演进出了实时数仓生产体系。

比较典型的是 Lambda 架构的实时数仓生产体系。在 Lambda 架构的实时数仓生产体系中,业务需要维护两条链路,将生产链路分为了流处理层和批处理层。流处理层主要用于实时处理增量数据,作为批处理层的加速层,这层通常会选用 Storm、Flink 等实时计算引擎来进行数据处理。而中间结果则采用 Kafka 进行存储,以提供低延迟的流式消费能力。

批处理层和离线数仓相同,完成 T+1 的数据结果产出。服务层则会综合流处理层和批处理层的结果对外提供服务。

随着流式计算引擎的不断发展,以 Flink 为例,已经实现了计算层的流批统一,在一些场景中可以完全移除掉批处理层,由流处理层来完成全量+增量的计算。为了提供中间关键数据的 OLAP 查询能力,仍然需要将 Kafka 的数据再 Dump 到 Hive 中一份。

在实时数仓生产体系中,优势是数据新鲜度非常高,同时基于流处理层也可以做很多的预计算,来降低查询的延迟。

劣势也比较明显:

  • 第一,数仓的维护人员需要维护从计算到存储的两条技术栈完全不同的链路,开发和维护的成本都比较高。
  • 第二,存储成本高。Kafka 为了提供低延迟的流式消费能力,相比于离线常用的 HDFS,S3 等离线存储,存储的成本会更高。同时,为了让中间数据能够提供离线查询的能力,还需要额外存储一份离线的全量数据。
  • 第三,离线和实时链路的数据口径比较难对齐。这是因为采用了完全不同的两套技术栈在构建流处理层和批处理层。虽然逻辑抽象是相同的,但在具体实现上仍然有差别。并且流处理层的数据在不断地进行增量处理,和离线处理层很难基于固定的时间点进行结果对齐。
  • 最后在流处理链路中的中间结果,它是不可以被查询的,因为 Kafka 只支持流式顺序消费,没有点查、batch 查询的能力。虽然可以通过将 Kafka 数据 Dump 到 Hive 中一份,但实时性比较差。

3

尽管计算引擎已经实现了流批统一,但实时数仓其他的痛点很大程度是由于存储功能存在一定的限制而导致的。随着数据湖技术的兴起,一种新的存储技术产生了,它能支持高效的数据流批读写、数据回溯以及数据更新。基于数据湖可以构建出新的数仓生产体系——Streaming Warehouse。

在 Streaming Warehouse 中,每个中间表都被抽象为 Dynamic Table,能够同时支持流式和批式访问,为用户提供了和离线数仓相同的生产体验。基于 Streaming Warehouse 可以带来以下收益。

首先,为用户提供了统一的Table抽象,用户只需要维护一套 Schema。同时也统一了技术栈,大幅降低了业务的开发和运维成本。

其次,它采用了流批一体的存储,支持流式消费和 OLAP 查询,可以随时查询实时计算的中间结果。

最后,在保证数据新鲜度的情况下,存储成本相比实时数仓会更低一些。中间存储可以选用相对廉价的 HDFS 和 S3 这样的存储。

4

接下来我们对这三种数仓生产体系做一个整体的对比。

  • 在数据新鲜度方面,实时数仓和 Streaming Warehouse 的数据新鲜度是比较接近的,都是近似于实时的生产体验。

  • 在查询延迟方面,三种数仓生产体系的查询延迟都相对较低,但实时数仓的中间结果查询需要付出更多的成本,比如将中间结果需要导出到Hive等。

  • 在开发成本方面,Streaming Warehouse 和离线数仓的开发成本比较接近,它们的开发模式类似,可以很容易的进行开发和数据验证,门槛较低。实时数仓由于中间结果不可查,想要做 debug 和数据验证的成本开销会比较高。

  • 在运维成本方面,Streaming Warehouse 和离线数仓的运维成本也是比较接近的,因为它们的生产体系类似。对于运维人员,只需要维护一条链路,使用同一套技术栈。同时 Streaming Warehouse 和离线数仓都可以选择更廉价的离线存储,存储成本会更低一些。

5

那么思考一下 Streaming Warehouse 是否真的完全覆盖了我们的需求?

先来看一个业务场景,这是一个比较典型的商品订单关联计算的业务场景。在这个场景中,订单数据和商品数据会经过一些简单的加工,导入到 Streaming Warehouse 中的 ODS 层的表,也就是订单表和商品表。

然后订单表和商品表会进一步拼接为 DWD 层的商品订单明细表。最后对 DWD 层的表做一些聚合计算,产生 DWS 层的数据结果表。例如统计今天所有商品的营收,统计今天销售量 Top 10 的商品信息等。

在这样一个业务场景中,业务在数仓中可能也会进行一些常见的操作,比如业务可能会去修改订单表的字段。那么如果修改了订单表的字段,怎么去判断这次修改可能会影响到下游的哪些表呢?这反映出目前 Streaming Warehouse 中缺乏一个血缘管理的业务能力。

另外如果订单表数据出错了,如何去做生产链路的数据订正呢?在离线数仓中,可以很方便的进行任务重跑、Overwrite 等操作。在 Streaming Warehouse 中目前也可以很方便的去做这样的操作吗?

由于 Streaming Warehouse 是基于实时生产链路,所以不仅需要对这个表进行订正,还需要对它下游的表同时进行处理。在整个订正的过程中,数据的中间变化不应该被服务层可见。比如聚合结果已经到了 10,在订正的过程中,这个结果可能会回退到1,然后再逐渐累加到 10。

除了上述两个问题外,在进行 OLAP 查询时,如果想要分析 Top 10 商品在整个营收中所占的比重如何进行呢?如果是离线数仓,我们可以在两个表就绪之后进行 batch 查询。而在 Streaming Warehouse 中并没有就绪的概念,这两张表又来源于两个不同的任务,任务之间并没有任何的数据对齐的操作。当我们进行多表关联查询的时候,它的计算结果并不是完全一致的,缺少一个一致性的保证。

6

下面我们来总结一下在 Streaming Warehouse 中存在的问题。

  • 缺少血缘管理功能,包括表的血缘关系以及数据的血缘关系。表血缘关系是指这个表的上下游依赖,而数据血缘关系则是指这份数据来源于上游的哪些数据,同时下游基于这份数据生产出了哪些数据。

  • 缺少统一的版本管理能力。在离线数仓中,我们可以按照小时、天来对数据进行对齐。而在 Streaming Warehouse 中,由于我们都是流式进行处理,没有数据对齐、版本划分的概念,就会导致进行多表关联查询的时候缺少一致性的保证。

  • 数据订正困难。在进行订正的过程中,需要进行链路双跑、业务逻辑修正等大量的人工操作,运维成本较高。

基于以上的问题,我们提出了一个基于 Flink 和 Paimon 构建 Streaming Warehouse,并对外提供数据一致性管理的能力。

二、方案设计

7

下面我们介绍一下基于 Flink 和 Paimon 实现数据一致性管理方案的详细设计。

在一致性管理方案的整体设计中,主要包含两个部分。

  • 第一部分,建立上下游的血缘关系,我们会引入 System Database 来记录 Streaming Warehouse 中所有表和数据的血缘关系。同时,在任务提交以及数据生产的过程中,会自动的把表以及数据之间的血缘关系写入到血缘关系表中。

  • 第二部分,我们会在 Streaming Warehouse 中引入数据版本控制的能力,数据会按照版本来保持可见性,并且协调多表数据版本处理的一致性。

8

下面我们详细介绍一下这两部分的方案设计。

首先是血缘关系中的Table血缘关系管理。我们在 Streaming Warehouse 中引入了 System Database,并在这个 System Database 中创建了 Source 和 Sink 的血缘关系表。在任务的提交阶段,会解析这个任务使用到的 Table 表,并将这些信息记录到 Paimon 的血缘关系表中。

上图是我们的一个表结构,主要用来记录表和任务之间的关联关系。基于这个关联关系,我们可以构建出表与表之间的数据血缘关系。

9

在数据血缘关系中会为数据划分一个版本,并将版本信息记录到数据血缘关系的表中。目前我们以 Flink 的 Checkpoint 作为数据版本的一个划分标志,这是因为在 Flink 中目前 Paimon 表是依赖 Checkpoint 来实现数据提交的。

在 Flink 的 Checkpoint 制作成功之后,这意味着一个新的版本的数据产生了,我们会自动记录消费与生产之间的 Snapshot 的关系。

10

接下来介绍数据版本控制的设计,首先介绍一下基本概念。

  • 第一个概念是 Flink Checkpoint。这个是 Flink 定期用来持久化状态,制作快照的一个功能,主要用于容错、两阶段提交等。

  • 第二个概念是 Paimon Snapshot。在 Flink 制作 Checkpoint 的时候 Paimon 会产生 1 个或 2 个Snapshot,这取决于 Paimon 在这个过程中是否有进行过 Compaction,但至少会产生一个 Snapshot 来作为新的数据版本。

  • 第三个概念是 Data Version,也就是数据版本。计算引擎在计算的时候会按照数据的版本进行数据的对齐,然后进行处理,从而实现一个微批模式的处理。

目前,短期内我们是将 Paimon Snapshot 和 Data Version 两个概念进行了对齐,也就是说一个 Paimon Snapshot 就对应数据的一个版本。

11

先简单看一个数据对齐的示例。假设我们有 Job-A 和 Job-B,他们分别基于 Table-A 产出了自己的下游表 Table-B 和 Table-C。当 Job-C 想要对 Table-B 和 Table-C 进行关联查询的时候,它就可以基于一致性的版本去做自己的 QUERY。

比如 Job-A 基于 Table-A 的 Snapshot-20 产出了 Table-B 的 Snapshot-11。Job-B 基于 Table-A 的Snapshot-20产出了 Table-C 的 Snapshot-15。那么 Job-C 的查询就应该基于 Table-B 的 Snapshot-11 和 Table-C 的 Snapshot-15 进行计算,从而实现计算的一致性。

12

接下来介绍一下数据对齐的实现,它的实现分为两个部分。

  • 在提交阶段,需要去血缘关系表中查询上下游表的一致性版本,并且基于查询结果给对应的上游表设置起始的消费位置。
  • 在运行阶段,按照消费的 Snapshot 来协调 Checkpoint,在 Flink 的 Checkpoint Coordinator 向 Source 发出 Checkpoint 的请求时,会强制要求将 Checkpoint 插入到两个 Snapshot 的数据之间。如果当前的 Snapshot 还没有完全被消费,这个 Checkpoint 的触发会被推迟,从而实现按照 Snapshot 对数据进行划分和处理。

    13

在 Flink 的 Checkpoint 成功之后,它会通知Sink的算子来进行 Table 的 commit。在 commit 完成之后,这份 Snapshot 的数据就可以被下游可见了。此时会由 Commit Listener 将数据的血缘关系写入到 System Table 中,用来记录这份血缘关系。

14

当我们实现上面两个功能之后,具体有哪些应用场景呢?

  • 第一,数据血缘的自动化管理。数据血缘关系在整个数仓中是非常重要的一个部分。基于血缘关系我们可以快速的进行数据溯源,风险评估等。同时也可以基于血缘关系分析这些表的使用方、使用数量、数据走向,从而进行实际应用价值的评估。
  • 第二,查询一致性的能力,我们可以为 OLAP 查询自动按照数据版本来做数据对齐,并且保证查询结果的一致性。同时基于一致性数据进行开发和 debug,可以降低开发和运维成本,不再需要业务方手动进行多表对齐的操作。
  • 第三,数据订正。基于数据一致性管理以及数据血缘关系,可以简化数据订正的过程。首先按照血缘关系我们可以自动的创建下游需要订正的表的镜像表,然后再进行订正。可以提供两种订正方式,全量订正和增量订正。
    • 全量订正,可以基于一致性版本的数据从上游进行全量消费,产生一个全链路的新数据。在整个数据生产追上延迟之后,可以对表进行一个自动切换。
    • 增量订正,可以考虑和 Flink 的 Savepoint 机制相结合,从而不用再从零开始去初始化状态,减少需要回溯的数据量。

三、当前进展

15

下面我们介绍一下目前数据一致性管理的阶段性进展。

在社区里,目前我们发起了相关的 issue、PIP 以及邮件进行讨论,大家感兴趣的话可以关注一下相应的进展。如果有新的需求和想法的话,也欢迎大家一起来交流。

16

在字节内部,目前我们完成了一个 POC 版本的开发和测试。在这个版本中,我们提供了一个第三方的外部服务,用来管理血缘关系,协调数据版本等。

四、未来规划

17

最后介绍一下在 Streaming Warehouse 上的未来规划。

  • 第一,端到端延迟优化。在 POC 的过程中,我们发现端到端的延迟很大程度上取决于 Flink Checkpoint 的间隔,同时在内部收集一些业务需求的时候,业务对端到端延迟要求比较高。这样会带来一个问题,当我们降低 Checkpoint 的频率时,会导致比较多的小文件,这需要做一些权衡。下一阶段我们会着重解决端到端延迟的问题。
  • 第二,数据订正能力增强。目前这个是业务在实时数仓生产中反馈比较多的痛点,业务希望数据订正的成本可以足够低,同时订正过程产生的中间结果对外不可见。
  • 第三,状态复用。在数仓生产中有很多场景是多表关联。目前在 Flink 中,Join 算子会存储左右两条流的数据明细,在多表级联 Join 的场景下,每个 Join 算子都会存储之前的 Join 结果,相当于多存储了一次前面表的明细,会产生非常严重的状态膨胀的问题。业务希望这些状态可以被复用,也就是说相同表的数据只用被存储一份,这样的话可以大幅度的减少状态存储的开销。同时业务也希望这个中间状态是可以被查询的。假设这些状态可以被存储到 Paimon 的表中,采用 Lookup Join 的方式去访问。那么我们就可以使用 Flink 的 SQL 直接查询中间状态。

Q&A

问:血缘关系解析是基于 Flink 的 calcite 吗?

答:不是,是基于 FlinkTableFactory 进行实现,在创建 DynamicTableSource 和 DynamicTableSink 时,提取相关的 Table 信息和任务信息,然后写入到 Paimon 的血缘关系表中。

问:针对于任务出错,数据订正,具体是怎么操作的呢?也就是恢复正常的一个处理流程是怎么样的,大概需要多长时间能够恢复正常呢?

答:我们的目标是希望数据订正的流程可以在系统内自动完成,初期设想是在订正时,基于表的血缘关系对下游表产生相应的镜像表,然后将任务双跑在这条镜像链路上,基于数据血缘关系可以实现数据仍然按照相同的版本进行处理。在两条链路的延迟基本对齐时,进行任务以及表的切换。处理时间依赖处理的数据量,链路的复杂度等。

问:大佬有考虑在此基础上做一个统一的 Paimon 管理服务吗?例如 Paimon 的元数据管理,Compaction 管理,血缘管理等等

答:目前只考虑了实现元数据管理、血缘管理等,对于 Compaction 管理,可能更适合在 Table Service 这样的服务中进行。

问:业务周期跨度比较大,Flink Join 缓存全量的数据?

答:Flink 全量 Join 数据会在状态中存储 Table 的所有数据,同时对于级联 Join 会产生非常严重的状态膨胀问题。根据 Join 的原理,可以考虑将 Join 实现为 Lookup Join + Delta Join,对于历史数据,采用 Lookup join 去查历史表数据,而对于最近的增量数据,将其存储在状态中,通过状态查询进行 Join,这样可以将大量的全量数据存储在 Paimon 表中,状态里只缓存少部分数据。这依赖版本管理的能力来区分数据是 Join 历史数据还是增量数据。

问:字段血缘关系会做吗?要根据 SQL 语法解析的吧

答:暂时不考虑字段血缘关系的实现。

点击查看原文视频 & 演讲PPT

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/61041.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue-baidu-map-3x 使用记录

在 Vue3 TypeScript 项目中,为了采用 标签组件 的方式,使用百度地图组件,冲浪发现了一个开源库 ovo,很方便!喜欢的朋友记得帮 原作者 点下 star ~ vue-baidu-map-3xbaidu-map的vue3/vue2版本(支持v2.0、v…

论文笔记:SUPERVISED CONTRASTIVE REGRESSION

2022arxiv的论文,没有中,但一作是P大图班本MIT博,可信度应该还是可以的 0 摘要 深度回归模型通常以端到端的方式进行学习,不明确尝试学习具有回归意识的表示。 它们的表示往往是分散的,未能捕捉回归任务的连续性质。…

MCU的类型和应用领域简介

MCU(Microcontroller Unit)根据存储器类型可分为无片内ROM型和带片内ROM型。无片内ROM型的芯片需要外接EPROM才能应用,而带片内ROM型则有不同的子类型,如片内EPROM型、MASK片内掩模ROM型和片内Flash型。 MCU还可以按照用途分为通…

策略模式——算法的封装与切换

1、简介 1.1、概述 在软件开发中,常常会遇到这种情况,实现某一个功能有多条途径。每一条途径对应一种算法,此时可以使用一种设计模式来实现灵活地选择解决途径,也能够方便地增加新的解决途径。为了适应算法灵活性而产生的设计模…

【分布式应用】ELK企业级日志分析系统

目录 一、ELK 简介 1.1 ELK各组件介绍 ElasticSearch: Kiabana: Logstash: 1.2 可以添加的其它组件: Filebeat: 缓存/消息队列(redis、kafka、RabbitMQ等): Fluentd&#xf…

向表中随机插入字符串数据

已知表 向该表中插入指定次数的随机字符串: 代码如下: DROP PROCEDURE sc //CREATE PROCEDURE sc(num INT) BEGIN DECLARE str VARCHAR(26) DEFAULT "abcdefghijklmnopqrstuvwxyz"; DECLARE cnt INT DEFAULT 0; DECLARE startIndex INT DEFAULT 1; DE…

React Native获取手机屏幕宽高(Dimensions)

import { Dimensions } from react-nativeconsole.log(Dimensions, Dimensions.get(window)) 参考链接: https://www.reactnative.cn/docs/next/dimensions#%E6%96%B9%E6%B3%95 https://chat.xutongbao.top/

【电源专题】充电IC与DC-DC有什么区别

充电IC和DC-DC一样使用很广泛,如手机、平板等需要电池供电的系统中,一般都会见到充电IC的身影。那么大家有没有考虑过一个问题。充电IC与DC-DC有什么区别? 首先如下所示为充电IC的两个阶段,一个阶段是恒流充电阶段,我们一般称之为CC阶段,另一个是恒压充电阶段,我们称之为…

EtherCAT转Profinet网关连接西门子PLC与凯福科技总线步进驱动器通讯

西门子S7-1200/1500系列的PLC,采用Profinet实时以太网通讯协议,需要连接带EtherCAT的通讯功能的伺服驱动器等设备,就必须进行通讯协议转换。捷米特JM-EIP-RTU系列的网关提供了,快速可行的解决方案 捷米特JM-ECTM-PN在PROFINET一侧…

学习左耳听风栏目90天——第一天 1-90(学习左耳朵耗子的工匠精神,对技术的热爱)【洞悉技术的本质,享受科技的乐趣】

洞悉技术的本质,享受科技的乐趣 第一篇,我的感受就是 耗叔是一个热爱技术,可以通过代码找到快乐的技术人。 作为it从业者,我们如何可以通过代码找到快乐呢?这是一个问题? 至少目前,我还没有这种…

wordpress发表文章时报错: rest_cannot_create,抱歉,您不能为此用户创建文章(已解决)

使用wordpress 的rest api发布文章,首先使用wp-json/jwt-auth/v1/token接口获取token,然后再使用/wp-json/wp/v2/posts 接口发表文章,但是使用axios请求时,却报错: 但是,我在postman上却是可以的&#xff0…

目标检测与跟踪 (1)- 机器人视觉与YOLO V8

目录 1、研究背景 2. 算法原理及对比 2.1 点对特征(Point Pairs) 2.2 模板匹配 2.3 霍夫森林 2.4 深度学习 3、YOLO家族模型演变 4、YOLO V8 1、研究背景 机器人视觉识别技术是移动机器人平台十分关键的技术,代表着机器人智能化、自动化…

C语言----动态内存分配(malloc calloc relloc free)超全知识点

目录 一.动态内存函数 1.malloc 2.free 3.calloc 4.malloc和calloc的区别 5.realloc 二.动态内存分配的常见错误 1.对null进行解引用操作 2.对动态开辟空间的越界访问 3.对非动态开辟内存使用free释放 4.使用free释放动态开辟内存的一部分 5.对同一块动态内存多次…

基于 Redux + TypeScript 实现强类型检查和对 Json 的数据清理

基于 Redux TypeScript 实现强类型检查和对 Json 的数据清理 突然像是打通了任督二脉一样就用了 generics 搞定了之前一直用 any 实现的类型…… 关于 Redux 的部分,这里不多赘述,基本的实现都在这里:Redux Toolkit 调用 API 的四种方式 和…

visio,word添加缺少字体,仿宋_GB2312、楷体_GB2312、方正小标宋简体等字体下载

一. 内容简介 visio,word添加缺少字体,仿宋_GB2312、楷体_GB2312、方正小标宋简体等字体下载 二. 软件环境 2.1 visio 三.主要流程 3.1 下载字体 http://www.downza.cn/ 微软官方给的链接好多字体没有,其他好多字体网站,就是给你看个样式&#xff…

【雕爷学编程】MicroPython动手做(31)——物联网之Easy IoT 2

1、物联网的诞生 美国计算机巨头微软(Microsoft)创办人、世界首富比尔盖茨,在1995年出版的《未来之路》一书中,提及“物物互联”。1998年麻省理工学院提出,当时被称作EPC系统的物联网构想。2005年11月,国际电信联盟发布《ITU互联网…

记一次ubuntu16误删libc.so.6操作的恢复过程

背景 操作系统:ubuntu16 glibc版本:2.23 修改原因: 经过一系列报错和手工构建之后,vulkansdk成功安装(起码运行./vulkansdu成功),在进行./vulkaninfo进行验证时,报错&#xff1a…

G-channel 实现低光图像增强

G-channel 之前研究低光图像增强时,看到一篇博客,里面介绍了一种方法,没有说明出处,也没有说明方法的名字,这里暂时叫做 G-channel 算法。 博客地址:低照度图像增强(附步骤及源码)…

vue+element中如何设置单个el-date-picker开始时间和结束时间关联

功能&#xff1a;选了开始时间&#xff0c;则结束时间只能选择开始时间之后的&#xff1b;选了结束时间&#xff0c;则开始时间只能选择结束时间之前的 重点是picker-options属性 图示&#xff1a; 代码展示: // body 内部<el-form-item><el-date-pickerv-model&qu…

AI抠图使用指南:Stable Diffusion WebUI Rembg实用技巧

抠图是图像处理工具的一项必备能力&#xff0c;可以用在重绘、重组、更换背景等场景。最近我一直在探索 Stable Diffusion WebUI 的各项能力&#xff0c;那么 SD WebUI 的抠图能力表现如何呢&#xff1f;这篇文章就给大家分享一下。 安装插件 作为一个生成式AI&#xff0c;SD…