使用Flink SQL实时入湖Hudi/Hive

文章目录

  • 1 Hudi 简介
  • 2 COW和MOR
  • 3 接入COW模式Hudi表
  • 4 使用Flink SQL查看新接表
  • 5 使用Hive查看新接表
  • 6 总结

1 Hudi 简介

Hudi是一个流式数据湖平台,使用Hudi可以直接打通数据库与数据仓库,连通大数据平台,支持对数据增删改查。Hudi还支持同步数据入库,提供了事务保证、索引优化,是打造实时数仓、实时湖仓一体的新一代技术。下面以我实际工作中遇到的问题,聊下湖仓一体的好处,如有不对,敬请指正。
在这里插入图片描述
像传统关系型数据库,MySQL/Oracle等大多支持OLTP,但不支持OLAP。如果写很复杂的SQL,传统关系型数据库根本跑不动,尤其是需要跨系统/跨数据库联合查询分析,传统关系型数据库并不支持(这个可以使用Presto解决)。

在这里插入图片描述
而离线数仓无法支持实时/准实时需求,无法记录级更新,当业务表数据量很大时,无论使用增量还是全量接入Hive,对业务库都有很大压力(使用从库可缓解)。Hudi能很好解决这个问题,通过配置可以准实时的写入Hudi,并同步到Hive,相当于业务表数据准实时的同步到Hive,这时取快照或者直接当作ODS层都可,再也不用担心ODS接入延迟了。
在这里插入图片描述

2 COW和MOR

Hudi有两种表类型,COW和MOR,如果接入表读多写少可选择COW,如字典表,读少写多使用MOR。
Copy on write:写时复制,使用列式文件格式(如 parquet)存储数据。不同进程在访问同一资源的时候,只有更新操作,才会去复制一份新的数据并更新替换,否则都是访问同一个资源。
Merge on read:读时合并,使用列式+基于行的(例如avro)文件格式的组合存储数据。更新被记录到增量文件中,然后被压缩以同步或异步地生成新版本的列式文件。
在这里插入图片描述
如果Hudi表是COPY_ON_WRITE类型,那么映射的Hive表对应是指定的Hive表名,此表中存储着Hudi所有数据。

如果Hudi表类型是MERGE_ON_READ模式,那么映射的Hive表将会有2张,一张后缀为rt ,另一张表后缀为ro。后缀rt对应的Hive表中存储的是Base文件Parquet格式数据+log Avro格式数据,也就是全量数据。后缀为ro Hive表中存储的是存储的是Base文件对应的数据。

3 接入COW模式Hudi表

开发测试时,可在客户端调试

./bin/sql-client.sh embedded -s yarn-session

调试没问题后,在DolphinScheduler配置上线
在这里插入图片描述
选择FLINK_STREAM
在这里插入图片描述
根据集群类型,选择部署方式

初始化脚本
初始化脚本配置一些参数和建表

SET 'yarn.application.queue' = 'root.etl';
set execution.checkpointing.interval='300s';
SET execution.checkpointing.mode = AT_LEAST_ONCE;
-- 保存checkpoint文件的目录
set state.checkpoints.dir='hdfs://cluster/tmp/flink/checkpoints/h_account_holiday';
-- 恢复时需设置检查点 set execution.savepoint.path='hdfs://cluster/tmp/flink/checkpoints/h_account_holiday/077107d6530a1c63cb9126258cfe2546/chk-72';

set taskmanager.network.memory.buffer-debloat.enabled=true;

SET state.checkpoints.num-retained= 3; 
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;

set execution.checkpointing.min-pause = '180000';
set 'table.exec.sink.upsert-materialize' = 'NONE';
set execution.checkpointing.max-concurrent-checkpoints=1;

set akka.ask.timeout = '1200s';
set web.timeout = '500000';
set heartbeat.timeout=500000;

SET 'connector.mysql-cdc.max-connection-attempts' = '5';
SET 'connector.mysql-cdc.connection-attempts-timeout' = '1200s';

SET restart-strategy='fixed-delay';
SET restart-strategy.fixed-delay.attempts='50';
SET restart-strategy.fixed-delay.delay='1min';
SET execution.checkpointing.timeout='40min';

SET state.backend='rocksdb';
SET state.backend.incremental=true;

set high-availability='zookeeper';
set high-availability.storageDir='hdfs://cluster/tmp/flink/ha-yarn';
set high-availability.zookeeper.quorum='bigdata-093:2181,bigdata-094:2181,bigdata-ds-12-195:2181,bigdata-ds-12-198:2181,bigdata-ds-12-199:2181';
set high-availability.zookeeper.path.root='/flink_yarn';
set yarn.application-attempts='10';


