2 快速上手使用Paimon数据湖

2.1 基于Flink SQL操作Paimon

在这里我们基于Flink 1.15(ON YARN)、Paimon 0.5版本开发一个案例。

注意:想要使用Paimon是非常简单的,不需要复杂的安装部署,只需要使用一个jar包即可对它进行操作。

我们在使用Paimon的时候其实也可以把它简单理解为Hive,这样便于理解。但是我们要知道,他们两个底层其实是不一样的,一个是数据仓库,一个是数据湖。

目前Paimon主要提供的是SQL层面的API,所以我们在使用Flink操作Paimon的时候需要用到Flink SQL。

还有一点需要注意:Paimon 目前只支持 Flink 1.17、1.16、1.15 和 1.14,低版本的Flink暂时无法使用。

https://paimon.apache.org/docs/master/engines/flink/
在这里插入图片描述

在本案例中,我们使用Flink 1.15版本,同时我们需要使用Flink 1.15版本对应的Paimon jar包。

注意:Paimon 目前有0.5和 0.6版本,0.5是稳定版本,0.6属于正在开发中的版本,目前建议大家使用0.5版本。

0.5稳定版本下载地址如下:

https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.15/0.5.0-incubating/paimon-flink-1.15-0.5.0-incubating.jar

使用Flink SQL操作Paimon的时候,可以在Flink SQL代码中操作,也可以在sql-client.sh中操作。

2.1.1 在Flink sql-client.sh中操作Paimon

下面我们首先来看一下如何在sql-client.sh中操作Paimon。

(1)将这个Paimon jar包下载下来之后,上传到flink客户端节点中flink的lib目录里面。

[root@bigdata04 ~]# cd /data/soft/flink-1.15.0/lib/
[root@bigdata04 lib]# ll paimon-flink-1.15-0.5.0-incubating.jar 
-rw-r--r--. 1 root root 26756622 May 22  2023 paimon-flink-1.15-0.5.0-incubating.jar

(2)确认这个Flink客户端节点中是否有Hadoop的相关环境,有没有配置HADOOP_CLASSPATH环境变量。
在工作中,基本上Flink客户端节点上面也会有Hadoop的相关环境,HADOOP_CLASSPATH我们之前也配置过了。
所以这一步就不需要额外做什么操作了。
(3)启动Hadoop集群。
因为我们要使用Flink ON YARN模式,所以需要启动Hadoop集群。

[root@bigdata01 ~]# cd /data/soft/hadoop-3.2.0/
[root@bigdata01 hadoop-3.2.0]# sbin/start-all.sh 

(4)启动sql-client客户端
由于使用的是Flink ON YARN模式,所以需要先使用yarn-session.sh脚本在YARN上启动一个Flink集群。

[root@bigdata04 flink-1.15.0]# bin/yarn-session-1-15.sh -jm 1024m -tm 1024m -d

启动sql-client

[root@bigdata04 flink-1.15.0]# bin/sql-client-1-15.sh 
2028-12-14 17:19:02,842 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.

Flink SQL>

(5)创建Paimon类型的Catalog

Flink SQL> CREATE CATALOG paimon_catalog WITH (
    'type'='paimon',
    'warehouse'='hdfs://bigdata01:9000/paimon'
);
Flink SQL> USE CATALOG paimon_catalog;

注意:Paimon中的Catalog也可以支持多种管理元数据的方式,目前我们使用的是默认的filesystem这种Metastore,也就是说Paimon的元数据目前会存储到我们在warehouse中指定的hdfs路径中。

除了这种Metastore之外,Paimon中的Catalog还可以支持Hive Metastore,也就是Paimon共用Hive的Metastore,这块内容后面我们再详细讲解。

此时到HDFS中查看一下,可以看到在/paimon目录下会自动创建default.dbdefault.db相当于是一个默认的数据库了。

