Iceberg从入门到精通系列之二十一:Spark集成Iceberg

Iceberg从入门到精通系列之二十一:Spark集成Iceberg

  • 一、在 Spark 3 中使用 Iceberg
  • 二、添加目录
  • 三、创建表
  • 四、写
  • 五、读
  • 六、Catalogs
  • 七、目录配置
  • 八、使用目录
  • 九、替换会话目录
  • 十、使用目录特定的 Hadoop 配置值
  • 十一、加载自定义目录
  • 十二、SQL 扩展
  • 十三、运行时配置-读选项
  • 十四、运行时配置-写选项
  • 十五、Spark Procedures
  • 十六、元数据管理

Iceberg的最新版本是1.4.3。

Spark 是目前用于 Iceberg 操作的功能最丰富的计算引擎。建议您开始使用 Spark,通过示例了解 Iceberg 概念和功能。您还可以在多引擎支持页面下查看将 Iceberg 与其他计算引擎结合使用的文档。

一、在 Spark 3 中使用 Iceberg

要在 Spark shell 中使用 Iceberg,请使用 --packages 选项:

spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.4.3

如果想在 Spark 安装中包含 Iceberg,请将iceberg-spark-runtime-3.2_2.12 Jar 添加到 Spark 的 jars 文件夹中。

二、添加目录

Iceberg 附带了目录,使 SQL 命令能够管理表并按名称加载它们。使用spark.sql.catalog.(catalog_name)下的属性配置目录。

此命令为 $PWD/warehouse 下的表创建一个名为 local 的基于路径的目录,并向 Spark 的内置目录添加对 Iceberg 表的支持:

spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.4.3\
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hive \
    --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.local.type=hadoop \
    --conf spark.sql.catalog.local.warehouse=$PWD/warehouse

三、创建表

要在 Spark 中创建第一个 Iceberg 表,请使用 Spark-sql shell 或 Spark.sql(…) 运行 CREATE TABLE 命令:

CREATE TABLE local.db.table (id bigint, data string) USING iceberg

Iceberg 目录支持全系列 SQL DDL 命令,包括:

  • CREATE TABLE … PARTITIONED BY
  • CREATE TABLE … AS SELECT
  • ALTER TABLE
  • DROP TABLE

四、写

创建表后,使用 INSERT INTO 插入数据:

INSERT INTO local.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c');
INSERT INTO local.db.table SELECT id, data FROM source WHERE length(data) = 1;

Iceberg 还向 Spark、MERGE INTO 和 DELETE FROM 添加了行级 SQL 更新:

MERGE INTO local.db.target t USING (SELECT * FROM updates) u ON t.id = u.id
WHEN MATCHED THEN UPDATE SET t.count = t.count + u.count
WHEN NOT MATCHED THEN INSERT *

这段代码是一个使用 MERGE INTO 语句进行数据合并的示例。它将来自更新表的数据合并到目标表中。具体的操作如下:

  1. 使用子查询 (SELECT * FROM updates) u 获取更新表的数据,并将其作为源表使用。
  2. 使用目标表 local.db.target 作为目标表进行更新。
  3. 使用 ON t.id = u.id 来指定源表和目标表之间的连接条件。这里使用了 id 列作为连接条件。
  4. 当连接条件匹配时,执行 WHEN MATCHED 分支的操作。这里使用 UPDATE SET t.count = t.count + u.count 将目标表中的 count 列的值增加源表中对应行的 count 值。
  5. 当连接条件不匹配时,执行 WHEN NOT MATCHED 分支的操作。这里使用 INSERT * 将源表中的数据插入到目标表中。

总之,这段代码通过 MERGE INTO 语句将更新表的数据合并到目标表中,根据连接条件进行更新或插入操作。

Iceberg 支持使用新的 v2 DataFrame 写入 API 写入 DataFrame:

spark.table("source").select("id", "data")
     .writeTo("local.db.table").append()

支持旧的写入 API,但不推荐。

五、读

要使用 SQL 读取,请在 SELECT 查询中使用 Iceberg 表名称:

SELECT count(1) as count, data
FROM local.db.table
GROUP BY data

SQL 也是检查表的推荐方法。要查看表中的所有快照,请使用快照元数据表:

SELECT * FROM local.db.table.snapshots
+-------------------------+----------------+-----------+-----------+----------------------------------------------------+-----+
| committed_at            | snapshot_id    | parent_id | operation | manifest_list                                      | ... |
+-------------------------+----------------+-----------+-----------+----------------------------------------------------+-----+
| 2019-02-08 03:29:51.215 | 57897183625154 | null      | append    | s3://.../table/metadata/snap-57897183625154-1.avro | ... |
|                         |                |           |           |                                                    | ... |
|                         |                |           |           |                                                    | ... |
| ...                     | ...            | ...       | ...       | ...                                                | ... |
+-------------------------+----------------+-----------+-----------+----------------------------------------------------+-----+

支持 DataFrame 读取,现在可以使用 Spark.table 按名称引用表:

val df = spark.table("local.db.table")
df.count()

六、Catalogs

Spark 添加了一个 API 来插入用于加载、创建和管理 Iceberg 表的表目录。 Spark 目录是通过在spark.sql.catalog 下设置Spark 属性来配置的。

这将创建一个名为 hive_prod 的 Iceberg 目录,该目录从 Hive 元存储加载表:

spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type = hive
spark.sql.catalog.hive_prod.uri = thrift://metastore-host:port
# omit uri to use the same URI as Spark: hive.metastore.uris in hive-site.xml