CREATE CATALOG cdc_catalog WITH (
'type' = 'hive',
'default-database' = 'flink_cdc',
'hive-conf-dir' = '/opt/apps/apache-hive-2.1.1-bin/conf'
);
-- 使用刚创建的catalog
use catalog cdc_catalog;
-- 选择flink_cdc库
use flink_cdc;

drop table if exists source_account_holiday;
create table if not exists source_account_holiday(
`id` int primary key not enforced
,workday date
,week int
,next_workday date
,create_time timestamp
,update_time timestamp
) with (
'connector'='mysql-cdc',
'hostname'='10.100.xx.xx',
'port'='3306',
'server-time-zone'='Asia/Shanghai',
'server-id'='6066-6070', -- 注意同一个实例,id不要重复,数字范围要大于并行度
'username'='xxx',
'password'='xxx',
'debezium.snapshot.mode'='initial',
'database-name'='xd_account',
'table-name'='account_holiday',
'connect.timeout'='1000000'
);

drop table if exists sink_account_holiday;
create table if not exists sink_account_holiday(
`id` int primary key not enforced
,workday date
,week int
,next_workday date
,create_time string -- 注意timestamp需转成string
,update_time string -- 注意timestamp需转成string
) with (
'connector' = 'hudi',
'path' = 'hdfs://cluster/tmp/flink/hudi/sink_account_holiday',
'hoodie.datasource.write.recordkey.field'='id', -- 设置主键
'table.type'='COPY_ON_WRITE',
'write.timezone'='Asia/Shanghai',
'hive_sync.enabled'='true',
'hive_sync.mode'='hms',
'hive_sync.metastore.uris'='thrift://bigdata-003:9083,thrift://bigdata-004:9083,thrift://bigdata-009:9083,thrift://bigdata-012:9083,thrift://bigdata-008:9083,thrift://bigdata-007:9083',
'hive_sync.db'='hudi', -- 同步到hive hudi库h_account_holiday,自动建表
'hive_sync.table'='h_account_holiday',
'hive_sync.username'='hive',
'hoodie.datasource.hive_sync.omit_metadata_fields'='true'
);

脚本
从source表写入sink表

insert into sink_account_holiday
select 
 id
,workday 
,week 
,next_workday 
,date_format(create_time, 'yyyy-MM-dd HH:mm:ss') -- 注意timestamp需转成string
,date_format(update_time, 'yyyy-MM-dd HH:mm:ss') -- 注意timestamp需转成string
from source_account_holiday;

在这里插入图片描述
执行后注意看日志,成功会有Application ID 和 Job ID
在这里插入图片描述
可通过Application ID 和 Job ID查看任务运行情况
在这里插入图片描述

4 使用Flink SQL查看新接表

使用Flink SQL,可以实时看到数据更新

cd /opt/apps/flink-1.14.4/
./bin/sql-client.sh embedded -s yarn-session

embedded 内嵌模式

Flink SQL> CREATE CATALOG cdc_catalog WITH (
> 'type' = 'hive',
> 'default-database' = 'flink_cdc',
> 'hive-conf-dir' = '/opt/apps/apache-hive-2.1.1-bin/conf'
> );
log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[INFO] Execute statement succeed.

Flink SQL> use catalog cdc_catalog;
[INFO] Execute statement succeed.

Flink SQL> show databases;

Flink SQL> use hudi;
[INFO] Execute statement succeed.
Flink SQL> select * from h_account_holiday limit 10;

在这里插入图片描述

5 使用Hive查看新接表

前面初始化脚本必须配置同步到hive,hive查不了source和sink表,只能查同步到hive的表

