Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门

1.概览

多源数据目录(Multi-Catalog)功能,旨在能够更方便对接外部数据目录,以增强Doris的数据湖分析和联邦数据查询能力。

在之前的 Doris 版本中,用户数据只有两个层级:Database 和 Table。当我们需要连接一个外部数据目录时,我们只能在Database 或 Table 层级进行对接。比如通过 create external table 的方式创建一个外部数据目录中的表的映射,或通过 create external database 的方式映射一个外部数据目录中的 Database。如果外部数据目录中的 Database 或 Table 非常多,则需要用户手动进行一一映射,使用体验不佳。

而新的 Multi-Catalog 功能在原有的元数据层级上,新增一层Catalog,构成 Catalog -> Database -> Table 的三层元数据层级。其中,Catalog 可以直接对应到外部数据目录。目前支持的外部数据目录包括:

  1. Apache Hive
  2. Apache Iceberg
  3. Apache Hudi
  4. Elasticsearch
  5. JDBC: 对接数据库访问的标准接口(JDBC)来访问各式数据库的数据。
  6. Apache Paimon(Incubating)

该功能将作为之前外表连接方式(External Table)的补充和增强,帮助用户进行快速的多数据目录联邦查询。

这篇教程将展示如何使用 Flink + paimon + Doris 构建实时湖仓一体的联邦查询分析,Doris 2.0.3 版本提供了 的支持,本文主要展示 Doris 和 paimon 怎么使用,同时本教程整个环境是都基于伪分布式环境搭建,大家按照步骤可以一步步完成。完整体验整个搭建操作的过程。

2. 环境

本教程的演示环境如下:

  1. Apache doris 2.0.2
  2. Hadoop 3.3.3
  3. hive 3.1.3
  4. Fink 1.17.1
  5. Apache paimon 0.5.0
  6. JDK 1.8.0_311

3. 安装

  1. 下载 Flink 1.17.1
    wget https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
    ## 解压安装
    tar zxf flink-1.17.1-bin-scala_2.12.tgz
  2. 下载相关的依赖到 Flink/lib 目录
cp /Users/zhangfeng/hadoop/hadoop-3.3.6/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.6.jar ./lib/
wget https://repo1.maven.org/maven2/org/apache/paimon/paimon-flink-1.17/0.5.0-incubating/paimon-flink-1.17-0.5.0-incubating.jar
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.17.1/flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar
  1. 配置并启动 Flink

配置环境变量,修改flink-conf.yaml配置文件

env.java.opts.all: "-Dfile.encoding=UTF-8"
classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 3
execution.checkpointing.interval: 10s
state.backend: rocksdb
state.checkpoints.dir: hdfs://zhangfeng:9000/flink/myckp
state.savepoints.dir: hdfs://zhangfeng:9000/flink/savepoints
state.backend.incremental: true

启动 Flink

bin/start-cluster.sh
bin/sql-client.sh embedded 
set 'sql-client.execution.result-mode' = 'tableau';

Catalog

Paimon Catalog可以持久化元数据,当前支持两种类型的metastore

  • 文件系统(默认):将元数据和表文件存储在文件系统中。
  • hive:在hive metastore存储元数据,用户可以直接从hive访问表。

文件系统

下面的 Flink SQL 注册并使用一个名为 paimon_catalog 的catalog。元数据和表文件存放在hdfs://localhost:9000/paimon/data下

CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://localhost:9000/paimon/data'
);

show catalogs;

Hive Catalog

我们也可以直接使用 hive metastore 来存储 paimon 元数据。

下面是创建语句

CREATE CATALOG paimon_hive WITH (
    'type' = 'paimon',
    'metastore' = 'hive',
    'uri' = 'thrift://localhost:9083',
    'hive-conf-dir' = '/Users/zhangfeng/hadoop/apache-hive-3.1.3-bin/conf/', 
    'warehouse' = 'hdfs://localhost:9000/paimon/hive'
);
show catalogs;

创建 paimon 表

USE CATALOG paimon_hive;
CREATE TABLE test_paimon_01 (
  userid BIGINT,
  age INT,
  address STRING,
  regiter_dt STRING  ,
  PRIMARY KEY(userid, regiter_dt) NOT ENFORCED
) PARTITIONED BY (regiter_dt);

show tables

4. 同步MySQL 数据到 Paimon表

下面我们演示怎么基于Flink CDC 快速实时同步 MySQL 表的数据到 Paimon表里。

这里首先你的MySQL 数据库要开启 binlog,具体的方法网上很多,这里不在叙述。

MySQL 表:

CREATE DATABASE emp_1;
 USE emp_1;
CREATE TABLE employees_1 (
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);


