大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(2)

大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(2)

我们上次已经了解了Paimon的下载及安装,并且了解了主键表的引擎以及changelog-producer的含义

  • 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(1)

今天,我们继续快速了解下最近比较火的Apache Paimon:

  • 官方文档:https://paimon.apache.org/docs/1.0/
  • 推荐阅读:当流计算邂逅数据湖:Paimon 的前生今世

1 利用Paimon做维表join

  • 在流式处理中,Lookup Join用来从另一个表(Paimon)中“查字典”来补充流的信息。一个表需要有时间标记(处理时间属性),另一个表需要支持快速查找(查找源连接器)
  • Paimon在Flink中支持对带有主键的表和追加表进行Lookup Join操作

1.1 常规Lookup

SET 'execution.runtime-mode' = 'streaming';

-- 1、用户维表
CREATE TABLE customers (
    id INT PRIMARY KEY NOT ENFORCED,
    name STRING    comment '姓名',
    country STRING comment '城市',
    zip STRING     comment '邮编'
) WITH (
    'connector' = 'paimon'
);

-- 插入维表数据
INSERT INTO customers VALUES
     (1,'tom','伦敦','123')
    ,(2,'hank','纽约','456')
    ,(3,'小明','北京','789');


-- 2、模拟订单表(数据流)
drop TEMPORARY TABLE orders_info;
CREATE TEMPORARY TABLE orders_info (
    order_id INT,
    total INT,
    customer_id INT,
    proc_time AS PROCTIME()
) WITH (
    'connector' = 'datagen', 
    'rows-per-second'='1', 
    'fields.order_id.kind'='sequence', 
    'fields.order_id.start'='1', 
    'fields.order_id.end'='1000000', 
    'fields.total.kind'='random', 
    'fields.total.min'='1', 
    'fields.total.max'='1000', 
    'fields.customer_id.kind'='random', 
    'fields.customer_id.min'='1', 
    'fields.customer_id.max'='3'
);

-- 3、维表join
SELECT 
      o.order_id
    , o.total
    , c.country
    , c.zip
FROM 
    orders_info AS o
JOIN 
    customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON 
    o.customer_id = c.id;

-- 可以看到下面的结果
+----+----------+----------+-------------+---------+
| op | order_id |    total |     country |     zip |
+----+----------+----------+-------------+---------+
| +I |        1 |      179 |        纽约 |     456 |
| +I |        2 |       31 |        北京 |     789 |
| +I |        3 |      148 |        北京 |     789 |
| +I |        4 |      774 |        北京 |     789 |

1.2 同步重试Lookup

  • 如果用户维表(查找表)的数据未准备好,导致订单表(主表)的记录无法完成连接,你可以考虑使用Flink的延迟重试策略进行查找。
  • 这个功能仅适用于Flink 1.16及以上的版本。
  • 下面sql的提示(hints)设置了重试策略:
    • 'table'='c': 指定了应用查找重试策略的目标表的别名,在这个例子中是customers表,其别名为c。
    • 'retry-predicate'='lookup_miss': 定义了触发重试的条件。这里的条件是lookup_miss,意味着如果在查找过程中没有找到对应的数据(即查找缺失),就会触发重试机制。
    • 'retry-strategy'='fixed_delay': 设置了重试的策略为固定延迟,即每次重试之间会有固定的等待时间。
    • 'fixed-delay'='1s': 当采用固定延迟重试策略时,这里设定了每次重试前等待的时间长度为1秒。
    • 'max-attempts'='600': 设定最大重试次数为600次。结合上面的fixed-delay设置,这意味着系统会每隔1秒尝试一次数据查找,最多尝试600次。
-- enrich each order with customer information
SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */
	  o.order_id
	, o.total
	, c.country
	, c.zip
FROM 
	orders AS o
JOIN 
	customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON 
	o.customer_id = c.id;

