5 Paimon数据湖之表数据查询详解

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

虽然前面我们已经讲过如何查询Paimon表中的数据了,但是有一些细节的东西还需要详细分析一下。

  • 首先是针对Paimon中系统表的查询,例如snapshots\schemas\options等等这些系统表。
    其实简单理解就是我们可以通过sql的形式查询系统表来查看实体表的快照、schema等信息,这些信息我们也可以直接到hdfs中查看,只是不太方便。

  • 在查询数据的时候,可以细分为批量读取和流式读取,因为Paimon可以同时支持批处理和流处理。

  • 在查询数据的时候,如果想要从之前的某一个时间点开始查询数据,也就说任务启动的时候想要查询一些历史数据,则需要用到时间旅行这个特性,可以在SQL查询语句中通过动态表选项指定scan.mode参数来控制具体查询哪些历史数据。

Scan Mode的值可以有多种,不同的值代表不同的含义,下面我们来具体分析一下:
在这里插入图片描述

注意:在分析的时候,我们需要针对批处理和流处理这两种情况分别进行分析。

  • (1)default:如果我们在执行查询的时候,没有指定scan.mode参数,则默认是default。但是此时需要注意,如果我们也没有同时指定其他参数,例如:timestamp-millis\snapshot-id等scan相关的参数,那么默认会执行latest-full策略。
    所以说,我们在执行查询的时候,如果没有指定任何scan相关的参数,那么默认执行的策略就是latest-full。

  • (2)latest-full:和full是一样的效果,不过full这个参数已经被标记为过期了。针对批处理,表示只读取最新快照中的所有数据,读取完成以后任务就执行结束了。针对流处理,表示第一次启动时读取最新快照中的所有数据,然后继续读取后续新增的变更数据,这个任务会一直运行。

  • (3)latest:针对批处理,他的执行效果和latest-full是一样的,只会读取最新快照中的所有数据。但是针对流处理就不一样了,此时表示只读取最新的变更数据,也就是说任务启动之后,只读取新增的数据,之前的历史快照中的数据不读取。类似于kafka中消费者里面的latest消费策略。

  • (4)from-snapshot:使用此策略的时候,需要同时指定snapshot-id参数。针对批处理,表示只读取指定id的快照中的所有数据。针对流处理,表示从指定id的快照开始读取变更数据(注意:此时不是读取这个快照中的所有数据,而是读取此快照中的变更数据,也可以理解为这个快照和上一个快照相比新增的数据),当然,后续新增的变更数据也是可以读取到的,因为这个是流处理,他会一直执行读取操作。

  • (5)from-snapshot-full:使用此策略的时候,也需要同时指定snapshot-id参数。针对批处理,他的执行效果和from-snapshot是一样的。针对流处理,表示第一次启动时读取指定id的快照中的所有数据,然后继续读取后续新增的变更数据,此时任务会一直执行。

  • (6)from-timestamp:使用此策略的时候,需要同时指定timestamp-millis参数。针对批处理,表示只读取指定时间戳的快照中的所有数据。针对流处理,表示从指定时间戳的快照开始读取变更数据,(注意,这里也是读取这个快照中的变更数据,不是所有数据。),然后读取后续新增的变更数据。

  • (7)incremental:表示是增量查询,这个主要是针对批处理的,通过这种策略可以读取开始和结束快照之间的增量变化。开始和结束快照可以通过快照id或者是时间戳进行指定。
    如果是使用快照id,则需要通过incremental-between参数指定。
    如果是使用时间戳,则需要通过incremental-between-timestamp参数指定。

  • (8)compacted-full:想要使用这个参数有一个前提,Paimon表需要开启完全压缩(full compaction)。此时针对批处理,表示只读取最新完全压缩(full compaction)的快照中的所有数据。针对流处理,表示第一次启动时读取最新完全压缩(full compaction)的快照中的所有数据,然后继续读取后续新增的变更数据。

针对这里面的latest、latest-full、compacted-full这几种策略放在一起可能容易混淆,下面我们来通过一个图重新梳理一下:
在这里插入图片描述