INSERT INTO `employees_1` VALUES  (10055,'1956-06-06','Georgy','Dredge','M','1992-04-27'),
(10056,'1961-09-01','Brendon','Bernini','F','1990-02-01'),
(10057,'1954-05-30','Ebbe','Callaway','F','1992-01-15'),
(10058,'1954-10-01','Berhard','McFarlin','M','1987-04-13'),
(10059,'1953-09-19','Alejandro','McAlpine','F','1991-06-26'),
(10060,'1961-10-15','Breannda','Billingsley','M','1987-11-02'),
(10061,'1962-10-19','Tse','Herber','M','1985-09-17'),
(10062,'1961-11-02','Anoosh','Peyn','M','1991-08-30'),
(10063,'1952-08-06','Gino','Leonhardt','F','1989-04-08'),
(10064,'1959-04-07','Udi','Jansch','M','1985-11-20'),
(10065,'1963-04-14','Satosi','Awdeh','M','1988-05-18'),
(10066,'1952-11-13','Kwee','Schusler','M','1986-02-26'),
(10067,'1953-01-07','Claudi','Stavenow','M','1987-03-04'),
(10068,'1962-11-26','Charlene','Brattka','M','1987-08-07'),
(10069,'1960-09-06','Margareta','Bierman','F','1989-11-05'),
(10070,'1955-08-20','Reuven','Garigliano','M','1985-10-14'),
(10071,'1958-01-21','Hisao','Lipner','M','1987-10-01'),
(10072,'1952-05-15','Hironoby','Sidou','F','1988-07-21'),
(10073,'1954-02-23','Shir','McClurg','M','1991-12-01'),
(10074,'1955-08-28','Mokhtar','Bernatsky','F','1990-08-13'),
(10075,'1960-03-09','Gao','Dolinsky','F','1987-03-19'),
(10076,'1952-06-13','Erez','Ritzmann','F','1985-07-09'),
(10077,'1964-04-18','Mona','Azuma','M','1990-03-02'),
(10078,'1959-12-25','Danel','Mondadori','F','1987-05-26'),
(10079,'1961-10-05','Kshitij','Gils','F','1986-03-27'),
(10080,'1957-12-03','Premal','Baek','M','1985-11-19'),
(10081,'1960-12-17','Zhongwei','Rosen','M','1986-10-30'),
(10082,'1963-09-09','Parviz','Lortz','M','1990-01-03'),
(10083,'1959-07-23','Vishv','Zockler','M','1987-03-31'),
(10084,'1960-05-25','Tuval','Kalloufi','M','1995-12-15');

在Flink sql-client 下创建 MySQL CDC 表:

CREATE TABLE employees_source (
    database_name STRING METADATA VIRTUAL,
    table_name STRING METADATA VIRTUAL,
    emp_no int NOT NULL,
    birth_date date,
    first_name STRING,
    last_name STRING,
    gender STRING,
    hire_date date,
    PRIMARY KEY (`emp_no`) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = 'zhangfeng',
    'database-name' = 'emp_1',
    'table-name' = 'employees_1'
  );

使用Create table as select 创建Paimon表,并将数据实时同步到Paimon表里:

create table mysql_to_paimon_01 as select * from default_catalog.default_database.employees_source;

查看Job

我们这个时候可以在Flink sql-client 下查询 paimon ,看到 Paimon 表里已经有数据了。

5. Doris On Paimon

Doris 提供了 Paimon 的 catalog 支持,我们可以通过这种方式,通过Doris 快速的去读 Paimon 表的数据,同时也可以通过 catalog 方式将 paimon 表的数据迁移到 Doris 表里

5.1 Doris 整合查询Paimon表

首先我们创建 Paimon catalog,有两种方式:

  1. 一种是基于 Hive metastore service
  2. 一种是基于 HDFS 文件系统
CREATE CATALOG `paimon_hdfs` PROPERTIES (
    "type" = "paimon",
    "warehouse" = "hdfs://localhost:9000/paimon/hive",
    "hadoop.username" = "hadoop"
);


CREATE CATALOG `paimon_hms` PROPERTIES (
    "type" = "paimon",
    "paimon.catalog.type" = "hms",
    "warehouse" = "hdfs://localhost:9000/paimon/hive",
    "hive.metastore.uris" = "thrift://localhost:9083"
);

创建成功之后我们通过 show catalogs方式可以看到我们创建好的 paimon catalog;

