29、Spark写数据到Hudi时,同步hive表的一些坑

1.hudi的同步hive表没有comment

原以为hudi同步的hive表是根据数据写入的dataframe的schema创建的。就和spark write hive时类似,查看源码后发现不是。

1.1 hudi同步hive的模式

HMS , JDBC , HIVESQL。我这儿常用的是HMS和JDBC
在这里插入图片描述
各个同步模式对应的执行器:
在这里插入图片描述

1.2 schema生成

我们可以看到schema生成的代码块。先从提交的commit中获取元数据信息,没有的话则从数据文件中获取schema。两种方式获取到的schema都是没有comment信息的。
org.apache.hudi.common.table.TableSchemaResolver#getTableParquetSchema
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  /**
   * Gets the schema for a hoodie table. Depending on the type of table, read from any file written in the latest
   * commit. We will assume that the schema has not changed within a single atomic write.
   *
   * @return Parquet schema for this table
   * @throws Exception
   */
  private MessageType getTableParquetSchemaFromDataFile() throws Exception {
    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();

    try {
      switch (metaClient.getTableType()) {
        case COPY_ON_WRITE:
          // If this is COW, get the last commit and read the schema from a file written in the
          // last commit
          HoodieInstant lastCommit =
              activeTimeline.getCommitsTimeline().filterCompletedInstants().lastInstant().orElseThrow(() -> new InvalidTableException(metaClient.getBasePath()));
          HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
              .fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class);
          String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
              .orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for commit "
                  + lastCommit + ", could not get schema for table " + metaClient.getBasePath() + ", Metadata :"
                  + commitMetadata));
          return readSchemaFromBaseFile(new Path(filePath));
        case MERGE_ON_READ:
          // If this is MOR, depending on whether the latest commit is a delta commit or
          // compaction commit
          // Get a datafile written and get the schema from that file
          Option<HoodieInstant> lastCompactionCommit =
              metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
          LOG.info("Found the last compaction commit as " + lastCompactionCommit);

          Option<HoodieInstant> lastDeltaCommit;
          if (lastCompactionCommit.isPresent()) {
            lastDeltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
                .findInstantsAfter(lastCompactionCommit.get().getTimestamp(), Integer.MAX_VALUE).lastInstant();
          } else {
            lastDeltaCommit =
                metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
          }
          LOG.info("Found the last delta commit " + lastDeltaCommit);

          if (lastDeltaCommit.isPresent()) {
            HoodieInstant lastDeltaInstant = lastDeltaCommit.get();
            // read from the log file wrote
            commitMetadata = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastDeltaInstant).get(),
                HoodieCommitMetadata.class);
            Pair<String, HoodieFileFormat> filePathWithFormat =
                commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
                    .filter(s -> s.contains(HoodieLogFile.DELTA_EXTENSION)).findAny()
                    .map(f -> Pair.of(f, HoodieFileFormat.HOODIE_LOG)).orElseGet(() -> {
                      // No Log files in Delta-Commit. Check if there are any parquet files
                      return commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
                          .filter(s -> s.contains((metaClient.getTableConfig().getBaseFileFormat().getFileExtension())))
                          .findAny().map(f -> Pair.of(f, HoodieFileFormat.PARQUET)).orElseThrow(() ->
                              new IllegalArgumentException("Could not find any data file written for commit "
                              + lastDeltaInstant + ", could not get schema for table " + metaClient.getBasePath()
                              + ", CommitMetadata :" + commitMetadata));
                    });
            switch (filePathWithFormat.getRight()) {
              case HOODIE_LOG:
                return readSchemaFromLogFile(lastCompactionCommit, new Path(filePathWithFormat.getLeft()));
              case PARQUET:
                return readSchemaFromBaseFile(new Path(filePathWithFormat.getLeft()));
              default:
                throw new IllegalArgumentException("Unknown file format :" + filePathWithFormat.getRight()
                    + " for file " + filePathWithFormat.getLeft());
            }
          } else {
            return readSchemaFromLastCompaction(lastCompactionCommit);
          }
        default:
          LOG.error("Unknown table type " + metaClient.getTableType());
          throw new InvalidTableException(metaClient.getBasePath());
      }
    } catch (IOException e) {
      throw new HoodieException("Failed to read data schema", e);
    }
  }