1.3 异步重试Lookup

  • 在同步重试机制下,如果处理某一条记录时遇到了问题(例如查找失败),系统会按照设定的重试策略反复尝试直到成功或者达到最大重试次数。在这期间,这条记录后面的其他记录即使没有问题也无法被处理,因为整个处理流程被阻塞了。这会导致数据处理的整体效率低下,甚至可能造成作业延迟或超时。
  • 使用异步加上allow_unordered,可以在某些记录在查找时缺失也不会再阻塞其他记录。
  • 下面sql配置项解释:
    • 'output-mode'='allow_unordered': 设置输出模式允许无序,意味着当查找失败时不会阻塞其他记录的处理,适合于不需要严格顺序的场景。
    • 'lookup.async'='true': 启用异步查找,提高效率,避免因等待某个查找结果而阻塞其它查找操作。
    • 'lookup.async-thread-number'='16': 设置用于异步查找的线程数为16,这可以加速查找过程,特别是在高并发情况下。
  • 注意:如果主表(orders)是CDC流,allow_unordered会被Flink SQL忽略(只支持追加流),流作业可能会被阻塞。可以尝试使用Paimon的audit_log系统表功能来绕过这个问题(将CDC流转为追加流)。
SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'output-mode'='allow_unordered', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */
	  o.order_id
	, o.total
	, c.country
	, c.zip
FROM 
	orders AS o
JOIN 
	customers /*+ OPTIONS('lookup.async'='true', 'lookup.async-thread-number'='16') */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON 
	o.customer_id = c.id;

1.4 max_pt功能

  • 在传统的数据仓库中,每个分区通常维护最新的完整数据,因此这种分区表只需要连接最新的分区即可。Paimon特别为此场景开发了max_pt功能。

  • 通过max_pt,Lookup节点能够自动刷新并查询最新分区的数据,确保所使用的客户信息始终是最新的,而不需要手动干预来确定或切换到最新的数据分区。

  • 这种方法特别适用于需要频繁更新和查询最新数据的场景,可以大大提高数据处理的效率和准确性。

-- 1、分区用户维表
drop table if exists customers;
CREATE TABLE customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING,
  dt STRING,
  PRIMARY KEY (id, dt) NOT ENFORCED
) PARTITIONED BY (dt);

-- 插入数据(维表关联时候,只查找'2025-02-19'分区数据)
INSERT INTO customers VALUES
(1, 'Alice', 'USA', '10001', '2025-02-18'),
(2, 'Bob', 'UK', '20002', '2025-02-18'),
(3, 'Charlie', 'Germany', '30003', '2025-02-18'),
(1, 'Alice', 'USA', '10002', '2025-02-19'), -- 更新了 Alice 的邮编
(4, 'David', 'France', '40004', '2025-02-19');



-- 2、订单信息表(解析kafka数据)
CREATE TEMPORARY TABLE orders (
  order_id BIGINT,
  customer_id INT,
  total DECIMAL(10, 2),
  proc_time AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'orders_topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'json',
  'properties.group.id' = 'testordersGroup',
  'scan.startup.mode' = 'earliest-offset',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
);


-- 创建topic
/opt/apps/kafka_2.12-2.6.2/bin/kafka-topics.sh --create --topic orders_topic --replication-factor 1 --partitions 1 --bootstrap-server centos01:9092 
-- 启动命令行生产者,生产数据
/opt/apps/kafka_2.12-2.6.2/bin/kafka-console-producer.sh --topic orders_topic --bootstrap-server centos01:9092
{"order_id":1001,"customer_id":1,"total":50.0}
{"order_id":1002,"customer_id":2,"total":30.0}
{"order_id":1004,"customer_id":4,"total":20.0}


-- 'lookup.dynamic-partition'='max_pt()': 这个选项指示查找节点自动定位并使用最新(最大)的分区。max_pt()函数帮助系统识别最新的分区,确保只查询最新的数据。
-- 'lookup.dynamic-partition.refresh-interval'='1 h': 设置了查找节点刷新最新分区的时间间隔为1小时。这意味着系统会每隔1小时检查一次是否有新的分区可用,并自动更新到最新分区的数据。