以下是名为rest_prod 的 REST 目录示例,该目录从 REST URL http://localhost:8080 加载表:

spark.sql.catalog.rest_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.rest_prod.type = rest
spark.sql.catalog.rest_prod.uri = http://localhost:8080

Iceberg 还支持 HDFS 中基于目录的目录,可以使用 type=hadoop 进行配置:

spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://nn:8020/warehouse/path

基于 Hive 的目录仅加载 Iceberg 表。要在同一 Hive 元存储中加载非 Iceberg 表,请使用会话目录。

七、目录配置

通过添加属性spark.sql.catalog.(catalog-name) 及其值的实现类来创建和命名目录。

Iceberg 提供了两种实现:

  • org.apache.iceberg.spark.SparkCatalog 支持 Hive Metastore 或 Hadoop 仓库作为目录
  • org.apache.iceberg.spark.SparkSessionCatalog 在 Spark 的内置目录中添加了对 Iceberg 表的支持,并将非 Iceberg 表委托给内置目录

两个目录均使用嵌套在目录名称下的属性进行配置。 Hive 和 Hadoop 的常见配置属性有:

属性描述
spark.sql.catalog.catalog-name.typehive, hadoop or rest底层 Iceberg 目录实现、HiveCatalog、HadoopCatalog、RESTCatalog 或在使用自定义目录时保持未设置
spark.sql.catalog.catalog-name.catalog-impl自定义 Iceberg 目录实现。如果 type 为 null,则 Catalog-impl 不得为 null。
spark.sql.catalog.catalog-name.io-impl自定义 FileIO 实现。
spark.sql.catalog.catalog-name.metrics-reporter-impl自定义 MetricsReporter 实现。
spark.sql.catalog.catalog-name.default-namespacedefault目录的默认当前命名空间
spark.sql.catalog.catalog-name.urithrift://host:portHive 类型目录的 Hive 元存储 URL、REST 类型目录的 REST URL
spark.sql.catalog.catalog-name.warehousehdfs://nn:8020/warehouse/path仓库目录的基本路径
Spark.sql.catalog.catalog-name.cache-enabledtrue or false是否启用目录缓存,默认值为true
spark.sql.catalog.catalog-name.cache.expiration-interval-ms30000 (30 seconds)缓存的目录条目过期的持续时间;仅当启用缓存为 true 时才有效。 -1 禁用缓存过期,0 完全禁用缓存,无论是否启用缓存。默认值为 30000(30 秒)
spark.sql.catalog.catalog-name.table-default.propertyKey属性键 propertyKey 的默认 Iceberg 表属性值,如果未覆盖,将在此目录创建的表上设置该值
Spark.sql.catalog.catalog-name.table-override.propertyKey属性键 propertyKey 的强制 Iceberg 表属性值,用户无法覆盖该值

八、使用目录

目录名称在 SQL 查询中用于标识表。在上面的示例中, hive_prod 和 hadoop_prod 可用于为将从这些目录加载的数据库和表名称添加前缀。

SELECT * FROM hive_prod.db.table

Spark 3 跟踪当前目录和命名空间,表名称中可以省略它们。

USE hive_prod.db;
SELECT * FROM table

要查看当前目录和命名空间,请运行 SHOW CURRENT NAMESPACE。

九、替换会话目录

要将 Iceberg 表支持添加到 Spark 的内置目录,请配置 Spark_catalog 以使用 Iceberg 的 SparkSessionCatalog。

spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type = hive

Spark 的内置目录支持 Hive Metastore 中跟踪的现有 v1 和 v2 表。这将 Spark 配置为使用 Iceberg 的 SparkSessionCatalog 作为该会话目录的包装器。当表不是 Iceberg 表时,将使用内置目录来加载它。

此配置可以对 Iceberg 和非 Iceberg 表使用相同的 Hive Metastore。

十、使用目录特定的 Hadoop 配置值

与使用 Spark.hadoop.* 配置 Hadoop 属性类似,在使用 Spark 时,可以通过添加带有前缀 Spark.sql.catalog.(catalog-name).hadoop 的目录属性来设置每个目录的 Hadoop 配置值。 *。这些属性将优先于使用spark.hadoop.*全局配置的值,并且仅影响Iceberg表。

spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.endpoint = http://aws-local:9000

十一、加载自定义目录

Spark 支持通过指定catalog-impl 属性来加载自定义Iceberg Catalog 实现。这是一个例子:

spark.sql.catalog.custom_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.custom_prod.catalog-impl = com.my.custom.CatalogImpl
spark.sql.catalog.custom_prod.my-additional-catalog-config = my-value

十二、SQL 扩展

Iceberg 0.11.0 及更高版本向 Spark 添加了一个扩展模块,以添加新的 SQL 命令,例如存储过程的 CALL 或 ALTER TABLE … WRITE ORDERED BY。

使用这些 SQL 命令需要使用以下 Spark 属性将 Iceberg 扩展添加到您的 Spark 环境:

Spark 扩展属性Iceberg扩展实施
spark.sql.extensionsorg.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

十三、运行时配置-读选项

配置 DataFrameReader 时会传递 Spark 读取选项,如下所示:

// time travel
spark.read
    .option("snapshot-id", 10963874102873L)
    .table("catalog.db.table")