1.3建表DDL

获取到schema后,我们再看建表行为。
org.apache.hudi.hive.ddl.DDLExecutor#createTable 定义了这个接口建表方法。有两个实现类,一个是
org.apache.hudi.hive.ddl.HMSDDLExecutor。另一个是 org.apache.hudi.hive.ddl.QueryBasedDDLExecutor
在这里插入图片描述在这里插入图片描述
首先,看org.apache.hudi.hive.ddl.HMSDDLExecutor#createTable方法:
ddl操作中使用的字段信息在HiveSchemaUtil.convertMapSchemaToHiveFieldSchema生成,可以直接在这个方法里看到字段的comment信息是直接写死为空字符串的。
在这里插入图片描述
在这里插入图片描述
再看,org.apache.hudi.hive.ddl.QueryBasedDDLExecutor#createTable方法。
方法里是通过HiveSchemaUtil.generateCreateDDL方法直接生成的ddl建表语句的。这个方法里generateSchemaString方法来生成字段信息的。在这个方法里,也是没有涉及comment信息的。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

1.4结论

同步hive表是在 数据写入hudi目录后,根据目录里的schema来创建的hive表,所以创建的hive表没有带着dataframe的comment信息。需要手动执行修改字段comment。

2.追加comment

2.1.使用spark.sql的方式修改comment

用spark.sql()的方式执行 修改comment的sql语句,会调用hudi里的AlterHoodieTableChangeColumnCommand类。这个里面会比较schema,刷新sparksession里的catalog信息,会让任务hang住。(为什么hang住没去排查)大概操作就是写一个使用新的schema的空数据集到hudi来实现schema更新。
org.apache.spark.sql.hudi.command.AlterHoodieTableChangeColumnCommand。
在这里插入图片描述
在这里插入图片描述

2.2使用hive-sql的方式修改comment

用hive-jdbc的方式执行修改sql语句。这个方式不会更新hive表里的 TBLPROPERTIES 的 'spark.sql.sources.schema.part.0’信息。
使用dataframe的schame.tojson ,去修改 ‘spark.sql.sources.schema.part.0’ 信息

  /**
   * 将 dataframe 中的comment加到 hudi的hive表中
   *
   * @param df      dataframe
   * @param dbTable hive表
   * @param spark   spark session
   */
  def addCommentForSyncHive(df: DataFrame, dbTable: String, spark: SparkSession, writeOptions: mutable.Map[String, String]): Unit = {
    val comment: Map[String, String] = df.schema.map(sf => (sf.name, sf.getComment().getOrElse(""))).toMap
    info(s"数据集的字段名->备注为:\n${comment.mkString("\n")}")


    val jdbcUrlOption = writeOptions.get(DataSourceWriteOptions.HIVE_URL.key())
    val jdbcUserOption = writeOptions.get(DataSourceWriteOptions.HIVE_USER.key())
    val jdbcPassOption = writeOptions.get(DataSourceWriteOptions.HIVE_PASS.key())
    assert(jdbcUrlOption.isDefined, s"${DataSourceWriteOptions.HIVE_URL.key()} 必须被指定")

    val connection = DbUtil.createHiveConnection(
      jdbcUrlOption.get, jdbcUserOption.getOrElse(""), jdbcPassOption.getOrElse("")
    )
    val stmt = connection.createStatement()
    //需要手动更新hive表中的spark.sql.sources.schema.part.0信息
    stmt.execute(s"ALTER TABLE $dbTable SET TBLPROPERTIES ('spark.sql.sources.schema.part.0' = '${df.schema.json}')")

    // 获取表字段和类型
    val tableSchema = spark.sql(s"DESCRIBE $dbTable")
      .select("col_name", "data_type")
      .collect()
      .map(row => (row.getString(0), row.getString(1)))

    tableSchema.foreach { case (column, dataType) =>
      if (comment.contains(column) && !Seq("ym", "ymd").contains(column)) {
        val newComment = comment.getOrElse(column, "")
        val sql = s"""ALTER TABLE $dbTable CHANGE COLUMN $column $column $dataType COMMENT '$newComment'"""
        info(s"添加备注执行sql:$sql")
        try {
          stmt.execute(sql)
        } catch {
          case e:Throwable =>
            warn("添加备注sql执行失败")
        }
      }
    }
    stmt.close()
    connection.close()
  }