首先看中间这条线,表示是数据的时间轴,左边是历史数据,右边是最新产生的数据。

中间这条线上面是批处理,下面是流处理。

我们首先来看批处理:
如果我们指定了scan.modelatest-full或者是latest,则会读取最新的快照中的所有数据,也就是Last Snapshot中的数据。
如果我们指定了scan.modecompacted-full,则会读取最新的完全压缩(full compaction)的快照中的数据,也就是Last Compact Snapshot中的数据。

接下来看一下流处理:
如果我们指定了scan.modelatest-full,则会在任务第一次启动时读取最新快照中的所有数据,然后继续读取后续新增的变更数据。也就是第一次启动时先读取Last Snapshot中的所有数据,接着读取后续新产生的数据。
如果我们指定了scan.modelatest,则此时只读取最新的变更数据,不读取LastSnapshot快照中的数据。
如果我们指定了scan.modecompacted-full,则第一次启动时会读取最新完全压缩(full compaction)的快照中的所有数据,也就是Last Compact Snapshot中的数据,接着读取后续新产生的数据。

这就是这些策略在批处理和流处理中的执行流程。

(1)查询系统表

下面我们来通过具体的案例来演示一下前面提到的查询数据相关的用法。

首先创建一个向Paimon表中模拟写入数据的类,便于一会测试使用
创建package:tech.xuwei.paimon.query

创建object:FlinkSQLWriteToPaimon

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 通过FlinkSQL 向Paimon中模拟写入数据
 * Created by xuwei
 */
object FlinkSQLWriteToPaimon {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //创建Paimon类型的表
    tEnv.executeSql(
      """
        |CREATE TABLE IF NOT EXISTS `query_table`(
        |    name STRING,
        |    age INT,
        |    PRIMARY KEY (name) NOT ENFORCED
        |)
        |""".stripMargin)

    //写入数据
    tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('jack',18)")
    tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('tom',19)")
    tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('mick',20)")

  }

}

在idea中运行这个代码。

接下来创建一个类来查询一下Paimon中的系统表。

创建object:FlinkPaimonSystemTable

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 查询Paimon中的系统表
 * Created by xuwei
 */
object FlinkPaimonSystemTable {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")


    //snapshot信息表,对应的其实就是hdfs中表的snapshot目录下的snapshot-*文件信息
    println("====================snapshot信息表===========================")
    tEnv.executeSql("SELECT * FROM query_table$snapshots").print()

    //schema信息表,对应的其实就是hdfs中表的schema目录下的schema-*文件信息
    println("====================schema信息表===========================")
    tEnv.executeSql("SELECT * FROM query_table$schemas").print()

    //manifest信息表,对应的其实就是hdfs中表的manifest目录下的manifest-*文件信息
    println("====================manifest信息表===========================")
    tEnv.executeSql("SELECT * FROM query_table$manifests").print()

    //file信息表,对应的其实就是hdfs中表的bucket-*目录下的data-*文件信息
    println("====================file信息表===========================")
    tEnv.executeSql("SELECT * FROM query_table$files").print()

    //option信息表,对应的就是建表语句中with里面指定的参数信息,在表的schema-*文件中也能看到option信息
    println("====================option信息表===========================")
    tEnv.executeSql("SELECT * FROM query_table$options").print()

    //consumer信息表,在查询数据的sql语句中指定了consumer-id之后才能看到
    println("====================consumer信息表===========================")
    tEnv.executeSql("SELECT * FROM query_table$consumers").print()


    //audit log信息表,相当于是表的审核日志,可以看到表中每条数据的rowkind,也就是+I\-U\+U\-D
    println("====================audit log信息表===========================")
    tEnv.executeSql("SELECT * FROM query_table$audit_log").print()


  }
}

运行代码。
注意:在本地执行flink sql中的print,会看到下面错误:

java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.
	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
	at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1026)
	at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:899)
	at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:823)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:210)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
	at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:219)
	at org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120)
	at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)
	at tech.xuwei.paimon.query.FlinkPaimonSystemTable$.main(FlinkPaimonSystemTable.scala:35)
	at tech.xuwei.paimon.query.FlinkPaimonSystemTable.main(FlinkPaimonSystemTable.scala)