mysql> show catalogs;
+-----------+-------------+----------+-----------+-------------------------+---------------------+------------------------+
| CatalogId | CatalogName | Type     | IsCurrent | CreateTime              | LastUpdateTime      | Comment                |
+-----------+-------------+----------+-----------+-------------------------+---------------------+------------------------+
|   1308010 | hive        | hms      |           | 2023-11-17 09:42:22.872 | 2023-11-17 09:42:46 | NULL                   |
|   1326307 | hudi        | hms      |           | 2023-11-27 11:33:22.231 | 2023-11-27 11:33:35 | NULL                   |
|         0 | internal    | internal |           | UNRECORDED              | NULL                | Doris internal catalog |
|     35689 | jdbc        | jdbc     |           | 2023-11-03 12:52:24.695 | 2023-11-03 12:52:59 | NULL                   |
|     38003 | mysql       | jdbc     |           | 2023-11-07 11:46:40.006 | 2023-11-07 11:46:54 | NULL                   |
|   1329142 | paimon_hdfs | paimon   |           | 2023-11-27 14:06:13.744 | 2023-11-27 14:06:41 |                        |
|   1328144 | paimon_hms  | paimon   | yes       | 2023-11-27 14:00:32.925 | 2023-11-27 14:00:44 | NULL                   |
+-----------+-------------+----------+-----------+-------------------------+---------------------+------------------------+
7 rows in set (0.00 sec)

切换 paimon catalog,通过下面这些操作我们可以看到我们在 paimon 里创建的表

mysql> switch  paimon_hdfs;
Query OK, 0 rows affected (0.00 sec)

mysql> show databases;
+----------+
| Database |
+----------+
| default  |
+----------+
1 row in set (0.02 sec)

mysql> use default;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> show tables;
+--------------------------+
| Tables_in_default        |
+--------------------------+
| example_tbl_partition_01 |
| example_tbl_unique_01    |
| mysql_to_paimon_01       |
| test_paimon_01           |
+--------------------------+
4 rows in set (0.00 sec)

通过 Doris 查询 Paimon 表

select * from mysql_to_paimon_01;

5.2 将Paimon 表的数据导入到 Doris

我们也可以快速的利用catalog 方式将 paimon 数据迁移到 Doris 里,我们可以使用 CATS方式:

create table doris_paimon_01
PROPERTIES("replication_num" = "1")  as  select * from paimon_hdfs.`default`.mysql_to_paimon_01;

注意:

1. 查询paimon的时候如果报下面的错误:

org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "hdfs"

需要再 hdfs 需要再core-site.xml 文件中加上下面的配置:

<property>
  <name>fs.hdfs.impl</name>
  <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
  <description>The FileSystem for hdfs: uris.</description>
</property>

6. 总结

是不是使用非常简单,快快体验Doris 湖仓一体,联邦查询的能力,来加速你的数据分析性能

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/204094.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

网络视频怎么更改IP?使用静态IP更改地址有哪些好处?

随着互联网的普及&#xff0c;越来越多的人开始使用网络视频。有时候&#xff0c;我们可能会遇到需要更改网络视频的IP地址的情况。那么&#xff0c;如何更改IP地址呢&#xff1f;使用静态IP更改地址又有哪些好处呢&#xff1f; 首先&#xff0c;我们来了解一下什么是静态IP地址…

外贸行业多人文件共享云盘推荐

Zoho WorkDrive外贸行业解决方案致力于为各类外贸企业客户提供数字化转型的支持&#xff0c;全面覆盖市场调研、客户服务与管理、产品设计与制作、采购、供应商管理、财务对账、单证报关、仓储管理以及物流运输等环节。Zoho WorkDrive企业网盘提供文件资料在线存储、共享、同步…

C/C++转义符:\x

文章目录 什么是转义符使用"\x"定义char数组宏定义中的\ 什么是转义符 在C语言中&#xff0c;转义符用于将一些特殊字符表示为单个字符&#xff0c;常用的转义符有&#xff1a; \\&#xff1a;反斜杠符号\&#xff1a;单引号\"&#xff1a;双引号\a&#xff1…

动态规划--使用最小花费爬楼梯

题目描述 给你一个整数数组 cost &#xff0c;其中 cost[i] 是从楼梯第 i 个台阶向上爬需要支付的费用。一旦你支付此费用&#xff0c;即可选择向上爬一个或者两个台阶。 你可以选择从下标为 0 或下标为 1 的台阶开始爬楼梯。 请你计算并返回达到楼梯顶部的最低花费。 示例…

如何有效避免七个常见的身份验证漏洞

引言 随着网络威胁的数量不断增加&#xff0c;了解学习可能会危及到客户在线身份的常见身份验证漏洞就显得格外重要。如果需要在网上满足客户的需求&#xff0c;并使用传统的身份验证机制时&#xff0c;就要对身份验证漏洞保持警惕。 只有了解了这些漏洞&#xff0c;才可以更…

vue3随机生成8位字母+数字

// 随机生成8位字母数字 export const autoPassword: any () > {// console.log("自动生成");//可获取的字符串const chars ABCDEFGHIJKLMNOPQRSTUVWSYZabcdefghijklmnopqrstuvwsyz0123456789;const list [];//通过随机获取八个字符串的索引下标for (let i 0;…

谁可以从使用 Amazon Lightsail 进行 VPS 托管中受益?

