Paimon 与 Spark 的集成(一)

Paimon

Apache Paimon (incubating) 是一项流式数据湖存储技术,可以为用户提供高吞吐、低延迟的数据摄入、流式订阅以及实时查询能力。Paimon 采用开放的数据格式和技术理念,可以与 ApacheFlink / Spark / Trino 等诸多业界主流计算引擎进行对接,共同推进 Streaming Lakehouse 架构的普及和发展。

Paimon x Spark

Apache Spark,作为大数据处理的统一计算分析引擎的,不仅支持多种语言的高级 API 使用,也支持了丰富的大数据场景应用,包括结构化数据处理的Spark SQL、用于机器学习的 MLlib,用于图形处理的 GraphX,以及用于增量计算和流处理的Structured Streaming。Spark 已经成为了大数据领域软件栈中必不可少的组成部分。作为数据湖领域新起的 Paimon,与 Spark 的深度、全面的集成也将为 Paimon在准实时场景、离线湖仓场景提供了便利。

接下来我们介绍一些在 Paimon 新版本中基于 Spark 计算引擎支持的主要功能。

Schema Evolution

Schema evolution 是一个数据湖领域一个非常关键的特性,它允许用户方便的修改表的当前 Schema 以适应现有数据,或随时间变化的新数据,同时保持数据的完整性和一致性。

在离线场景中,我们可以通过计算引擎,如 Spark 或者 Flink,提供的 Alter Table 的 SQL 语法来实现对 Schema 的操作。在某些场景下,我们并非都能实时准确的获取上游数据较当前表的 Schema 变化;另外在 Streaming 流式场景中以离线 Alter Table 的方式完成 Schema 的更新需要执行1)停止流作业,2)完成 Schema 更新操作,3)重启流作业这样的流程,这是较为低效的。

Paimon 支持了在数据写入的同时,自动完成 Source 数据和当前表数据的 Schema 合并,并将合并后的 Schema 作为表的最新 Schema,仅需要配置参数  write.merge-schema。

data.write
.format("paimon")
.mode("append")
.option("write.merge-schema", "true")
.save(location)
新增列

比较常见的是,在执行数据追加或覆盖操作时使用,以自动调整 Schema 以包含一个或多个新列。

假设原表的 Schema 为:

a INT
b STRING

新数据 data 的 Schema 为:

a INT
b STRING
c LONG
d Map<String, Double>

操作完成后的表的 Schema 变更为:

a INT
b STRING
c LONG
d Map<String, Double>
提升字段类型

Paimon 的 Schema Evolution 也同时支持数据类型的提升,如 Int 提升为 Long,Long提升为 Decimal 等;以上述表继续写入数据,假设新数据的 Schema 为:

a Long
b STRING
c Decimal
d Map<String, Double>

操作完成后的表的 Schema 变更为:

a Long
b STRING
c Decimal
d Map<String, Double>
强制类型转换

如以上示例所示,Paimon 支持数据字段类型的提升,如数值型向更高的精度提升(由 Int 提升至 Long,由 Long 提升至 Decimal),同时 Paimon 也支持一些类型之间的强制转换,如 String 强转成 Date 类型或者 Long 转换成 Int,但需要显式的配置参数 write.merge-schema.explicit-cast。

data.write
.format("paimon")
.mode("append")
.option("write.merge-schema", "true")
.option("write.merge-schema.explicit-cast", "true")
.save(location)

假设原表的 Schema为:

a LONG
b STRING //内容为2023-08-01的格式

新数据 data 的 Schema 为:

a INT
b DATE

操作完成后的表的 Schema 变更为:

a INT
b DATE

需要注意的是:

数据写入(追加或覆盖写)时的 Schema Evolution 不支持删除列和重命名列操作的,也不支持不在隐式/显式转换范围内的数据类型提升。当具体数值不能转换成目标类型时,为了避免将表数据破环,当前会报错,终止该操作。

Spark Structured Streaming

Spark Structured Streaming 是一个基于 Spark SQL 引擎构建的可扩展且容错的流处理引擎,可以像表达静态数据的批量计算一样的表达流计算。Spark SQL 引擎将负责增量且持续地运行它,并随着流数据不断到达而更新最终结果。Structured Streaming 支持流之间的聚合、事件时间窗口、流批之间 Join 等。Spark 通过 checkpointing 和 write-ahead logs 实现了端到端的 exactly-once。简而言之,Structured Streaming 提供快速、可扩展、容错、端到端的一次性流处理,而用户无需考虑流处理。