Spark选项默认值描述
snapshot-id(latest)要读取的表快照的快照 ID
as-of-timestamp(latest)以毫秒为单位的时间戳;使用的快照将是此时的当前快照。
split-size根据表属性覆盖此表的 read.split.target-size 和 read.split.metadata-target-size
lookback根据表属性覆盖此表的 read.split.planning-lookback
file-open-cost根据表属性覆盖此表的 read.split.open-file-cost
vectorization-enabled根据表属性覆盖此表的 read.parquet.vectorization.enabled
batch-size根据表属性覆盖此表的 read.parquet.vectorization.batch-size
stream-from-timestamp(none)流式传输的时间戳(以毫秒为单位);如果在最旧的已知祖先快照之前,则将使用最旧的

十四、运行时配置-写选项

配置 DataFrameWriter 时会传递 Spark 写入选项,如下所示:

df.write
    .option("write-format", "avro")
    .option("snapshot-property.key", "value")
    .insertInto("catalog.db.table")
Spark选项默认值描述
write-format表 write.format.default用于此写入操作的文件格式;parquet, avro, or orc
target-file-size-bytes根据表属性覆盖此表的 write.target-file-size-bytes
check-nullabilitytrue设置字段可空检查
snapshot-property.custom-keynull在快照摘要中添加具有自定义键和相应值的条目(仅 DSv2 需要快照属性。前缀)
fanout-enabledfalse覆盖此表的 write.spark.fanout.enabled
check-orderingtrue检查输入模式和表模式是否相同
isolation-levelnull数据帧覆盖操作所需的隔离级别。 null => 不检查(对于幂等写入),serializable => 检查目标分区中的并发插入或删除,snapshot => 检查目标分区中的并发删除。
validate-from-snapshot-idnull如果设置了隔离级别,则为用于检查表中并发写入冲突的基本快照的 ID。应该是从表中进行任何读取之前的快照。可以通过 Table API 或 Snapshots 表获取。如果为空,则使用表中最旧的已知快照。
compression-codecTable write.(fileformat).compression-codec覆盖此写入的此表的压缩编解码器
compression-levelTable write.(fileformat).compression-level对于此写入,覆盖此表的 Parquet 和 Avro 表的压缩级别
compression-strategyTable write.orc.compression-strategy针对此写入覆盖此表的 ORC 表压缩策略

CommitMetadata 提供了一个接口,用于在 SQL 执行期间将自定义元数据添加到快照摘要中,这对于审计或更改跟踪等目的非常有用。如果属性以 snapshot-property. 开头,则该前缀将从每个属性中删除。这是一个例子:

import org.apache.iceberg.spark.CommitMetadata;

Map<String, String> properties = Maps.newHashMap();
properties.put("property_key", "property_value");
CommitMetadata.withCommitProperties(properties,
        () -> {
            spark.sql("DELETE FROM " + tableName + " where id = 1");
            return 0;
        },
        RuntimeException.class);

十五、Spark Procedures

要在 Spark 中使用 Iceberg,请首先配置 Spark 目录。存储过程仅在 Spark 3 中使用 Iceberg SQL 扩展时可用。

可以通过 CALL 从任何配置的 Iceberg 目录中使用过程。所有过程都在命名空间系统中。

CALL 支持按名称(推荐)或按位置传递参数。不支持混合位置参数和命名参数。

命名参数
所有过程参数均已命名。按名称传递参数时,参数可以按任何顺序,并且可以省略任何可选参数

CALL catalog_name.system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1)

位置参数
按位置传递参数时,只有结尾参数可以省略(如果它们是可选的)。

CALL catalog_name.system.procedure_name(arg_1, arg_2, ... arg_n)

快照管理

rollback_to_snapshot

将表回滚到特定快照 ID。

要回滚到特定时间,请使用 rollback_to_timestamp。

此过程会使所有引用受影响表的缓存 Spark 计划失效。

参数名称RequiredType描述
table✔️string要更新的表的名称
snapshot_id✔️long要回滚到的快照 ID
Output NameType描述
previous_snapshot_idlong回滚前当前快照ID
current_snapshot_idlong新的当前快照 ID

将表 db.sample 回滚到快照 ID 1:

CALL catalog_name.system.rollback_to_snapshot('db.sample', 1)

rollback_to_timestamp

将表回滚到某个时间的当前快照。

参数名称RequiredType描述
table✔️string要更新的表的名称
timestamp✔️timestamp要回滚到的时间戳
Output NameType描述
previous_snapshot_idlong回滚前当前快照ID
current_snapshot_idlong新的当前快照 ID

将 db.sample 回滚到特定的日期和时间。

CALL catalog_name.system.rollback_to_timestamp('db.sample', TIMESTAMP '2021-06-30 00:00:00.000')

set_current_snapshot

设置表的当前快照 ID。

与回滚不同,快照不需要是当前表状态的祖先。

CALL catalog_name.system.set_current_snapshot('db.sample', 1)

cherrypick_snapshot

从现有快照创建新快照,而不更改或删除原始快照。

只能选择追加和动态覆盖快照。

CALL catalog_name.system.cherrypick_snapshot('my_table', 1)
CALL catalog_name.system.cherrypick_snapshot(snapshot_id => 1, table => 'my_table' )

fast_forward

将一个分支的当前快照快进到另一个分支的最新快照。

CALL catalog_name.system.fast_forward('my_table', 'main', 'audit-branch')

十六、元数据管理

许多维护操作可以使用 Iceberg 存储过程来执行。

过期快照

