Iceberg从入门到精通系列之十七:Apache InLong往Iceberg同步数据

Iceberg从入门到精通系列之十七:Apache InLong往Iceberg同步数据

  • 一、概览
  • 二、版本支持
  • 三、依赖项
  • 四、SQL API 用法
  • 五、多表写入
  • 六、动态表名映射
  • 七、动态建库、建表
  • 八、动态schema变更
  • 九、Iceberg Load 节点参数
  • 十、数据类型映射

一、概览

  • Apache Iceberg是一种用于大型分析表的高性能格式。

二、版本支持

提取节点版本
IcebergIceberg:0.12.x,0.13.x

三、依赖项

<dependency>
    <groupId>org.apache.inlong</groupId>
    <artifactId>sort-connector-iceberg</artifactId>
    <version>1.7.0</version>
</dependency>

四、SQL API 用法

在 flink 中创建Iceberg表,我们推荐使用Flink SQL Client,因为它更便于用户理解概念。

Step.1 在hadoop环境下启动一个独立的flink集群。

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

# Start the flink standalone cluster
./bin/start-cluster.sh

Step.2 启动flink SQL客户端。

flink-runtime在 iceberg 项目中创建了一个单独的模块来生成一个捆绑的 jar,可以直接由 flink SQL 客户端加载。

如果想要flink-runtime手动构建捆绑的 jar,只需构建inlong项目,它将在<inlong-root-dir>/inlong-sort/sort-connectors/iceberg/target。

默认情况下,iceberg 包含用于 hadoop 目录的 hadoop jars。如果我们要使用 hive 目录,我们需要在打开 flink sql 客户端时加载 hive jars。幸运的是,apache inlong将 一个捆绑的hive jar打包进入Iceberg。所以我们可以如下打开sql客户端:

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

./bin/sql-client.sh embedded -j <flink-runtime-directory>/iceberg-flink-runtime-xxx.jar shell

Step.3 在当前 Flink 目录中创建表

默认情况下,我们不需要创建目录,只需使用内存目录即可。在目录中如果catalog-database.catalog-table不存在,会自动创建。这里我们只是加载数据。

在 Hive 目录中管理的表

下面的 SQL 会在当前 Flink 目录中创建一个 Flink 表,映射到 iceberg 目录中default_database.iceberg_table管理的 iceberg 表。由于目录类型默认是 hive,所以这里不需要放catalog-type.

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hive_prod',
    'uri'='thrift://localhost:9083',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

如果要创建 Flink 表映射到 Hive 目录中管理的不同Iceberg表(例如hive_db.hive_iceberg_table在 Hive 中),则可以创建 Flink 表,如下所示:

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hive_prod',
    'catalog-database'='hive_db',
    'catalog-table'='hive_iceberg_table',
    'uri'='thrift://localhost:9083',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

将记录写入 Flink 表时,如果底层目录数据库(hive_db上例中)不存在,则会自动创建它。

在 hadoop 目录中管理的表

以下 SQL 将在当前 Flink 目录中创建一个 Flink 表,该表映射到default_database.flink_tablehadoop 目录中管理Iceberg表。

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hadoop_prod',
    'catalog-type'='hadoop',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

Step.6 向Iceberg表中插入数据

INSERT INTO `flink_table` 
    SELECT 
    `id` AS `id`,
    `d` AS `name`
    FROM `source_table`

在自定义Catalog中管理的表

以下 SQL 将在当前 Flink 目录中创建一个 Flink 表,该表映射到default_database.flink_table自定义目录中管理的Iceberg表。

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='custom_prod',
    'catalog-type'='custom',
    'catalog-impl'='com.my.custom.CatalogImpl',
     -- More table properties for the customized catalog
    'my-additional-catalog-config'='my-value',
     ...
);

五、多表写入

目前 Iceberg 支持多表同时写入,需要在 FLINK SQL 的建表参数上添加 ‘sink.multiple.enable’ = ‘true’ 并且目标表的schema 只能定义成 BYTES 或者 STRING ,以下是一个建表语句举例:

CREATE TABLE `table_2`(
    `data` STRING)
WITH (
    'connector'='iceberg-inlong',
    'catalog-name'='hive_prod',
    'uri'='thrift://localhost:9083',
    'warehouse'='hdfs://localhost:8020/hive/warehouse',
    'sink.multiple.enable' = 'true',
    'sink.multiple.format' = 'canal-json',
    'sink.multiple.add-column.policy' = 'TRY_IT_BEST',
    'sink.multiple.database-pattern' = '${database}',
    'sink.multiple.table-pattern' = 'test_${table}'
);