Paimon 在 0.5 和 0.6 两个版本逐步完善了 Spark Structured Streaming 的读写支持,提供了基于 Spark 引擎的流式读写能力。

■ Streaming Sink

Spark Structured Streaming 定义了三种输出模式(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts),Paimon 仅支持 Append 模式和 Complete 模式。

// `df` is the upstream source data.
val stream = df
  .writeStream
  .outputMode("append")
  .option("checkpointLocation", "/path/to/checkpoint")
  .format("paimon")
  .start("/path/to/paimon/sink/table")
■ Streaming Source

结合 Spark 支持的多种 Trigger 策略 [1]和 Paimon 拓展的一些流式处理的能力,Paimon 可以支持丰富的 Streaming Source 的应用场景。

Paimon 提供了多样了 ScanMode,允许用户以合适的参数指定初始状态从 Paimon 表读取的数据。

ScanMode

描述

latest

仅读取后续持续写入的数据。

latest-full

读取当前快照的数据,以及后续持续写入的数据。

from-timestamp

读取参数 scan.timestamp-millis 指定的时间戳之后持续写入的数据。

from-snapshot

读取参数 scan.snapshot-id 指定的版本后续持续写入的数据。

from-snapshot-full

读取参数 scan.snapshot-id 指定的版本快照数据,以及后续持续写入的数据。

default

默认等同于 latest-full 模式;如果指定 scan.snapshot-id,等同于 from-snapshot 模式;如果指定 scan.timestamp-millis,等同于 from-timestamp 模式;

Paimon 通过拓展 SupportsAdmissionControl [2]接口,实现了 Source 端的流量控制,避免了由于要处理的单个 Batch 的数据量过大而引起的流式作业运行失败的问题。Paimon 目前支持以下ReadLimit [3]的实现。

Readlimit 参数

描述

read.stream.maxFilesPerTrigger

一个 Batch 最多返回的Splits数

read.stream.maxBytesPerTrigger

一个 Batch 最多返回的byte数

read.stream.maxRowsPerTrigger

一个 Batch 最多返回的行数

read.stream.minRowsPerTrigger

一个 Batch 最少返回的行数,和 maxTriggerDelayMs 搭配使用构成ReadMinRows [4]

read.stream.maxTriggerDelayMs

一个 Batch 触发的最大延时,和 minRowsPerTrigger 搭配使用构成ReadMinRows [4]

以两个示例说明 Paimon Spark Structured Streaming 的用法。

示例一:

普通的流式增量 ETL 场景。

// Paimon source表的Schema为:time Long, stockId INT, avg_price DOUBLE
val query = spark.readStream
  .format("paimon")
  .option("scan.mode", "latest")
  .load("/path/to/paimon/source/table")
  .selectExpr("CAST(time AS timestamp) AS timestamp", "stockId", "price")
  .withWatermark("timestamp", "10 seconds")
  .groupBy(window($"timestamp", "5 seconds"), col("stockId"))
  .writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime(180, TimeUnit.SECONDS))
  .start()

该示例以 3 分钟的间隔流式读取 Paimon 后续的增量数据,进行 ETL 转化后同步到下游。

示例二:

适用于追补数据的场景,流式读取 Paimon 表自某个指定快照之后的数据,读取完成后不再读取后续写入的数据,同时限定了每个 Batch 大致的数据规模。

val query = spark.readStream
  .format("paimon")
  .option("scan.mode", "from-snapshot")
  .option("scan.snapshot-id", 345)
  .option("read.stream.maxBytesPerTrigger", "134217728")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .trigger(Trigger.AvailableNow())
  .start()

示例代码中指定 Trigger.AvailableNow()触发器,表示仅读取流式任务启动时当前 Paimon 可用的数据;使用 from-snapshot 的 ScanMode 标识了读取快照 ID=345 之后写入的数据。在配置 maxBytesPerTrigger 等于 128MB 后,Spark Structured Streaming会将待消费的数据按照 128MB 的 Splits 大小进行 Batch 切分,由多个 Batch 完成当前快照数据的消费。

Spark SQL 拓展

■ Insert Overwrite

Insert Overwrite 是一个常用的 SQL 语法,用于重写整张表或者表中指定分区。该功能在 Paimon 新版本中也得到支持,包括了 static 和 dynamic 两种模式。

Static Overwrite

覆盖整张表:无论当前表是否是分区表,通过以下 SQL 可以完成使用新数据覆盖原表数据的操作。

