Hudi 多表摄取工具 HoodieMultiTableStreamer 配置方法与示例

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

1. 多表公共配置和差异配置的两种处理方式


由于 Hudi 的 HoodieMultiTableStreamer / HoodieMultiTableDeltaStreamer 是一次处理多张 Hudi 表的写入,这些表既会有如 hoodie.deltastreamer.source.kafka.value.deserializer.class 这样相同的公共配置,也会有如 hoodie.datasource.write.recordkey.field 这样每张表每张表都不同的个性化配置,为此,HoodieMultiTableStreamer / HoodieMultiTableDeltaStreamer 给出的解决方案是:将公共配置提取到一个配置文件,将每张表的个性化配置放置到多个对应文件中,至于如何将每张表的表名和它的配置文件映射起来,Hudi 提供两种方案:

方式一:

在公共配置文件中通过 hoodie.deltastreamer.ingestion.<db>.<table>.configFile 显式指定 <db>.<table> 对应的配置文件,以下是一个示例:

hoodie.deltastreamer.ingestion.tablesToBeIngested=db1.table1,db2.table2
hoodie.deltastreamer.ingestion.db1.table1.configFile=/tmp/config_table1.properties
hoodie.deltastreamer.ingestion.db2.table2.configFile=/tmp/config_table2.properties

方式二:

将所有表的配置文件统一放置到一个文件夹,并按照 <database>_<table>_config.properties 形式统一命名,通过 --config-folder 参数指明文件夹的路径后,Hudi 就能根据文件名自动映射到对应表,不必再向方式一那样显式配置。这是使用了“约定大约配置”的处理方式,方式二更加简洁,是首选的配置方式,我们接下来就详细介绍一下。

2. 首选方式:使用约定的多表文件命名规则简化配置


这一配置方式可简述为:将所有表的配置文件统一放置到一个文件夹下,并按照 <database>_<table>_config.properties 形式统一命名,同时,在公共配置文件中通过 hoodie.deltastreamer.ingestion.tablesToBeIngested 配置项以 <db1>.<table1>,<db2>.<table2>,... 的形式列出所有表,最后,在命令行中通过参数 --config-folder 指明文件夹的路径,这样 Hudi 就能根据约定的命名规则找到每张表的对应配置文件,那就不必再通过 hoodie.streamer.ingestion.<database>.<table>.configFile 显式地逐一配置。以下是一个示例:

1. common.properties

hoodie.deltastreamer.ingestion.tablesToBeIngested=db1.table1,db2.table2

2. config folder 目录结构

/tmp
├── db1_table1_config.properties
├── db2_table2_config.properties

3. 作业提交命令

spark-submit \
    ...
    --props file://common.properties \
    --config-folder file://tmp \
    ...

3. 启用 Schema Registry 时多个 Topic 的 Schema URL 的配置方法


另一个涉及多表特化配置的地方是在 HoodieMultiTableStreamer 摄取 Debezium CDC 数据写入 Hudi 表时,由于 Hudi 的 Streamer 在处理 Debezium CDC 时强依赖 Confluent Schema Registry,在摄取每一张表对应的 Topic 时都需要指定 Topic 的 Schema Url,为了避免大量的手动配置,HoodieMultiTableStreamer 再次使用了“约定大约配置”的处理方式,它通过hoodie.streamer.schemaprovider.registry.baseUrl 指定 url 的 base 部分,通过 hoodie.streamer.schemaprovider.registry.urlSuffix 指定 url 的后缀部分,中间部分是 Topic 的名称,由 Hudi 自动拼接,这样动态地获得了每张表对应 Topic 的 Schema Url。

4. 重点参数


我们上面提到的几个重点参数再集中梳理一下:

4.1 命令行中的重要参数


  • --base-path-prefix 指定摄取数据后 Hudi 数据集存放的 base 目录,数据集将按照:<base-path-prefix>/<database>/<table> 格式存放
  • --config-folderHoodieMultiTableStreamer 下专门用于指定存放所有表配置文件的路径,配置约定的文件命名 pattern:<database>_<table>_config.properties,Hudi 就能自动找到每张表的配置文件,那不必再通过 hoodie.streamer.ingestion.<database>.<table>.configFile 单独配置

