简介
Apache Paimon 是一种湖格式,支持使用 Flink 和 Spark 构建实时湖仓一体架构 用于流式处理和批处理操作。Paimon创新性地将湖格式与LSM(Log-structured merge-tree)相结合 结构,将实时流式更新引入 Lake 架构。
Paimon提供以下核心能力:
实时更新:
主键表支持大规模更新的写入,具有非常高的更新性能,通常通过 Flink Streaming。
支持定义合并引擎,随心所欲地更新记录。删除重复数据以保留最后一行、部分更新、聚合记录或第一行,由您决定。
支持定义 changelog-producer,在合并引擎的更新中生成正确和完整的更改日志,简化您的流分析。
附加数据处理:
追加表(无主键)提供大规模批处理和流式处理功能。自动小文件合并。
支持z-order排序的数据压缩以优化文件布局,使用 minmax 等索引提供基于数据跳过的快速查询。
数据湖功能:
可扩展元数据:支持存储PB级大规模数据集,存储大量分区。
支持 ACID 事务 & 时间旅行 & 模式演进。
版本
文中使用相关技术版本为flink-1.16和paimon-flink-1.16-0.8.1.jar
添加jar
将paimon-flink-1.16-0.8.1.jar添加到FLINK_HOME/lib目录下
修改FLINK配置文件
vim <FLINK_HOME>/conf/flink-conf.yaml
taskmanager.numberOfTaskSlots: 2
启动FLINK
./bin/start-cluster.sh
执行FLINK SQL
./bin/sql-client.sh
Create a Catalog and a Table
-- if you're trying out Paimon in a distributed environment,
-- the warehouse path should be set to a shared file system, such as HDFS or OSS
CREATE CATALOG my_catalog WITH (
'type'='paimon',
'warehouse'='file:/tmp/paimon'
);
USE CATALOG my_catalog;
-- create a word count table
CREATE TABLE word_count (
word STRING PRIMARY KEY NOT ENFORCED,
cnt BIGINT
);
写数据
-- create a word data generator table
CREATE TEMPORARY TABLE word_table (
word STRING
) WITH (
'connector' = 'datagen',
'fields.word.length' = '1'
);
-- paimon requires checkpoint interval in streaming mode
SET 'execution.checkpointing.interval' = '10 s';
-- write streaming data to dynamic table
INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;
OLAP 查询
-- use tableau result mode
SET 'sql-client.execution.result-mode' = 'tableau';
-- switch to batch mode
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';
-- olap query the table
SELECT * FROM word_count;
流式查询
-- switch to streaming mode
SET 'execution.runtime-mode' = 'streaming';
-- track the changes of table and calculate the count interval statistics
SELECT `interval`, COUNT(*) AS interval_cnt FROM
(SELECT cnt / 10000 AS `interval` FROM word_count) GROUP BY `interval`;
停止FLINK
./bin/stop-cluster.sh