Iceberg 中的每次写入/更新/删除/更新插入/压缩都会生成一个新快照,同时保留旧数据和元数据以进行快照隔离和时间旅行。 expire_snapshots 过程可用于删除不再需要的旧快照及其文件。

此过程将删除旧快照以及这些旧快照唯一需要的数据文件。这意味着expire_snapshots过程永远不会删除未过期快照仍然需要的文件。

参数名称RequiredType描述
table✔️string要更新的表的名称
older_thantimestamp删除快照之前的时间戳(默认:5 天前)
retain_lastint无论old_than如何,要保留的祖先快照数量(默认为1)
max_concurrent_deletesint用于删除文件操作的线程池的大小(默认情况下,不使用线程池)
stream_resultsboolean当为true时,删除文件将通过RDD分区发送到Spark驱动程序(默认情况下,所有文件将发送到Spark驱动程序)。建议将此选项设置为 true,以防止 Spark 驱动程序因文件大小而导致 OOM
snapshot_idsarray of long即将过期的快照 ID 数组。

如果省略old_than和retain_last,则将使用表的过期属性。仍被分支或标签引用的快照将不会被删除。默认情况下,分支和标签永远不会过期,但可以使用表属性history.expire.max-ref-age-ms更改它们的保留策略。主分支永远不会过期。

输出名称类型描述
deleted_data_files_countlong该操作删除的数据文件数量
deleted_position_delete_files_countlong本次操作删除的位置删除文件数量
deleted_equality_delete_files_countlong本次操作删除的等式删除文件数量
deleted_manifest_files_countlong此操作删除的清单文件数
deleted_manifest_lists_countlong此操作删除的清单列表文件数

删除早于特定日期和时间的快照,但保留最后 100 个快照:

CALL hive_prod.system.expire_snapshots('db.sample', TIMESTAMP '2021-06-30 00:00:00.000', 100)

删除快照 ID 为 123 的快照(注意该快照 ID 不应该是当前快照):

CALL hive_prod.system.expire_snapshots(table => 'db.sample', snapshot_ids => ARRAY(123))

删除孤立文件
用于删除 Iceberg 表的任何元数据文件中未引用的文件,因此可以被视为“孤立”文件。

通过在此表上执行remove_orphan_files 命令的试运行而不实际删除它们,列出所有要删除的候选文件:

CALL catalog_name.system.remove_orphan_files(table => 'db.sample', dry_run => true)

删除 tablelocation/data 文件夹中表 db.sample 未知的所有文件。

CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location => 'tablelocation/data')

重写数据文件

Iceberg 跟踪表中的每个数据文件。更多的数据文件会导致更多的元数据存储在清单文件中,而小数据文件会导致不必要的元数据量和文件打开成本的低效查询。

Iceberg 可以使用 Spark 和 rewriteDataFiles 操作并行压缩数据文件。这会将小文件组合成较大的文件,以减少元数据开销和运行时文件打开成本。

参数名称RequiredType描述
table✔️string要更新的表的名称
strategystring策略的名称 - binpack 或 sort。默认为 binpack 策略
sort_orderstring对于 Zorder,请在 zorder() 中使用逗号分隔的列列表。 (Spark 3.2 及以上版本支持)示例:zorder(c1,c2,c3)。否则,以逗号分隔排序顺序,格式为 (ColumnName SortDirection NullOrder)。其中 SortDirection 可以是 ASC 或 DESC。 NullOrder 可以是 NULLS FIRST 或 NULLS LAST。默认为表的排序顺序
optionsmap<string, string>用于操作的选项
wherestring谓词作为用于过滤文件的字符串。请注意,所有可能包含与过滤器匹配的数据的文件都将被选择进行重写

常规选项

名称默认值描述
max-concurrent-file-group-rewrites5同时重写的最大文件组数
partial-progress.enabledfalse启用在整个重写完成之前提交文件组
partial-progress.max-commits10如果启用部分进度,则允许此重写产生的最大提交量
use-starting-sequence-numbertrue使用压缩开始时快照的序列号,而不是新生成的快照的序列号
rewrite-job-ordernone根据该值强制重写作业顺序。如果 rewrite-job-order=bytes-asc,则先重写最小的作业组。如果 rewrite-job-order=bytes-desc,则先重写最大的作业组。如果 rewrite-job-order=files-asc,则首先重写文件最少的作业组。如果 rewrite-job-order=files-desc,则首先重写文件最多的作业组。如果 rewrite-job-order=none,则按照它们的顺序重写作业组已计划(无特定顺序)。
target-file-size-bytes536870912(512 MB,表属性中 write.target-file-size-bytes 的默认值)目标输出文件大小
min-file-size-bytes目标文件大小的 75%无论任何其他标准如何,低于此阈值的文件都将被考虑重写
max-file-size-bytes目标文件大小的 180%无论任何其他条件如何,大小高于此阈值的文件都将被考虑重写
min-input-files5无论其他条件如何,超过此文件数量的任何文件组都将被重写
rewrite-allfalse强制重写所有提供的文件,覆盖其他选项
max-file-group-size-bytes107374182400 (100GB)单个文件组中应重写的最大数据量。整个重写操作根据分区被分解为多个部分,并根据文件组的大小在分区内分解。这有助于分解非常大的分区的重写,否则由于集群的资源限制,这些分区可能无法重写。
delete-file-threshold2147483647需要与数据文件关联才能考虑重写的最小删除次数

排序策略选项

