博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。 |
1. 多表公共配置和差异配置的两种处理方式
由于 Hudi 的 HoodieMultiTableStreamer
/ HoodieMultiTableDeltaStreamer
是一次处理多张 Hudi 表的写入,这些表既会有如 hoodie.deltastreamer.source.kafka.value.deserializer.class
这样相同的公共配置,也会有如 hoodie.datasource.write.recordkey.field
这样每张表每张表都不同的个性化配置,为此,HoodieMultiTableStreamer
/ HoodieMultiTableDeltaStreamer
给出的解决方案是:将公共配置提取到一个配置文件,将每张表的个性化配置放置到多个对应文件中,至于如何将每张表的表名和它的配置文件映射起来,Hudi 提供两种方案:
方式一:
在公共配置文件中通过 hoodie.deltastreamer.ingestion.<db>.<table>.configFile
显式指定 <db>.<table>
对应的配置文件,以下是一个示例:
hoodie.deltastreamer.ingestion.tablesToBeIngested=db1.table1,db2.table2
hoodie.deltastreamer.ingestion.db1.table1.configFile=/tmp/config_table1.properties
hoodie.deltastreamer.ingestion.db2.table2.configFile=/tmp/config_table2.properties
方式二:
将所有表的配置文件统一放置到一个文件夹,并按照 <database>_<table>_config.properties
形式统一命名,通过 --config-folder
参数指明文件夹的路径后,Hudi 就能根据文件名自动映射到对应表,不必再向方式一那样显式配置。这是使用了“约定大约配置”的处理方式,方式二更加简洁,是首选的配置方式,我们接下来就详细介绍一下。
2. 首选方式:使用约定的多表文件命名规则简化配置
这一配置方式可简述为:将所有表的配置文件统一放置到一个文件夹下,并按照 <database>_<table>_config.properties
形式统一命名,同时,在公共配置文件中通过 hoodie.deltastreamer.ingestion.tablesToBeIngested
配置项以 <db1>.<table1>,<db2>.<table2>,...
的形式列出所有表,最后,在命令行中通过参数 --config-folder
指明文件夹的路径,这样 Hudi 就能根据约定的命名规则找到每张表的对应配置文件,那就不必再通过 hoodie.streamer.ingestion.<database>.<table>.configFile
显式地逐一配置。以下是一个示例:
1. common.properties
hoodie.deltastreamer.ingestion.tablesToBeIngested=db1.table1,db2.table2
2. config folder 目录结构
/tmp
├── db1_table1_config.properties
├── db2_table2_config.properties
3. 作业提交命令
spark-submit \
...
--props file://common.properties \
--config-folder file://tmp \
...
3. 启用 Schema Registry 时多个 Topic 的 Schema URL 的配置方法
另一个涉及多表特化配置的地方是在 HoodieMultiTableStreamer 摄取 Debezium CDC 数据写入 Hudi 表时,由于 Hudi 的 Streamer 在处理 Debezium CDC 时强依赖 Confluent Schema Registry,在摄取每一张表对应的 Topic 时都需要指定 Topic 的 Schema Url,为了避免大量的手动配置,HoodieMultiTableStreamer 再次使用了“约定大约配置”的处理方式,它通过hoodie.streamer.schemaprovider.registry.baseUrl
指定 url 的 base 部分,通过 hoodie.streamer.schemaprovider.registry.urlSuffix
指定 url 的后缀部分,中间部分是 Topic 的名称,由 Hudi 自动拼接,这样动态地获得了每张表对应 Topic 的 Schema Url。
4. 重点参数
我们上面提到的几个重点参数再集中梳理一下:
4.1 命令行中的重要参数
--base-path-prefix
指定摄取数据后 Hudi 数据集存放的 base 目录,数据集将按照:<base-path-prefix>/<database>/<table>
格式存放--config-folder
在HoodieMultiTableStreamer
下专门用于指定存放所有表配置文件的路径,配置约定的文件命名 pattern:<database>_<table>_config.properties
,Hudi 就能自动找到每张表的配置文件,那不必再通过hoodie.streamer.ingestion.<database>.<table>.configFile
单独配置
4.2 配置文件中的重要参数
-
hoodie.streamer.ingestion.tablesToBeIngested
:需要被实时摄取并同步的表,单表使用<database>.<table>
形式,多表用逗号分隔,例如:db1.table1,db1.table2
-
hoodie.streamer.ingestion.<database>.<table>.configFile
:每张表需要提供的 Hudi 配置文件的存放路径。由于数据表可能非常多,逐一配置所有的表非常繁琐,因此 Hudi Streamer 提供一种文件命名模式:<database>_<table>_config.properties
,只要我们将对应表的配置文件以此模式命名并放置于--config-folder
配置的文件夹下,Hudi 就能自动映射为对应表的配置,不必再显式地配置这一项! -
hoodie.streamer.schemaprovider.registry.url
是给单表(HoodieStreamer)用的 -
hoodie.streamer.schemaprovider.registry.baseUrl
+hoodie.streamer.schemaprovider.registry.urlSuffix
联合起来给多表 用的!!
5. 完整示例
最后,我们引用《CDC 数据入湖方案:Kafka Connect > Kafka + Schema Registry > Hudi ( HoodieMultiTableStreamer ) 》一文第 6 节给出一个完整示例作为一个参考:
tee global-config.properties << EOF
# deltastreamer props
hoodie.deltastreamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter
hoodie.deltastreamer.ingestion.tablesToBeIngested=inventory.orders
hoodie.deltastreamer.schemaprovider.class=org.apache.hudi.utilities.schema.SchemaRegistryProvider
hoodie.deltastreamer.schemaprovider.registry.baseUrl=${SCHEMA_REGISTRY_URL}/subjects/
hoodie.deltastreamer.schemaprovider.registry.urlSuffix=-value/versions/latest
hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
# kafka props
bootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS
auto.offset.reset=earliest
# schema registry props
schema.registry.url=http://10.0.13.30:8085
EOF
tee inventory_orders_config.properties << EOF
include=global-config.properties
hoodie.deltastreamer.source.kafka.topic=osci.mysql-server-3.inventory.orders
hoodie.datasource.write.recordkey.field=order_number
hoodie.datasource.write.partitionpath.field=order_date
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.write.hive_style_partitioning=true
hoodie.datasource.hive_sync.database=inventory
hoodie.datasource.hive_sync.table=orders
hoodie.datasource.hive_sync.partition_fields=order_date
EOF
aws s3 rm --recursive $APP_S3_HOME/inventory_orders
spark-submit \
--master yarn \
--deploy-mode client \
--jars /usr/lib/spark/connector/lib/spark-avro.jar \
--class org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer \
/usr/lib/hudi/hudi-utilities-bundle.jar \
--props file://$HOME/global-config.properties \
--table-type COPY_ON_WRITE \
--op UPSERT \
--config-folder file://$HOME \
--base-path-prefix $APP_S3_HOME \
--target-table inventory.orders \
--continuous \
--min-sync-interval-seconds 60 \
--source-class org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource \
--payload-class org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider
关联阅读:
-
《CDC 数据入湖方案:Kafka Connect > Kafka + Schema Registry > Hudi ( HoodieMultiTableStreamer ) 》
-
《CDC 数据入湖方案:Flink CDC > Kafka + Schema Registry > Hudi ( HoodieMultiTableStreamer ) 》