这个异常不影响程序执行,实际工作中我们不会写这种代码,一般都是在sql中写insert into select语句了,在这主要是为了方便测试,忽略这个异常即可。

如果感觉看起来比较乱,可以修改一下log4j.properties日志中的告警级别,改为error级别即可。

log4j.rootLogger=error,stdout

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

重新运行代码,可以看到如下结果:

====================snapshot信息表===========================
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| op |          snapshot_id |            schema_id |                    commit_user |    commit_identifier |                    commit_kind |             commit_time |   total_record_count |   delta_record_count | changelog_record_count |            watermark |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| +I |                    1 |                    0 | 8f74d97b-bf6b-4ac7-bb47-3bb... |  9223372036854775807 |                         APPEND | 2023-07-28 17:35:22.859 |                    1 |                    1 |                      0 | -9223372036854775808 |
| +I |                    2 |                    0 | 49412497-1749-4566-8bf8-1c5... |  9223372036854775807 |                         APPEND | 2023-07-28 17:35:24.802 |                    2 |                    1 |                      0 | -9223372036854775808 |
| +I |                    3 |                    0 | e55e756d-e528-4b7c-97f0-a01... |  9223372036854775807 |                         APPEND | 2023-07-28 17:35:26.409 |                    3 |                    1 |                      0 | -9223372036854775808 |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
3 rows in set
====================schema信息表===========================
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |            schema_id |                         fields |                 partition_keys |                   primary_keys |                        options |                        comment |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                    0 | [{"id":0,"name":"name","typ... |                             [] |                       ["name"] |                             {} |                                |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
1 row in set
====================manifest信息表===========================
+----+--------------------------------+----------------------+----------------------+----------------------+----------------------+
| op |                      file_name |            file_size |      num_added_files |    num_deleted_files |            schema_id |
+----+--------------------------------+----------------------+----------------------+----------------------+----------------------+
| +I | manifest-800ac729-22d3-494b... |                 1665 |                    1 |                    0 |                    0 |
| +I | manifest-61d14e4e-d2a0-42ac... |                 1675 |                    1 |                    0 |                    0 |
| +I | manifest-fd8e45b0-d456-467a... |                 1673 |                    1 |                    0 |                    0 |
+----+--------------------------------+----------------------+----------------------+----------------------+----------------------+
3 rows in set
====================file信息表===========================
+----+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+-------------+----------------------+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
| op |                      partition |      bucket |                      file_path |                    file_format |            schema_id |       level |         record_count |   file_size_in_bytes |                        min_key |                        max_key |              null_value_counts |                min_value_stats |                max_value_stats |           creation_time |
+----+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+-------------+----------------------+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
| +I |                             [] |           0 | data-6b23bcaf-3dbe-46c0-a67... |                            orc |                    0 |           0 |                    1 |                  566 |                         [jack] |                         [jack] |                {age=0, name=0} |            {age=18, name=jack} |            {age=18, name=jack} | 2023-07-28 17:35:22.453 |
| +I |                             [] |           0 | data-ce40f0df-aa2a-4682-8b6... |                            orc |                    0 |           0 |                    1 |                  581 |                         [mick] |                         [mick] |                {age=0, name=0} |            {age=20, name=mick} |            {age=20, name=mick} | 2023-07-28 17:35:26.257 |
| +I |                             [] |           0 | data-ac9bd895-2b8e-4efe-969... |                            orc |                    0 |           0 |                    1 |                  572 |                          [tom] |                          [tom] |                {age=0, name=0} |             {age=19, name=tom} |             {age=19, name=tom} | 2023-07-28 17:35:24.603 |
+----+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+-------------+----------------------+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
3 rows in set
====================option信息表===========================
Empty set
====================tag信息表===========================
Empty set
====================consumer信息表===========================
Empty set
====================audit log信息表===========================
+----+--------------------------------+--------------------------------+-------------+
| op |                        rowkind |                           name |         age |
+----+--------------------------------+--------------------------------+-------------+
| +I |                             +I |                           jack |          18 |
| +I |                             +I |                           mick |          20 |
| +I |                             +I |                            tom |          19 |
+----+--------------------------------+--------------------------------+-------------+
3 rows in set
(2)批量读取

下面演示一下如何在批量读取中使用时间旅行功能。

创建object:tech.xuwei.paimon.query.FlinkPaimonBatchQuery

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 批量读取
 * Created by xuwei
 */
object FlinkPaimonBatchQuery {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //SET 'execution.runtime-mode' = 'batch';
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //批量查询数据
    tEnv.executeSql(
      """
        |SELECT * FROM query_table
        |-- /*+ OPTIONS('scan.mode'='latest-full') */ -- 默认策略,可以省略不写,只读取最新快照中的所有数据
        |-- /*+ OPTIONS('scan.mode'='latest') */ -- 在批处理模式下和latest-full的效果一致
        |-- /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '2') */ -- 只读取指定id的快照中的所有数据
        |-- /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '2') */ -- 在批处理模式下和from-snapshot的效果一致
        |-- /*+ OPTIONS('scan.mode'='from-timestamp','scan.timestamp-millis' = '1690536924802') */ -- 只读取指定时间戳的快照中的所有数据
        |-- /*+ OPTIONS('scan.mode'='incremental','incremental-between' = '1,3') */ -- 指定两个快照id,查询这两个快照之间的增量变化
        |-- /*+ OPTIONS('scan.mode'='incremental','incremental-between-timestamp' = '1690536922859,1690536926409') */ -- 指定两个时间戳,查询这两个快照之间的增量变化
        |""".stripMargin)
      .print()

  }
}

运行代码,查看每一种策略的数据结果。

注意:在演示compacted-full这种策略的时候需要给表开启full-compaction

所以重新创建一个新的表。

创建object:FlinkSQLWriteToPaimonForCompact

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 通过FlinkSQL 向Paimon中模拟写入数据
 * Created by xuwei
 */
object FlinkSQLWriteToPaimonForCompact {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //创建Paimon类型的表
    tEnv.executeSql(
      """
        |CREATE TABLE IF NOT EXISTS `query_table_compact`(
        |    name STRING,
        |    age INT,
        |    PRIMARY KEY (name) NOT ENFORCED
        |)WITH(
        |    'changelog-producer' = 'full-compaction',
        |    'full-compaction.delta-commits' = '1'
        |)
        |""".stripMargin)

    //写入数据
    tEnv.executeSql("INSERT INTO query_table_compact(name,age) VALUES('jack',18)")
    tEnv.executeSql("INSERT INTO query_table_compact(name,age) VALUES('tom',19)")
    tEnv.executeSql("INSERT INTO query_table_compact(name,age) VALUES('mick',20)")

  }

}