[root@bigdata04 ~]# hdfs dfs -ls /paimon
Found 1 items
drwxr-xr-x   - root supergroup          0 2028-11-07 10:35 /paimon/default.db

(6)创建表。
首先创建一个数据源表,这个表负责模拟产生实时数据。

Flink SQL> CREATE TABLE word_source (
    word STRING
) WITH (
    'connector' = 'datagen',
'fields.word.length' = '1',
'rows-per-second' = '1'
);

此时会看到如下错误信息:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.CatalogException: Paimon Catalog only supports paimon tables , and you don't need to specify  'connector'= 'paimon' when using Paimon Catalog
 You can create TEMPORARY table instead if you want to create the table of other connector.

解释:此时创建的word_source表不是Paimon类型的表,但是却放在了Paimon类型的Catalog里面,所以就报错了。
当我们在Paimon类型的Catalog里面创建表的时候,表默认会使用'connector'= 'paimon',可以省略不写。

针对这个问题,有两种解决方案:

  • 1:不在Paimon类型的Catalog里面创建这个表
  • 2:在建表语句中增加TEMPORARY关键字来创建一个临时表,这样在建表的时候可以指定其他类型的connector。
Flink SQL> CREATE TEMPORARY TABLE word_source (
    word STRING
) WITH (
    'connector' = 'datagen',
'fields.word.length' = '1',
'rows-per-second' = '1'
);

然后创建一个结果表,这个表负责存储结果数据。

Flink SQL> CREATE TABLE wc_sink (
    word STRING PRIMARY KEY NOT ENFORCED,
    cnt BIGINT
);

注意:此时创建这个表的时候不需要在WITH里面指定'connector',因为我们在Paimon Catalog里面创建的表默认都是paimon类型的表。

此时可以到HDFS中查看到这个表对应的hdfs目录。

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db
Found 1 items
drwxr-xr-x   - root supergroup          0 2028-11-07 10:47 /paimon/default.db/wc_sink

wc_sink目录下面会有一个schema目录,里面维护的是表的schema信息。

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/wc_sink/schema
Found 1 items
-rw-r--r--   2 root supergroup        265 2028-11-07 10:47 /paimon/default.db/wc_sink/schema/schema-0

咱们前面说了,目前这个paimon类型的catalog使用的metastore是默认的filesystem,所以表的元数据信息会存储在我们指定的hdfs路径里面。

问题:为什么刚才创建的word_source表的元数据信息没有存储在这里呢?

答案:因为word_source表是TEMPORARY(临时)类型的表。

解释:这里面的schema-0表示是这个表的第1个schema,因为表的schema的信息可能会发生变化,所以后期可能会有schema-1schema-2等等。

查看schema-0中的详细内容:

[root@bigdata04 ~]# hdfs dfs -cat /paimon/default.db/wc_sink/schema/schema-0
{
  "id" : 0,
  "fields" : [ {
    "id" : 0,
    "name" : "word",
    "type" : "STRING NOT NULL"
  }, {
    "id" : 1,
    "name" : "cnt",
    "type" : "BIGINT"
  } ],
  "highestFieldId" : 1,
  "partitionKeys" : [ ],
  "primaryKeys" : [ "word" ],
  "options" : { }
}

解释:

  • id:对应的就是schema文件的编号。
  • fields:对应的是表中的字段列表,以json数组形式存储,里面包含了id、name、type,分别表示字段的位置编号,字段名称,字段类型。
  • highestFieldId:最大的字段位置编号。
  • partitionKeys:表中的分区字段。
  • primaryKeys:表中的主键字段。
  • options:表的扩展配置。

(7)执行计算逻辑,向结果表中写入数据。

Flink SQL> SET 'execution.checkpointing.interval' = '10 s';
Flink SQL> INSERT INTO wc_sink SELECT word, COUNT(*) FROM word_source GROUP BY word;

注意:在流处理模式中,操作Paimon表时需要开启Checkpoint