hive> use hudi;
OK
Time taken: 2.406 seconds
hive> set role admin;
OK
Time taken: 0.093 seconds
hive> select * from h_account_holiday limit 10;
OK
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
44      2024-05-12      7       2024-05-13      2024-01-20 15:17:59     2024-01-20 15:17:59
45      2024-05-18      6       2024-05-20      2024-01-20 15:17:59     2024-01-20 15:17:59
89      2024-10-04      5       2024-10-08      2024-01-20 15:17:59     2024-01-20 15:17:59
110     2024-12-14      6       2024-12-16      2024-01-20 15:17:59     2024-01-20 15:17:59
112     2024-12-21      6       2024-12-23      2024-01-20 15:17:59     2024-01-20 15:17:59
115     2024-12-29      7       2024-12-30      2024-01-20 15:17:59     2024-01-20 15:17:59
91      2024-10-06      7       2024-10-08      2024-01-20 15:17:59     2024-01-20 15:17:59
93      2024-10-13      7       2024-10-14      2024-01-20 15:17:59     2024-01-20 15:17:59
50      2024-06-02      7       2024-06-03      2024-01-20 15:17:59     2024-01-20 15:17:59
95      2024-10-20      7       2024-10-21      2024-01-20 15:17:59     2024-01-20 15:17:59
Time taken: 0.147 seconds, Fetched: 10 row(s)

在这里插入图片描述

6 总结

使用这种方案,真正实现了湖仓一体,基本满足了实时和离线需求,且主要使用SQL,开发和维护成本较低。不过,该方案也有个问题,flink cdc 会挂,导致数据没更新,还是要多关注下。

参考链接:
https://blog.csdn.net/qq_32727095/article/details/123863620
https://zhuanlan.zhihu.com/p/471842018
https://zhuanlan.zhihu.com/p/526372429
https://blog.csdn.net/JH_Zhai/article/details/136042662
https://www.jianshu.com/p/0837ada9de76

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/598629.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

支持向量机:抽象难懂?看这里就明白了!

今天给大家分享的知识是关于支持向量机的内容,支持向量机算法是目前学习到的机器学习算法中最抽象、最难以理解的内容,不过支持向量机算法在实际使用过程中还是比较常见,无论是在医学研究还是经济研究中都能看到身影,所有&#xf…

4.4网安学习第四阶段第四周回顾(个人学习记录使用)

本周重点 ①Linux系统提权 ②Linux权限维持 ③Windows 提权 ④Windows权限维持 ⑤SSRF利用 ⑥内网环境 ⑦内网扫描 ⑧漏洞利用 ⑨内网代理 ⑩获取主机控制权其他方案 ⑩①vuln靶场 ⑩②CS代理与ICMP隧道 本周主要内容 ①Linux系统提权 系统提权是成功入侵系统之…

[数据概念|方案实操]清华数据大讲堂1-海南数据基础设施建设思考与实践

“ 全国最大自贸区在数据要素市场改革中都做了什么?” 如鼹鼠哥上一篇文章所介绍,4月17日,在清华公管学院,由杭州数据局局长 徐青山 给大家做了题为《数据要素市场化配置改革杭州实践与思考》的报告,鼹鼠哥自己的一点感…

暗区突围pc端资格发放了吗 暗区突围pc测试资格怎么获取

暗区突围pc端资格发放了吗 暗区突围pc测试资格怎么获取 暗区突围是一款很火爆的第一人称射击网游,现在终于要上线PC端啦!小伙伴们是不是已经迫不及待想要体验电脑上的硬核射击快感了?暗区突围pc端资格已经陆续发放,想要参与PC端…

Excel办公之if函数-是非之争

IF函数是Excel中功能强大的函数,可以帮助用户根据逻辑条件判断并返回不同的值,广泛应用于数据分析、数据处理、报表制作等场景,是日常办公中必不可少的工具。 语法: IF(logical_test, value_if_true, value_if_false) 其中&…

晶振负载对系统有什么影响?

电子系统中,晶振(晶体振荡器)是确保系统各部分同步工作的关键组件。然而,晶振的性能受到其负载电容大小的显著影响。本文将详细探讨晶振负载电容对系统性能的影响,并给出相应的解决方案。 一、晶振负载电容的作用 晶…

药物代谢动力学学习笔记

一、基本概念 二、经典房室模型 三、非线性药物代谢动力学 四、非房室模型 五、药代动力学与药效动力学 六、生物等效性评价 七、生物样品分析方法 基本概念 生物样品:生物机体的全血、血浆、血清、粪便、尿液或其他组织的样品 特异性,specificity&…

服务器关机前未退出xampp导出MySQL无法启动

背景解决 五一放假,服务器关机了,但是关机前没有正常关闭数据库服务,导致数据库无法启动! 查看错误日志如下 从报错信息可以看出是MySQL这个服务相关文件出现问题了,解决思路:重新安装xampp 重新安装xam…

IT 项目管理介绍和资料汇总

IT项目管理到底是什么?是对组织承担的任何信息技术项目的成功监督。IT项目经理负责规划、预算、执行、领导、故障排除和维护这些项目。IT项目经理可能会做的事情包括: 1、硬件安装 2、软件、网站和应用程序开发 3、网络和云计算解决方案的升级和/或推出…