运行代码。

再创建一个新的读取数据的类:

创建object:FlinkPaimonBatchQueryForCompact

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 批量读取
 * Created by xuwei
 */
object FlinkPaimonBatchQueryForCompact {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //SET 'execution.runtime-mode' = 'batch';
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //批量查询数据
    tEnv.executeSql(
      """
        |SELECT * FROM query_table_compact
        |/*+ OPTIONS('scan.mode' = 'compacted-full') */ --表需要开启full-compaction,设置changelog-producer和full-compaction.delta-commits
        |""".stripMargin)
      .print()

  }
}

运行代码,可以看到如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          18 |
| +I |                           mick |          20 |
| +I |                            tom |          19 |
+----+--------------------------------+-------------+

由于目前每一次提交数据都会触发完全压缩,所以我们查询最新的完全压缩快照中的数据是可以获取到所有数据的。

此时可以通过系统表查看一下这个表的snapshot信息:
创建object:FlinkPaimonSystemTableForCompact

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 查询Paimon中的系统表
 * Created by xuwei
 */
object FlinkPaimonSystemTableForCompact {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")


    //snapshot信息表,对应的其实就是hdfs中表的snapshot目录下的snapshot-*文件信息
    println("====================snapshot信息表===========================")
    tEnv.executeSql("SELECT * FROM query_table_compact$snapshots").print()

  }
}