此时可以到HDFS中查看一下:

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/wc_sink
Found 4 items
drwxr-xr-x   - root supergroup          0 2028-11-07 11:35 /paimon/default.db/wc_sink/bucket-0
drwxr-xr-x   - root supergroup          0 2028-11-07 11:35 /paimon/default.db/wc_sink/manifest
drwxr-xr-x   - root supergroup          0 2028-11-07 10:47 /paimon/default.db/wc_sink/schema
drwxr-xr-x   - root supergroup          0 2028-11-07 11:35 /paimon/default.db/wc_sink/snapshot

在这里可以看到snapshot、manifest、bucket-0等信息,这些其实就是Paimon中最核心的东西了。这些文件后续会有一个独立章节详细分析,在这大家先有一个大致的概念即可。

(8)OLAP查询。
OLAP查询其实就是离线查询了。

Flink SQL> -- 设置结果数据显示格式
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';

Flink SQL> -- 切换到批处理模式
Flink SQL> RESET 'execution.checkpointing.interval';
Flink SQL> SET 'execution.runtime-mode' = 'batch';

Flink SQL> -- 执行OLAP查询
Flink SQL> SELECT * FROM wc_sink;

结果如下:

+------+--------+
| word |    cnt |
+------+--------+
|    0 | 594057 |
|    1 | 594887 |
|    2 | 594064 |
|    3 | 594812 |
|    4 | 595013 |
|    5 | 594375 |
|    6 | 594052 |
|    7 | 593309 |
|    8 | 594334 |
|    9 | 594878 |
|    a | 596356 |
|    b | 593656 |
|    c | 592675 |
|    d | 595513 |
|    e | 594268 |
|    f | 593751 |
+------+--------+

注意:在这里多次执行这个SQL语句,可以发现结果是不一样的,因为表中的结果数据是一直在变化的,每次执行查询的时候都会读取最新快照中的数据。

(9)流式查询。

Flink SQL> -- 切换到流处理模式
Flink SQL> SET 'execution.runtime-mode' = 'streaming';

Flink SQL> -- 执行流式查询
Flink SQL> SELECT * FROM wc_sink;

结果如下:

+----+--------------------------------+----------------------+
| op |                           word |                  cnt |
+----+--------------------------------+----------------------+
| +I |                              0 |               637764 |
| +I |                              1 |               638705 |
| +I |                              2 |               637742 |
| +I |                              3 |               638773 |
| +I |                              4 |               638829 |
| +I |                              5 |               638090 |
| +I |                              6 |               637866 |
| +I |                              7 |               637241 |
| +I |                              8 |               638128 |
| +I |                              9 |               638221 |
| +I |                              a |               640101 |
| +I |                              b |               637347 |
| +I |                              c |               636275 |
| +I |                              d |               639562 |
| +I |                              e |               637851 |
| +I |                              f |               637505 |
| -U |                              0 |               637764 |
| +U |                              0 |               643996 |
| -U |                              1 |               638705 |
| +U |                              1 |               644960 |
| -U |                              2 |               637742 |
| +U |                              2 |               644017 |
| -U |                              3 |               638773 |
| +U |                              3 |               645018 |
| -U |                              4 |               638829 |
| +U |                              4 |               645143 |
| -U |                              5 |               638090 |
| +U |                              5 |               644230 |
| -U |                              6 |               637866 |
| +U |                              6 |               644086 |
| -U |                              7 |               637241 |
| +U |                              7 |               643529 |
| -U |                              8 |               638128 |
| +U |                              8 |               644379 |
| -U |                              9 |               638221 |
| +U |                              9 |               644502 |
| -U |                              a |               640101 |
| +U |                              a |               646362 |
| -U |                              b |               637347 |
| +U |                              b |               643531 |
| -U |                              c |               636275 |
| +U |                              c |               642611 |
| -U |                              d |               639562 |
| +U |                              d |               645845 |
| -U |                              e |               637851 |
| +U |                              e |               644111 |
| -U |                              f |               637505 |
| +U |                              f |               643680 |
| -U |                              0 |               643996 |
| +U |                              0 |               650285 |
| -U |                              1 |               644960 |
| +U |                              1 |               651196 |
| -U |                              2 |               644017 |
| +U |                              2 |               650205 |
| -U |                              3 |               645018 |
| +U |                              3 |               651225 |
| -U |                              4 |               645143 |
| +U |                              4 |               651405 |
| -U |                              5 |               644230 |
| +U |                              5 |               650612 |
| -U |                              6 |               644086 |
| +U |                              6 |               650322 |
| -U |                              7 |               643529 |
| +U |                              7 |               649680 |
| -U |                              8 |               644379 |
| +U |                              8 |               650659 |
| -U |                              9 |               644502 |
| +U |                              9 |               650721 |
| -U |                              a |               646362 |
| +U |                              a |               652679 |
| -U |                              b |               643531 |
| +U |                              b |               649683 |
| -U |                              c |               642611 |
| +U |                              c |               648862 |
| -U |                              d |               645845 |
| +U |                              d |               652217 |
| -U |                              e |               644111 |
| +U |                              e |               650447 |
| -U |                              f |               643680 |
| +U |                              f |               649802 |