4.2 配置文件中的重要参数


  • hoodie.streamer.ingestion.tablesToBeIngested:需要被实时摄取并同步的表,单表使用 <database>.<table> 形式,多表用逗号分隔,例如:db1.table1,db1.table2

  • hoodie.streamer.ingestion.<database>.<table>.configFile:每张表需要提供的 Hudi 配置文件的存放路径。由于数据表可能非常多,逐一配置所有的表非常繁琐,因此 Hudi Streamer 提供一种文件命名模式:<database>_<table>_config.properties,只要我们将对应表的配置文件以此模式命名并放置于 --config-folder 配置的文件夹下,Hudi 就能自动映射为对应表的配置,不必再显式地配置这一项!

  • hoodie.streamer.schemaprovider.registry.url 是给单表(HoodieStreamer)用的

  • hoodie.streamer.schemaprovider.registry.baseUrl + hoodie.streamer.schemaprovider.registry.urlSuffix 联合起来给多表 用的!!

5. 完整示例


最后,我们引用《CDC 数据入湖方案:Kafka Connect > Kafka + Schema Registry > Hudi ( HoodieMultiTableStreamer ) 》一文第 6 节给出一个完整示例作为一个参考:

tee global-config.properties << EOF
# deltastreamer props
hoodie.deltastreamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter
hoodie.deltastreamer.ingestion.tablesToBeIngested=inventory.orders
hoodie.deltastreamer.schemaprovider.class=org.apache.hudi.utilities.schema.SchemaRegistryProvider
hoodie.deltastreamer.schemaprovider.registry.baseUrl=${SCHEMA_REGISTRY_URL}/subjects/
hoodie.deltastreamer.schemaprovider.registry.urlSuffix=-value/versions/latest
hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
# kafka props
bootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS
auto.offset.reset=earliest
# schema registry props
schema.registry.url=http://10.0.13.30:8085
EOF

tee inventory_orders_config.properties << EOF
include=global-config.properties
hoodie.deltastreamer.source.kafka.topic=osci.mysql-server-3.inventory.orders
hoodie.datasource.write.recordkey.field=order_number
hoodie.datasource.write.partitionpath.field=order_date
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.write.hive_style_partitioning=true
hoodie.datasource.hive_sync.database=inventory
hoodie.datasource.hive_sync.table=orders
hoodie.datasource.hive_sync.partition_fields=order_date
EOF

aws s3 rm --recursive $APP_S3_HOME/inventory_orders

spark-submit \
    --master yarn \
    --deploy-mode client \
    --jars /usr/lib/spark/connector/lib/spark-avro.jar \
    --class org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer \
    /usr/lib/hudi/hudi-utilities-bundle.jar \
    --props file://$HOME/global-config.properties \
    --table-type COPY_ON_WRITE \
    --op UPSERT \
    --config-folder file://$HOME \
    --base-path-prefix $APP_S3_HOME \
    --target-table inventory.orders \
    --continuous \
    --min-sync-interval-seconds 60 \
    --source-class org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource \
    --payload-class org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload \
    --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider

关联阅读

  • 《CDC 数据入湖方案:Kafka Connect > Kafka + Schema Registry > Hudi ( HoodieMultiTableStreamer ) 》

  • 《CDC 数据入湖方案:Flink CDC > Kafka + Schema Registry > Hudi ( HoodieMultiTableStreamer ) 》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/645851.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Linux初探】:解锁开源世界的神秘钥匙

文章目录 &#x1f680;一、了解Linux&#x1f525;二、Linux 的发行版❤️三、Linux应用领域&#x1f4a5;四、Linux vs Windows & mac &#x1f680;一、了解Linux Linux是一种自由、开放源代码的操作系统&#xff0c;它的内核由芬兰计算机科学家Linus Torvalds在1991年创…

