Apache Doris 整合 FLINK 、 Hudi 构建湖仓一体的联邦查询入门

1.概览

多源数据目录(Multi-Catalog)功能,旨在能够更方便对接外部数据目录,以增强Doris的数据湖分析和联邦数据查询能力。

在之前的 Doris 版本中,用户数据只有两个层级:Database 和 Table。当我们需要连接一个外部数据目录时,我们只能在Database 或 Table 层级进行对接。比如通过 create external table 的方式创建一个外部数据目录中的表的映射,或通过 create external database 的方式映射一个外部数据目录中的 Database。如果外部数据目录中的 Database 或 Table 非常多,则需要用户手动进行一一映射,使用体验不佳。

而新的 Multi-Catalog 功能在原有的元数据层级上,新增一层Catalog,构成 Catalog -> Database -> Table 的三层元数据层级。其中,Catalog 可以直接对应到外部数据目录。目前支持的外部数据目录包括:

  1. Apache Hive
  2. Apache Iceberg
  3. Apache Hudi
  4. Elasticsearch
  5. JDBC: 对接数据库访问的标准接口(JDBC)来访问各式数据库的数据。
  6. Apache Paimon(Incubating)

该功能将作为之前外表连接方式(External Table)的补充和增强,帮助用户进行快速的多数据目录联邦查询。

这篇教程将展示如何使用 Flink + Hudi + Doris 构建实时湖仓一体的联邦查询分析,Doris 2.0.3 版本提供了 的支持,本文主要展示 Doris 和 Hudi 怎么使用,同时本教程整个环境是都基于伪分布式环境搭建,大家按照步骤可以一步步完成。完整体验整个搭建操作的过程。

2. 环境

本教程的演示环境如下:

  1. Centos7
  2. Apache doris 2.0.2
  3. Hadoop 3.3.3
  4. hive 3.1.3
  5. Fink 1.17.1
  6. Apache hudi 0.14
  7. JDK 1.8.0_311

3. 安装

  1. 下载 Flink 1.17.1
    wget https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
    ## 解压安装
    tar zxf flink-1.17.1-bin-scala_2.12.tgz
  2. 下载 Flink 和 Hudi 相关的依赖
wget https://repo1.maven.org/maven2/org/apache/flink/flink-table-planner_2.12/1.17.1/flink-table-planner_2.12-1.17.1.jar
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-hive-sync-bundle/0.14.0/hudi-hive-sync-bundle-0.14.0.jar
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink1.17-bundle/0.14.0/hudi-flink1.17-bundle-0.14.0.jar
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-hadoop-mr-bundle/0.14.0/hudi-hadoop-mr-bundle-0.14.0.jar

将上面这些依赖下载到 flink-1.17.1/lib 目录,然后将之前的 flink-table-planner-loader-1.17.1.jar 删除或者移除。

3. 创建 Hudi 表并写入数据

3.1 启动 Flink

bin/start-cluster.sh

启动 Flink client

./bin/sql-client.sh embedded shell

#设置返回结果模式为tableau,让结果直接显示

set sql-client.execution.result-mode=tableau;

3.2 启动 Hive MetaStore 和 HiveServer

nohup ./bin/hive --service hiveserver2 >/dev/null 2>&1  &
nohup ./bin/hive --service metastore >/dev/null 2>&1  &

3.3 创建 Hudi 表

我们来创建 Hudi 表,我们这里使用 Hive MetaStore Service 来保存 Hudi 的元数据。