执行代码,可以看到如下结果

====================snapshot信息表===========================
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| op |          snapshot_id |            schema_id |                    commit_user |    commit_identifier |                    commit_kind |             commit_time |   total_record_count |   delta_record_count | changelog_record_count |            watermark |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| +I |                    1 |                    0 | 38d8f3a4-aeb3-4072-90cf-421... |  9223372036854775807 |                         APPEND | 2023-07-28 17:57:07.293 |                    1 |                    1 |                      0 | -9223372036854775808 |
| +I |                    2 |                    0 | 38d8f3a4-aeb3-4072-90cf-421... |  9223372036854775807 |                        COMPACT | 2023-07-28 17:57:08.211 |                    3 |                    2 |                      1 | -9223372036854775808 |
| +I |                    3 |                    0 | 84203720-0e42-40a6-8202-642... |  9223372036854775807 |                         APPEND | 2023-07-28 17:57:09.423 |                    4 |                    1 |                      0 | -9223372036854775808 |
| +I |                    4 |                    0 | 84203720-0e42-40a6-8202-642... |  9223372036854775807 |                        COMPACT | 2023-07-28 17:57:09.641 |                    8 |                    4 |                      1 | -9223372036854775808 |
| +I |                    5 |                    0 | 25d0f600-076a-407f-a07a-caf... |  9223372036854775807 |                         APPEND | 2023-07-28 17:57:11.500 |                    9 |                    1 |                      0 | -9223372036854775808 |
| +I |                    6 |                    0 | 25d0f600-076a-407f-a07a-caf... |  9223372036854775807 |                        COMPACT | 2023-07-28 17:57:12.130 |                   15 |                    6 |                      1 | -9223372036854775808 |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+

此时可以看到在commit_kind这一列中显示的有APPENDCOMPACT,表示这个快照是追加产生的还是完全压缩产生的。

由于我们配置的每一次提交数据都会触发完全压缩,所以对应的有3个完全压缩产生的快照。

为了便于验证,我们可以把最新的那个完全压缩的快照删除掉,再执行查询,看看结果是什么样的:

删除最新的完全压缩的快照:

[root@bigdata04 ~]# hdfs dfs -rm -r /paimon/default.db/query_table_compact/snapshot/snapshot-6

注意:这个删除操作建议大家在命令行执行,不要在web页面执行,在web页面删除可能会直接把这个表的目录删除掉!!!!!

然后再执行FlinkPaimonBatchQueryForCompact,结果如下:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          18 |
| +I |                            tom |          19 |
+----+--------------------------------+-------------+
2 rows in set

注意:此时最新的完全压缩的快照就是snapshot-4了,这个快照中只有2条数据。

这就是批量读取中时间旅行参数的使用。

(3)流式读取

下面演示一下如何在流式读取中使用时间旅行功能。

创建object:FlinkPaimonStreamingQuery

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 流式读取
 * Created by xuwei
 */
object FlinkPaimonStreamingQuery {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //SET 'execution.runtime-mode' = 'streaming';
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)//使用流处理模式
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //流式查询数据
    tEnv.executeSql(
      """
        |SELECT * FROM query_table
        |-- /*+ OPTIONS('scan.mode'='latest-full') */ -- 默认策略,可以省略不写,第一次启动时读取最新快照中的所有数据,然后继续读取后续新增的变更数据
        |-- /*+ OPTIONS('scan.mode'='latest') */ -- 只读取最新的变更数据
        |-- /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '2') */ -- 从指定id的快照开始读取变更数据(包含后续新增)
        |-- /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '2') */ -- 第一次启动时读取指定id的快照中的所有数据,然后继续读取后续新增的变更数据
        |-- /*+ OPTIONS('scan.mode'='from-timestamp','scan.timestamp-millis' = '1690536924802') */ -- 从指定时间戳的快照开始读取变更数据(包含后续新增)
        |""".stripMargin)
      .print()
  }
}
(4)Consumer ID

最后我们在流式读取这里扩展一个知识点:Consumer ID,这个功能是针对流式读取设计的。

