Flink Hive Catalog操作案例

在此对Flink读写Hive表操作进行逐步记录,需要指出的是,其中操作Hive分区表和非分区表的DDL有所不同,以下分别记录。

基础环境

Hive-3.1.3
Flink-1.17.1

基本操作与准备

1、上传依赖jar包到flink/lib目录下

cp flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar
cp mysql-connector-j-8.1.0.jar

2、更换planner依赖(Hive集成的推荐设置)

mv /usr/sft/flink-1.17.1/opt/flink-table-planner_2.12-1.17.1.jar /usr/sft/flink-1.17.1/lib/
mv /usr/sft/flink-1.17.1/lib/flink-table-planner-loader-1.17.1.jar /usr/sft/flink-1.17.1/opt/

3、启动Hive MetaStore

nohup hive --service metastore 2>&1 &

4、启动flink集群和sql-client

yarn-session.sh -d -nm flink-cluster
sql-client.sh embedded -s yarn-session

5、在flink sql-client中创建hive catalog

CREATE CATALOG hive WITH (
    'type' = 'hive',
    'default-database' = 'sty',
    'hive-conf-dir' = '/usr/sft/hive-3.1.3/conf'
);

非分区表读写

1、Hive中建表并插入数据

create table behavior(
username string,
behavior string
);
insert into behavior values('lisi','buy'),('zhangsan','read');

2、使用hive catalog

use catalog hive;

2、flink sql-client中执行数据插入与数据查询(和常规sql一致)

insert into behavior values('sisi','buy'),('tracy','read');
select *from behavior;

在这里插入图片描述

分区表读写

这里和非分区表有所不同,主要体现在建表层面,参考博客:https://www.jianshu.com/p/295066a24092

写入到hive分区表
streamEnv需要开启checkpoint,保证flink写入hive分区表的写入一致性
hive表ddl中需要指定以下TBLPROPERTIES:
sink.partition-commit.trigger:分区提交触发器,单选,可选值为partition-time、process-time(默认), 其中partition-time需要根据当前数据的watermark来判断分区是否需要提交,当watermark + delay大于等于分区上的时间时就会提交该分区元数据;process-time的话根据当前系统处理时间来判断分区是否需要提交,当系统处理时间大于等于分区上的时间就会提交该分区元数据
partition.time-extractor.timestamp-pattern:使用partition-time触发器时使用该配置项。表示从表字段中提取出表达某个分区的时间的格式,需要提取到的时间必须为yyyy-MM-dd HH:mm:ss的格式。比如字段dt的格式为yyyy-MM-dd,则配置为$dt 00:00:00则表示分区时间取值为dt的value的0点0分0秒,可以选择多个表字段组合。当表字段无法抽取出符合的格式时,则使用自定义提取器partition.time-extractor.class。
sink.partition-commit.delay: 表示watermark允许event time的最大乱序时间,使用partition-time触发器时可以使用,默认为0s
sink.partition-commit.policy.kind:分区提交方式,多选,可选值为metastore、success-file、custom,metastore表示写入元数据库,success-file表示往hdfs分区目录写入一个标志文件,custom表示使用自定义提交方式,通常使用metastore,success-file组合
partition.time-extractor.kind:当要使用自定义分区时间提取器时需要配置此项,值配置为custom
partition.time-extractor.class:当要使用自定义分区时间提取器时需要配置此项,值配置为自定义提取器的类路径。在集群中运行时,需要把该类打成jar包放到flink lib目录下。
某个分区触发提交后,后续再有此分区的数据进来,仍然会写入hive该分区。
作者:spongebobZ
链接:https://www.jianshu.com/p/295066a24092
来源:简书

1、hive创建分区表并插入数据

create table userinfo(
name string,
age int
)
partitioned by (dt string)
stored as orc
tblproperties(
    'sink.partition-commit.trigger' = 'partition-time',
    'sink.partition-commit.policy.kind'='metastore,success-file',
    'partition.time-extractor.timestamp-pattern' ='yyyy-MM-dd HH:mm:ss',
    'sink.partition-commit.delay' = '10'
);

insert into table userInfo partition(dt='2023-10-26') values('zhangsan',23);
insert into table userInfo partition(dt='2023-10-26') values('lisi',26),('wangwu',27);

