摘要
Spark/Flink可以使用Hive的metastore,但是Hive无法通过Hive metastore中的Spark/Flink表直接查询数据。为了解决这个问题,可以配置使用Hive sync。在Spark/Flink操作表的时候,自动同步Hive的元数据。这样就可以通过Hive查询Hudi表的内容。
Hive metastore通过目录结构的来维护元数据,数据的更新是通过覆盖来保证事务。但是数据湖是通过追踪文件来管理元数据,一个目录中可以包含多个版本的文件。这一点和Hive元数据管理是不同的。所以说为了兼容Hive metastore,Hudi需要实时从Timeline同步元数据到Hive metastore。
环境准备
Flink 1.13.2
Hudi 0.11.1
Spark 3.1.1
Hive 3.1.2
Hive 兼容Hudi格式
复制编译后的packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.11.1.jar到各节点Hive安装目录的auxlib目录中。
进入beeline后执行:
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
set hive.stats.autogather=false;
Flink Hudi Hive Sync
如果要使用Hive Sync功能,编译时候需要激活flink-bundle-shade-hive3profile。编译命令如下所示:
mvn clean package -Dflink1.13 -Dscala2.11 -DskipTests -Pflink-bundle-shade-hive3 -T 4
Flink Hive Sync支持两种模式连接Hive:
- Hive Metastore(hms): 连接Hive Metastore 9083端口。
- JDBC: 连接HiveServer 10000端口。
两种使用方式如下所示:
-- hms 模式模板
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = '${db_path}/t1',
'table.type' = 'COPY_ON_WRITE', -- 如果使用MERGE_ON_READ,只有生成parquet文件之后,Hive才能查出数据
'hive_sync.enable' = 'true', -- 必须。启用Hive sync
'hive_sync.mode' = 'hms', -- 必须。设置模式未hms,默认为jdbc
'hive_sync.metastore.uris' = 'thrift://${ip}:9083' -- 必须。端口需要在 hive-site.xml上配置
);
-- jdbc 模式模板
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = '${db_path}/t1',
'table.type' = 'COPY_ON_WRITE', -- 如果使用MERGE_ON_READ,只有生成parquet文件之后,Hive才能查出数据
'hive_sync.enable' = 'true', -- 必须。启用Hive sync
'hive_sync.mode' = 'jdbc', -- 必须。设置模式未hms,默认为jdbc
'hive_sync.metastore.uris' = 'thrift://${ip}:9083', -- 必须。端口需要在hive-site.xml上配置
'hive_sync.jdbc_url'='jdbc:hive2://${ip}:10000', -- 必须。hiveServer端口
'hive_sync.table'='${table_name}', -- 必须。同步过去的hive表名
'hive_sync.db'='${db_name}', -- 必须。同步过去的hive表所在数据库名
'hive_sync.username'='${user_name}', -- 必须。JDBC 用户名
'hive_sync.password'='${password}' -- 必须。JDBC 密码
);
使用HMS方式配置Hive Sync:
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs:///zy/hudi/',
'table.type' = 'COPY_ON_WRITE',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://manager127:9083',
'hive_sync.table'='t1',
'hive_sync.db'='default'
);
-- 插入测试数据
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
进入beeline,执行:
show tables;
更多文章请扫码关注公众号,有问题的小伙伴也可以在公众号上提出。