在 Spark 环境下使用 Paimon,请参考这里 [5]

USE paimon;


CREATE TABLE T (a INT, b STRING) TBLPROPERTIES('primary-key'='a');


INSERT OVERWRITE T VALUES (1, "a"), (2, "b");
----------
1 a
2 b
----------


INSERT OVERWRITE T VALUES (1, "a2"), (3, "c");
----------
1 a2
3 c
----------

覆盖指定的表分区。

USE paimon;


CREATE TABLE T (dt STRING, a INT, b STRING)
TBLPROPERTIES('primary-key'='dt,a')
PARTITIONED BY(dt);


INSERT OVERWRITE T VALUES ("2023-10-01", 1, "a"), ("2023-10-02", 2, "b");
----------------
2023-10-01 1 a
2023-10-02 2 b
----------------


INSERT OVERWRITE T PARTITION (dt = "2023-10-02") VALUES (2, "b2"), (4, "d");
----------------
2023-10-01 1 a
2023-10-02 2 b2
2023-10-02 d 4
----------------

Dynamic Parititon Overwrite(DPO)

默认情况下是在 Static 模式下执行 Insert Overwrite 的,用户需要显式的指定要覆盖的分区信息;我们可以通过参数启用 Dynamic 模式来执行 Insert Overwrite,这样Paimon 将自动判断 source 端数据所涉及到的分区来执行覆盖操作。

Paimon 启动 DPO 需要启动 spark session 时额外指定 paimon 的 extension:

--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
 
 
USE paimon;


CREATE TABLE T (dt STRING, a INT, b STRING)
TBLPROPERTIES('primary-key'='dt,a')
PARTITIONED BY(dt);


INSERT OVERWRITE T VALUES ("2023-10-01", 1, "a"), ("2023-10-02", 2, "b");
----------------
2023-10-01 1 a
2023-10-02 2 b
----------------


SET spark.sql.sources.partitionOverwriteMode=DYNAMIC;


INSERT OVERWRITE T VALUES ("2023-10-02", 2, "b2"), ("2023-10-02", 4, "d");
----------------
2023-10-01 1 a
2023-10-02 2 b2
2023-10-02 d 4
----------------

在配置 spark.sql.sources.partitionOverwriteMode=DYNAMIC 后,不再需要指定要覆盖 dt="2023-10-02"的分区,实现了数据的动态覆盖。

■ Call procedure

除了由 Spark 框架提供了常用的 SQL 语法(包括 DDL,DML,Query 以及一些表信息查询)外,Paimon 还需要拓展一些额外的 SQL 语法来提供自定义功能的操作接口,便于用户对 Paimon 表的管理和探索。Call Procedure 的引入为这种场景的支持提供了框架层面的支持。

procedure 的语法:

CALL procedure_name(table => 'table_identifier', arg1 => '', ...);

目前 Paimon 已经实现了三种 procedure:

Procedure

描述

用法

create_tag

为指定快照创建标签

CALL create_tag(table => 'T', tag => 'test_tag', snapshot => 2)

delete_tag

删除已创建的标签

CALL delete_tag(table => 'T', tag => 'test_tag')

rollback

回滚表到指定标签或者版本

CALL rollback(table => 'T', version => '2')


场景示例

以下构造一个流式开启 Schema Evolution 的示例,上游数据实时同步到 paimon 的 user 表(原表仅有 userId 和 name 两个维度),在某时刻上游数据添加了 age 属性,在无需停止作业运维时通过开启 Schema Evolution 自动完成元数据的合并和新数据的写入。

d29f7b28eeb0f663120ebf2d21e3b4a4.png

 
 
// 原表的定义
// CREATE TABLE T (userId INT, name STRING) TBLPROPERTIES ('primary-key'='userId');


// -- 假设原表的流式写入的数据--
// 1 user1
// 2 user2
// -------------------------


// 使用MemoryStream模拟上游streaming数据
val inputData = MemoryStream[(Int, String, Int)]
val stream = inputData
  .toDS()
  .toDF("userId", "name", "age")
  .writeStream
  .option("checkpointLocation", "/path/to/checkpoint")
  .option("write.merge-schema", "true")
  .format("paimon")
  .start("/path/to/user_table")


inputData.addData((1, "user1", 30), (3, "user3", 33))
stream.processAllAvailable()


// -- 该batch数据写入后的表数据--
// 1 user1 30
// 2 user2 null
// 3 user3 33
// ---------------------------