名称默认值描述
compression-factor1.0Shuffle 分区的数量以及 Spark 排序创建的输出文件的数量取决于此文件重写器中使用的输入数据文件的大小。由于压缩,磁盘文件大小可能无法准确表示输出中文件的大小。此参数允许用户调整用于估计实际输出数据大小的文件大小。大于 1.0 的系数将生成比我们根据磁盘文件大小预期的更多的文件。小于 1.0 的值将创建比我们基于磁盘大小预期的文件少的文件。
shuffle-partitions-per-file1用于每个输出文件的随机分区数。 Iceberg 将使用自定义合并操作将这些已排序的分区重新拼接成单个已排序的文件。

使用 zorder sort_order 的排序策略选项

名称默认值描述
var-length-contribution8从可变长度类型的输入列考虑的字节数(字符串、二进制)
max-output-size2147483647ZOrder 算法中交织的字节数

Output

名称默认值描述
rewritten_data_files_countint通过该命令重写的数据数
added_data_files_countint此命令写入的新数据文件的数量
rewritten_bytes_countlong该命令写入的字节数
failed_data_files_countintpartial-progress.enabled为true时重写失败的数据文件数量

例子

使用默认的bin-packing重写算法重写表db.sample中的数据文件,合并小文件,同时根据表的默认写入大小拆分大文件。

CALL catalog_name.system.rewrite_data_files('db.sample')

通过使用与 bin-pack 相同的默认值对 id 和 name 上的所有数据进行排序来重写表 db.sample 中的数据文件,以确定要重写的文件。

CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'id DESC NULLS LAST,name ASC NULLS FIRST')

通过 zOrdering 在 c1 和 c2 列上重写表 db.sample 中的数据文件。使用与 bin-pack 相同的默认值来确定要重写哪些文件。

CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'zorder(c1,c2)')

在任何需要重写的文件超过 2 个或更多的分区中,使用 bin-pack 策略重写表 db.sample 中的数据文件。

CALL catalog_name.system.rewrite_data_files(table => 'db.sample', options => map('min-input-files','2'))

重写表db.sample中的数据文件,并选择可能包含与过滤器(id = 3且name =“foo”)匹配的数据的文件进行重写。

CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"')

重写清单

重写表的清单以优化扫描计划。

清单中的数据文件按分区规范中的字段排序。此过程使用 Spark 作业并行运行。

参数说明RequiredType说明
table✔️string要更新的表的名称
use_cachingboolean运行期间使用Spark缓存(默认为true)

输出

参数说明Type说明
rewritten_manifests_countint由该命令重写的清单数量
added_mainfests_countint此命令写入的新清单文件的数量

重写表 db.sample 中的清单,并将清单文件与表分区对齐。

CALL catalog_name.system.rewrite_manifests('db.sample')

重写表 db.sample 中的清单并禁用 Spark 缓存。这样做可以避免执行器上的内存问题。

CALL catalog_name.system.rewrite_manifests('db.sample', false)

重写位置删除文件

Iceberg可以重写位置删除文件,这有两个目的:

  • 小压缩:将小位置的删除文件压缩为较大的文件。这减少了清单文件中存储的元数据的大小以及打开小删除文件的开销。
  • 删除悬空删除:过滤掉引用不再有效的数据文件的位置删除记录。在 rewrite_data_files 之后,指向重写数据文件的位置删除记录并不总是被标记为删除,并且可以通过表的实时快照元数据保持跟踪。这称为“悬空删除”问题。
参数说明RequiredType说明
table✔️string要更新的表的名称
optionsmap<string, string>程序使用的选项

重写期间,悬空删除始终会被过滤掉。

名称默认值描述
max-concurrent-file-group-rewrites5同时重写的最大文件组数
partial-progress.enabledfalse启用在整个重写完成之前提交文件组
partial-progress.max-commits10如果启用部分进度,则允许此重写产生的最大提交量
rewrite-job-ordernone根据该值强制重写作业顺序。如果 rewrite-job-order=bytes-asc,则先重写最小的作业组。如果 rewrite-job-order=bytes-desc,则先重写最大的作业组。如果 rewrite-job-order=files-asc,则首先重写文件最少的作业组。如果 rewrite-job-order=files-desc,则首先重写文件最多的作业组。如果 rewrite-job-order=none,则按照它们的顺序重写作业组已计划(无特定顺序)。
target-file-size-bytes67108864(64MB,表属性中 write.delete.target-file-size-bytes 的默认值)目标输出文件大小
min-file-size-bytes目标文件大小的 75%无论任何其他标准如何,低于此阈值的文件都将被考虑重写
max-file-size-bytes目标文件大小的 180%无论任何其他条件如何,大小高于此阈值的文件都将被考虑重写
min-input-files5无论其他条件如何,超过此文件数量的任何文件组都将被重写
rewrite-allfalse强制重写所有提供的文件,覆盖其他选项
max-file-group-size-bytes107374182400 (100GB)单个文件组中应重写的最大数据量。整个重写操作根据分区被分解为多个部分,并根据文件组的大小在分区内分解。这有助于分解非常大的分区的重写,否则由于集群的资源限制,这些分区可能无法重写。

Output

输出名称Type描述
rewritten_delete_files_countint通过此命令删除的删除文件数
added_delete_files_countint通过此命令添加的删除文件数
rewritten_bytes_countlong通过此命令删除的删除文件的字节数
added_bytes_countlong通过此命令添加的所有新删除文件的字节数

