6、Kafka同步到Hive
6.1 建映射表
通过flink sql client 建Kafka topic的映射表
CREATE TABLE kafka_user_topic(
id int,
name string,
birth string,
gender string
) WITH (
'connector' = 'kafka',
'topic' = 'flink-cdc-user',
'properties.bootstrap.servers' = '192.168.0.4:6668',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
6.2 建hive表
建hive表
CREATE TABLE ods_user (
id int,
name string,
birth string,
gender string
) STORED AS parquet TBLPROPERTIES (
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='0S',
'sink.partition-commit.policy.kind'='metastore,success-file',
'auto-compaction'='true',
'compaction.file-size'='128MB'
);
如果没有切换hive方言建hive表会报错
切换Hive方言
SET table.sql-dialect=hive;
hive表
CREATE TABLE ods_user (
id int,
name string,
birth string,
gender string
) STORED AS parquet TBLPROPERTIES (
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='0S',
'sink.partition-commit.policy.kind'='metastore,success-file',
'auto-compaction'='true',
'compaction.file-size'='128MB'
);
6.3 生成作业
生成数据
insert into ods_user
select * from kafka_user_topic;
系列文章
Fink CDC数据同步(一)环境部署https://blog.csdn.net/weixin_44586883/article/details/136017355?spm=1001.2014.3001.5502
Fink CDC数据同步(二)MySQL数据同步https://blog.csdn.net/weixin_44586883/article/details/136017472?spm=1001.2014.3001.5501
Fink CDC数据同步(三)Flink集成Hivehttps://blog.csdn.net/weixin_44586883/article/details/136017571?spm=1001.2014.3001.5501
Fink CDC数据同步(四)Mysql数据同步到Kafkahttps://blog.csdn.net/weixin_44586883/article/details/136023747?spm=1001.2014.3001.5501
Fink CDC数据同步(五)Kafka数据同步Hivehttps://blog.csdn.net/weixin_44586883/article/details/136023837?spm=1001.2014.3001.5501
Fink CDC数据同步(六)数据入湖Hudihttps://blog.csdn.net/weixin_44586883/article/details/136023939?spm=1001.2014.3001.5502