此时可以看到结果数据是一直在发生变化的,因为数据源是一直源源不断在产生数据的。

在Flink SQL控制台,按ctrl+c停止此流式查询任务。

(10)停止任务,退出sql-client
到Flink任务界面中停止Flink核心计算逻辑对应的任务
在这里插入图片描述

然后退出sql-client。

Flink SQL>exit;

最后停止YARN中的Flink集群。

[root@bigdata04 flink-1.15.0]# yarn application -kill application_1857176567822_0001

2.1.2 在Flink SQL代码中操作Paimon

创建一个maven项目:db_paimon

在项目中创建一个scala目录。

pom.xml中引入相关依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.15.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.15.0</version>
</dependency>
<!-- log4j的依赖 -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.10</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.10</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
    <version>1.15.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.15.0</version>
</dependency>
<dependency>
    <groupId>org.apache.paimon</groupId>
    <artifactId>paimon-flink-1.15</artifactId>
    <version>0.5</version>
    <scope>system</scope>
    <systemPath>${project.basedir}/lib/paimon-flink-1.15-0.5.0-incubating.jar</systemPath>
</dependency>
<!-- hadoop依赖 -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-files</artifactId>
    <version>1.15.0</version>
</dependency>

注意:由于目前在maven仓库里面还无法查到paimon的依赖,所以通过本地jar包的形式引入paimon的依赖。

resource目录中添加log4j.properties日志配置文件:

log4j.rootLogger=warn,stdout

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

首先来看一下如何使用Flink SQL代码向Paimon表中写入数据。

创建package:tech.xuwei.paimon.sql
创建object:FlinkSQLWriteToPaimon

代码如下:

package tech.xuwei.paimon.sql

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 使用FlinkSQL向Paimon表中写入数据
 * Created by xuwei
 */
object FlinkSQLWriteToPaimon {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //注意:在流处理模式中,操作Paimon表时需要开启Checkpoint。
    env.enableCheckpointing(5000)

    //创建数据源表-普通表
    //注意:此时这个表是在Flink SQL中默认的Catalog里面创建的
    tEnv.executeSql(
      """
        |CREATE TABLE word_source (
        |    word STRING
        |) WITH (
        |    'connector' = 'datagen',
        |    'fields.word.length' = '1',
        |    'rows-per-second' = '1'
        |)
        |""".stripMargin)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //创建目的地表-Paimon表
    tEnv.executeSql(
      """
        |CREATE TABLE IF NOT EXISTS wc_sink_sql (
        |    word STRING,
        |    cnt BIGINT,
        |    PRIMARY KEY (word) NOT ENFORCED
        |)
        |""".stripMargin)