要支持多表写入同时需要设置上游数据的序列化格式(通过选项 ‘sink.multiple.format’ 来设置, 目前仅支持 [canal-json|debezium-json])。

六、动态表名映射

Iceberg 在多表写入的时可以自定义映射的数据库名和表名的规则,可以填充占位符然后添加前后缀来修改映射的目标表名称。 Iceberg Load Node 会解析 ‘sink.multiple.database-pattern’ 作为目的端的 数据库名, 解析 ‘sink.multiple.table-pattern’ 作为目的端的表名,占位符是从数据中解析出来的,变量是严格通过 ‘${VARIABLE_NAME}’ 来表示, 变量的取值来自于数据本身, 即可以是通过 ‘sink.multiple.format’ 指定的某种 Format 的元数据字段, 也可以是数据中的物理字段。 关于 ‘topic-parttern’ 的例子如下:

  • ‘sink.multiple.format’ 为 ‘canal-json’:
  • ‘topic-pattern’ 为 ‘{database}_${table}’, 提取后的 Topic 为 ‘inventory_products’ (‘database’, ‘table’ 为元数据字段, ‘id’ 为物理字段)
  • ‘topic-pattern’ 为 ‘{database} t a b l e {table} table{id}’, 提取后的 Topic 为 ‘inventory_products_111’ (‘database’, ‘table’ 为元数据字段, ‘id’ 为物理字段)

七、动态建库、建表

Iceberg在多表写入时遇到不存在的表和不存在的库时会自动创建数据库和数据表,并且支持在运行过程中新增捕获额外的表入库。 默认的Iceberg表参数为:‘format-version’ = ‘2’、‘write.upsert.enabled’ = ‘true’'、‘engine.hive.enabled’ = ‘true’

八、动态schema变更

Iceberg在多表写入时支持同步源表结构变更到目标表(DDL同步),支持的schema变更如下:

在这里插入图片描述

九、Iceberg Load 节点参数

选项是否必须默认值类型描述
connector必需(none)String指定要使用的连接器,应该是’iceberg’
catalog-type必需hiveStringhive或hadoop用于内置目录,或为使用catalog-impl的自定义目录实现未设置
catalog-name必需(none)String目录名称
catalog-database必需(none)String目录名称
catalog-table必需(none)String在Iceberg目录中管理的数据库名称
catalog-impl自定义custom可选(none)String如果未设置,则必须设置完全限定的类名自定义目录实现catalog-type
cache-enabled可选trueBoolean是否启用目录缓存,默认值为true
urihive catalog可选(none)StringHive元存储的thrift URI
clientshive catalog可选2IntegerHive Metastore客户端池大小,默认值为2
warehousehive catalog或hadoop catalog可选(none)String对于Hive目录,是Hive仓库位置,如果既不设置hive-conf-dir指定包含hive-site.xml配置文件的位置也不添加正确hive-site.xml的类路径,用户应指定此路径。对于hadoop目录,HDFS目录存放元数据文件和数据文件
hive-conf-dirhive catalog可选(none)Stringhive-site.xml包含将用于提供自定义 Hive 配置值的配置文件的目录的路径。如果同时设置和创建Iceberg目录时,hive.metastore.warehouse.dirfrom <hive-conf-dir>/hive-site.xml(或来自类路径的 hive 配置文件)的值将被该值覆盖。warehousehive-conf-dirwarehouse
inlong.metric.labels可选(none)Stringinlong metric 的标签值,该值的构成为groupId={groupId}&streamId={streamId}&nodeId={nodeId}。
sink.multiple.enable可选falseBoolean是否开启多路写入
sink.multiple.schema-update.policy可选TRY_IT_BESTEnum遇到数据中schema和目标表不一致时的处理策略TRY_IT_BEST:尽力而为,尽可能处理,处理不了的则忽略IGNORE_WITH_LOG:忽略并且记录日志,后续该表数据不再处理THROW_WITH_STOP:抛异常并且停止任务,直到用户手动处理schema不一致的情况
sink.multiple.pk-auto-generated可选falseBoolean是否自动生成主键,对于多路写入自动建表时当源表无主键时是否将所有字段当作主键
sink.multiple.typemap-compatible-with-spark可选falseBoolean是否适配spark的类型系统,对于多路写入自动建表时是否需要适配spark的类型系统

十、数据类型映射