Python轴承故障诊断 (18)基于CNN-TCN-Attention的创新诊断模型

往期精彩内容: Python-凯斯西储大学(CWRU)轴承数据解读与分类处理 Python轴承故障诊断 (一)短时傅里叶变换STFT Python轴承故障诊断 (二)连续小波变换CWT_pyts 小波变换 故障-CSDN博客 Python轴承故障诊断 (三)经验模态分解EMD_轴承诊断 …

H5页面跳转去微信的客服页面不需要添加客服就可以直接聊天

我并没有添加客服的微信。但是页面直接跳转了进来。可以直接聊天。 首先你公司要有个企业微信。然后登陆公司的企业微信。搜索框找到应用里面的企业客服 然后你就看到了客服账号的接入连接。代码上直接写个 <div οnclick"window.location.href接入链接粘贴到这里&q…

关闭前端统一请求库设计与落地

前言 对于一个前端工程师而言&#xff0c;每天都在面对的较多的需求场景就是调用后端的接口&#xff0c;但是因为众所周知的原因&#xff0c;前端目前已经有无数种调用接口的方式&#xff0c;例如&#xff1a;之前有基于 XHR、Axios、Fetch 进行封装的工具&#xff0c;大家都试…

有没有电脑桌面监控软件|十大电脑屏幕监控软件超全盘点!

当然&#xff0c;目前市场上有许多电脑桌面监控软件可供选择&#xff0c;它们各有特色&#xff0c;旨在满足不同企业和个人对于远程监控、安全管理、提高工作效率等方面的需求。以下是根据近期资料整理的十大电脑屏幕监控软件盘点&#xff0c;包括它们的一些特点和优势&#xf…

Web3:下一代互联网的科技进化

随着科技的不断演进&#xff0c;互联网已经成为了我们生活中不可或缺的一部分。而在Web3时代&#xff0c;我们将会见证互联网进化的下一个阶段。本文将探讨Web3作为下一代互联网的科技进化&#xff0c;以及它所带来的重要变革和影响。 传统互联网的局限性 传统互联网存在诸多…

如何从零开始学习数据结构?

在开始前我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「数据结构的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#xff01;数据结构 算法&#xff1d;程…

MySQL日志机制【undo log、redo log、binlog 】

前言 SQL执行流程图文分析&#xff1a;从连接到执行的全貌_一条 sql 执行的全流程?-CSDN博客文章浏览阅读1.1k次&#xff0c;点赞20次&#xff0c;收藏12次。本文探讨 MySQL 执行一条 SQL 查询语句的详细流程&#xff0c;从连接器开始&#xff0c;逐步介绍了查询缓存、解析 S…

xmind的13个快捷方式

1.新建导图 CtrlshiftN 2.编辑文字 空格键 3.插入图片 Ctrli 4. 插入主题 Enter键 5. 插入主题之前 ShiftEnter键 6. 插入子主题 Tab键 7. 放大导图 “Ctrl”“” 8. 缩小导图 “Ctrl”“-” 9. 复制 CtrlInsert 10. 粘贴 Shift Insert 11. 剪切 ShiftDelete 12. 截图 F7 13. 保…

【Pytorch】5.DataLoder的使用

什么是DataLoader 个人理解是&#xff0c;如果Dataset的所有数据相当于一副扑克牌&#xff0c;DataLoader就相当于从扑克牌中抽取几张&#xff0c;我们可以规定一次抽取的张数&#xff0c;或者以什么规则进行抽取 DataLoader的使用 查阅官网的文档&#xff0c;主要有这几个参数…

Unity Shader中获取像素点深度信息

1.顶点着色器中对深度进行计算 v2f vert(appdata v) {v2f o;o.pos UnityObjectToClipPos(v.vertex);o.uv TRANSFORM_TEX(v.uv, _MainTex);o.depth (o.pos.z / o.pos.w 1.0) * 0.5; // Normalize depth to [0, 1]return o; }但是达不到预期&#xff0c;最后返回的值一直大于…

SpringMVC响应数据

三、SpringMVC响应数据 3.1 handler方法分析 理解handler方法的作用和组成&#xff1a; /*** TODO: 一个controller的方法是控制层的一个处理器,我们称为handler* TODO: handler需要使用RequestMapping/GetMapping系列,声明路径,在HandlerMapping中注册,供DS查找!* TODO: ha…