    //向目的地表中写入数据
    tEnv.executeSql(
      """
        |INSERT INTO `paimon_catalog`.`default`.`wc_sink_sql`
        |SELECT
        |    word,
        |    COUNT(*) as cnt
        |FROM `default_catalog`.`default_database`.`word_source`
        |GROUP BY word
        |""".stripMargin).print()

  }

}

注意:在这里我们创建表word_source的时候没有创建临时表,因为我们不是在Paimon Catalog里面创建的。

运行代码,此时可以在hdfs中看到表中的相关文件内容

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/wc_sink_sql
Found 4 items
drwxr-xr-x   - yehua supergroup          0 2028-11-28 17:08 /paimon/default.db/wc_sink_sql/bucket-0
drwxr-xr-x   - yehua supergroup          0 2028-11-28 17:08 /paimon/default.db/wc_sink_sql/manifest
drwxr-xr-x   - yehua supergroup          0 2028-11-28 17:04 /paimon/default.db/wc_sink_sql/schema
drwxr-xr-x   - yehua supergroup          0 2028-11-28 17:08 /paimon/default.db/wc_sink_sql/snapshot

停止代码。

接下来我们来使用Flink SQL代码从Paimon表中读取数据。
创建object:FlinkSQLReadFromPaimon

代码如下:

package tech.xuwei.paimon.sql

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 使用FlinkSQL从Paimon表中读取数据
 * Created by xuwei
 */
object FlinkSQLReadFromPaimon {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //读取Paimon表中的数据,并且打印输出结果
    tEnv.executeSql(
      """
        |SELECT * FROM  `paimon_catalog`.`default`.`wc_sink_sql`
        |""".stripMargin)
      .print()

  }

}

运行代码,此时可以看到类似这样的数据:

+----+--------------------------------+----------------------+
| op |                           word |                  cnt |
+----+--------------------------------+----------------------+
| +I |                              1 |                   26 |
| +I |                              7 |                   24 |
| +I |                              e |                   14 |
| +I |                              2 |                   20 |
| +I |                              4 |                   23 |
| +I |                              6 |                   23 |
| +I |                              c |                   26 |
| +I |                              0 |                   21 |
| +I |                              3 |                   28 |
| +I |                              8 |                   23 |
| +I |                              5 |                   20 |
| +I |                              d |                   20 |
| +I |                              a |                   14 |
| +I |                              b |                   19 |
| +I |                              f |                   25 |
| +I |                              9 |                   18 |
.....

此时再启动FlinkSQLWriteToPaimon代码向Paimon表中写入数据,可以在控制台看到如下数据:

| -U |                              2 |                   20 |
| +U |                              2 |                    4 |
| -U |                              4 |                   23 |
| +U |                              4 |                    2 |
| -U |                              6 |                   23 |
| +U |                              6 |                    1 |
| -U |                              c |                   26 |
| +U |                              c |                    3 |
| -U |                              1 |                   26 |
| +U |                              1 |                    4 |
| -U |                              7 |                   24 |
| +U |                              7 |                    2 |
| -U |                              e |                   14 |
| +U |                              e |                    2 |
| -U |                              a |                   14 |
| +U |                              a |                    2 |
| -U |                              b |                   19 |
| +U |                              b |                    3 |
| -U |                              5 |                   20 |
| +U |                              5 |                    3 |
| -U |                              d |                   20 |
| +U |                              d |                    1 |
| -U |                              0 |                   21 |
| +U |                              0 |                    2 |
| -U |                              8 |                   23 |
| +U |                              8 |                    2 |
| -U |                              9 |                   18 |
| +U |                              9 |                    1 |

这样就可以看到表中数据的实时变更情况。

基于Flink DataStream API 操作Paimon

Pamion虽然没有提供DataStream API,但是可以借助于Flink中DataStreamTable的转换来操作Pamion。

下面我们来具体演示一下:

首先来看一下如何使用Flink DataStreamAPI向Paimon表中写入数据。