Iceberg数据类型详细信息。这里介绍了加载数据如何将 Iceberg 类型转换为 Flink 类型。

Flink SQL类型Iceberg类型
CHARSTRING
VARCHARSTRING
STRINGSTRING
BOOLEANBOOLEAN
BINARYFIXED(L)
VARBINARYBINARY
DECIMALDECIMAL(P,S)
TINYINTINT
SMALLINTINT
INTEGERINT
BIGINTLONG
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
TIMESTAMP_LTZTIMESTAMPTZ
INTERVAL-
ARRAYLIST
MULTISETMAP
MAPMAP
ROWSTRUCT
RAW-

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/38006.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Flutter系列文章-Flutter环境搭建和Dart基础

Flutter是Google推出的一个开源的、高性能的移动应用开发框架&#xff0c;可以用一套代码库开发Android和iOS应用。Dart则是Flutter所使用的编程语言。让我们来看看如何搭建Flutter开发环境&#xff0c;并了解Dart语言的基础知识。 一、Flutter环境搭建 1. 安装Flutter SDK …

springboot+ElasticSearch+Logstash+Kibana实现日志采集ELK

ElasticSearchLogstashKibana日志管理 一、什么是ELK? ELK是Elasticsearch、Logstash、Kibana的简称&#xff0c;这三者是核心套件&#xff0c;但并非全部。一般情况下我们可以把日志保存在日志文件当中&#xff0c;也可以把日志存入数据库当中。但随着业务量的增加&#xf…

搭建 Java 部署环境,部署 Web 项目到 Linux

为了进行部署&#xff0c;把写好的 java web 程序放到 Linux 上&#xff0c;需要先把对应的依赖的软件 (环境) 搭建好&#xff0c;安装一些必要的软件程序 JDKTomcatMySqL jdk 直接使用包管理器进行安装(基于yum安装) 一、yum 1、认识 yum yum (Yellow dog Updater, Modified…

6. Java + Selenium 环境搭建

前提&#xff1a;Java 版本最低要求为 8&#xff1b;推荐使用 chrome 浏览器 chrome Java 1. 下载 chrome 浏览器&#xff08;推荐&#xff09; 2. 查看 chrome 浏览器版本 重点记住前两位即可。 3. 下载 chrome 浏览器驱动 下载链接&#xff1a; https://chromedriver.…

我爱学QT-仿写智能家居界面 上 中 下

学习链接&#xff1a; 仿写一个智能家居界面&#xff08;上&#xff09;_哔哩哔哩_bilibili 上 给QT工程添加资源文件 在这里 然后选这个&#xff0c;choose后会有起名&#xff0c;之一千万不能是中文&#xff0c;要不就等报错吧 然后把你要添加的图片托到文件夹下&#xf…

云计算基础教程(第2版)笔记——基础篇与技术篇介绍

文章目录 前言 第一篇 基础篇 一 绪论 1.1 云计算的概念以及特征 1.1.1云计算的基本概念 1.1.2云计算的基本特征 1.2 云计算发展简史 1.3 三种业务模式介绍 1. 基础设施即服务&#xff08;IaaS&#xff09; 2. 平台即服务&#xff08;PaaS&#xff09; 3. 软…

F#奇妙游(12):并行编程与π

核越多&#xff0c;越快乐 多核CPU对于计算机程序的开发带来了很大的挑战&#xff0c;但是也带来了很大的机遇。在多核CPU上&#xff0c;程序的性能可以通过并行化来提升&#xff0c;但是并行化的难度也随之提升。本文将介绍多核CPU的基本概念&#xff0c;以及如何在多核CPU上…

OpenCv图像基本变换

目录 一、图像翻转 二、图像旋转 三、仿射变换之平移 四、仿射变换之获取变换矩阵 五、仿射变换之透视变换 一、图像翻转 图像翻转不等同于旋转&#xff0c;类似于一些视频的拍摄&#xff0c;拍摄后实际是左右颠倒的&#xff0c;通过图像翻转可进行还原 案例代码如下: …

Android之Intent

意图介绍 一个意图(Intent)对象包含了目标组件、动作、数据、类别、附加数据、标志六个部分。 目标组件 目标组件可以帮助应用发送显式意图调用请求。在创建Intent时&#xff0c;可以通过setComponent方法来设置一个组件&#xff0c;如&#xff1a; //设置组件 intent.setC…

Linux - CentOS 二进制安装 MySQL 8.0.31(非常实用)