后续规划

Paimon 孵化于 Flink 社区,源于流式数仓,但其远不止于此。Paimon 将在与如 Apache Spark 这样的其他引擎的深度集成上,以及在如离线湖仓的场景支持上持续发力。在接下来的时间上,社区在和 Spark 引擎的支持上将逐渐拓展支持更多的 Spark SQL 语法,比如 Update、Merge Into 等;在读写性能上也会进行深层次优化。


参考

[1] Trigger 策略

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

[2] SupportsAdmissionControl

https://spark.apache.org/docs/3.2.1/api/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.html

[3] ReadLimit

https://spark.apache.org/docs/3.2.1/api/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.html

[4] ReadMinRows

https://spark.apache.org/docs/3.2.1/api/java/org/apache/spark/sql/connector/read/streaming/ReadMinRows.html

[5] 在 Spark 环境下使用 Paimon

https://paimon.apache.org/docs/master/engines/spark3/#setup


▼ 关注「Apache Spark 技术交流社区」,获取更多技术干货 ▼

 f6b637bb03c3661ca0875130168999b7.gif  点击「阅读原文」,跳转  Apache Paimon 官网

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/130597.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

听GPT 讲Rust源代码--library/core/src(2)

题图来自 5 Ways Rust Programming Language Is Used[1] File: rust/library/core/src/iter/adapters/by_ref_sized.rs 在Rust的源代码中&#xff0c;rust/library/core/src/iter/adapters/by_ref_sized.rs 文件实现了 ByRefSized 适配器&#xff0c;该适配器用于创建一个可以以…

在Node.js中,什么是事件发射器(EventEmitter)?

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

全新Inner-IoU损失函数!!!通过辅助边界框计算IoU有效提升检测效果

摘要 1 简介 2 方法 2.1 边界框回归模式分析 2.2 Inner-IoU 损失 3 实验 3.1 模拟实验 3.2 对比实验 3.2.1 PASCAL VOC上的YOLOv7 3.2.2 YOLOv5 在 AI-TOD 上 4. 参考 摘要 随着检测器的快速发展&#xff0c;边界框回归&#xff08;BBR&#xff09;损失函数不断进…

11月份 四川汽车托运报价已经上线

中国人不骗中国人!! 国庆小长假的高峰期过后 放假综合症的你还没痊愈吧 今天给大家整理了9条最新线路 广州到四川的托运单价便宜到&#x1f4a5; 核算下来不过几毛钱&#x1f4b0; 相比起自驾的漫长和疲惫&#x1f697; 托运不得不说真的很省事 - 赠送保险 很多客户第一次运车 …

多目标优化框架

随着模型越来越复杂&#xff0c;优化目标越来越多&#xff0c;传统算法都慢慢地无法胜任复杂优化任务&#xff0c;更为智能的优化方法也就应运而生了。其中有一类是进化优化算法&#xff0c;这类算法的思想来源是自然界的“优胜劣汰”法则&#xff0c;通过不停地保留好的个体最…

艾默生Emerson EDI需求分析

艾默生Emerson是一家全球领先的工程技术和解决方案提供商。该公司总部位于美国&#xff0c;成立于1890年&#xff0c;经过多年的发展&#xff0c;已经发展成为一个多元化的跨国企业&#xff0c;业务遍及工业、商业和消费者市场。艾默生提供各种产品和服务&#xff0c;包括自动化…

CSS3 过度效果、动画、多列

一、CSS3过度&#xff1a; CSS3过渡是元素从一种样式逐渐改变为另一种的效果。要实现这一点&#xff0c;必须规定两相内容&#xff1a;指定要添加效果的CSS属性&#xff1b;指定效果的持续时间。如果为指定持续时间&#xff0c;transition将没有任何效果。 <style> div…

Python 的 datetime 模块

目录 简介 一、date类 &#xff08;一&#xff09;date 类属性 &#xff08;二&#xff09;date 类方法 &#xff08;三&#xff09;实例属性 &#xff08;四&#xff09;实例的方法 二、time类 &#xff08;一&#xff09;time 类属性 &#xff08;二&#xff09;tim…

python调用chrome实现网页自动操作

一. 内容简介 python调用chrome实现网页自动操作。 二. 软件环境 2.1vsCode 2.2Anaconda version: conda 22.9.0 2.3代码 链接&#xff1a; 三.主要流程 3.1 下载驱动和插件 调用谷歌浏览器&#xff0c;需要下载浏览器驱动&#xff08;https://registry.npmmirror.co…

