Spark集成Iceberg采集输出
文章目录
- Spark集成Iceberg采集输出
- Iceberg提供了两类指标和提供了两类指标输出器
- ScanReport
- CommitReport
- LoggingMetricsReporter
- RESTMetricsReporter
- 验证示例
- 相关环境配置
- 结果说明
Iceberg提供了两类指标和提供了两类指标输出器
ScanReport
包含在对给定表进行扫描规划期间收集到的指标。除了涉及表的一些一般信息(如快照 id 或表名)外,它还包括以下指标:
- 扫描规划总持续时间
- 结果中包含的数据,删除文件数量
- 扫描/跳过的数据,删除清单数量
- 扫描/跳过的数据,删除文件数
- 扫描的相等,位置删除文件数
CommitReport
载有在提交对表的更改(又称生成快照)后收集的指标。除了涉及表的一些一般信息(如快照 id 或表名)外,它还包括以下指标:
- 总持续时间
- 提交成功所需的尝试次数
- 添加/删除的数据,删除文件数
- 添加/删除的相等,位置删除文件数
- 添加/删除的相等,位置删除文件数
LoggingMetricsReporter
日志指标输出器,输出在日志文件中。
RESTMetricsReporter
Rest指标输出器,发送至Rest服务中
只能在使用restcatalog
时,才能使用该指标输出器。
验证示例
相关环境配置
iceberg-demo相关配置
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.donny.demo</groupId>
<artifactId>iceberg-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>iceberg-demo</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>3.4.2</spark.version>
<iceberg.version>1.6.1</iceberg.version>
<parquet.version>1.13.1</parquet.version>
<avro.version>1.11.3</avro.version>
<parquet.hadoop.bundle.version>1.8.1</parquet.hadoop.bundle.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-3.4_2.12</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-extensions-3.4_2.12</artifactId>
<version>${iceberg.version}</version>
<exclusions>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>antlr4</artifactId>
</exclusion>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
<version>${parquet.hadoop.bundle.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
重写日志输出配置文件log4j2.properties
,将指标日志输出至指标日志文件。spark的默认日志配置文件来自spark-core包,org.apache.spark.log4j2-defaults.properties
。
# Set everything to be logged to the console
rootLogger.level = info
rootLogger.appenderRef.stdout.ref = console
logger.icebergMetric.appenderRef.file.ref = RollingFile
logger.icebergMetric.appenderRef.stdout.ref = console
appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} %p %c{1}: %m%n%ex
appender.CUSTOM.type = RollingFile
appender.CUSTOM.name = RollingFile
appender.CUSTOM.fileName = logs/iceberg_metrics.log
appender.CUSTOM.filePattern = logs/iceberg_metrics.%d{yyyy-MM-dd}-%i.log.gz
appender.CUSTOM.layout.type = PatternLayout
appender.CUSTOM.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %c{1}:%L - %m%n
appender.CUSTOM.strategy.type = DefaultRolloverStrategy
appender.CUSTOM.strategy.delete.type = Delete
appender.CUSTOM.strategy.delete.basePath = logs
appender.CUSTOM.strategy.delete.0.type = IfFileName
appender.CUSTOM.strategy.delete.0.regex = iceberg_metrics.*.log.gz
appender.CUSTOM.strategy.delete.1.type = IfLastModified
appender.CUSTOM.strategy.delete.1.age = P15D
appender.CUSTOM.policy.type = TimeBasedTriggeringPolicy
# Settings to quiet third party logs that are too verbose
logger.jetty.name = org.sparkproject.jetty
logger.jetty.level = warn
logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle
logger.jetty2.level = error
logger.repl1.name = org.apache.spark.repl.SparkIMain$exprTyper
logger.repl1.level = info
logger.repl2.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter
logger.repl2.level = info
# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
logger.repl.name = org.apache.spark.repl.Main
logger.repl.level = warn
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs
# in SparkSQL with Hive support
logger.metastore.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler
logger.metastore.level = fatal
logger.hive_functionregistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry
logger.hive_functionregistry.level = error
# Parquet related logging
logger.parquet.name = org.apache.parquet.CorruptStatistics
logger.parquet.level = error
logger.parquet2.name = parquet.CorruptStatistics
logger.parquet2.level = error
# Custom logger for your application
logger.icebergMetric.name = org.apache.iceberg.metrics.LoggingMetricsReporter
logger.icebergMetric.level = Info
logger.icebergMetric.additivity = false
Java主类,主要为表配置指标输出类,才能进行指标输出。
package com.donny.demo;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.metrics.LoggingMetricsReporter;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.io.IOException;
/**
* @author 1792998761@qq.com
* @version 1.0
*/
public class IcebergSparkDemo {
public static void main(String[] args) throws AnalysisException, IOException, InterruptedException {
SparkSession spark = SparkSession
.builder()
.master("local")
.appName("Iceberg spark example")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hadoop") //指定catalog 类型
.config("spark.sql.catalog.local.warehouse", "iceberg_warehouse")
.getOrCreate();
spark.sql("CREATE TABLE local.iceberg_db.table2( id bigint, data string, ts timestamp) USING iceberg PARTITIONED BY (day(ts))");
spark.sql("INSERT INTO local.iceberg_db.table2 VALUES (1, 'a', cast(1727601585 as timestamp)),(2, 'b', cast(1724923185 as timestamp)),(3, 'c', cast(1724919585 as timestamp))");
spark.sql("INSERT INTO local.iceberg_db.table2 VALUES (4, 'd', cast(1727605185 as timestamp)),(5, 'e', cast(1725963585 as timestamp)),(6, 'f', cast(1726827585 as timestamp))");
spark.sql("DELETE FROM local.iceberg_db.table2 where id in (2)");
org.apache.iceberg.Table table = Spark3Util.loadIcebergTable(spark, "local.iceberg_db.table2");
spark.sql("INSERT INTO local.iceberg_db.table2 VALUES (4, 'd', cast(1724750385 as timestamp)),(5, 'e', cast(1724663985 as timestamp)),(6, 'f', cast(1727342385 as timestamp))");
spark.sql("INSERT INTO local.iceberg_db.table2 VALUES (7, 'h', cast(1727601585 as timestamp)),(8, 'i', cast(1724923185 as timestamp)),(9, 'j', cast(1724836785 as timestamp))");
spark.sql("INSERT INTO local.iceberg_db.table2 VALUES (10, 'k', cast(1727601585 as timestamp)),(11, 'l', cast(1724923185 as timestamp)),(12, 'm', cast(1724836785 as timestamp))");
// 配置表的指标输出器
table.updateProperties()
.set("metrics.reporters", LoggingMetricsReporter.class.getName())
.commit();
// 主动表扫描
TableScan tableScan =
table.newScan();
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
}
spark.sql("INSERT INTO local.iceberg_db.table2 VALUES (30, 't', cast(1727605185 as timestamp)),(31, 'y', cast(1725963585 as timestamp)),(32, 'i', cast(1726827585 as timestamp))");
Dataset<Row> result = spark.sql("SELECT * FROM local.iceberg_db.table2 where ts >= '2024-09-20'");
result.show();
spark.close();
}
}
结果说明
目前验证的时候只发现是需要主动调用scan,输出的指标(主动输出指标)
2024-10-07 09:38:11.903 INFO LoggingMetricsReporter:38 - Received metrics report: ScanReport{
tableName=local.iceberg_db.table2,
snapshotId=3288641599702333945,
filter=true,
schemaId=0,
projectedFieldIds=[1, 2, 3],
projectedFieldNames=[id, data, ts],
scanMetrics=ScanMetricsResult{
totalPlanningDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.294853952S, count=1},
resultDataFiles=CounterResult{unit=COUNT, value=0},
resultDeleteFiles=CounterResult{unit=COUNT, value=0},
totalDataManifests=CounterResult{unit=COUNT, value=6},
totalDeleteManifests=CounterResult{unit=COUNT, value=0},
scannedDataManifests=CounterResult{unit=COUNT, value=0},
skippedDataManifests=CounterResult{unit=COUNT, value=0},
totalFileSizeInBytes=CounterResult{unit=BYTES, value=0},
totalDeleteFileSizeInBytes=CounterResult{unit=BYTES, value=0},
skippedDataFiles=CounterResult{unit=COUNT, value=0},
skippedDeleteFiles=CounterResult{unit=COUNT, value=0},
scannedDeleteManifests=CounterResult{unit=COUNT, value=0},
skippedDeleteManifests=CounterResult{unit=COUNT, value=0},
indexedDeleteFiles=CounterResult{unit=COUNT, value=0},
equalityDeleteFiles=CounterResult{unit=COUNT, value=0},
positionalDeleteFiles=CounterResult{unit=COUNT, value=0}},
metadata={
engine-version=3.4.2,
iceberg-version=Apache Iceberg 1.6.1 (commit 8e9d59d299be42b0bca9461457cd1e95dbaad086),
app-id=local-1728265088818,
engine-name=spark}}
删除语句触发scan指标,(被动指标输出)
2024-10-07 11:15:54.708 INFO LoggingMetricsReporter:38 - Received metrics report: ScanReport{
tableName=local.iceberg_db.table2,
snapshotId=7181960343136679052,
filter=ref(name="id") == "(1-digit-int)",
schemaId=0,
projectedFieldIds=[1, 2, 3],
projectedFieldNames=[id, data, ts],
scanMetrics=ScanMetricsResult{
totalPlanningDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.098792497S, count=1},
resultDataFiles=CounterResult{unit=COUNT, value=1},
resultDeleteFiles=CounterResult{unit=COUNT, value=0},
totalDataManifests=CounterResult{unit=COUNT, value=2},
totalDeleteManifests=CounterResult{unit=COUNT, value=0},
scannedDataManifests=CounterResult{unit=COUNT, value=2},
skippedDataManifests=CounterResult{unit=COUNT, value=0},
totalFileSizeInBytes=CounterResult{unit=BYTES, value=898},
totalDeleteFileSizeInBytes=CounterResult{unit=BYTES, value=0},
skippedDataFiles=CounterResult{unit=COUNT, value=4},
skippedDeleteFiles=CounterResult{unit=COUNT, value=0},
scannedDeleteManifests=CounterResult{unit=COUNT, value=0},
skippedDeleteManifests=CounterResult{unit=COUNT, value=0},
indexedDeleteFiles=CounterResult{unit=COUNT, value=0},
equalityDeleteFiles=CounterResult{unit=COUNT, value=0},
positionalDeleteFiles=CounterResult{unit=COUNT, value=0}},
metadata={
engine-version=3.4.2,
iceberg-version=Apache Iceberg 1.6.1 (commit 8e9d59d299be42b0bca9461457cd1e95dbaad086),
app-id=local-1728270940331,
engine-name=spark}}
insert触发的commit指标,(被动指标输出)
2024-10-06 15:48:47 INFO LoggingMetricsReporter:38 - Received metrics report:
CommitReport{
tableName=local.iceberg_db.table2,
snapshotId=3288641599702333945,
sequenceNumber=6,
operation=append,
commitMetrics=CommitMetricsResult{
totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.430784537S, count=1},
attempts=CounterResult{unit=COUNT, value=1},
addedDataFiles=CounterResult{unit=COUNT, value=3},
removedDataFiles=null,
totalDataFiles=CounterResult{unit=COUNT, value=14},
addedDeleteFiles=null,
addedEqualityDeleteFiles=null,
addedPositionalDeleteFiles=null,
removedDeleteFiles=null,
removedEqualityDeleteFiles=null,
removedPositionalDeleteFiles=null,
totalDeleteFiles=CounterResult{unit=COUNT, value=0}, addedRecords=CounterResult{unit=COUNT, value=3},
removedRecords=null,
totalRecords=CounterResult{unit=COUNT, value=14},
addedFilesSizeInBytes=CounterResult{unit=BYTES, value=2646},
removedFilesSizeInBytes=null,
totalFilesSizeInBytes=CounterResult{unit=BYTES, value=12376},
addedPositionalDeletes=null,
removedPositionalDeletes=null,
totalPositionalDeletes=CounterResult{unit=COUNT, value=0},
addedEqualityDeletes=null,
removedEqualityDeletes=null,
totalEqualityDeletes=CounterResult{unit=COUNT, value=0}},
metadata={
engine-version=3.4.2,
app-id=local-1728200916879,
engine-name=spark,
iceberg-version=Apache Iceberg 1.6.1 (commit 8e9d59d299be42b0bca9461457cd1e95dbaad086)}}