【九十四】【算法分析与设计】练习四蛮力法练习,排列问题和组合问题,求解最大连续子序列和问题,求解幂集问题,求解0/1背包问题,求解任务分配问题

求解最大连续子序列和问题 给定一个有n&#xff08;n≥1&#xff09;个整数的序列&#xff0c;要求求出其中最大连续子序列的和。 例如&#xff1a; 序列&#xff08;-2&#xff0c;11&#xff0c;-4&#xff0c;13&#xff0c;-5&#xff0c;-2&#xff09;的最大子序列和为20…

机器人支持回调接口配置(详细教程)

大家伙&#xff0c;我是雄雄&#xff0c;欢迎关注微信公众号&#xff1a;雄雄的小课堂。 一、前言 今天&#xff0c;给大家介绍一下&#xff0c;如何在机器人中配置回调地址和接口编写。很多时候我们可能有这样的场景&#xff0c;收到消息后&#xff0c;想自己处理一下消息的内…

用Python一键生成PNG图片的PowerPoint幻灯片

在当今的商业环境中,PowerPoint演示是展示和传递信息的常用方式。然而,手动将大量图像插入到幻灯片中往往是一项乏味且耗时的工作。但是,通过Python编程,我们可以轻松自动化这个过程,节省时间和精力。 C:\pythoncode\new\folderTOppt.py 在本文中,我将介绍如何使用Python、wx…

Rust开源Web框架Salvo源码编译

1.克隆源码: https://github.com/salvo-rs/salvo.git 2.进入salve目录并运行cargo build编译 编译成功 3.编译生成的库 4.安装salve-cli git clone --recursive https://github.com/salvo-rs/salvo-cli.git 编译salve-cli

人工智能万卡 GPU 集群的硬件和网络架构

万卡 GPU 集群互联:硬件配置和网络设计 一、背景 自从 OpenAI 推出 ChatGPT 以来,LLM 迅速成为焦点关注的对象,并取得快速发展。众多企业纷纷投入 LLM 预训练,希望跟上这一波浪潮。然而,要训练一个 100B 规模的 LLM,通常需要庞大的计算资源,例如拥有万卡 GPU 的集群。以…

Google Play 提示 “您的设备与此版本不兼容“ 解决方案

一、 问题概述Google Play提示“您的设备与此版本不兼容”&#xff0c;无法安装应用。 遇到问题的设备为Xiaomi Mi A3&#xff0c;查了下这台手机的基本信息&#xff0c;Android One系统&#xff0c;版本分为9.0、10.0、11.0。 二、 问题分析Google Play的过滤器 通常有以下5种…

【Nginx <末>】Nginx 基于 IP 地址的访问限制

目录 &#x1f44b;前言 &#x1f4eb;一、限制 IP 可以实现哪些功能 &#x1f440;二、 项目实现 2.1 访问控制实现 2.2 Nginx 配置中指定 IP 地址 &#x1f49e;️三、章末 &#x1f44b;前言 小伙伴们大家好&#xff0c;前面一段时间学习了 Nginx 的相关知识&#xff0c…

RT-DRET在实时目标检测上超越YOLO8

导读 目标检测作为计算机视觉的核心任务之一&#xff0c;其研究已经从基于CNN的架构发展到基于Transformer的架构&#xff0c;如DETR&#xff0c;后者通过简化流程实现端到端检测&#xff0c;消除了手工设计的组件。尽管如此&#xff0c;DETR的高计算成本限制了其在实时目标检测…

React useState基本类型变量的使用

在 React 中&#xff0c;useState 是一个 Hook&#xff0c;用于在函数组件中添加状态&#xff0c;它可以让函数组件拥有状态。基本使用方法如下&#xff1a; // App.jsx import React, { useState } from reactfunction App() {// 使用 useState 创建一个状态变量&#xff0c;初…

如何用Java实现SpringCloud Alibaba Sentinel的熔断功能?

在Java中使用Spring Cloud Alibaba Sentinel实现熔断功能的步骤如下&#xff1a; 添加依赖 在项目的pom.xml文件中添加Spring Cloud Alibaba Sentinel的依赖&#xff1a; <dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud…