注意:若建表时未在tblproperties中配置恰当的sink.partition-commit.policy.kind,flink sql-client插入数据时将遇到如下报错:

Could not execute SQL statement. Reason:
org.apache.flink.connectors.hive.FlinkHiveException: Streaming write to partitioned hive table `hive`.`sty`.`userInfo` without providing a commit policy. Make sure to set a proper value for sink.partition-commit.policy.kind

2、flink sql-client插入与查询数据

insert into  userinfo partition(dt='2023-10-24') values('tracy',26),('lily',27);
select *from userinfo;

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/110070.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java工具库——commons-lang3的50个常用方法

未来的你,我亲爱的女孩,愿此刻无忧无虑,开心,快乐… 工具库介绍 Apache Commons Lang 3(通常简称为Commons Lang 3)是Apache Commons项目中的一个Java工具库,它提供了一系列实用的工具类和方法…

Linux|安装Nomachine

参考:2022 Nomachine 最简安装与使用指南(https://blog.csdn.net/qq_51116518/article/details/127450253) 解压 先将目录调整到压缩包所在目录,输入sudo tar zxvf nomachine_7.6.2_3_aarch64.tar.gz 添加权限 sudo chmod -R…

GZ035 5G组网与运维赛题第6套

2023年全国职业院校技能大赛 GZ035 5G组网与运维赛项(高职组) 赛题第6套 一、竞赛须知 1.竞赛内容分布 竞赛模块1--5G公共网络规划部署与开通(35分) 子任务1:5G公共网络部署与调试(15分) …

RK3568-clock

pll锁相环 总线 gating rk3568.dtsi pmucru: clock-controller@fdd00000 {compatible = "rockchip,rk3568-pmucru";reg = <0x0 0xfdd00000 0x0 0x1000>;rockchip,grf = <&grf>;rockchip,pmugrf = <&pmugrf>;#clock-cells = <1>;#re…

SQL BETWEEN运算符

SQL BETWEEN 运算符 BETWEEN运算符用于选取介于两个值之间的数据范围内的值。 BETWEEN运算符选择给定范围内的值。值可以是数字&#xff0c;文本或日期。 BETWEEN运算符是包含性的&#xff1a;包括开始和结束值&#xff0c;且开始值需小于结束值。 SQL BETWEEN 语法 SELECT …

使用 SQL 的方式查询消息队列数据以及踩坑指南

背景 为了让业务团队可以更好的跟踪自己消息的生产和消费状态&#xff0c;需要一个类似于表格视图的消息列表&#xff0c;用户可以直观的看到发送的消息&#xff1b;同时点击详情后也能查到消息的整个轨迹。 消息列表 点击详情后查看轨迹 原理介绍 由于 Pulsar 并没有关系型数…

Angular-03:组件模板

各种学习后的知识点整理归纳&#xff0c;非原创&#xff01; 组件模板 ① 数据绑定② 属性绑定③ 类名绑定④ 样式绑定⑤ 事件绑定⑥ 获取原生DOM对象6.1 在组件模板中获取6.2 在组件类中获取 ⑦ 双向数据绑定⑧ 内容投影8.1 select选择器8.2 单槽投影8.3 多槽投影 ⑨ 安全操作…

Vue图片路径问题(动态引入)

vue项目中我们经常会遇到动态路径的图片无法显示的问题&#xff0c;以下是静态路径和动态路径的常见使用方法。 1.静态路径 在日常的开发中&#xff0c;图片的静态路径通过相对路径和绝对路径的方式引入。 相对路径&#xff1a;以.开头的&#xff0c;例如./、../之类的。就是…

LangChain+LLM实战---Langchain-Chatchat概述

LangChain介绍 LangChain是个开源的框架&#xff0c;它可以让AI开发人员把像GPT-4这样的大型语言模型(LLM)和外部数据结合起来。可以简单认为LangChain是LLM领域的Spring&#xff0c;以及开源版的ChatGPT插件系统。 LangChain的强大之处不仅能通过API调用语言模型&#xff0c;…

Panda3d 介绍

Panda3d 介绍 文章目录 Panda3d 介绍Panda3d 的安装Panda3d 的坐标系统介绍Panda3d 的运行Panda3d 加载一个熊猫父节点和子节点之间的关系 验证Panda3d 的坐标系统X 轴的平移Y 轴的平移Z 轴的平移X 轴的旋转Y 轴的旋转Z 轴的旋转 Panda3D是一个3D引擎:一个用于3D渲染和游戏开发…

为什么看了那么多测试技术帖,感觉自己还是菜?

作为测试新手&#xff0c;最爱莫过于看各大牛发的技术贴&#xff0c;这篇很牛叉&#xff0c;那篇也很有道理&#xff0c;似乎自己看着看着也会成为高手。然而几年后&#xff0c;发现自己对专业知识的理解乱的很&#xff0c;里面更有很多自相矛盾的地方&#xff0c;这到底是哪里…

iTransformer: INVERTED TRANSFORMERS ARE EFFECTIVE FOR TIME SERIES FORECASTING

#论文题目&#xff1a;ITRANSFORMER: INVERTED TRANSFORMERS ARE EFFECTIVE FOR TIME SERIES FORECASTING #论文地址&#xff1a;https://arxiv.org/abs/2310.06625 #论文源码开源地址&#xff1a;https://github.com/thuml/Time-Series-Library #论文所属会议&#xff1a;Mach…

Flask基本教程以及Jinjia2模板引擎简介

flask基本使用 直接看代码吧&#xff0c;非常容易上手&#xff1a; # 创建flask应用 app Flask(__name__)# 路由 app.route("/index", methods[GET]) def index():return "FLASK&#xff1a;欢迎访问主页&#xff01;"if __name__ "__main__"…

vulnhub靶机Venus

下载地址&#xff1a;The Planets: Venus ~ VulnHub 主机发现 arp-scan -l 端口扫描 nmap --min-rate 1000 -p- 192.168.21.132 端口版本扫描 nmap -sV -sT -O -p22,8080 192.168.21.132 对于http-alt HTTP Alternative Services 介绍 | JerryQu 的小站 (imququ.com) 总结…

[迁移学习]UniDAformer域自适应全景分割网络

原文的标题为&#xff1a;UniDAformer: Unified Domain Adaptive Panoptic Segmentation Transformer via Hierarchical Mask Calibration&#xff0c;发表于CVPR2023。 一、概述 域自适应全景分割是指利用一个或多个相关域中的现成标注数据来缓解语义分割数据标注复杂的问题。…

MyBatis的增删改查

2023.10.29 本章学习MyBatis的基本crud操作。 insert java程序如下&#xff1a; ①使用map集合传参 Testpublic void testInsertCar(){SqlSession sqlSession SqlSessionUtil.openSession();//先将数据放到Map集合中&#xff0c;在sql语句中使用 #{map集合的key} 来完成传…

Steger算法实现结构光光条中心提取(python版本)

Steger算法原理 对结构光进行光条中心提取时&#xff0c;Steger算法是以Hessian矩阵为基础的。它的基础步骤如下所示&#xff1a; 从Hessian矩阵中求出线激光条纹的法线方向在光条纹法线方向上将其灰度分布按照泰勒多项式展开&#xff0c;求取的极大值即为光条在该法线方向上…

【Flutter】自定义分段选择器Slider

【Flutter】ZFJ自定义分段选择器Slider 前言 在开发一个APP的时候&#xff0c;需要用到一个分段选择器&#xff0c;系统的不满足就自己自定义了一个&#xff1b; 可以自定义节点的数量、自定义节点的大小、自定义滑竿的粗细&#xff0c;自定义气泡的有无等等… 基本上满足你…

Springboot的Container Images,docker加springboot

Spring Boot应用程序可以使用Dockerfiles容器化&#xff0c;或者使用Cloud Native Buildpacks来创建优化的docker兼容的容器映像&#xff0c;您可以在任何地方运行。 1. Efficient Container Images 很容易将Spring Boot fat jar打包为docker映像。然而&#xff0c;像在docke…

合肥中科深谷嵌入式项目实战——人工智能与机械臂(三)

订阅&#xff1a;新手可以订阅我的其他专栏。免费阶段订阅量1000 python项目实战 Python编程基础教程系列&#xff08;零基础小白搬砖逆袭) 作者&#xff1a;爱吃饼干的小白鼠。Python领域优质创作者&#xff0c;2022年度博客新星top100入围&#xff0c;荣获多家平台专家称号。…