SELECT 
	  o.order_id
	, o.total
	, c.country
	, c.zip
FROM 
	orders AS o
JOIN 
	customers /*+ OPTIONS('lookup.dynamic-partition'='max_pt()', 'lookup.dynamic-partition.refresh-interval'='1 h') */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON 
    o.customer_id = c.id;


+----+----------------------+--------------+--------------------------------+--------------------------------+
| op |             order_id |        total |                        country |                            zip |
+----+----------------------+--------------+--------------------------------+--------------------------------+
| +I |                 1001 |        50.00 |                            USA |                          10002 |
| +I |                 1004 |        20.00 |                         France |                          40004 |

  • 可以运行一个Flink流式作业来启动针对该paimon维表的查询服务。当QueryService存在时,Flink查找连接(Lookup Join)会优先从该服务获取数据,这将有效提高查询性能。

  • 可以通过调用sys.query_service系统函数来实现:
    CALL sys.query_service('paimon_db.customers', 4); -- 设置并行度为4或者通过下面action包开启

    <FLINK_HOME>/bin/flink run \
        /path/to/paimon-flink-action-1.0.0.jar \
        query_service \
        --warehouse <warehouse-path> \
        --database <database-name> \
        --table <table-name> \
        [--parallelism <parallelism>] \
        [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
    
  • 查询服务能够在内存中缓存频繁访问的数据,并以高并发的方式提供这些数据,减少了磁盘I/O操作和网络延迟的影响。

2 集成Mysql CDC

  • 可以通过 Flink SQL 或者 Flink DataStream API 将 Flink CDC 数据写入 Paimon 中,也可以通过Paimon 提供的 CDC 工具来完成入湖。那这两种方式有什么区别呢?

    在这里插入图片描述

  • 上图是使用 Flink SQL 来完成入湖,简单,但是当源表添加新列后,同步作业不会同步新的列,下游 Paimon 表也不会增加新列。

在这里插入图片描述

  • 上图是使用 Paimon CDC 工具来同步数据,可以看到,当源表发生列的新增后,流作业会自动新增列的同步,并传导到下游的 Paimon 表中,完成 Schema Evolution 的同步

  • Paimon CDC 工具也提供了整库同步:

    • 一个作业同步多张表,以低成本的方式同步大量小表
    • 作业里同时自动进行 Schema Evolution
    • 新表将会被自动进行同步,你不用重启作业,全自动完成
  • 推荐阅读:Flink + Paimon 数据 CDC 入湖最佳实践

2.1 MySQL一张表同步到Paimon一张表

2.1.1 环境准备

# mysql-cdc相关jar包的下载地址,同时还需要mysql-connector
https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-mysql-cdc/3.1.1/
https://repo.maven.apache.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.1.1/
https://repo.maven.apache.org/maven2/org/apache/flink/flink-cdc-common/3.1.1/

# 在flink的lib目录添加下面的jar包
[root@centos01 lib]# ll /opt/apps/flink-1.16.0/lib/
-rw-r--r--. 1 root root     259756 Feb 19 15:13 flink-cdc-common-3.1.1.jar
-rw-r--r--. 1 root root   21286022 Feb 19 11:40 flink-cdc-pipeline-connector-mysql-3.1.1.jar
-rw-r--r--. 1 root root     388680 Feb 19 10:57 flink-connector-mysql-cdc-3.1.1.jar
-rw-r--r--. 1 root root    2475087 Feb 19 10:58 mysql-connector-java-8.0.27.jar
......

# 开启MySQL Binlog并重启MySQL
[root@centos01 ~]# vim /etc/my.cnf
#添加如下配置信息,开启`cdc_db`数据库的Binlog
#数据库id
server-id = 1
##启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
#binlog类型
binlog_format=row
##启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=cdc_db

[root@centos01 ~]# systemctl restart mysqld


# 启动hadoop和yarn
[root@centos01 ~]# start-all.sh
# 启动Hive
[root@centos01 ~]# nohup hive --service metastore  2>&1 & 
[root@centos01 ~]# nohup  hive --service hiveserver2   2>&1 & 
# yarn-session模式
# http://centos01:8088/cluster中可以看到Flink session cluster的job ID
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/yarn-session.sh -d
# 启动Flink的sql-client
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/sql-client.sh -s yarn-session
  • 使用Hive的元数据
Flink SQL> CREATE CATALOG paimon_hive_catalog WITH (
    'type' = 'paimon',
    'metastore' = 'hive',
    'uri' = 'thrift://centos01:9083'
);

Flink SQL> USE CATALOG paimon_hive_catalog;
Flink SQL> create database if not exists paimon_db;
Flink SQL> use paimon_db;

# 设置显示模式
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
Flink SQL> SET 'execution.runtime-mode' = 'batch';

2.1.2 案例详解

  • 如果指定的Paimon表不存在,将自动创建表。其schema将从所有指定的MySQL表派生;
  • 如果Paimon 表已存在,则其schema将与所有指定MySQL表的schema进行比较;
  • 仅支持同步具有主键的MySQL表
  • 也可MySQL多张表同步到Paimon一张表,可以查看官网示例。
-- 准备测试数据
DROP TABLE IF EXISTS `user_info`;
CREATE TABLE `user_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `login_name` varchar(200) DEFAULT NULL COMMENT '用户名称',
  `name` varchar(200) DEFAULT NULL COMMENT '用户姓名',
  `phone_num` varchar(200) DEFAULT NULL COMMENT '手机号',
  `birthday` date DEFAULT NULL COMMENT '用户生日',
  `gender` varchar(1) DEFAULT NULL COMMENT '性别 M男,F女',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `operate_time` datetime DEFAULT NULL COMMENT '修改时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB  DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;

-- ----------------------------
-- Records of user_info
-- ----------------------------
INSERT INTO `user_info`(`login_name`, `name`, `phone_num`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES 
('zhangsan', '张三', '13800001234', '1990-01-01', 'M', NOW(), NOW()),
('lisi', '李四', '13800005678', '1992-02-02', 'F', NOW(), NOW()),
('wangwu', '王五', '13800009012', '1995-03-03', 'M', NOW(), NOW()),
('zhaoliu', '赵六', '13800003456', '1998-04-04', 'F', NOW(), NOW()),
('sunqi', '孙七', '13800007890', '2000-05-05', 'M', NOW(), NOW());


CREATE TABLE `orders` (
  `order_id` bigint(20) NOT NULL  AUTO_INCREMENT COMMENT '订单编号',
  `user_id` bigint(20) NOT NULL COMMENT '用户编号',
  `order_date` datetime DEFAULT NULL COMMENT '订单日期',
  `total_price` decimal(10,2) DEFAULT NULL COMMENT '订单总价',
  PRIMARY KEY (`order_id`) USING BTREE,
  KEY `idx_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;


INSERT INTO `orders`(`user_id`, `order_date`, `total_price`) VALUES 
(1001, '2025-02-18 14:32:10', 199.99),
(1002, '2025-02-19 09:15:30', 299.50),
(1001, '2025-02-19 12:47:05', 79.99),
(1003, '2025-02-19 13:00:00', 499.00),
(1004, '2025-02-20 08:20:25', 159.99);
# 按照天进行分区
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \
    /opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \
    mysql-sync-table \
    --warehouse hdfs://centos01:8020/user/hive/warehouse \
    --database paimon_db \
    --table ods_user_info_cdc \
    --primary-keys pt,id \
    # 指定分区键,分区键是通过函数转换而来
	--partition_keys pt \
    --computed_column 'pt=date_format(operate_time, yyyyMMdd)' \
    --mysql-conf hostname=centos01 \
    --mysql-conf username=root \
    --mysql-conf password=123456 \
    --mysql-conf database-name=cdc_db \
    --mysql-conf table-name='user_info' \
    --catalog-conf metastore=hive \
    --catalog-conf uri=thrift://centos01:9083 \
    --table-conf bucket=2 \
    --table-conf changelog-producer=input \
    --table-conf sink.parallelism=2 
  • 我们可以进行测试了
-- 此时会自动创建paimon表(无需我们手动建表了)
Flink SQL> select * from ods_user_info_cdc;
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+
| id | login_name | name |   phone_num |   birthday | gender |         create_time |        operate_time |       pt |
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+
|  3 |     wangwu | 王五 | 13800009012 | 1995-03-03 |      M | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 |
|  1 |   zhangsan | 张三 | 13800001234 | 1990-01-01 |      M | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 |
|  2 |       lisi | 李四 | 13800005678 | 1992-02-02 |      F | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 |
|  4 |    zhaoliu | 赵六 | 13800003456 | 1998-04-04 |      F | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 |
|  5 |      sunqi | 孙七 | 13800007890 | 2000-05-05 |      M | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 |
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+


-- 修改原始表的schema
mysql> ALTER TABLE user_info ADD COLUMN email varchar(255);
mysql> INSERT INTO `user_info`(`login_name`, `name`, `phone_num`, `birthday`, `gender`, `create_time`, `operate_time`, `email`)
values('hank', '汉克', '18600007890', '2000-05-05', 'M', NOW(), NOW(), 'hank@163.com');

-- 注意:我们此时在原始sql-client窗口查询,`email`字段并没有加上
Flink SQL> select * from ods_user_info_cdc;
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+
| id | login_name | name |   phone_num |   birthday | gender |         create_time |        operate_time |       pt |
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+
|  3 |     wangwu | 王五 | 13800009012 | 1995-03-03 |      M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |
|  1 |   zhangsan | 张三 | 13800001234 | 1990-01-01 |      M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |
|  2 |       lisi | 李四 | 13800005678 | 1992-02-02 |      F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |
|  4 |    zhaoliu | 赵六 | 13800003456 | 1998-04-04 |      F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |
|  5 |      sunqi | 孙七 | 13800007890 | 2000-05-05 |      M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |
|  6 |       hank | 汉克 | 18600007890 | 2000-05-05 |      M | 2025-02-19 18:01:43 | 2025-02-19 18:01:43 | 20250219 |
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+

-- 其实,此时的schema已经改变,我们查看hdfs上的schema信息会发现`email`字实际上已经添加
[root@centos01 ~]# hdfs dfs -cat /user/hive/warehouse/paimon_db.db/ods_user_info_cdc/schema/schema-1


-- 因此,我们需要另外重新启动一个sql-client窗口进行查询,可以发现是正确的
Flink SQL> select * from ods_user_info_cdc;
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+--------------+
| id | login_name | name |   phone_num |   birthday | gender |         create_time |        operate_time |       pt |        email |
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+--------------+
|  3 |     wangwu | 王五 | 13800009012 | 1995-03-03 |      M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |       <NULL> |
|  1 |   zhangsan | 张三 | 13800001234 | 1990-01-01 |      M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |       <NULL> |
|  2 |       lisi | 李四 | 13800005678 | 1992-02-02 |      F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |       <NULL> |
|  4 |    zhaoliu | 赵六 | 13800003456 | 1998-04-04 |      F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |       <NULL> |
|  5 |      sunqi | 孙七 | 13800007890 | 2000-05-05 |      M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |       <NULL> |
|  6 |       hank | 汉克 | 18600007890 | 2000-05-05 |      M | 2025-02-19 18:01:43 | 2025-02-19 18:01:43 | 20250219 | hank@163.com |
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+--------------+

注:

  • 上面自动创建的paimon表是分区表,但是在Hive中不是分区表
-- 报错
0: jdbc:hive2://192.168.42.101:10000> show partitions ods_user_info_cdc ;
Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Table ods_user_info_cdc is not a partitioned table (state=42000,code=1)
  • 要想在hive中也是分区表,需要在同步时候设置参数metastore.partitioned-table=true
# 同步到另一张paimon表,并设置hive分区参数
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \
    /opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \
    mysql-sync-table \
    --warehouse hdfs://centos01:8020/user/hive/warehouse \
    --database paimon_db \
    --table ods_user_info_partition_cdc \
    --primary-keys pt,id \
	--partition_keys pt \
    --computed_column 'pt=date_format(operate_time, yyyyMMdd)' \
    --mysql-conf hostname=centos01 \
    --mysql-conf username=root \
    --mysql-conf password=123456 \
    --mysql-conf database-name=cdc_db \
    --mysql-conf table-name='user_info' \
    --catalog-conf metastore=hive \
    --catalog-conf uri=thrift://centos01:9083 \
    --table-conf bucket=2 \
    --table-conf changelog-producer=input \
    --table-conf metastore.partitioned-table=true \
    --table-conf sink.parallelism=2 
  • 然后可以从hive中读取相关分区了
0: jdbc:hive2://192.168.42.101:10000> show partitions ods_user_info_partition_cdc ;
+--------------+
|  partition   |
+--------------+
| pt=20250219  |
+--------------+

-- 读取相关分区数据
0: jdbc:hive2://192.168.42.101:10000> select * from ods_user_info_cdc a where  pt=20250219;
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+-----------+---------------+
| a.id  | a.login_name  | a.name  | a.phone_num  | a.birthday  | a.gender  |     a.create_time      |     a.operate_time     |   a.pt    |    a.email    |
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+-----------+---------------+
| 1     | zhangsan      | 张三      | 13800001234  | 1990-01-01  | M         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | 20250219  | NULL          |
| 2     | lisi          | 李四      | 13800005678  | 1992-02-02  | F         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | 20250219  | lisi@qq.com   |
| 4     | zhaoliu       | 赵六      | 13800003456  | 1998-04-04  | F         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | 20250219  | NULL          |
| 5     | sunqi         | 孙七      | 13800007890  | 2000-05-05  | M         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | 20250219  | NULL          |
| 6     | hank          | 汉克      | 18600007890  | 2000-05-05  | M         | 2025-02-19 18:01:43.0  | 2025-02-19 18:01:43.0  | 20250219  | hank@163.com  |
| 3     | wangwu        | 王五      | 13800009012  | 1995-03-03  | M         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | 20250219  | NULL          |
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+-----------+---------------+

2.2 整库同步

  • 通过官方提供的paimon-action的jar包可以很方便的将 MySQL、Kafka、Mongo等中的数据实时摄入到Paimon中
  • 使用paimon-flink-action,采用mysql-sync-database(整库同步),并通过"–including_tables"参数选择要同步的表
  • 这种同步模式有效地节省了大量资源开销,相比每个表启动一个 Flink 任务而言,避免了资源的大量浪费。
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \
    /opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \
    mysql-sync-database \
    --warehouse hdfs://centos01:8020/user/hive/warehouse \
    --database paimon_db \
    --table-prefix "ods_" \
    --table-suffix "_mysql_cdc" \
    --mysql-conf hostname=centos01 \
    --mysql-conf username=root \
    --mysql-conf password=123456 \
    --mysql-conf database-name=cdc_db \
    --catalog-conf metastore=hive \
    --catalog-conf uri=thrift://centos01:9083 \
    --table-conf bucket=2 \
    --table-conf changelog-producer=input \
    --table-conf sink.parallelism=2 \
	--including-tables 'user_info|orders'

-- 在一个flink任务中同步多个mysql表
-- 在paimon中会自动创建多张表
Flink SQL> select * from ods_orders_mysql_cdc;
Flink SQL> select * from ods_user_info_mysql_cdc;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/972571.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

⭐ Unity 横向滑动列表 首尾相连 轮转图

效果如下&#xff1a; 场景挂载&#xff1a; 代码部分&#xff1a; using DG.Tweening; using System; using System.Collections; using System.Collections.Generic; using System.Drawing.Printing; using UnityEngine; using UnityEngine.EventSystems; using UnityEngine…

大白话实战Sentinel

Sentinel是SpringCloudAlibaba提供的用来做服务保护的框架,而服务保护的常见手段就是限流和熔断降级。在大型分布式系统里面,由于微服务众多,所以服务之间的稳定性需要做特别关注,Sentinel的核心包就提供了从多个维度去保护服务稳定的策略,而且这些保护策略都可以连接上Se…

【C语言】C语言 哈夫曼编码传输(源码+数据文件)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;专__注&#x1f448;&#xff1a;专注主流机器人、人工智能等相关领域的开发、测试技术。 C语言 哈夫曼编码传输&#xff08;源码数据文件&am…

用命令模式设计一个JSBridge用于JavaScript与Android交互通信

用命令模式设计一个JSBridge用于JavaScript与Android交互通信 在开发APP的过程中&#xff0c;通常会遇到Android需要与H5页面互相传递数据的情况&#xff0c;而Android与H5交互的容器就是WebView。 因此要想设计一个高可用的 J S B r i d g e JSBridge JSBridge&#xff0c;不…

3月营销日历:开启春日盛宴,绽放生活魅力

关键营销节点∶惊蛰、女生节、妇女节、 植树节、315消费者权益日、春分 营销关键词 养生、女生魅力、感恩女性、环保、品质 01.重点关注品类 春季服饰&#xff1a;如轻薄外套、春装等&#xff0c;适合惊蛰后的市场需求&#xff1b; 美妆护肤&#xff1a;妇女节期间&#xf…

GPT-SoVITS更新V3 win整合包

GPT-SoVITS 是由社区开发者联合打造的开源语音生成框架&#xff0c;其创新性地融合了GPT语言模型与SoVITS&#xff08;Singing Voice Inference and Timbre Synthesis&#xff09;语音合成技术&#xff0c;实现了仅需5秒语音样本即可生成高保真目标音色的突破。该项目凭借其开箱…

AI芯片:科技变革的核心驱动力

近年来&#xff0c;人工智能&#xff08;AI&#xff09;的飞速发展对众多行业产生了深远影响&#xff0c;芯片领域也不例外。AI在芯片设计、制造及应用等方面带来了革新性的改变&#xff0c;成为推动芯片行业发展的关键力量。 AI助力芯片设计效率飞升 传统芯片设计极为复杂&am…

【phpstudy】关于实现两个不同版本的mysql并存。

1.首先是先安装好两个版本的mysql mysql5.7用默认的就行 2.更改mysql8.0的配置&#xff0c;如图 3.找到mysql8.0的路径&#xff0c;看着个里面就可以知道了 4.进入后&#xff0c;可以把data里面的数据情况&#xff0c;就是把data文件夹里的东西删除&#xff08;我是先备份好了一…

Coze扣子新功能详解

今晚(2025-01-24)扣子再次进行更新 主要更新内容&#xff1a; 搭建小程序和 H5 用户界面时&#xff0c;支持使用音频组件播放音频内容 数据库操作体验提升 界面优化&#xff1a;对数据库详情界面进行了重新设计&#xff0c;并将工作流运行数据库的测试数据位置从原工作流底…

Pytorch深度学习教程_3_初识pytorch

欢迎来到《PyTorch深度学习教程》系列的第三篇&#xff01;在前面的两篇中&#xff0c;我们已经介绍了Python及numpy的基本使用。今天&#xff0c;我们将深入探索PyTorch的核心功能&#xff0c;帮助你更好地理解和使用这个强大的深度学习框架。 欢迎订阅专栏&#xff1a; 深度…

第4章 信息系统架构(二)

4.2 系统架构 信息系统架构是一种体系结构&#xff0c;它反映了一个组织信息系统的各个组成部分之间的关系&#xff0c;以及信息系统与相关业务、信息系统与相关技术之间的关系。 4.2.1 架构定义 对于大规模的复杂系统来说&#xff0c;对总体的系统结构设计比起对计算算法和…

剑指 Offer II 024. 反转链表

comments: true edit_url: https://github.com/doocs/leetcode/edit/main/lcof2/%E5%89%91%E6%8C%87%20Offer%20II%20024.%20%E5%8F%8D%E8%BD%AC%E9%93%BE%E8%A1%A8/README.md 剑指 Offer II 024. 反转链表 题目描述 给定单链表的头节点 head &#xff0c;请反转链表&#xff…

Python----数据结构(单链表:节点,是否为空,长度,遍历,添加,删除,查找)

一、链表 链表是一种线性数据结构&#xff0c;由一系列按特定顺序排列的节点组成&#xff0c;这些节点通过指针相互连接。每个节点包含两部分&#xff1a;元素和指向下一个节点的指针。其中&#xff0c;最简单的形式是单向链表&#xff0c;每个节点含有一个信息域和一个指针域&…

Java开发实习面试笔试题(含答案)

在广州一家中大公司面试&#xff08;BOSS标注是1000-9999人&#xff0c;薪资2-3k&#xff09;&#xff0c;招聘上写着Java开发&#xff0c;基本没有标注前端要求&#xff0c;但是到场知道是前后端分离人不分离。开始先让你做笔试&#xff08;12道问答4道SQL题&#xff09;&…

Docker:3、在VSCode上安装并运行python程序或JavaScript程序

1.VSCode上安装并运行python程序&#xff1a; 1.1.安装Docker插件 1.2.新建自动化脚本DockerFile FROM python:3.-slim-buster WORKDIR /app COPY .. RUN pip3 install -r requirements.txt CMD ["python3", "app.py"]COPY <本地路径><目标…

MOS管炸了,PWM“死区”时间得了解一下

从字面上来看“死区”的意思就是&#xff1a;如果处于这个区&#xff0c;那就会出现“损坏”的现象&#xff0c;直白点&#xff0c;就是“禁区”&#xff01; 实际应用中&#xff0c;比如大功率设备的电机&#xff0c;还有变频器等驱动电路&#xff0c;多部分都是采用MOS管和IG…

idea-代码补全快捷键

文章目录 前言idea-代码补全快捷键1. 基本补全2. 类型匹配补全3. 后缀补全4. 代码补全 前言 如果您觉得有用的话&#xff0c;记得给博主点个赞&#xff0c;评论&#xff0c;收藏一键三连啊&#xff0c;写作不易啊^ _ ^。   而且听说点赞的人每天的运气都不会太差&#xff0c;…

保姆级教程:利用Ollama与Open-WebUI本地部署 DeedSeek-R1大模型

1. 安装Ollama 根据自己的系统下载Ollama&#xff0c;我的是Linux&#xff0c;所以我使用如下命令进行下载安装&#xff1a; curl -fsSL https://ollama.com/install.sh | sh2. 安装Open-WebUI 使用 Docker 的方式部署 open-webui &#xff0c;使用gpu的话按照如下命令进行 …

win10 系统 自定义Ollama安装路径 及模型下载位置

win10 系统 自定义Ollama安装路径 及模型下载位置 由于Ollama的exe安装软件双击安装的时候默认是在C盘&#xff0c;以及后续的模型数据下载也在C盘&#xff0c;导致会占用C盘空间&#xff0c;所以这里单独写了一个自定义安装Ollama安装目录的教程。 Ollama官网地址&#xff1…

以教代学——费曼学习法

本文是思维导图&#xff0c;算是费曼学习法精髓我的个人总结&#xff0c;如果条件允许的话可以去看看费曼自传性质的书&#xff0c;书名见文末。 《别闹了&#xff0c;费曼先生》&#xff1a;英文书名为《Surely Youre Joking, Mr. Feynman!》&#xff0c;这是一本自传性质的书…