重写位置删除表db.sample中的文件。这会选择符合默认重写标准的位置删除文件,并写入目标大小 target-file-size-bytes 的新文件。悬空删除将从重写的删除文件中删除。

CALL catalog_name.system.rewrite_position_delete_files('db.sample')

重写表 db.sample 中的所有位置删除文件,写入新文件 target-file-size-bytes。悬空删除将从重写的删除文件中删除。

CALL catalog_name.system.rewrite_position_delete_files(table => 'db.sample', options => map('rewrite-all', 'true'))

重写位置删除表db.sample中的文件。这会选择分区中的位置删除文件,其中需要根据大小标准重写 2 个或更多位置删除文件。悬空删除将从重写的删除文件中删除。

CALL catalog_name.system.rewrite_position_delete_files(table => 'db.sample', options => map('min-input-files','2'))

migrate

将表替换为加载了源数据文件的 Iceberg 表。

表架构、分区、属性和位置将从源表复制。

如果任何表分区使用不支持的格式,迁移将会失败。支持的格式有 Avro、Parquet 和 ORC。现有数据文件被添加到 Iceberg 表的元数据中,并且可以使用从原始表架构创建的名称到 ID 映射来读取。

要在测试时保持原始表完整,请使用快照创建共享源数据文件和架构的新临时表。

默认情况下,原始表保留为名称 table_BACKUP_。

参数说明RequiredType说明
table✔️string要迁移的表的名称
propertiesmap<string, string>新 Iceberg 表的属性
drop_backupboolean当 true 时,原始表将不会保留作为备份(默认为 false)
backup_table_namestring将保留作为备份的表的名称(默认为 table_BACKUP_)

输出:

输出名称Type描述
migrated_files_countlong附加到 Iceberg 表的文件数

例子
将 Spark 默认目录中的表 db.sample 迁移到 Iceberg 表,并添加属性“foo”设置为“bar”:

CALL catalog_name.system.migrate('spark_catalog.db.sample', map('foo', 'bar'))

将当前目录中的 db.sample 迁移到 Iceberg 表,而不添加任何其他属性:

CALL catalog_name.system.migrate('db.sample')

add_files

尝试直接将文件从 Hive 或基于文件的表添加到给定的 Iceberg 表中。与 migrate 或 snapshot 不同,add_files 可以从一个或多个特定分区导入文件,并且不会创建新的 Iceberg 表。此命令将为新文件创建元数据,但不会移动它们。此过程不会分析文件的架构来确定它们是否确实与 Iceberg 表的架构匹配。完成后,Iceberg 表将把这些文件视为 Iceberg 拥有的文件集的一部分。这意味着任何后续的 expire_snapshot 调用都将能够物理删除添加的文件。如果可以进行迁移或快照,则不应使用此方法。

请记住,add_files 过程将从仅添加一次的每个文件中获取 Parquet 元数据。如果您使用分层存储(例如 Amazon S3 智能分层存储类),将从存档中检索底层文件,并将在设定的时间段内保留在较高层上。

register_table
为已存在但没有相应目录标识符的metadata.json 文件创建目录条目。

将新表注册为 db.tbl 到spark_catalog,指向metadata.json文件路径/to/metadata/file.json。

CALL spark_catalog.system.register_table(
  table => 'db.tbl',
  metadata_file => 'path/to/metadata/file.json'
)

元数据信息

ancestors_of

报告指定快照的父级实时快照ID

获取当前快照的所有快照祖先(默认)

CALL spark_catalog.system.ancestors_of('db.tbl')

获取特定快照的所有快照祖先

CALL spark_catalog.system.ancestors_of('db.tbl', 1)
CALL spark_catalog.system.ancestors_of(snapshot_id => 1, table => 'db.tbl')

Change Data Capture

创建变更日志视图

创建一个包含给定表中的更改的视图。

参数名称RequiredType说明
table✔️string变更日志的源表的名称
changelog_viewstring要创建的视图的名称
optionsmap<string, string>要使用的 Spark 读取选项图
net_changesboolean是否输出净变化(更多信息见下文)。默认为 false。
compute_updatesboolean是否计算更新前/更新后图像(有关详细信息,请参阅下文)。默认为 false。
identifier_columnsarray用于计算更新的标识符列的列表。如果参数compute_updates设置为true并且未提供identifier_columns,则将使用表的当前标识符字段。
remove_carryoversboolean是否删除结转行(有关详细信息,请参阅下文)。默认为 true。自 1.4.0 起已弃用,将在 1.5.0 中删除;请查询 SparkChangelogTable 以查看结转行

以下是常用的 Spark 读取选项列表:

  • start-snapshot-id:独占的启动快照ID。如果未提供,它将从表的第一个快照中读取。
  • end-snapshot-id:包含的结束快照id,默认为表的当前快照。
  • start-timestamp:唯一的开始时间戳。如果未提供,它将从表的第一个快照中读取。
  • end-timestamp:包含的结束时间戳,默认为表的当前快照。

输出:

输出名称Type描述
changelog_viewstring创建的变更日志视图的名称

根据快照 1(不包括)和快照 2(包括)之间发生的更改创建变更日志视图 tbl_changes。

CALL spark_catalog.system.create_changelog_view(
  table => 'db.tbl',
  options => map('start-snapshot-id','1','end-snapshot-id', '2')
)

根据时间戳 1678335750489(不包括)和 1678992105265(包括)之间发生的更改创建变更日志视图 my_changelog_view。