创建package:tech.xuwei.paimon.datastream
创建object:FlinkDataStreamWriteToPaimon

代码如下:

package tech.xuwei.paimon.datastream

import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}

/**
 * 使用Flink DataStream API向Paimon表中写入数据
 * Created by xuwei
 */
object FlinkDataStreamWriteToPaimon {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //手工构造一个Changelog DataStream 数据流
    val dataStream = env.fromElements(
      Row.ofKind(RowKind.INSERT, "jack", Int.box(10)),//+I
      Row.ofKind(RowKind.INSERT, "tom", Int.box(20)),//+I
      Row.ofKind(RowKind.UPDATE_BEFORE, "jack", Int.box(10)),//-U
      Row.ofKind(RowKind.UPDATE_AFTER, "jack", Int.box(11))//+U
    )(Types.ROW_NAMED(Array("name", "age"),Types.STRING,Types.INT))


    //将DataStream转换为Table
    val schema = Schema.newBuilder()
      .column("name", DataTypes.STRING().notNull())//主键非空
      .column("age", DataTypes.INT())
      .primaryKey("name")//指定主键
      .build()
    val table = tEnv.fromChangelogStream(dataStream,schema,ChangelogMode.all())

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //注册临时表
    tEnv.createTemporaryView("t1",table)

    //创建Paimon类型的表
    tEnv.executeSql(
      """
        |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
        |CREATE TABLE IF NOT EXISTS `user` (
        |    name STRING,
        |    age INT,
        |    PRIMARY KEY (name) NOT ENFORCED
        |) WITH (
        |    'changelog-producer' = 'input'
        |)
        |""".stripMargin)

    //向Paimon表中写入数据
    tEnv.executeSql(
      """
        |INSERT INTO `user`
        |SELECT name,age FROM t1
        |""".stripMargin)
  }

}

运行代码,可以将changlog DataStream数据写入到Paimon表中。

接下来开发一个从Paimon表中读取数据的代码。
创建object:FlinkDataStreamReadFromPaimon

代码如下:

package tech.xuwei.paimon.datastream

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 使用Flink DataStream API从Paimon表中读取数据
 * Created by xuwei
 */
object FlinkDataStreamReadFromPaimon {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //将计算结果Table转换为DataStream
    val execSql =
      """
        |SELECT * FROM `user` -- 此时默认只能查到数据的最新值
        | /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '1') */ -- 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
        |""".stripMargin
    val table = tEnv.sqlQuery(execSql)

    //将结果数据转为Changlog DataStream数据流
    val dataStream = tEnv.toChangelogStream(table)

    //将DataStream中的数据输出打印到控制台
    dataStream.print().setParallelism(1)

    //执行任务
    env.execute("FlinkDataStreamReadFromPaimon")
  }

}

运行代码。
注意:如果没有手工指定数据读取模式,那么最终的结果数据是类似这样的,看不到数据的历史变化,只能看到最新的数据:

+U[jack, 11]
+I[tom, 20]

此时如果想要查看到数据历史的变化情况,需要通过动态表选项来指定数据读取(扫描)模式为from-snapshot

val execSql =
      """
        |SELECT * FROM `user` -- 此时默认只能查到数据的最新值
        |/*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '1') */ -- 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
        |""".stripMargin

运行代码,结果如下:

+I[jack, 10]
-U[jack, 10]
+U[jack, 11]
+I[tom, 20]

这个结果其实和最开始我们构造的changelog datastream数据流是一致的。


更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/120009.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

云闪付支付接口的技术实现方式

&#xff08;一&#xff09;整体框架。      云闪付的整体架构如图 1 所示&#xff0c;总体与原有的支付清算体系相同&#xff0c;只是增加了云端支付平台、移动应用平台和移动应用。云端支付平台主要对移动应用端的限制密钥进行更新和管理&#xff0c;同时对云端支付账户进…

2011年408计网