CREATE TABLE table1(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
  'connector'='hudi',
  'path' = 'hdfs://localhost:9000/user/hive/warehouse/demo.db',
  'table.type'='COPY_ON_WRITE',       
  'hive_sync.enable'='true',           
  'hive_sync.table'='hudi_hive',        
  'hive_sync.db'='demo',            
  'hive_sync.mode' = 'hms',         
  'hive_sync.metastore.uris' = 'thrift://192.168.31.54:9083' 
);
  1. 'table.type'='COPY_ON_WRITE', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
  2. 'hive_sync.enable'='true', -- required,开启hive同步功能
  3. 'hive_sync.table'='${hive_table}', -- required, hive 新建的表名
  4. 'hive_sync.db'='${hive_db}', -- required, hive 新建的数据库名
  5. 'hive_sync.mode' = 'hms', -- required, 将hive sync mode设置为hms, 默认jdbc
  6. 'hive_sync.metastore.uris' = 'thrift://ip:9083' -- required, metastore的端口

写入数据:

INSERT INTO table1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

通过 Flink 查询 Hudi 表的数据

SELECT * FROM TABLE1

我们可以查看 HDFS 上这个数据文件已经存在,在 hive client 下也可以看到这表

hive> use demo;
OK
Time taken: 0.027 seconds
hive> show tables;
OK
hudi_hive

4. Doris On Hudi

Doris 操作访问 Hudi 的数据很简单,我们只需要创建一个 catalog 就可以,不需要再想之前一样写一个完整的建表语句,同时当 Hudi 数据源中增删表或者增删字段,Doris 这边可以通过配置自动刷新或者手动刷新Catalog 自动感知。

下面我们在Doris 下创建一个 Catalog 来访问 Hudi 外部表的数据

CREATE CATALOG hudi PROPERTIES (
    'type'='hms',
    'hive.metastore.uris' = 'thrift://192.168.31.54:9083'
);

这里我们上面Hudi的元数据是使用HMS存储的,我们创建的时候只需要指定上面两个信息即可,如果你的HDFS是高可用的,你需要添加NameNode HA的信息:

'hadoop.username' = 'hive',
'dfs.nameservices'='your-nameservice',
'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007',
'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007',
'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'

具体参照Doris 官网文档

创建成功之后我们可以通过下面的红框标识出来的步骤去看到 Hudi 的表。

执行查询 Hudi 表:

将 Hudi 表里的数据迁移到 Doris

这里我们先创建好 Doris的表,建表语句如下:

CREATE TABLE doris_hudi(
  uuid VARCHAR(20) ,
  name VARCHAR(10),
  age INT,
  ts datetime(3),
  `partition` VARCHAR(20)
)
UNIQUE KEY(`uuid`)
DISTRIBUTED BY HASH(`uuid`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true"
);

通过 Insert Select 语句将 Hudi 数据迁移到 Doris :

insert into doris_hudi select uuid,name,age,ts,partition from hudi.demo.hudi_hive;

查询 Doris 表

mysql> select * from doris_hudi;
+------+---------+------+-------------------------+-----------+
| uuid | name    | age  | ts                      | partition |
+------+---------+------+-------------------------+-----------+
| id1  | Danny   |   23 | 1970-01-01 08:00:01.000 | par1      |
| id2  | Stephen |   33 | 1970-01-01 08:00:02.000 | par1      |
| id3  | Julian  |   53 | 1970-01-01 08:00:03.000 | par2      |
| id4  | Fabian  |   31 | 1970-01-01 08:00:04.000 | par2      |
| id5  | Sophia  |   18 | 1970-01-01 08:00:05.000 | par3      |
| id6  | Emma    |   20 | 1970-01-01 08:00:06.000 | par3      |
| id7  | Bob     |   44 | 1970-01-01 08:00:07.000 | par4      |
| id8  | Han     |   56 | 1970-01-01 08:00:08.000 | par4      |
+------+---------+------+-------------------------+-----------+
8 rows in set (0.02 sec)

我们那还可以通过 CATS方式将 hudi数据迁移到Doris,Doris 自动完成建表

create table doris_hudi_01
PROPERTIES("replication_num" = "1")  as  
select uuid,name,age,ts,`partition` from hudi.demo.hudi_hive;

5. 总结

是不是使用非常简单,快快体验Doris 湖仓一体,联邦查询的能力,来加速你的数据分析性能

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/194069.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