相当于我们在kafka消费者中指定一个groupid,这样可以通过groupid维护消费数据的偏移量信息,便于任务停止以后重启的时候继续基于之前的进度进行查询。

在这里Consumer ID的主要作用是为了方便记录每次查询到的数据快照的位置,他会把下一个还未读取的快照id记录到hdfs文件中。
当之前的任务停止以后,新启动的任务可以基于之前任务记录的快照id继续查询数据,不需要从状态中恢复位置信息。

这个特性目前属于实验特性,还没有经过大量生产环境的验证,大家可以先提前了解一下。

下面来结合一个案例演示一下:
具体的思路是这样的:

  • 1:首先使用Consumer ID查询一次query_table表中的数据。
  • 2:然后停止之前的查询任务,向query_table表中模拟产生1条数据。
  • 3:重新启动第1步骤中的任务,验证一下是否只读取到了新增的那1条数据

创建object:FlinkPaimonStreamingQueryForConsumerid

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 流式读取
 * Created by xuwei
 */
object FlinkPaimonStreamingQueryForConsumerid {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //SET 'execution.runtime-mode' = 'streaming';
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)//使用流处理模式
    val tEnv = StreamTableEnvironment.create(env)

    //注意:在流处理模式中,操作Paimon表时需要开启Checkpoint。
    env.enableCheckpointing(5000)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //流式查询数据
    tEnv.executeSql(
      """
        |SELECT * FROM query_table
        |/*+ OPTIONS('consumer-id'='con-1') */ -- 指定消费者id
        |""".stripMargin)
      .print()
  }
}

注意:在这需要开启checkpoint,否则Consumer ID的功能无法正常触发。

第一次执行此代码,可以看到如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          18 |
| +I |                           mick |          20 |
| +I |                            tom |          19 |

停止此代码。

此时其实可以到hdfs中查看一下维护的Consumer ID信息:

[root@bigdata04 ~]# hdfs dfs -cat /paimon/default.db/query_table/consumer/consumer-con-1
{
  "nextSnapshot" : 4
}

这里面记录的是下一次需要读取的快照id,数值为4,此时最新的快照id是3,因为快照id为3的快照已经读取过了,下一个快照id就是4了。

其实直接查询consumer系统表也是可以看到这些信息的。

创建object:FlinkPaimonSystemTableForConsumerid

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 查询Paimon中的系统表
 * Created by xuwei
 */
object FlinkPaimonSystemTableForConsumerid {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")


    //consumer信息表,在查询数据的sql语句中指定了consumer-id之后才能看到
    println("====================consumer信息表===========================")
    tEnv.executeSql("SELECT * FROM query_table$consumers").print()

  }
}

执行代码,可以看到如下结果:

====================consumer信息表===========================
+----+--------------------------------+----------------------+
| op |                    consumer_id |     next_snapshot_id |
+----+--------------------------------+----------------------+
| +I |                          con-1 |                    4 |
+----+--------------------------------+----------------------+
1 row in set

从这可以看出来,next_snapshot_id4,查出来的结果是一样的。

接下来我们向query_table中新增一条数据。

创建object:FlinkSQLWriteToPaimonForConsumerid

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 通过FlinkSQL 向Paimon中模拟写入数据
 * Created by xuwei
 */
object FlinkSQLWriteToPaimonForConsumerid {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //创建Paimon类型的表
    tEnv.executeSql(
      """
        |CREATE TABLE IF NOT EXISTS `query_table`(
        |    name STRING,
        |    age INT,
        |    PRIMARY KEY (name) NOT ENFORCED
        |)
        |""".stripMargin)

    //写入数据
    tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('jessic',30)")

  }

}

执行代码。

最后,我们再重新启动FlinkPaimonStreamingQueryForConsumerid,可以看到如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                         jessic |          30 |

能看到这个结果,说明这个consumer id生效了,当我们第二次使用相同的consumer id读取这个表的时候,是可以基于之前的进度继续读取的。

停止此任务。

此时再执行FlinkPaimonSystemTableForConsumerid,查看最新的next_snapshot_id

