2.1 基于Flink SQL操作Paimon
在这里我们基于Flink 1.15(ON YARN)、Paimon 0.5
版本开发一个案例。
注意:想要使用Paimon是非常简单的,不需要复杂的安装部署,只需要使用一个jar包即可对它进行操作。
我们在使用Paimon的时候其实也可以把它简单理解为Hive
,这样便于理解。但是我们要知道,他们两个底层其实是不一样的,一个是数据仓库,一个是数据湖。
目前Paimon主要提供的是SQL
层面的API
,所以我们在使用Flink操作Paimon的时候需要用到Flink SQL。
还有一点需要注意:Paimon 目前只支持 Flink 1.17、1.16、1.15 和 1.14
,低版本的Flink暂时无法使用。
https://paimon.apache.org/docs/master/engines/flink/
在本案例中,我们使用Flink 1.15版本,同时我们需要使用Flink 1.15版本对应的Paimon jar
包。
注意:Paimon 目前有0.5和 0.6版本,0.5是稳定版本,0.6属于正在开发中的版本,目前建议大家使用0.5版本。
0.5稳定版本下载地址如下:
https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.15/0.5.0-incubating/paimon-flink-1.15-0.5.0-incubating.jar
使用Flink SQL操作Paimon的时候,可以在Flink SQL
代码中操作,也可以在sql-client.sh
中操作。
2.1.1 在Flink sql-client.sh中操作Paimon
下面我们首先来看一下如何在sql-client.sh
中操作Paimon。
(1)将这个Paimon jar包下载下来之后,上传到flink客户端节点中flink的lib目录里面。
[root@bigdata04 ~]# cd /data/soft/flink-1.15.0/lib/
[root@bigdata04 lib]# ll paimon-flink-1.15-0.5.0-incubating.jar
-rw-r--r--. 1 root root 26756622 May 22 2023 paimon-flink-1.15-0.5.0-incubating.jar
(2)确认这个Flink客户端节点中是否有Hadoop的相关环境,有没有配置HADOOP_CLASSPATH
环境变量。
在工作中,基本上Flink客户端节点上面也会有Hadoop的相关环境,HADOOP_CLASSPATH
我们之前也配置过了。
所以这一步就不需要额外做什么操作了。
(3)启动Hadoop集群。
因为我们要使用Flink ON YARN
模式,所以需要启动Hadoop集群。
[root@bigdata01 ~]# cd /data/soft/hadoop-3.2.0/
[root@bigdata01 hadoop-3.2.0]# sbin/start-all.sh
(4)启动sql-client
客户端
由于使用的是Flink ON YARN
模式,所以需要先使用yarn-session.sh
脚本在YARN上启动一个Flink集群。
[root@bigdata04 flink-1.15.0]# bin/yarn-session-1-15.sh -jm 1024m -tm 1024m -d
启动sql-client
[root@bigdata04 flink-1.15.0]# bin/sql-client-1-15.sh
2028-12-14 17:19:02,842 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.
Flink SQL>
(5)创建Paimon类型的Catalog
Flink SQL> CREATE CATALOG paimon_catalog WITH (
'type'='paimon',
'warehouse'='hdfs://bigdata01:9000/paimon'
);
Flink SQL> USE CATALOG paimon_catalog;
注意:Paimon中的Catalog也可以支持多种管理元数据的方式,目前我们使用的是默认的filesystem这种Metastore,也就是说Paimon的元数据目前会存储到我们在warehouse中指定的hdfs路径中。
除了这种Metastore之外,Paimon中的Catalog还可以支持Hive Metastore
,也就是Paimon共用Hive的Metastore,这块内容后面我们再详细讲解。
此时到HDFS中查看一下,可以看到在/paimon
目录下会自动创建default.db
,default.db
相当于是一个默认的数据库了。
[root@bigdata04 ~]# hdfs dfs -ls /paimon
Found 1 items
drwxr-xr-x - root supergroup 0 2028-11-07 10:35 /paimon/default.db
(6)创建表。
首先创建一个数据源表,这个表负责模拟产生实时数据。
Flink SQL> CREATE TABLE word_source (
word STRING
) WITH (
'connector' = 'datagen',
'fields.word.length' = '1',
'rows-per-second' = '1'
);
此时会看到如下错误信息:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.CatalogException: Paimon Catalog only supports paimon tables , and you don't need to specify 'connector'= 'paimon' when using Paimon Catalog
You can create TEMPORARY table instead if you want to create the table of other connector.
解释:此时创建的word_source
表不是Paimon
类型的表,但是却放在了Paimon
类型的Catalog
里面,所以就报错了。
当我们在Paimon类型的Catalog里面创建表的时候,表默认会使用'connector'= 'paimon'
,可以省略不写。
针对这个问题,有两种解决方案:
- 1:不在Paimon类型的Catalog里面创建这个表
- 2:在建表语句中增加
TEMPORARY
关键字来创建一个临时表,这样在建表的时候可以指定其他类型的connector。
Flink SQL> CREATE TEMPORARY TABLE word_source (
word STRING
) WITH (
'connector' = 'datagen',
'fields.word.length' = '1',
'rows-per-second' = '1'
);
然后创建一个结果表,这个表负责存储结果数据。
Flink SQL> CREATE TABLE wc_sink (
word STRING PRIMARY KEY NOT ENFORCED,
cnt BIGINT
);
注意:此时创建这个表的时候不需要在WITH里面指定'connector'
,因为我们在Paimon Catalog里面创建的表默认都是paimon类型的表。
此时可以到HDFS中查看到这个表对应的hdfs目录。
[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db
Found 1 items
drwxr-xr-x - root supergroup 0 2028-11-07 10:47 /paimon/default.db/wc_sink
在wc_sink
目录下面会有一个schema
目录,里面维护的是表的schema信息。
[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/wc_sink/schema
Found 1 items
-rw-r--r-- 2 root supergroup 265 2028-11-07 10:47 /paimon/default.db/wc_sink/schema/schema-0
咱们前面说了,目前这个paimon类型的catalog使用的metastore是默认的filesystem,所以表的元数据信息会存储在我们指定的hdfs路径里面。
问题:为什么刚才创建的word_source表的元数据信息没有存储在这里呢?
答案:因为word_source表是TEMPORARY
(临时)类型的表。
解释:这里面的schema-0
表示是这个表的第1个schema,因为表的schema的信息可能会发生变化,所以后期可能会有schema-1
、schema-2
等等。
查看schema-0
中的详细内容:
[root@bigdata04 ~]# hdfs dfs -cat /paimon/default.db/wc_sink/schema/schema-0
{
"id" : 0,
"fields" : [ {
"id" : 0,
"name" : "word",
"type" : "STRING NOT NULL"
}, {
"id" : 1,
"name" : "cnt",
"type" : "BIGINT"
} ],
"highestFieldId" : 1,
"partitionKeys" : [ ],
"primaryKeys" : [ "word" ],
"options" : { }
}
解释:
- id:对应的就是schema文件的编号。
- fields:对应的是表中的字段列表,以json数组形式存储,里面包含了id、name、type,分别表示字段的位置编号,字段名称,字段类型。
- highestFieldId:最大的字段位置编号。
- partitionKeys:表中的分区字段。
- primaryKeys:表中的主键字段。
- options:表的扩展配置。
(7)执行计算逻辑,向结果表中写入数据。
Flink SQL> SET 'execution.checkpointing.interval' = '10 s';
Flink SQL> INSERT INTO wc_sink SELECT word, COUNT(*) FROM word_source GROUP BY word;
注意:在流处理模式中,操作Paimon表时需要开启Checkpoint
。
此时可以到HDFS中查看一下:
[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/wc_sink
Found 4 items
drwxr-xr-x - root supergroup 0 2028-11-07 11:35 /paimon/default.db/wc_sink/bucket-0
drwxr-xr-x - root supergroup 0 2028-11-07 11:35 /paimon/default.db/wc_sink/manifest
drwxr-xr-x - root supergroup 0 2028-11-07 10:47 /paimon/default.db/wc_sink/schema
drwxr-xr-x - root supergroup 0 2028-11-07 11:35 /paimon/default.db/wc_sink/snapshot
在这里可以看到snapshot、manifest、bucket-0
等信息,这些其实就是Paimon中最核心的东西了。这些文件后续会有一个独立章节详细分析,在这大家先有一个大致的概念即可。
(8)OLAP查询。
OLAP查询其实就是离线查询了。
Flink SQL> -- 设置结果数据显示格式
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
Flink SQL> -- 切换到批处理模式
Flink SQL> RESET 'execution.checkpointing.interval';
Flink SQL> SET 'execution.runtime-mode' = 'batch';
Flink SQL> -- 执行OLAP查询
Flink SQL> SELECT * FROM wc_sink;
结果如下:
+------+--------+
| word | cnt |
+------+--------+
| 0 | 594057 |
| 1 | 594887 |
| 2 | 594064 |
| 3 | 594812 |
| 4 | 595013 |
| 5 | 594375 |
| 6 | 594052 |
| 7 | 593309 |
| 8 | 594334 |
| 9 | 594878 |
| a | 596356 |
| b | 593656 |
| c | 592675 |
| d | 595513 |
| e | 594268 |
| f | 593751 |
+------+--------+
注意:在这里多次执行这个SQL语句,可以发现结果是不一样的,因为表中的结果数据是一直在变化的,每次执行查询的时候都会读取最新快照中的数据。
(9)流式查询。
Flink SQL> -- 切换到流处理模式
Flink SQL> SET 'execution.runtime-mode' = 'streaming';
Flink SQL> -- 执行流式查询
Flink SQL> SELECT * FROM wc_sink;
结果如下:
+----+--------------------------------+----------------------+
| op | word | cnt |
+----+--------------------------------+----------------------+
| +I | 0 | 637764 |
| +I | 1 | 638705 |
| +I | 2 | 637742 |
| +I | 3 | 638773 |
| +I | 4 | 638829 |
| +I | 5 | 638090 |
| +I | 6 | 637866 |
| +I | 7 | 637241 |
| +I | 8 | 638128 |
| +I | 9 | 638221 |
| +I | a | 640101 |
| +I | b | 637347 |
| +I | c | 636275 |
| +I | d | 639562 |
| +I | e | 637851 |
| +I | f | 637505 |
| -U | 0 | 637764 |
| +U | 0 | 643996 |
| -U | 1 | 638705 |
| +U | 1 | 644960 |
| -U | 2 | 637742 |
| +U | 2 | 644017 |
| -U | 3 | 638773 |
| +U | 3 | 645018 |
| -U | 4 | 638829 |
| +U | 4 | 645143 |
| -U | 5 | 638090 |
| +U | 5 | 644230 |
| -U | 6 | 637866 |
| +U | 6 | 644086 |
| -U | 7 | 637241 |
| +U | 7 | 643529 |
| -U | 8 | 638128 |
| +U | 8 | 644379 |
| -U | 9 | 638221 |
| +U | 9 | 644502 |
| -U | a | 640101 |
| +U | a | 646362 |
| -U | b | 637347 |
| +U | b | 643531 |
| -U | c | 636275 |
| +U | c | 642611 |
| -U | d | 639562 |
| +U | d | 645845 |
| -U | e | 637851 |
| +U | e | 644111 |
| -U | f | 637505 |
| +U | f | 643680 |
| -U | 0 | 643996 |
| +U | 0 | 650285 |
| -U | 1 | 644960 |
| +U | 1 | 651196 |
| -U | 2 | 644017 |
| +U | 2 | 650205 |
| -U | 3 | 645018 |
| +U | 3 | 651225 |
| -U | 4 | 645143 |
| +U | 4 | 651405 |
| -U | 5 | 644230 |
| +U | 5 | 650612 |
| -U | 6 | 644086 |
| +U | 6 | 650322 |
| -U | 7 | 643529 |
| +U | 7 | 649680 |
| -U | 8 | 644379 |
| +U | 8 | 650659 |
| -U | 9 | 644502 |
| +U | 9 | 650721 |
| -U | a | 646362 |
| +U | a | 652679 |
| -U | b | 643531 |
| +U | b | 649683 |
| -U | c | 642611 |
| +U | c | 648862 |
| -U | d | 645845 |
| +U | d | 652217 |
| -U | e | 644111 |
| +U | e | 650447 |
| -U | f | 643680 |
| +U | f | 649802 |
此时可以看到结果数据是一直在发生变化的,因为数据源是一直源源不断在产生数据的。
在Flink SQL控制台,按ctrl+c
停止此流式查询任务。
(10)停止任务,退出sql-client
到Flink任务界面中停止Flink核心计算逻辑对应的任务
然后退出sql-client。
Flink SQL>exit;
最后停止YARN中的Flink集群。
[root@bigdata04 flink-1.15.0]# yarn application -kill application_1857176567822_0001
2.1.2 在Flink SQL代码中操作Paimon
创建一个maven项目:db_paimon
在项目中创建一个scala
目录。
在pom.xml
中引入相关依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.15.0</version>
</dependency>
<!-- log4j的依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink-1.15</artifactId>
<version>0.5</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/paimon-flink-1.15-0.5.0-incubating.jar</systemPath>
</dependency>
<!-- hadoop依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.15.0</version>
</dependency>
注意:由于目前在maven仓库里面还无法查到paimon的依赖,所以通过本地jar
包的形式引入paimon的依赖。
在resource
目录中添加log4j.properties
日志配置文件:
log4j.rootLogger=warn,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
首先来看一下如何使用Flink SQL代码向Paimon表中写入数据。
创建package:tech.xuwei.paimon.sql
创建object:FlinkSQLWriteToPaimon
代码如下:
package tech.xuwei.paimon.sql
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
/**
* 使用FlinkSQL向Paimon表中写入数据
* Created by xuwei
*/
object FlinkSQLWriteToPaimon {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
val tEnv = StreamTableEnvironment.create(env)
//注意:在流处理模式中,操作Paimon表时需要开启Checkpoint。
env.enableCheckpointing(5000)
//创建数据源表-普通表
//注意:此时这个表是在Flink SQL中默认的Catalog里面创建的
tEnv.executeSql(
"""
|CREATE TABLE word_source (
| word STRING
|) WITH (
| 'connector' = 'datagen',
| 'fields.word.length' = '1',
| 'rows-per-second' = '1'
|)
|""".stripMargin)
//创建Paimon类型的Catalog
tEnv.executeSql(
"""
|CREATE CATALOG paimon_catalog WITH (
| 'type'='paimon',
| 'warehouse'='hdfs://bigdata01:9000/paimon'
|)
|""".stripMargin)
tEnv.executeSql("USE CATALOG paimon_catalog")
//创建目的地表-Paimon表
tEnv.executeSql(
"""
|CREATE TABLE IF NOT EXISTS wc_sink_sql (
| word STRING,
| cnt BIGINT,
| PRIMARY KEY (word) NOT ENFORCED
|)
|""".stripMargin)
//向目的地表中写入数据
tEnv.executeSql(
"""
|INSERT INTO `paimon_catalog`.`default`.`wc_sink_sql`
|SELECT
| word,
| COUNT(*) as cnt
|FROM `default_catalog`.`default_database`.`word_source`
|GROUP BY word
|""".stripMargin).print()
}
}
注意:在这里我们创建表word_source
的时候没有创建临时表,因为我们不是在Paimon Catalog里面创建的。
运行代码,此时可以在hdfs中看到表中的相关文件内容
[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/wc_sink_sql
Found 4 items
drwxr-xr-x - yehua supergroup 0 2028-11-28 17:08 /paimon/default.db/wc_sink_sql/bucket-0
drwxr-xr-x - yehua supergroup 0 2028-11-28 17:08 /paimon/default.db/wc_sink_sql/manifest
drwxr-xr-x - yehua supergroup 0 2028-11-28 17:04 /paimon/default.db/wc_sink_sql/schema
drwxr-xr-x - yehua supergroup 0 2028-11-28 17:08 /paimon/default.db/wc_sink_sql/snapshot
停止代码。
接下来我们来使用Flink SQL代码从Paimon表中读取数据。
创建object:FlinkSQLReadFromPaimon
代码如下:
package tech.xuwei.paimon.sql
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
/**
* 使用FlinkSQL从Paimon表中读取数据
* Created by xuwei
*/
object FlinkSQLReadFromPaimon {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
val tEnv = StreamTableEnvironment.create(env)
//创建Paimon类型的Catalog
tEnv.executeSql(
"""
|CREATE CATALOG paimon_catalog WITH (
| 'type'='paimon',
| 'warehouse'='hdfs://bigdata01:9000/paimon'
|)
|""".stripMargin)
tEnv.executeSql("USE CATALOG paimon_catalog")
//读取Paimon表中的数据,并且打印输出结果
tEnv.executeSql(
"""
|SELECT * FROM `paimon_catalog`.`default`.`wc_sink_sql`
|""".stripMargin)
.print()
}
}
运行代码,此时可以看到类似这样的数据:
+----+--------------------------------+----------------------+
| op | word | cnt |
+----+--------------------------------+----------------------+
| +I | 1 | 26 |
| +I | 7 | 24 |
| +I | e | 14 |
| +I | 2 | 20 |
| +I | 4 | 23 |
| +I | 6 | 23 |
| +I | c | 26 |
| +I | 0 | 21 |
| +I | 3 | 28 |
| +I | 8 | 23 |
| +I | 5 | 20 |
| +I | d | 20 |
| +I | a | 14 |
| +I | b | 19 |
| +I | f | 25 |
| +I | 9 | 18 |
.....
此时再启动FlinkSQLWriteToPaimon
代码向Paimon表中写入数据,可以在控制台看到如下数据:
| -U | 2 | 20 |
| +U | 2 | 4 |
| -U | 4 | 23 |
| +U | 4 | 2 |
| -U | 6 | 23 |
| +U | 6 | 1 |
| -U | c | 26 |
| +U | c | 3 |
| -U | 1 | 26 |
| +U | 1 | 4 |
| -U | 7 | 24 |
| +U | 7 | 2 |
| -U | e | 14 |
| +U | e | 2 |
| -U | a | 14 |
| +U | a | 2 |
| -U | b | 19 |
| +U | b | 3 |
| -U | 5 | 20 |
| +U | 5 | 3 |
| -U | d | 20 |
| +U | d | 1 |
| -U | 0 | 21 |
| +U | 0 | 2 |
| -U | 8 | 23 |
| +U | 8 | 2 |
| -U | 9 | 18 |
| +U | 9 | 1 |
这样就可以看到表中数据的实时变更情况。
基于Flink DataStream API 操作Paimon
Pamion虽然没有提供DataStream API
,但是可以借助于Flink中DataStream
和Table
的转换来操作Pamion。
下面我们来具体演示一下:
首先来看一下如何使用Flink DataStreamAPI
向Paimon表中写入数据。
创建package:tech.xuwei.paimon.datastream
创建object:FlinkDataStreamWriteToPaimon
代码如下:
package tech.xuwei.paimon.datastream
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}
/**
* 使用Flink DataStream API向Paimon表中写入数据
* Created by xuwei
*/
object FlinkDataStreamWriteToPaimon {
def main(args: Array[String]): Unit = {
//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
val tEnv = StreamTableEnvironment.create(env)
//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
Row.ofKind(RowKind.INSERT, "jack", Int.box(10)),//+I
Row.ofKind(RowKind.INSERT, "tom", Int.box(20)),//+I
Row.ofKind(RowKind.UPDATE_BEFORE, "jack", Int.box(10)),//-U
Row.ofKind(RowKind.UPDATE_AFTER, "jack", Int.box(11))//+U
)(Types.ROW_NAMED(Array("name", "age"),Types.STRING,Types.INT))
//将DataStream转换为Table
val schema = Schema.newBuilder()
.column("name", DataTypes.STRING().notNull())//主键非空
.column("age", DataTypes.INT())
.primaryKey("name")//指定主键
.build()
val table = tEnv.fromChangelogStream(dataStream,schema,ChangelogMode.all())
//创建Paimon类型的Catalog
tEnv.executeSql(
"""
|CREATE CATALOG paimon_catalog WITH (
| 'type'='paimon',
| 'warehouse'='hdfs://bigdata01:9000/paimon'
|)
|""".stripMargin)
tEnv.executeSql("USE CATALOG paimon_catalog")
//注册临时表
tEnv.createTemporaryView("t1",table)
//创建Paimon类型的表
tEnv.executeSql(
"""
|-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
|CREATE TABLE IF NOT EXISTS `user` (
| name STRING,
| age INT,
| PRIMARY KEY (name) NOT ENFORCED
|) WITH (
| 'changelog-producer' = 'input'
|)
|""".stripMargin)
//向Paimon表中写入数据
tEnv.executeSql(
"""
|INSERT INTO `user`
|SELECT name,age FROM t1
|""".stripMargin)
}
}
运行代码,可以将changlog DataStream
数据写入到Paimon表中。
接下来开发一个从Paimon表中读取数据的代码。
创建object:FlinkDataStreamReadFromPaimon
代码如下:
package tech.xuwei.paimon.datastream
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
/**
* 使用Flink DataStream API从Paimon表中读取数据
* Created by xuwei
*/
object FlinkDataStreamReadFromPaimon {
def main(args: Array[String]): Unit = {
//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
val tEnv = StreamTableEnvironment.create(env)
//创建Paimon类型的Catalog
tEnv.executeSql(
"""
|CREATE CATALOG paimon_catalog WITH (
| 'type'='paimon',
| 'warehouse'='hdfs://bigdata01:9000/paimon'
|)
|""".stripMargin)
tEnv.executeSql("USE CATALOG paimon_catalog")
//将计算结果Table转换为DataStream
val execSql =
"""
|SELECT * FROM `user` -- 此时默认只能查到数据的最新值
| /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '1') */ -- 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
|""".stripMargin
val table = tEnv.sqlQuery(execSql)
//将结果数据转为Changlog DataStream数据流
val dataStream = tEnv.toChangelogStream(table)
//将DataStream中的数据输出打印到控制台
dataStream.print().setParallelism(1)
//执行任务
env.execute("FlinkDataStreamReadFromPaimon")
}
}
运行代码。
注意:如果没有手工指定数据读取模式,那么最终的结果数据是类似这样的,看不到数据的历史变化,只能看到最新的数据:
+U[jack, 11]
+I[tom, 20]
此时如果想要查看到数据历史的变化情况,需要通过动态表选项来指定数据读取(扫描)模式为from-snapshot
。
val execSql =
"""
|SELECT * FROM `user` -- 此时默认只能查到数据的最新值
|/*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '1') */ -- 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
|""".stripMargin
运行代码,结果如下:
+I[jack, 10]
-U[jack, 10]
+U[jack, 11]
+I[tom, 20]
这个结果其实和最开始我们构造的changelog datastream
数据流是一致的。
更多Paimon数据湖内容请关注
:https://edu.51cto.com/course/35051.html