CALL spark_catalog.system.create_changelog_view(
  table => 'db.tbl',
  options => map('start-timestamp','1678335750489','end-timestamp', '1678992105265'),
  changelog_view => 'my_changelog_view'
)

创建一个更改日志视图,根据标识符列 id 和 name 计算更新。

CALL spark_catalog.system.create_changelog_view(
  table => 'db.tbl',
  options => map('start-snapshot-id','1','end-snapshot-id', '2'),
  identifier_columns => array('id', 'name')
)

创建变更日志视图后,您可以查询该视图以查看快照之间发生的更改。

SELECT * FROM tbl_changes
SELECT * FROM tbl_changes where _change_type = 'INSERT' AND id = 3 ORDER BY _change_ordinal

请注意,更改日志视图包括更改数据捕获 (CDC) 元数据列,这些列提供有关正在跟踪的更改的附加信息。这些列是:

  • _change_type:更改的类型。它具有以下值之一:INSERT、DELETE、UPDATE_BEFORE 或 UPDATE_AFTER。
  • _change_ordinal:更改的顺序
  • _commit_snapshot_id:发生更改的快照 ID

这是相应结果的示例。显示第一个快照插入了2条记录,第二个快照删除了1条记录。
在这里插入图片描述
创建计算净更改的变更日志视图。它删除中间更改并仅输出净更改。

CALL spark_catalog.system.create_changelog_view(
  table => 'db.tbl',
  options => map('end-snapshot-id', '87647489814522183702'),
  net_changes => true
)

对于净更改,上述更改日志视图仅包含以下行,因为 Alice 被插入到第一个快照中并在第二个快照中被删除。
在这里插入图片描述
Carry-over Rows

该过程默认删除结转行。结转行是使用写时复制时行级操作(MERGE、UPDATE 和 DELETE)的结果。例如,给定一个包含 row1 (id=1, name=‘Alice’) 和 row2 (id=2, name=‘Bob’) 的文件。对 row2 进行写时复制删除需要擦除该文件并将 row1 保留在新文件中。更改日志表将其报告为以下两行,尽管这不是对该表的实际更改。

在这里插入图片描述
要查看结转行,请按如下方式查询 SparkChangelogTable:

SELECT * FROM spark_catalog.db.tbl.changes

更新前/后图像

该过程计算更新前/更新后图像(如果已配置)。更新前/更新后图像是从一对删除行和插入行转换而来的。标识符列用于确定插入和删除记录是否引用同一行。如果两条记录共享相同的标识列值,则它们被视为同一行的之前和之后状态。您可以在表模式中设置标识符字段,也可以将它们作为过程参数输入。

以下示例显示了使用标识符列 (id) 进行更新前/更新后图像计算,其中具有相同 id 的行删除和插入被视为单个更新操作。具体来说,假设我们有以下两行:

在这里插入图片描述
在这种情况下,该过程将更新之前的行标记为 UPDATE_BEFORE 图像,将更新之后的行标记为 UPDATE_AFTER 图像,从而产生以下更新前/更新后图像:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/364967.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MySQL原理(一)架构组成之逻辑模块(2)缓存机制

前面提到了mysql的逻辑模块中包含Query Cache 。 一、查询缓存 1、作用 MySQL查询缓存即缓存查询数据的SQL文本及查询结果,用Key-Value的形式保存在服务器内存中。当查询命中缓存,MySQL会立刻返回结果,跳过了解析,优化和执行阶段。 2、查询缓存的命中条件 &#xff08;1&a…

linux查看mysql状态重启

1.linux怎么看mysql数据库是不是宕机了&#xff1f; MySQL/MariaDB数据库的状态&#xff1a;使用systemctl status mysql或者service mysqld status命令。如果显示"active (running)"表示MySQL正常运行&#xff1b;如果显示"inactive (dead)"则表示MySQL已…

2024年电子数据取证“獬豸杯”比赛解析WP

2024年电子数据取证“獬豸杯”比赛解析WP 项目介绍&#xff1a;参赛对象:任务目标&#xff1a;第一部分&#xff1a;手机取证第二部分&#xff1a;计算机取证第二部分&#xff1a;APK分析 项目介绍&#xff1a; 简介&#xff1a; 竞赛为个人赛&#xff0c;工具自备&#xff0c…

Mysql篇----第一篇

系列文章目录 文章目录 系列文章目录前言一、数据库存储引擎二、InnoDB(B+树)三、TokuDB( Fractal Tree-节点带数据)前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码…

云上自动部署丨使用 Terraform 在 AWS 上搭建 DolphinDB

HashiCorp Terraform 是一款基础架构即代码工具&#xff0c;旨在实现 "Write, Plan, and Create Infrastructure as Code"。它通过配置文件来描述云资源的拓扑结构&#xff0c;包括虚拟机、存储账户和网络接口。Terraform 几乎支持市面上所有的云服务&#xff0c;能够…

Sharding-JDBC之ComplexKeysShardingAlgorithm(复合分片算法)

目录 一、简介二、maven依赖三、数据库3.1、创建数据库3.2、创建表 四、配置&#xff08;二选一&#xff09;4.1、properties配置4.2、yml配置 五、复合分片算法六、实现6.1、实体层6.2、持久层6.3、服务层6.4、测试类6.4.2、根据时间范围查询订单 一、简介 实际工作中&#xf…

移动应用开发的方式

移动应用开发的方式(三种) Native App&#xff1a; 本地应用程序&#xff08;原生App&#xff09; Web App&#xff1a;网页应用程序&#xff08;移动web&#xff09; Hybrid App&#xff1a;混合应用程序&#xff08;混合App&#xff09; hybrid应用场景 1、微信公众号&…