嵌入式八股 | 笔试面试 | 校招秋招 | 详细讲解

〇、前言 作者:赛博二哈 本嵌入式八股撰写初衷:当时求职翻遍了我能找到的所有八股,不论是嵌入式的,计算机基础的,C艹的,都很难看下去,细究其原因,有个最大的痛点: 大部…

Python读取Ansible playbooks返回信息

一.背景及概要设计 当公司管理维护的服务器到达一定规模后,就必然借助远程自动化运维工具,而ansible是其中备选之一。Ansible基于Python开发,集合了众多运维工具(puppet、chef、func、fabric)的优点&#x…

使用opencv的matchTemplate进行银行卡卡号识别

![字体文件](https://img-blog.csdnimg.cn/3a16c87cf4d34aceb0778c4b20ddadb2.png#pic_center import cv2 import numpy as npdef show_img(img, name"temp"):img cv2.resize(img, (0, 0), fx3, fy3)cv2.imshow(name, img)cv2.waitKey(0)cv2.destroyAllWindows()de…

242. 有效的字母异位词

这篇文章会收录到 :算法通关村第十二关-白银挑战字符串经典题目-CSDN博客 242. 有效的字母异位词 描述 : 给定两个字符串 s 和 t ,编写一个函数来判断 t 是否是 s 的字母异位词。 注意:若 s 和 t 中每个字符出现的次数都相同,则称 s 和 t …

VsCode 调试 MySQL 源码

1. 启动 MySQL 2. 查看 MySQL 进程号 [root ~]# ps -ef | grep mysqld root 21479 1 0 Nov01 ? 00:00:00 /bin/sh /usr/local/mysql/bin/mysqld_safe --datadir/usr/local/mysql/data --pid-file/usr/local/mysql/data/mysqld.pid root 26622 21479 0 …

【JDK21】详解虚拟线程

目录 1.概述 2.虚拟线程是为了解决哪些问题 2.1.线程切换的巨大代价 2.2.哪些情况会造成线程的切换 2.3.线程资源是有限的 3.虚拟线程 4.适用场景 1.概述 你发任你发,我用JAVA8?JDK21可能要对这句话say no了。 现在Oracle JDK是每4个版本&#x…

minio分布式存储系统

目录 拉取docker镜像 minio所需要的依赖 文件存放的位置 手动上传文件到minio中 工具类上传 yml配置 config类 service类 启动类 测试类 图片 视频 删除minio服务器的文件 下载minio服务器的文件 拉取docker镜像 拉取稳定版本:docker pull minio/minio:RELEASE.20…

FLASK博客系列6——数据库之谜

我们上一篇已经实现了简易博客界面,你还记得我们的博客数据是自己手动写的吗?但实际应用中,我们是不可能这样做的。大部分程序都需要保存数据,所以不可避免要使用数据库。我们这里为了简单方便快捷,使用了超级经典的SQ…

MySOL常见四种连接查询

1、内联接 &#xff08;典型的联接运算&#xff0c;使用像 或 <> 之类的比较运算符&#xff09;。包括相等联接和自然联接。 内联接使用比较运算符根据每个表共有的列的值匹配两个表中的行。例如&#xff0c;检索 students和courses表中学生标识号相同的所有行。 2、…

linux下的工具---vim

一、了解vim 1、vim是linux的开发工具 2、vi/vim的区别简单点来说&#xff0c;它们都是多模式编辑器&#xff0c;不同的是vim是vi的升级版本&#xff0c;它不仅兼容vi的所有指令&#xff0c;而且还有一些新的特性在里面。例如语法加亮&#xff0c;可视化操作不仅可以在终端运行…

前端OFD文件预览(vue案例cafe-ofd)

0、提示 下面只有vue的使用示例demo &#xff0c;官文档参考 cafe-ofd - npm 其他平台可以参考 ofd - npm 官方线上demo: ofd 1、安装包 npm install cafe-ofd --save 2、引入 import cafeOfd from cafe-ofd import cafe-ofd/package/index.css Vue.use(cafeOfd) 3、使…

计算机服务器中了mallox勒索病毒如何处理,mallox勒索病毒解密文件恢复

科技技术的发展推动了企业的生产运营&#xff0c;网络技术的不断应用&#xff0c;极大地方便了企业日常生产生活&#xff0c;但网络毕竟是一把双刃剑&#xff0c;网络安全威胁一直存在&#xff0c;近期&#xff0c;云天数据恢复中心接到很多企业的求助&#xff0c;企业的计算机…

文件夹重命名技巧:如何整理过长且混乱的文件夹名称

当浏览计算机文件夹时&#xff0c;有时候会遇到一些过长且混乱的文件夹名称&#xff0c;给文件夹管理带来不便。倘若手动修改文件夹名称会出现错误的机率过大&#xff0c;且这样操作太耗费时间和精力。有什么方法能够避免手动修改文件夹名称&#xff0c;提升工作效率的方法呢&a…

万字详解,和你用RAG+LangChain实现chatpdf

像chatgpt这样的大语言模型(LLM)可以回答很多类型的问题,但是,如果只依赖LLM,它只知道训练过的内容,不知道你的私有数据:如公司内部没有联网的企业文档,或者在LLM训练完成后新产生的数据。(即使是最新的GPT-4 Turbo,训练的数据集也只更新到2023年4月)所以,如果我们…

leetCode 841. 钥匙和房间 图遍历 深度优先遍历+广度优先遍历 + 图解

841. 钥匙和房间 - 力扣&#xff08;LeetCode&#xff09; 有 n 个房间&#xff0c;房间按从 0 到 n - 1 编号。最初&#xff0c;除 0 号房间外的其余所有房间都被锁住。你的目标是进入所有的房间。然而&#xff0c;你不能在没有获得钥匙的时候进入锁住的房间。当你进入一个房…

Android 12 打开网络ADB并禁用USB连接ADB

平台 RK3588 Android 12 Android 调试桥 (adb) Android 调试桥 (adb) 是一种功能多样的命令行工具&#xff0c;可让您与设备进行通信。adb 命令可用于执行各种设备操作&#xff0c;例如安装和调试应用。adb 提供对 Unix shell&#xff08;可用来在设备上运行各种命令&am…

保护IP地址不被窃取的几种方法

随着互联网的普及和信息技术的不断发展&#xff0c;网络安全问题日益凸显。其中&#xff0c;保护个人IP地址不被窃取成为了一个重要的问题。IP地址是我们在互联网上的身份标识&#xff0c;如果被他人获取&#xff0c;就可能导致个人隐私泄露、计算机受到攻击等一系列问题。因此…

笔记62:注意力汇聚 --- Nadaraya_Watson 核回归

本地笔记地址&#xff1a;D:\work_file\&#xff08;4&#xff09;DeepLearning_Learning\03_个人笔记\3.循环神经网络\第10章&#xff1a;动手学深度学习~注意力机制 a a a a a a a a a a a a a a a a

常见面试题-Netty中ByteBuf类

了解 Netty 中的 ByteBuf 类吗&#xff1f; 答&#xff1a; 在 Java NIO 编程中&#xff0c;Java 提供了 ByteBuffer 作为字节缓冲区类型&#xff08;缓冲区可以理解为一段内存区域&#xff09;&#xff0c;来表示一个连续的字节序列。 Netty 中并没有使用 Java 的 ByteBuff…

SpringBoot详解

一、介绍 Spring Boot 是一个基于 Spring 框架的开源框架&#xff0c;用于构建微服务和 Web 应用程序。它可以帮助开发者轻松创建独立的、基于 Spring 的应用程序&#xff0c;并在较短的时间内完成项目的开发。 二、核心 1. 约定大于配置 Spring Boot 通过自动化配置、约定优…