修改’spark.sql.sources.schema.part.0’时,因为schema带有备注,会很长,导致超过hive表元数据mysql表字段的长度限制。去mysql中修改这个长度限制(table_params表PARAM_VALUE字段)。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/954996.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

wordpress zibll 2025款新页脚-6ke论坛

演示地址&#xff1a;6KE论坛-综合开放交流论坛 [hidecontent type"reply" desc"隐藏内容&#xff1a;评论后查看"] [/hidecontent]

Freeswitch使用media_bug能力实现回铃音检测

利用freeswitch的media bug能力来在智能外呼时通过websocket对接智能中心的声音检测接口&#xff0c;来实现回铃音检测&#xff0c;来判断用户当前是否已响应&#xff0c;拒接&#xff0c;关机等。 1.回铃音处理流程 2.模块源码目录结构 首先新建一个freeswitch的源码的src/a…

基于SpringBoot的企业级工位管理系统【源码+文档+部署讲解】

系统介绍 基于SpringBootVue实现的企业级工位管理系统采用前后端分离架构方式&#xff0c;系统设计了管理员、员工两种角色&#xff0c;系统实现了用户登录与注册、个人中心、员工管理、部门信息管理、工位信息管理、使用情况管理、工位分配管理等功能。 技术选型 开发工具&…

keepalived双机热备(LVS+keepalived)实验笔记

目录 前提准备&#xff1a; keepalived1&#xff1a; keepalived2&#xff1a; web1&#xff1a; web2&#xff1a; keepalived介绍 功能特点 工作原理 应用场景 前提准备&#xff1a; 准备4台centos&#xff0c;其中两台为keepalived&#xff0c;两台为webkeepalive…

【Linux】12.Linux进程概念(1)

文章目录 1. 冯诺依曼体系结构2. 操作系统(Operator System)概念设计OS的目的胆小的操作系统定位如何理解 "管理"总结 3. 进程基本概念task_struct-PCB的一种task_ struct内容分类组织进程查看进程通过系统调用获取进程标示符通过系统调用创建进程-fork初识 1. 冯诺依…

LabVIEW 程序中的 R6025 错误

R6025错误 通常是 运行时库 错误&#xff0c;特别是与 C 运行时库 相关。这种错误通常会在程序运行时出现&#xff0c;尤其是在使用 C 编译的程序或依赖 C 运行时库的程序时。 ​ 可能的原因&#xff1a; 内存访问冲突&#xff1a; R6025 错误通常是由于程序在运行时访问无效内…

03JavaWeb——Ajax-Vue-Element(项目实战)

1 Ajax 1.1 Ajax介绍 1.1.1 Ajax概述 我们前端页面中的数据&#xff0c;如下图所示的表格中的学生信息&#xff0c;应该来自于后台&#xff0c;那么我们的后台和前端是互不影响的2个程序&#xff0c;那么我们前端应该如何从后台获取数据呢&#xff1f;因为是2个程序&#xf…

2024 京东零售技术年度总结

每一次回望&#xff0c;都为了更好地前行。 2024 年&#xff0c;京东零售技术在全面助力业务发展的同时&#xff0c;在大模型应用、智能供应链、端技术、XR 体验等多个方向深入探索。京东 APP 完成阶段性重要改版&#xff0c;打造“又好又便宜”的优质体验&#xff1b;国补专区…

Apache搭建https服务器

Apache搭建https服务器 REF: 使用OpenSSL自建一个HTTPS服务

XML在线格式化 - 加菲工具

XML在线格式化 打开网站 加菲工具 选择“XML 在线格式化” 输入XML&#xff0c;点击左上角的“格式化”按钮 得到格式化后的结果

BO-SVM贝叶斯算法优化支持向量机的数据多变量时间序列预测