C语言——malloc和free用法和常见误区

最近写了个关于动态数组的代码&#xff0c;遇到了一个大坑&#xff0c;特此记录 先说结论&#xff1a; 1.利用malloc创建堆空间&#xff0c;大小最好设置大一点&#xff0c;不然后面存进去的值需要的空间过大会导致各种的堆、指针问题 2.只能使用realloc对已经创建的空间进行修…

没有电商经验的人去操作抖音小店,难度大不大?好操作吗?

大家好&#xff0c;我是电商小V 很多新手小伙伴想去操作抖音小店项目&#xff0c;咨询的最多的问题就是我没有电商运营的经验可以去操作吗&#xff1f; 当然是可以操作的&#xff0c;抖音小店项目对于新手来说是一个非常友好的项目&#xff0c;很多小伙伴都是感觉没有电商经验去…

C++——list的实现以及源码

前言&#xff1a; 最近学习了clist的实现&#xff0c;这让我对迭代器的理解又上升了一个新的高度&#xff0c;注意&#xff1a;代码里的list是放在一个叫zgw的命名空间里边&#xff0c;但是在实现list的代码中没有加namespace&#xff0c;这里给个注意&#xff0c;以后复习时能…

整理了10个靠谱且热门的赚钱软件,适合普通人长期做的赚钱副业

作为一名普通的上班族&#xff0c;我们每天都在辛勤工作&#xff0c;但工资的增长速度却如同蜗牛般缓慢。不过&#xff0c;别担心&#xff0c;信息时代总是带给我们无尽的惊喜&#xff01;今天&#xff0c;我将为大家推荐一些赚钱的宝藏软件&#xff0c;让你在闲暇之余轻松实现…

五分钟搭建一个Suno AI音乐站点

五分钟搭建一个Suno AI音乐站点 在这个数字化时代&#xff0c;人工智能技术正以惊人的速度改变着我们的生活方式和创造方式。音乐作为一种最直接、最感性的艺术形式&#xff0c;自然也成为了人工智能技术的应用场景之一。今天&#xff0c;我们将以Vue和Node.js为基础&#xff…

三十六计的笔记

系列文章目录 三十六计的笔记 文章目录 系列文章目录1、瞒天过海2、围魏救赵3、借刀杀人4、以逸待劳5、趁火打劫6、声东击西7、无中生有8、暗渡陈仓9、隔岸观火10、笑里藏刀11、李代桃僵12、顺手牵羊13、打草惊蛇14、借尸还魂15、调虎离山16、欲擒故纵17、抛砖引玉18、擒贼擒王…

牛客NC302 环形数组的连续子数组最大和【中等 动态规划 Java/Go/PHP/C++】

题目 题目链接&#xff1a; https://www.nowcoder.com/practice/e9f3282363844355aa51497c5410beee 思路 动态规划 两种情况&#xff08;首位相连的&#xff09;和首位不相连的 首尾相连的可以算最小的连续子数组得出&#xff0c;sum-就是。Java代码 import java.util.*;pub…

第20届文博会:“特别呈现”—周瑛瑾雷米·艾融双个展,著名美术评论家,批评家彭德教授对周瑛瑾作品进行评论

周瑛瑾不是学院派艺术家&#xff0c;但在彩墨画领域的天赋超出中国八大美院的同类型画家。相比具有批判意识的当代艺术&#xff0c;他的彩墨艺术如同我们这个苦难世界的创可贴和安慰剂。当我面对他的彩墨画&#xff0c;首先是惊艳&#xff0c;随之想到屈原的离骚&#xff0c;还…

slint esp32 tokio

源码&#xff1a;https://github.com/xiaguangbo/slint_esp32_tokio cpu 是 esp32c2&#xff0c;屏幕是 ili9341&#xff0c;触摸是 xpt2046&#xff0c;使用 spi 半双工 不使用DMA&#xff08;esp-rs还没支持&#xff09;&#xff0c;SPI 40M&#xff0c;240*320全屏刷新为1.5…