Fink CDC数据同步(六)数据入湖Hudi

数据入湖Hudi

Apache Hudi(简称:Hudi)使得您能在hadoop兼容的存储之上存储大量数据,同时它还提供两种原语,使得除了经典的批处理之外,还可以在数据湖上进行流处理。这两种原语分别是:

  • Update/Delete记录:Hudi使用细粒度的文件/记录级别索引来支持Update/Delete记录,同时还提供写操作的事务保证。查询会处理最后一个提交的快照,并基于此输出结果。
  • 变更流:Hudi对获取数据变更提供了一流的支持:可以从给定的时间点获取给定表中已updated/inserted/deleted的所有记录的增量流,并解锁新的查询姿势(类别)。

配置

将hudi相关jar包放在flink安装目录的lib下

hudi-flink1.16-bundle-0.13.0.jar

hudi-hadoop-mr-0.13.0.jar

hudi-hive-sync-0.13.0.jar

确保/etc/profile配置了hadoop和hive的环境变量

#HADOOP_HOME
export HADOOP_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_CONF_DIR=/usr/hdp/3.1.5.0-152/hadoop/etc/hadoop
export HADOOP_COMMON_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_HDFS_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_YARN_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_MAPRED_HOME=/usr/hdp/3.1.5.0-152/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`

#HIVE HOME
export HIVE_HOME=/usr/hdp/3.1.5.0-152/hive
export PATH=$PATH:$HIVE_HOME/bin:$HIVE_HOME/sbin

测试插入hudi表

set sql-client.execution.result-mode = tableau;
set execution.checkpointing.interval=30sec;
SET table.sql-dialect=default;

CREATE TABLE hudi_test(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',  -- 连接器指定hudi
  'path' = 'hdfs://bigdata101:8020/hudi/hudi_test',  -- 数据存储地址
  'table.type' = 'MERGE_ON_READ' -- 表类型,默认COPY_ON_WRITE,可选MERGE_ON_READ
);

INSERT INTO hudi_test VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

MySql数据写入Hudi表

建hudi表

create table hudi_user(
    id string not null,
    name string,
    birth string,
    gender string,
    primary key (id) not enforced
)
with (
    'connector' = 'hudi',
    'path' = 'hdfs://bigdata101:8020/hudi/hudi_user',
    'table.type' = 'MERGE_ON_READ',
    'write.option' = 'bulk_insert',
    'write.precombine.field' = 'id'
);

将MySql映射表的数据插入hudi表,此时会生成一个flink任务

insert into ods.hudi_user select * from mysql_user;

流式查询

上面的查询方式是非流式查询,流式查询会生成一个flink作业,并且实时显示数据源变更的数据。

流式查询(Streaming Query)需要设置read.streaming.enabled = true。再设置read.start-commit,如果想消费所有数据,设置值为earliest。

使用参数如下:

参数名称

是否必填

默认值

备注

read.streaming.enabled

FALSE

FALSE

设置为true,开启stream query

read.start-commit

FALSE

the latest commit

Instant time的格式为:’yyyyMMddHHmmss’

read.streaming_skip_compaction

FALSE

FALSE

是否不消费compaction commit,消费compaction commit会出现重复数据

clean.retain_commits

FALSE

10

当开启change log mode,保留的最大commit数量。如果checkpoint interval为5分钟,则保留50分钟的change log

建表:

create table hudi_user_read_streaming(
    id int not null ,
    name string,
    birth string,
    gender string,
    primary key (id) not enforced
)
with (
    'connector' = 'hudi',
    'path' = 'hdfs://bigdata101:8020/hudi/hudi_user',
    'table.type' = 'MERGE_ON_READ',
    'write.option' = 'bulk_insert',
    'write.precombine.field' = 'id',
    'read.streaming.enabled' = 'true',  -- 默认值false,设置为true,开启stream query
    'read.start-commit' = '20231008134557', -- start-commit之前提交的数据不显示,
    'read.streaming.check-interval' = '4'  -- 检查间隔,默认60s

);


insert into hudi_user_read_streaming select * from mysql_user;

select * from hudi_user_read_streaming;

此时,执行select 语句就会生成一个flink 作业

源端变更数据会实时展示出来


 

 系列文章

Fink CDC数据同步(一)环境部署icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017355?spm=1001.2014.3001.5502​​​​​​​
Fink CDC数据同步(二)MySQL数据同步icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017472?spm=1001.2014.3001.5501
Fink CDC数据同步(三)Flink集成Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017571?spm=1001.2014.3001.5501
Fink CDC数据同步(四)Mysql数据同步到Kafkaicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023747?spm=1001.2014.3001.5501
Fink CDC数据同步(五)Kafka数据同步Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023837?spm=1001.2014.3001.5501

Fink CDC数据同步(六)数据入湖Hudiicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023939?spm=1001.2014.3001.5502

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/374516.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Java 数据结构】泛型进阶

泛型 1 什么是泛型2 引出泛型2.1 语法 3 泛型类的使用3.1 语法3.2 示例3.3 类型推导(Type Inference) 泛型是如何编译的擦除机制裸类型4 泛型的上界4.1 语法4.2 示例4.3 复杂示例 5 泛型方法5.1 定义语法5.2 示例5.3 使用示例-可以类型推导5.4 使用示例-不使用类型推导 6 通配符…

编译原理与技术(三)——语法分析(五)自底向上-LR分析

一、自顶向下的LL(1)与自底向上的LR (一)LL(1)非递归预测分析器及分析表 (二)LR分析器及分析表 二、LR分析 举个例子。 从上面不难看出,LR分析也是由分析表驱动的。那么关键在于构造LR分析表。

眼动追踪系统体验实验(脑与认知期末考核)

实验名称:眼动追踪系统体验实验 实验目的: 本实验旨在通过体验脑与认知眼动追踪系统,了解该技术在心理学、认知科学等领域的应用,理解它是怎样工作的,探究眼动追踪技术如何揭示人类认知过程的奥秘。 实验环境&#…

1、将 ChatGPT 集成到数据科学工作流程中:提示和最佳实践

将 ChatGPT 集成到数据科学工作流程中:提示和最佳实践 希望将 ChatGPT 集成到您的数据科学工作流程中吗?这是一个利用 ChatGPT 进行数据科学的提示的实践。 ChatGPT、其继任者 GPT-4 及其开源替代品非常成功。开发人员和数据科学家都希望提高工作效率,并使用 ChatGPT 来简…

docker之程序镜像的制作

目录 一、每种资源的预安装(基础) 安装 nginx安装 redis 二、dockerfile文件制作(基础) 打包 redis 镜像 创建镜像制作空间制作dockerfile 打包 nginx 镜像 三、创建组合镜像(方式一) 生成centos容器并…

环境配置:Udacity的Self-Driving项目安装运行

前言 Udacity的自动驾驶工程师纳米学位项目(Self-Driving Car Engineer Nanodegree Program)是一项面向学习者的前沿技术项目,旨在提供全面的自动驾驶工程师培训。该项目由Udacity与自动驾驶领域的领先公司和专业人士合作开发,涵…

C++ STL精通之旅:向量、集合与映射等容器详解

目录 常用容器 顺序容器 向量vector 构造 尾接 & 尾删 中括号运算符 获取长度 清空 判空 改变长度 提前分配好空间 代码演示 运行结果 关联容器 集合set 构造 遍历 其他 代码演示 运行结果​编辑 映射map 常用方法 构造 遍历 其他 代码演示1​编…

安卓9宫格密码键盘

自定义组件 package com.zx.mocab.views;import android.content.Context; import android.graphics.Canvas; import android.graphics.Paint; import android.graphics.Path; import android.util.AttributeSet; import android.util.Log; import android.view.MotionEvent…

adb push 将电脑中的文件传输到安卓开发板

1. adb remount 重新挂载设备的文件系统,以便可以对设备进行读写操作,通常情况下,安卓开发板在连接到计算机后,设备的文件系统会被挂载为只读文件系统,重新挂载后变成可读可写权限 C:\Users\Administrator>adb re…

工业笔记本丨行业三防笔记本丨亿道加固笔记本定制丨极端温度优势

工业笔记本是专为在恶劣环境条件下工作而设计的高度耐用的计算机设备。与传统消费者级笔记本电脑相比,工业笔记本在极端温度下展现出了许多优势。本文将探讨工业笔记本在极端温度环境中的表现,并介绍其优势。 耐高温性能: 工业笔记本具有更高的耐高温性…

2月6日作业

1.现有无序序列数组为23,24,12,5,33,5347&#xff0c;请使用以下排序实现编程 函数1:请使用冒泡排序实现升序排序 函数2:请使用简单选择排序实现升序排序 函数3:请使用快速排序实现升序排序 函数4:请使用插入排序实现升序排序 #include<stdio.h> #include<string.h&…

2024-01-06-AI 大模型全栈工程师 - 机器学习基础

摘要 2024-01-06 阴 杭州 晴 本节简介: a. 数学模型&算法名词相关概念; b. 学会数学建模相关知识&#xff1b; c. 学会自我思考&#xff0c;提升认知&#xff0c;不要只会模仿&#xff1b; 课程内容 1. Fine-Tuning 有什么作用&#xff1f; a. 什么是模型训练&#xff…

计算机毕业设计 基于SpringBoot的线上教育培训办公系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

掌握Pandas数据转换利器深入解析pd.to_numeric函数与实战技巧【第63篇—python:Pandas数据】

文章目录 引言pd.to_numeric函数简介参数详解实战案例进阶应用&#xff1a;处理缺失值与异常值1. 处理缺失值2. 处理异常值 高效利用downcast参数优化内存占用优化性能&#xff1a;使用apply函数批量处理数据实战案例&#xff1a;处理时间序列数据处理多列数据&#xff1a;结合…

知识管理平台有哪些?帮助企业高效发展

在现代商业环境中&#xff0c;知识被认为是推动企业高效发展的关键因素。一个有效的知识管理平台可以帮助企业收集、整理和分享知识&#xff0c;从而提高工作效率&#xff0c;增强竞争优势。接下来&#xff0c;我将向大家推荐三款优秀的知识管理平台&#xff1a;Helplook&#…

Python初学者学习记录——python基础综合案例:数据可视化——动态柱状图

一、案例效果 通过pyecharts可以实现数据的动态显示&#xff0c;直观的感受1960~2019年世界各国GDP的变化趋势 二、通过Bar构建基础柱状图 反转x轴和y轴 标签数值在右侧 from pyecharts.charts import Bar from pyecharts.options import LabelOpts# 构建柱状图对象 bar Bar()…

理解“无意义的”JavaScript特性

概要 JavaScript可能是世界上最流行的客户端语言&#xff0c;但它远非完美&#xff0c;也不是没有怪癖。Juan Diego Rodriguez研究了几个“荒谬的”JavaScript怪癖&#xff0c;并解释了它们是如何进入语言的&#xff0c;以及如何在自己的代码中避免它们。 特性 为什么JavaS…

【并发编程】手写线程池阻塞队列

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;并发编程 ⛺️稳重求进&#xff0c;晒太阳 示意图 步骤1&#xff1a;自定义任务队列 变量定义 用Deque双端队列来承接任务用ReentrantLock 来做锁并声明两个条件变量 Condition fullWai…

PDF文件格式(一):交叉引用流

在PDF-1.5版本之前&#xff0c;对象的交叉引用信息是存储在交叉引用表(cross-reference table)中的。在PDF-1.5版本之后&#xff0c;引进了交叉引用流(cross-reference stream)对象&#xff0c;可以用它来存储对象的交叉引用信息&#xff0c;就像交叉引用表的功能一样。 采用交…

鸿蒙开发系列教程(十一)--布局应用:层叠布局

层叠布局 &#xff08;Stack&#xff09; 层叠布局&#xff08;StackLayout&#xff09;用于在屏幕上预留一块区域来显示组件中的元素&#xff0c;提供元素可以重叠的布局。层叠布局通过stack容器组件实现位置的固定定位与层叠&#xff0c;容器中的子元素&#xff08;子组件&…