第33题 TCP/IP 参考模型的网络层提供的是&#xff08;&#xff09;A. 无连接不可靠的数据报服务B. 无连接可靠的数据报服务C. 有连接不可靠的虚电路服务D. 有连接可靠的虚电路服务 本题考查TCP/IP 参考模型的网络层 若网络层提供的是虚电路服务&#xff0c;则必须建立网络层的…

WPF中依赖属性及附加属性的概念及用法

完全来源于十月的寒流&#xff0c;感谢大佬讲解 依赖属性 由依赖属性提供的属性功能 与字段支持的属性不同&#xff0c;依赖属性扩展了属性的功能。 通常&#xff0c;添加的功能表示或支持以下功能之一&#xff1a; 资源数据绑定样式动画元数据重写属性值继承WPF 设计器集成 …

佳能相机拍出来的dat文件怎么修复为正常视频

3-3 佳能相机是普通人用得最多的相机之一&#xff0c;也有一些专业机会用于比较重要的场景&#xff0c;比如婚庆、会议录像、家庭录像使用等。 但作为电子产品&#xff0c;经常会出现一些奇怪的故障&#xff0c;最严重的应该就是拍出来的东西打不开了。 本文案例是佳能相机拍…

校园安防监控系统升级改造方案:如何实现设备利旧上云与AI视频识别感知?

一、背景与需求分析 随着现代安防监控科技的兴起和在各行各业的广泛应用&#xff0c;监控摄像头成为众所周知的产品&#xff0c;也为人类的工作生活提供了很大的便利。由于科技的发达&#xff0c;监控摄像头的升级换代也日益频繁。每年都有不计其数的摄像头被拆掉闲置&#xf…

51单片机-串口通信

文章目录 前言1.基础介绍2.串口实战3.4. 前言 1.基础介绍 常见1&#xff0c;2&#xff0c;3,电源 常用方式1 fosc外部晶振 2.串口实战 3. 4.

软件测试/测试开发丨探索Python的魔力:从第一个程序到快捷键大揭秘

点此获取更多相关资料 第一个 Python 程序 通过程序输出 Hello World 是在学习每一门编程语言时&#xff0c;都会接触到的第一个程序。 在 Python 中&#xff0c;可以通过内置函数 print() 实现向控制台输出 Hello World 。 使用 print()输出 可以进入 命令行交互模式 或使…

从研发域到量产域的自动驾驶工具链探索与实践

导读 本文整理自 2023 年 9 月 5 日百度云智大会 - 智能汽车分论坛&#xff0c;百度智能云自动驾驶云研发高级经理徐鹏的主题演讲《从研发域到量产域的自动驾驶工具链探索与实践》。 全文中部段落附有演讲中 2 个产品演示视频的完整版&#xff0c;精彩不容错过。 (视频观看&…

生态环境领域基于R语言piecewiseSEM结构方程模型

结构方程模型&#xff08;Sructural Equation Modeling&#xff0c;SEM&#xff09;可分析系统内变量间的相互关系&#xff0c;并通过图形化方式清晰展示系统中多变量因果关系网&#xff0c;具有强大的数据分析功能和广泛的适用性&#xff0c;是近年来生态、进化、环境、地学、…

【网络】五中IO模型介绍 + 多路转接中select和poll服务器的简单编写

高级IO 前言正式开始前面的IO函数简单过一遍什么叫做低效的IO钓鱼的例子同步IO和异步IO五种IO模型阻塞IO非阻塞IO信号驱动多路转接异步IO 小结 代码演示非阻塞IO多路转接select介绍简易select服务器timeout 为 nullptrtimeout 为 {0, 0}timeout 为 {5, 0}调用accept select编写…

macos端串口调试推荐 serial直装激活 for mac

serial for mac版软件特色 1.准备好macOS High Sierra 最近的升级是否会让您的设备落后&#xff1f;Serial将使其恢复正常工作&#xff0c;同时保持Mac的安全功能完好无损。 2.完美无瑕的仿真 Serial是一个全功能的终端仿真器&#xff0c;支持Xterm&#xff0c;VT102和ANSI…