====================consumer信息表===========================
+----+--------------------------------+----------------------+
| op |                    consumer_id |     next_snapshot_id |
+----+--------------------------------+----------------------+
| +I |                          con-1 |                    5 |
+----+--------------------------------+----------------------+
1 row in set

此时next_snapshot_id变成了5,这是正确的。

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/130811.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C# 同步异步大白话

同步异步大白话 背景 任务异步编程模型(TAP)提供了对异步代码的抽象。您可以像往常一样,将代码编写为一系列语句。您可以阅读该代码,就好像每条语句都在下一条语句开始之前完成一样。编译器执行许多转换,因为其中一些…

log4j CVE-2021-44228 RCE漏洞复现

一、漏洞特征 Apache Log4j 是 Apache 的一个开源项目,Apache Log4j2是一个基于Java的日志记录工具。该工具重写了Log4j框架,并且引入了大量丰富的特性。我们可以控制日志信息输送的目的地为控制台、文件、GUI组件等,通过定义每一条日志信息的…

双编码器构建机器人零力拖动/导纳控制思路

前言 这篇博客主要记录昨日与实验室大佬针对UR5机器人拖动示教功能实现的思路。由于本人并非主攻力控方面。直到昨天在做实验的时候,与力控组的大佬讨论过后才了解UR机器人实现导纳控制的思路。 关于导纳控制/零力拖动 导纳控制与阻抗控制单从字面去理解很容易记…

多因素验证如何让企业邮箱系统登录更安全?

企业邮箱系统作为基础的办公软件之一,既是企业内外沟通的重要工具,也是连接企业多个办公平台的桥梁,往往涉及到客户隐私、业务信息、企业机密等等。为了保护邮箱账户的安全,设置登陆密码无疑是保护账户安全的常用措施之一。然而随…

如何设计一个网盘系统的架构

1. 概述 现代生活中已经离不开网盘,比如百度网盘。在使用网盘的过程中,有没有想过它是如何工作的?在本文中,我们将讨论如何设计像百度网盘这样的系统的基础架构。 2. 系统需求 2.1. 功能性需求 用户能够上传照片/文件。用户能…

一题三解(暴力、二分查找算法、单指针):鸡蛋掉落

涉及知识点 暴力、二分查找算法、单指针 题目 给你 k 枚相同的鸡蛋&#xff0c;并可以使用一栋从第 1 层到第 n 层共有 n 层楼的建筑。 已知存在楼层 f &#xff0c;满足 0 < f < n &#xff0c;任何从 高于 f 的楼层落下的鸡蛋都会碎&#xff0c;从 f 楼层或比它低的…

归并分治 归并排序的应用 + 图解 + 笔记

归并分治 前置知识&#xff1a;讲解021-归并排序 归并排序 图解 递归 非递归 笔记-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/134338789?spm1001.2014.3001.5501原理&#xff1a; (1&#xff09;思考一个问题在大范围上的答案&#xff0c;是否等于&…

postman 参数化使用csv导入外部数据

一、参数化脚本入参 postman中变量用{{变量名}}表示变量 二、创建外部数据文件 csv文件逗号分割多个变量和对应值注意编码格式必须为utf-8 三、run collection导入数据文件 四、设置运行参数run 浏览数据 可调试设置迭代次数&#xff1a;防止批量出错&#xff0c;可先设定…

新风机小助手-风压变速器

风压变速器是一种用于调节系统中风量和风压的装置&#xff0c;常用于通风系统中。它通过改变进出风口的开度来调整风流的速度和风压。 风压变速器通常由进出风口和可调节的风门组成。风门可以手动或自动调节&#xff0c;控制进出风口的开度&#xff0c;从而改变风量和风压。根据…

uni-app基于vite和vue3创建并集成pinia实现数据持久化

一、uni-app基于Vite和Vue3创建并集成pinia实现数据持久化 文章目录 一、uni-app基于Vite和Vue3创建并集成pinia实现数据持久化1.如何创建基于Vite和Vue3的uni-app项目&#xff1f;2.选择其中一个分支&#xff0c;就是一个脚手架 二、以下都是基于vite-ts版本创建和配置1.目录结…