BO-SVM贝叶斯算法优化支持向量机的数据多变量时间序列预测 目录 BO-SVM贝叶斯算法优化支持向量机的数据多变量时间序列预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.Matlab基于BO-SVR贝叶斯算法优化支持向量机的数据多变量时间序列预测&#xff0c;加入5折交叉验…

flutter R库对图片资源进行自动管理

项目中对资源的使用是开发过程中再常见不过的一环。 一般我们在将资源导入到项目中后,会通过资源名称来访问。 但在很多情况下由于我们疏忽输入错了资源名称,从而导致资源无法访问。 所以,急需解决两个问题: 资源编译期可检查可方便预览资源安装相关插件 在vscode中安装两…

【鱼皮大佬API开放平台项目】Spring Cloud Gateway HTTPS 配置问题解决方案总结

问题背景 项目架构为前后端分离的微服务架构&#xff1a; 前端部署在 8000 端口API 网关部署在 9000 端口后端服务包括&#xff1a; api-backend (9001端口)api-interface (9002端口) 初始状态&#xff1a; 前端已配置 HTTPS&#xff08;端口 8000&#xff09;后端服务未配…

Windows远程桌面网关出现重大漏洞

微软披露了其Windows远程桌面网关&#xff08;RD Gateway&#xff09;中的一个重大漏洞&#xff0c;该漏洞可能允许攻击者利用竞争条件&#xff0c;导致拒绝服务&#xff08;DoS&#xff09;攻击。该漏洞被标识为CVE-2025-21225&#xff0c;已在2025年1月的补丁星期二更新中得到…

‌如何有效学习PyTorch:从基础到实践的全面指南‌

随着人工智能和深度学习技术的飞速发展&#xff0c;PyTorch作为当前最流行的深度学习框架之一&#xff0c;凭借其动态计算图、灵活的编程模型以及强大的社区支持&#xff0c;在学术界和工业界均得到了广泛应用。本文旨在为初学者和有一定基础的读者提供一套系统、全面的PyTorch…

2Spark Core

2Spark Core 1.RDD 详解1) 为什么要有 RDD?2) RDD 是什么?3) RDD 主要属性 2.RDD-API1) RDD 的创建方式2) RDD 的算子分类3) Transformation 转换算子4) Action 动作算子 3. RDD 的持久化/缓存4. RDD 容错机制 Checkpoint5. RDD 依赖关系1) 宽窄依赖2) 为什么要设计宽窄依赖 …

视频超分(VSR)论文阅读记录/idea积累(一)

STAR: Spatial-Temporal Augmentation with Text-to-Video Models for Real-World Video Super-Resolution 关键词: text-to-video (T2V) Local Information Enhancement Module (LIEM) Dynamic Frequency (DF) 引言: VSR: 传统VSR分两大类recurrent-based和sliding-wind…

MySQL8数据库全攻略:版本特性、下载、安装、卸载与管理工具详解

大家好&#xff0c;我是袁庭新。 MySQL作为企业项目中的主流数据库&#xff0c;其5.x和8.x版本尤为常用。本文将详细介绍MySQL 8.x的特性、下载、安装、服务管理、卸载及管理工具&#xff0c;旨在帮助用户更好地掌握和使用MySQL数据库。 1.MySQL版本及下载 企业项目中使用的…

Docker安装PostGreSQL docker安装PostGreSQL 完整详细教程

Docker安装PostGreSQL docker安装PostGreSQL 完整详细教程 Docker常用命令大全Docker 运行命令生成Docker 上安装 PostGreSQL 14.15 的步骤&#xff1a;1、拉取 PostGreSQL 14.15 镜像2、创建并运行容器3、测试连接4、设置所有IP都可以运行连接进入容器内 修改配置文件关闭容器…

Elasticsearch:Jira 连接器教程第一部分

作者&#xff1a;来自 Elastic Gustavo Llermaly 将我们的 Jira 内容索引到 Elaasticsearch 中以创建统一的数据源并使用文档级别安全性进行搜索。 在本文中&#xff0c;我们将回顾 Elastic Jira 原生连接器的一个用例。我们将使用一个模拟项目&#xff0c;其中一家银行正在开发…