pyspark连接mysql数据库报错

使用pyspark连接mysql数据库代码如下 spark_conf SparkConf().setAppName("MyApp").setMaster("local")spark SparkSession.builder.config(confspark_conf).getOrCreate()url "jdbc:mysql://localhost:3306/test?useUnicodetrue&characterE…

Mactracker for mac(硬件信息查询工具)免费下载

想知道你电脑的信息吗&#xff1f;Mactracker Mac版是Macos上一款硬件信息查询工具&#xff0c;可以查询电脑中的硬件信息&#xff0c;还可以查看您使用软件的具体情况&#xff0c;苹果电脑产品和周边产品的信息&#xff0c;售价等等&#xff0c;让您对电脑有更多深刻的了解。 …

NowCoder | 环形链表的约瑟夫问题

NowCoder | 环形链表的约瑟夫问题 OJ链接 思路&#xff1a; 创建带环链表带环链表的删除节点 代码如下&#xff1a; #include<stdlib.h>typedef struct ListNode ListNode; ListNode* ListBuyNode(int x) {ListNode* node (ListNode*)malloc(sizeof(ListNode));node…

Zabbix如何监控腾讯云NAT网关

1、NAT网关介绍 NAT 网关&#xff08;NAT Gateway&#xff09;是一种支持 IP 地址转换服务&#xff0c;提供网络地址转换能力&#xff0c;主要包括SNAT&#xff08;Source Network Address Translation&#xff0c;源网络地址转换&#xff09;和DNAT&#xff08;Destination N…

在Python中使用deepfakes实现AI换脸功能

目录 一、Deepfakes技术原理 二、Deepfakes技术实现方法 三、Deepfakes技术应用与实现代码 四、结论 近年来&#xff0c;深度学习技术在图像处理、计算机视觉和人工智能领域取得了显著的进步。其中&#xff0c;Deepfakes技术是一种基于深度学习的图像合成技术&#xff0c;可…

AI时代产品经理升级之道:ChatGPT让产品经理插上翅膀

文章目录 一、ChatGPT简介二、ChatGPT在产品经理工作中的应用1. 快速获取用户反馈2. 智能分析竞品3. 智能推荐产品4.分析市场趋势5.优化产品功能 三、总结与展望《AI时代产品经理升级之道&#xff1a;ChatGPT让产品经理插上翅膀》亮点内容简介目录作者简介获取方式 随着人工智能…

使用oracle虚拟机添加新硬盘

1、关闭运行的虚拟机后配置 单击选择要配置的oracle虚拟机&#xff0c;单击设置–>存储—>控制器&#xff0c;单击添加虚拟硬盘图标。 2、配置硬盘 单击“创建”&#xff0c;单击“下一步”&#xff0c;选择需要创建的虚拟硬盘大小&#xff0c;完成创建。 完成创建后…

2023中国视频云市场报告:腾讯云音视频解决方案份额连续六次蝉联榜首,加速全球化布局

近日&#xff0c;国际数据公司&#xff08;IDC&#xff09;发布了《中国视频云市场跟踪&#xff08;2023上半年&#xff09;》报告&#xff0c;腾讯云音视频的解决方案份额连续六次蝉联榜首&#xff0c;并在视频生产创作与媒资管理市场份额中排名第一。同时&#xff0c;在实时音…

[云原生案例2.1 ] Kubernetes的部署安装 【单master集群架构 ---- (二进制安装部署)】节点部分

文章目录 1. 常见的K8S安装部署方式1.1 Minikube1.2 Kubeadm1.3 二进制安装部署 2. Kubernetes单master集群架构 ---- &#xff08;二进制安装部署&#xff09;2.1 前置准备2.2 操作系统初始化2.3 部署 docker引擎 ---- &#xff08;所有 node 节点&#xff09;2.4 部署 etcd 集…