确定性 vs 非确定性:GPT 时代的新编程范式

分享嘉宾 | 王咏刚 责编 | 梦依丹 出品 | 《新程序员》编辑部 在 ChatGPT 所引爆的新一轮编程革命中&#xff0c;自然语言取代编程语言&#xff0c;在只需编写提示词/拍照就能出程序的时代&#xff0c;未来程序员真的会被简化为提示词的编写员吗&#xff1f;通过提示词操纵 …

jQuery HTML/CSS 参考文档

jQuery HTML/CSS 参考文档 文章目录 应用样式 示例属性方法示例 jQuery HTML/CSS 参考文档 应用样式 addClass( classes ) 方法可用于将定义好的样式表应用于所有匹配的元素上。可以通过空格分隔指定多个类。 示例 以下是一个简单示例&#xff0c;设置了para标签 <p&g…

【机试题】LazyIterator迭代器懒加载问题

将下面这个未完成的Java工具类补充完成&#xff0c;实现懒加载的功能&#xff0c;该类需要实现Iterable接口&#xff0c;能够遍历所有数据。具体要求如下&#xff1a; 工具类提供了一个ValueLoader接口&#xff0c;用于获取数据&#xff0c;其中ValueLoader的接口定义为&#x…

css:clip元素裁剪实现Loading加载效果边框

clip 属性定义了元素的哪一部分是可见的。clip 属性只适用于 position:absolute 的元素。 警告&#xff1a; 这个属性已被废弃。建议使用 clip-path 文档 https://developer.mozilla.org/zh-CN/docs/Web/CSS/cliphttps://developer.mozilla.org/zh-CN/docs/Web/CSS/clip-path …

AndroidStudio gitee令牌过期 解决方式 remote:Oauth: Access token is expired

记一次&#xff0c;gitee令牌过期 解决方式 Oauth: Access token is expired fatal: unable to access ‘https://gitee.com/xxxx.git/’: The requested URL returned error: 403 remote: [session-e14669a3] Oauth: Access token is expired fatal: unable to access https…

SpringBoot3+Vue3+Mysql+Element Plus完成数据库存储blob类型图片,前端渲染后端传来的base64类型图片

前言 如果你的前后端分离项目采用SpringBoot3Vue3Element Plus&#xff0c;且在没有OSS&#xff08;对象存储&#xff09;的情况下&#xff0c;使用mysql读写图片&#xff08;可能不限于图片&#xff0c;待测试&#xff09;。 耗时三天&#xff0c;在踩了无数雷后&#xff0c…

C# PaddleDetection yolo 印章检测

效果 项目 代码 using OpenCvSharp; using OpenCvSharp.Extensions; using Sdcb.PaddleDetection; using Sdcb.PaddleInference; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq…

GEE:将鼠标变成十字指针,点击获取影像值,显示值到UI中

作者:CSDN @ _养乐多_ 本文记录了在 Google Earth Engine(GEE)开发中,将鼠标变成十字指针,点击获取影像值,显示值到UI中的代码片段。这段代码复制过去修改变量名就可以用了。 效果如下图所示, 文章目录 一、代码片段一、代码片段 使用的时候将 YLDImage 变量换成你屏…

使用WinDbg分析软件突然崩溃的问题

为了测试windbg有多么牛逼&#xff0c;所以仅仅只是测试一下&#xff0c;属于事后诸葛亮型&#xff0c;也只是为了验证一下&#xff0c;把此方法学会即可。 模拟场景&#xff1a; 软件运行后&#xff0c;点击按钮&#xff0c;直接崩溃掉&#xff0c;什么提示都没有。因此&…

uniapp vue2 vuex 持久化

1.vuex的使用 一、uniapp中有自带vuex插件&#xff0c;直接引用即可 二、在项目中新建文件夹store,在main.js中导入 在根目录下新建文件夹store,在此目录下新建index.js文件 index.js import Vue from vueimport Vuex from vuexVue.use(Vuex)const store new Vuex.Store(…

【Java SE】类和对象(上)

目录 一. 面向对象的初步认知 1.1 什么是面向对象 1.2 面向对象与面向过程 二. 类定义和使用 2.1 简单认识类 2.2 类的定义格式 三. 类的实例化 3.1 什么是实例化 3.2 实例化对象 四. this引用(重点&#xff09; 4.1 为什么要有this引用 4.2 this的使用 4.3 this引…