一、下载 mysql-8.0.31-linux-glibc2.12-x86_64.tar.xz 下载地址&#xff1a;MySQL :: Download MySQL Community Server (Archived Versions) 具体如下图所示&#xff1a; 二、将 mysql-8.0.31-linux-glibc2.12-x86_64.tar.xz 放入到服务器的 /usr/local &#xff08;路径可…

C++万字自学笔记

[TOC] 一、 C基础 C的IDE有CLion、Visual Studio、DEV C、eclipse等等&#xff0c;这里使用CLion进行学习。 0. C初识 0.1 第一个C程序 编写一个C程序总共分为4个步骤 创建项目创建文件编写代码运行程序 #include <iostream>int main() {using namespace std;cout…

SpringCloud Alibaba——Ribbon底层怎样实现不同服务的不同配置

目录 一、Ribbon底层怎样实现不同服务的不同配置二、源码角度分析 一、Ribbon底层怎样实现不同服务的不同配置 为不同服务创建不同的spring上下文&#xff0c;不同的spring上下文中存放对应这个服务所有的配置。 二、源码角度分析 SpringClientFactory中可以获取到所有ribbon…

基于MSP432P401R跟随小车(一)【2022年电赛】

文章目录 一、赛前准备1. 硬件清单2. 工程环境 二、赛题思考三、软件设计1. 路程、时间、速度计算2. 距离测量3. 双机通信4. 红外循迹 四、技术交流 一、赛前准备 1. 硬件清单 主控板&#xff1a; MSP432P401R测距模块&#xff1a; GY56数据显示&#xff1a; OLED电机&#x…

7.5 SpringBoot 拦截器Interceptor实战 统一角色权限校验

文章目录 前言一、定义注解annotation二、拦截角色注解1. 在拦截器哪里拦截&#xff1f;2. 如何拦截角色注解&#xff1f;3. 角色如何读取?4. 最后做角色校验 三、应用&#xff1a;给管理员操作接口加注解四、PostMan测试最后 前言 在【7.1】管理员图书录入和修改API&#xf…

原生js实现将图片内容复制到剪贴板

核心代码 /*复制图片*/ copyImg(dom) {/* 警告&#xff1a;dom不能是img标签&#xff0c;建议用DIV标签包裹img标签&#xff0c;否者会报错&#xff01;不支持复制背景图&#xff01; */dom.style.userSelect auto;let selection getSelection(), range document.createRan…

支持向量机(SVM)的超参数调整 C 和 Gamma 参数

作者:CSDN @ _养乐多_ 支持向量机(Support Vector Machine,SVM)是一种广泛应用的监督式机器学习算法。它主要用于分类任务,但也适用于回归任务。在本文中,我们将深入探讨支持向量机的两个重要参数:C和gamma。在阅读本文前,我假设您对该算法有基本的了解,并专注于这些…

从网络安全行业人才需求讲讲【个人规划】

如果你是一名正在找工作的网络安全方向大学生&#xff0c;或者是刚刚踏入网络安全领域的新手&#xff0c;这篇文章很适合你&#xff0c;如果你是一名老网安人&#xff0c;看看有哪些你中招了。 当你打开BOSS直聘、拉钩等招聘类网站后&#xff0c;在首页的快速导航页面很难找到关…

微信小程序安装和使用 Vant Weapp 组件库

微信小程序安装和使用 Vant Weapp 组件库 1. Vant Weapp 介绍2. Vant Weapp 的 安装2.1. 通过npm安装2.2. 构建npm2.3. 修改 app.json2.4. 修改 project.congfig.json2.5. 测试一下&#xff0c;使用Vant Weapp提供的组件 1. Vant Weapp 介绍 Vant 是一个轻量、可靠的移动端组件…

《遗留系统现代化》读书笔记(基础篇)

你现在所写的每一行代码&#xff0c;都是未来的遗留系统 为什么要对遗留系统进行现代化&#xff1f; 什么是遗留系统&#xff1f; 判断遗留系统的几个维度&#xff1a;代码、架构、测试、DevOps 以及技术和工具。时间长短并不是衡量遗留系统的标准。代码质量差、架构混乱、没…

【深度学习笔记】正则化与 Dropout

本专栏是网易云课堂人工智能课程《神经网络与深度学习》的学习笔记&#xff0c;视频由网易云课堂与 deeplearning.ai 联合出品&#xff0c;主讲人是吴恩达 Andrew Ng 教授。感兴趣的网友可以观看网易云课堂的视频进行深入学习&#xff0c;视频的链接如下&#xff1a; 神经网络和…