文章作者&#xff1a;Libai 介绍 在当今数字化的环境中&#xff0c;拥有可靠和高效的托管解决方案对于企业和个人来说至关重要。由于其灵活性、可扩展性和成本效益&#xff0c;虚拟专用服务器&#xff08;VPS&#xff09;托管已经在市场上获得了巨大的流行。Amazon Lightsail …

样品实验Fortegra202环氧树脂增韧剂TDS说明书

样品实验Fortegra202环氧树脂增韧剂TDS说明书 150克/瓶

关于pyqt5与moviepy到打包的坑点

1,pyqt5 关于pyqt5 designer.exe 的使用主要就是了解pyqt5右侧菜单栏的功能使用 打包后的文件&#xff0c;需要继承改类&#xff0c;进行图形指令交互 关于pyqt5&#xff0c;要了解信号&#xff0c;和槽点的相互关系。 我在pyqt5中使用moviepy的时候&#xff0c;需要用到异步…

厦门城市建设与建筑结构健康监测系统的重要性与作用

厦门&#xff0c;这座美丽的海滨城市&#xff0c;随着经济的快速发展和城市化的不断推进&#xff0c;城市建设已成为人们关注的焦点。其中&#xff0c;建筑结构健康监测系统对于保障城市建设和建筑的安全具有举足轻重的地位。 WITBEE万宾针对建筑的动态平衡&#xff0c;温湿度&…

虾皮、Lazada稳定的测评系统需要哪些技术要求

测评作为一项高效运营手段&#xff0c;具有显著的重要性。然而&#xff0c;对于卖家而言&#xff0c;自行建立一套测评系统所需的技术条件并非易事。 在构建系统之前&#xff0c;必须深入理解每个平台的控制风险机制&#xff0c;而后才能开展下一步的建设工作。 1.首先&#…

基于ASP.Net的图书管理系统的设计与实现

摘 要 图书馆管理系统是一整套高科技技术与书本管理知识结合的产物。它把传统书籍静态的服务这个缺陷完美化&#xff0c;完成多媒体数据的交互、远程网络连接、检查搜索智能化、多数据库无障碍联系、跨时空信息服务。图书管理系统用计算机程序替代了传统手工记录的工作模式&am…

四川芸鹰蓬飞带货可靠吗?

随着数字时代的到来&#xff0c;抖音等短视频平台逐渐成为人们生活的一部分。不仅年轻人喜欢在抖音上分享日常生活&#xff0c;越来越多的商家也看到了抖音带货的巨大潜力。在这个充满机遇与挑战的环境中&#xff0c;四川芸鹰蓬飞商务信息咨询有限公司凭借其专业的服务和良好的…

cs11C programming language

cs11C programming language WeChat&#xff1a;yj4399_ Sina Visitor System

T-Rex:检测一切 | 基于视觉提示的开集检测器,检测并计数

图1. 我们引入了一个交互式对象计数模型T-Rex。给定参考图像上指定的框或点&#xff0c;T-Rex 可以检测目标图像上的所有与指定对象表现出相似模式的实例&#xff0c;然后将其相加得到计数结果。我们先通过T-Rex生成检测到框提示&#xff0c;再使用SAM得到mask&#xff0c;以获…

【软件测试】银行核心业务系统性能测试总结,一篇通透...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 下面讨论的是字符…

C++基础 -17-继承中 基类与派生构造和析构调用顺序

首先声明 定义了派生类会同时调用基类和派生的构造函数 定义了派生类会同时调用基类和派生的析构函数 那么顺序如何如下图 构造由上往下顺序执行 析构则完全相反 #include "iostream"using namespace std;class base {public:base(){cout << "base-bui…

力扣283题 移动零 双指针解法

移动零 给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末尾&#xff0c;同时保持非零元素的相对顺序。 请注意 &#xff0c;必须在不复制数组的情况下原地对数组进行操作。 示例 1: 输入: nums [0,1,0,3,12] 输出: [1,3,12,0,0]示例 2: 输入: nums [0] 输出…

Linux:虚拟机安装Ubuntu系统

一、下载Ubuntu 地址&#xff1a;https://cn.ubuntu.com/download/desktop 二、安装 以上配置完成后&#xff0c;点击完成按钮&#xff0c;接下来就是一段较长时间的等待安装过程。 安装完成后&#xff0c;还有一些系统性配置。 系统配置非常简单&#xff0c;全部next即可。…

Linux设置Nginx开机自启

文章目录 获取linux系统是多少位: getconf LONG_BIT获取CentOS版本: lsb_release -a获取nginx的版本: nginx -version第一步配置文件 vim /etc/rc.local最底部增加这一行&#xff1a; /usr/local/nginx/sbin/nginx 第二步注册systemctl服务 在/usr/lib/systemd/system目录…