使用Pycharm在本地调用chatgpt的接口

目录 1.安装环境 2.建立多轮对话的完整代码&#xff08;根据自己使用的不同代理需要修改端口&#xff08;port&#xff09;&#xff09; 3.修改代码在自己的Pycharm上访问chagpt的api并实现多轮对话&#xff0c;如果不修改是无法成功运行的。需要确定秘钥和端口以保证正常访…

npm 包管理工具

简介 Node官网&#xff1a;https://nodejs.org/ Npm官网&#xff1a;https://www.npmjs.com/ 在现代的软件开发世界中&#xff0c;包管理工具起着至关重要的作用。它们为开发者社区提供了一种高效共享和使用代码的方式。在 JavaScript 的范畴内&#xff0c;npm&#xff08;Node…

展台设计搭建中6个关键元素

一、哪种风格的会展展台设计更显示设计感 从已有的展台布置局面可以看出&#xff0c;不同展台设计有着不同的标准与选择原则&#xff0c;现有的一系列展台设计标识会随着现代化会展的提升重新进入更新诉求阶段。 二、展台设计一般会有那种可以选择的类别 从出现在展台设计优化阶…

【MySQL】MySQL库

使用C/C语言链接MySQL 一、mysql connect二、mysql 接口介绍1. 初始化 mysql_init()2. 链接数据库 mysql_real_connect()3. 执行 mysql 命令 mysql_query()4. 获取执行结果 mysql_store_result()5. 释放空间5. 关闭 mysql 链接 mysql_close() 一、mysql connect 要使用C语言连…

字节大佬含泪吐血总结系列之 应用层常见协议

字节大佬含泪吐血总结系列之 应用层常见协议 原文地址&#xff1a;https://github.com/Snailclimb/JavaGuide 文章目录 字节大佬含泪吐血总结系列之 应用层常见协议HTTP:超文本传输协议Websocket&#xff1a;全双工通信协议SMTP:简单邮件传输(发送)协议POP3/IMAP:邮件接收的协…

Jmeter高级使用

文章目录 JMeter之计数器JMeter之集合点JMeter之断言JMeter之动态关联后置处理器&#xff1a;正则表达式提取器 JMeter之分布式测试JMeter之组件执行顺序元件的作用域元件的执行顺序配置元件Http Cookie管理器 多协议接口的性能测试Debug采样器Http请求中文乱码的解决Post参数设…

cip、ethernet/ip开源协议栈:开发源代码

EtherNet/IP是一个工业以太网协议&#xff0c;它结合标准协议TCP和UDP&#xff0c;在以太网上基础上的通用工业协议&#xff08;CIP&#xff09;。 该协议由ODVA维护。ODVA还管理其他CIP实现&#xff0c;如DeviceNet。 协议栈和源代码下载 www.jngbus.com 在开发Ethernet/Ip…

如何在CentOS安装DataEase数据分析服务并实现远程访问管理界面

如何在CentOS安装DataEase数据分析服务并实现远程访问管理界面 前言1. 安装DataEase2. 本地访问测试3. 安装 cpolar内网穿透软件4. 配置DataEase公网访问地址5. 公网远程访问Data Ease6. 固定Data Ease公网地址 &#x1f308;你好呀&#xff01;我是 是Yu欸 &#x1f30c; 202…

QSqlRelationalTableModel 关系表格模型

一、 1.1 QSqlRelationalTableModel继承自QSqlTableModel&#xff0c;并且对其进行了扩展&#xff0c;提供了对外键的支持。一个外键就是一个表中的一个字段 和 其他表中的主键字段之间的一对一的映射。例如&#xff0c;“studInfo”表中的departID字段对应的是“departments…

【SpringCloud】使用OpenFeign进行微服务化改造

目录 一、需求与背景二、OpenFeign 远程调用技术原理三、项目代码演示3.1 引入依赖3.2 实现OpenFeign注解修饰接口3.3 指定 OpenFeign 远程调用接口的扫描路径 四、OpenFeign 在日志中打印Request和Response五、OpenFeign 客户端超时配置六、使用 OpenFeign 实现服务降级6.1 实…

【python】OpenCV—Tracking(10.1)

学习来自《Learning OpenCV 3 Computer Vision with Python》Second Edition by Joe Minichino and Joseph Howse 文章目录 检测移动的目标涉及到的 opencv 库cv2.GaussianBlurcv2.absdiffcv2.thresholdcv2.dilatecv2.getStructuringElementcv2.findContourscv2.contourAreacv2…

STM32 IIC电量计LTC2944

1 描述 LTC2944 可在便携式产品应用中测量电池充电状态、电池电压、电池电流及其自身温度。宽输入电压范围允许使用高达 60V 的多节电池。精密库仑反向积分电流通过电池正极端子与负载或充电器之间的检测电阻器。 电压、电流和温度由内部 14 位无延迟 ΔΣ™ ADC 测量。测量结…

09-信息收集-APP及其他资产等

信息收集-APP及其他资产等 信息收集-APP及其他资产等一、APP提取季抓包及后续配合1、某APK一键提取反编译2、利用bp抓取更多URL 二、某IP无web框架下的第三方测试1、各种端口一顿乱扫 —— 思路2、各种接口一顿乱扫 —— 思路3、接口部分一顿测试 —— 思路 三、**案例演示**1、…