第一章 Object-XML 映射简介

文章目录 第一章 Object-XML 映射简介基础如何工作的映射选项IRIS 中的相关工具XML 文档的可能应用 第一章 Object-XML 映射简介 基础 将对象映射到 XML 一词意味着定义如何将该对象用作 XML 文档。要将对象映射到 XML&#xff0c;请将 %XML.Adaptor 添加到定义该对象的类的超…

线性代数-Python-04:线性系统+高斯消元的实现

文章目录 1 线性系统2 高斯-jordon消元法的实现2.1 Matrix2.2 Vector2.3 线性系统 3 行最简形式4 线性方程组的结构5 线性方程组-通用高斯消元的实现5.1 global5.2 Vector-引入is_zero5.3 LinearSystem5.4 main 1 线性系统 2 高斯-jordon消元法的实现 2.1 Matrix from .Vecto…

搭建完全分布式Hadoop

文章目录 一、Hadoop集群规划二、在主节点上配置Hadoop&#xff08;一&#xff09;登录虚拟机&#xff08;二&#xff09;设置主机名&#xff08;三&#xff09;主机名与IP地址映射&#xff08;四&#xff09;关闭与禁用防火墙&#xff08;五&#xff09;配置免密登录&#xff…

根据DataFrame指定的列该列中如果有n个不同元素则将其转化为n行显示explode()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 根据DataFrame指定的列 该列中如果有n个不同元素 则将其转化为n行显示 explode() 选择题 以下代码两次输出结果分别为几行&#xff1f; import pandas as pd df pd.DataFrame({种类:[蔬菜,水…

什么是 CASB,在网络安全中的作用

数字化转型正在稳步攀升&#xff0c;组织现在越来越关注在线生产力系统和协作平台&#xff0c;各行各业的企业都采用了不同的云基础设施服务模式。云基础架构提供按需服务&#xff0c;可提高易用性、访问控制、内容协作和减少内部存储资源&#xff0c;以及许多其他好处。迁移到…

【Mysql】查询mysql的版本

目录 cmd命令查询 mysql -- help(命令&#xff09; mysql -u root -p(命令&#xff09; 数据库管理工具查询 select version(); cmd命令查询 mysql -- help(命令&#xff09; mysql -u root -p(命令&#xff09; 执行该命令并且输入数据库密码 数据库管理工具查询 selec…

DAY50 309.最佳买卖股票时机含冷冻期 + 714.买卖股票的最佳时机含手续费

309.最佳买卖股票时机含冷冻期 题目要求&#xff1a;给定一个整数数组&#xff0c;其中第 i 个元素代表了第 i 天的股票价格 。 设计一个算法计算出最大利润。在满足以下约束条件下&#xff0c;你可以尽可能地完成更多的交易&#xff08;多次买卖一支股票&#xff09;: 你不…

如何解决msvcr90.dll文件丢失,电脑丢失msvcr90.dll什么意思

在计算机使用过程中&#xff0c;我们可能会遇到一些错误提示&#xff0c;比如“msvcr90.dll丢失”。这是因为msvcr90.dll是Microsoft Visual C 2008运行库的一部分&#xff0c;当这个文件丢失或者损坏时&#xff0c;就会导致程序无法正常运行。那么&#xff0c;如何解决这个问题…

麒麟V10系统下编译libcef_dll

目录 前言1、下载cef2、编译libcef_dll2.1 问题一 cmake版本太低2.2 问题二 无法识别的编译选项 -m64 3、总结 前言 本篇主要记录了在飞腾PC麒麟V10系统下编译libcef_dll时遇到的问题及解决方法。在Qt应用程序中使用QWebEngine加载HTML网页算是常规操作&#xff0c;但是涉及到…

Clickhouse SQL

insert insert操作和mysql一致 标准语法&#xff1a;insert into [table_name] values(…),(….)从表到表的插入&#xff1a;insert into [table_name] select a,b,c from [table_name_2] update 和 delete ClickHouse 提供了 Delete 和 Update 的能力&#xff0c;这类操作…