从‘discover.partitions‘=‘true‘分析Hive的TBLPROPERTIES

从’discover.partitions’='true’分析Hive的TBLPROPERTIES

前言

Hive3.1.2先建表:

show databases ;

use db_lzy;

show tables ;

create external table if not exists test_external_20230502(
    id int,
    comment1 string,
    comment2 string
)
stored as parquet 
;

create external table if not exists test_external_par_20230502(
    id int,
    comment1 string,
    comment2 string
)
partitioned by (
    dt string
    )
stored as parquet
;

然后查看建表语句:

show create table test_external_20230502;
show create table test_external_par_20230502;

可以看到:

hive (db_lzy)> show create table test_external_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_20230502`(
  `id` int,
  `comment1` string,
  `comment2` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_20230502'
TBLPROPERTIES (
  'bucketing_version'='2',
  'transient_lastDdlTime'='1683028671')
Time taken: 0.181 seconds, Fetched: 15 row(s)
hive (db_lzy)>
             > show create table test_external_par_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_par_20230502`(
  `id` int,
  `comment1` string,
  `comment2` string)
PARTITIONED BY (
  `dt` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_par_20230502'
TBLPROPERTIES (
  'bucketing_version'='2',
  'transient_lastDdlTime'='1683028770')
Time taken: 0.121 seconds, Fetched: 17 row(s)
hive (db_lzy)>

可以看到都有TBLPROPERTIES表属性的信息,这个信息在写DDL时并没有指定,显然是自动生成的。

在CDP建表有时候会默认添加一个表属性:

'discover.partitions'='true'

这个表属性用于表分区的自动发现。接下来就从这个属性入手,分析Hive的TBLPROPERTIES。

走JDBC增加表属性

先写个简单的JDBC:

package com.zhiyong;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

/**
 * @program: zhiyong_study
 * @description: 测试Hive的TblProperties
 * @author: zhiyong
 * @create: 2023-05-02 19:31
 **/
public class TblpropertiesTest1 {
    public static void main(String[] args) throws Exception{
        final String HIVE_JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
        final String HIVE_HOST = "192.168.88.101";
        final String HIVE_PORT = "10000";
        final String HIVE_USER = "root";
        final String HIVE_PASSWORD = "123456";

        String HIVE_URL = "jdbc:hive2://" + HIVE_HOST + ":" + HIVE_PORT + "/default";

        Connection conn = null;
        Statement stmt = null;
        ResultSet resultSet = null;

        String sql_Alter_TBLPROPERTIES = "alter table db_lzy.test_external_par_20230502 set TBLPROPERTIES('zhiyong_key1' = 'zhiyong_value1')";


        Class.forName(HIVE_JDBC_DRIVER);
        conn = DriverManager.getConnection(HIVE_URL, HIVE_USER, HIVE_PASSWORD);
        stmt = conn.createStatement();

        System.out.println("修改表属性:");
        stmt.execute(sql_Alter_TBLPROPERTIES);

        }
    }
}

打断点调试来查看调用的堆栈,可以发现从HiveConnectionTCLIServiceTBinaryProtocolTSocket,调用过程和之前的:

https://lizhiyong.blog.csdn.net/article/details/129742904

分析的基本一致,底层就是走了Thrift协议。那么从语言无关的ssql-Client这端显然是看不出啥端倪了。更多有用的源码信息是在server端,暂时先不看。

此时:

hive (db_lzy)> show create table test_external_par_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_par_20230502`(
  `id` int,
  `comment1` string,
  `comment2` string)
PARTITIONED BY (
  `dt` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_par_20230502'
TBLPROPERTIES (
  'bucketing_version'='2',
  'last_modified_by'='hadoop',
  'last_modified_time'='1683030293',
  'transient_lastDdlTime'='1683030293',
  'zhiyong_key1'='zhiyong_value1')
Time taken: 0.155 seconds, Fetched: 20 row(s)
hive (db_lzy)>

可以看到已经将配置项写入了表属性。

show databases ;
use db_hive_metastore;
show tables ;
select * from TBLS;

显示:

在这里插入图片描述

从表名可以找到刚才修改了表属性的就是这个TBL_ID=17的表。

select * from TABLE_PARAMS
where TBL_ID=17;

显示:

在这里插入图片描述

显然set的表属性就记录在MetaStore元数据库【笔者用的Apache3.1.2是存到了MySQL】的TABLE_PARAMS表中。

可以看到这个表还记录了表的外部属性。所以SQL Boy们删除表数据时有时候会先执行:

alter table 库名.表名 set TBLPROPERTIES ('EXTERNAL'='FALSE');

比较奇怪的是在元数据表中没找到:

alter table 库名.表名 set TBLPROPERTIES  ("external.table.purge"="true");

这个表属性对应的配置项。

但是在HiveStrictManagedMigration.java

boolean migrateToExternalTable(Table tableObj, TableType tableType) throws HiveException {
  String msg;
  switch (tableType) {
  case MANAGED_TABLE:
    if (AcidUtils.isTransactionalTable(tableObj)) {
      msg = createExternalConversionExcuse(tableObj,
          "Table is a transactional table");
      LOG.debug(msg);
      return false;
    }
    LOG.info("Converting {} to external table ...", getQualifiedName(tableObj));
    if (!runOptions.dryRun) {
      String command = String.format(
          "ALTER TABLE %s SET TBLPROPERTIES ('EXTERNAL'='TRUE', 'external.table.purge'='true')",
          getQualifiedName(tableObj));
      runHiveCommand(command);
    }
    return true;
  case EXTERNAL_TABLE:
    msg = createExternalConversionExcuse(tableObj,
        "Table is already an external table");
    LOG.debug(msg);
    break;
  default: // VIEW/MATERIALIZED_VIEW
    msg = createExternalConversionExcuse(tableObj,
        "Table type " + tableType + " cannot be converted");
    LOG.debug(msg);
    break;
  }
  return false;
}

可以看到和这个配置项相关的内容。说明在Hive3.1.2中该配置项有效。并且可以看出通过purge这个属性,使得外部表可以挂载文件。。。

手动新增一条:

在这里插入图片描述

此时:

hive (db_lzy)> show create table test_external_par_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_par_20230502`(
  `id` int,
  `comment1` string,
  `comment2` string)
PARTITIONED BY (
  `dt` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_par_20230502'
TBLPROPERTIES (
  'bucketing_version'='2',
  'last_modified_by'='hadoop',
  'last_modified_time'='1683030293',
  'transient_lastDdlTime'='1683030293',
  'zhiyong_key1'='zhiyong_value1',
  'zhiyong_key2'='zhiyong_value2')
Time taken: 0.098 seconds, Fetched: 21 row(s)
hive (db_lzy)>

重新拿到的建表DDL已经出现了这个配置。。。

至此就可以明白为神马Spark使用Overwrite模式写入时会重建表,并且增加一大坨的spark.sql相关的配置。。。也能够明白为神马IceBerg和Hudi会选用Hive的MetaStore做元数据存储。。。也能够明白为神马Flink会使用Hive的MetaStore做Catalog。。。

就地取材,拿这个表当kv键值对存储,就可以保存表的多种属性,存取都很方便。并且可以直接复用Hive的MetaStore,解析别的表的元数据也很方便。走Thrift来进行rpc远程调用和、走JDBC读源数据表,都可以无缝衔接Hive生态圈的多种组件。这可能也是云原生时代Hadoop和Yarn被OSS和K8S暴打,但是Hive还依旧霸气的原因。。。存储、运算、资源调度都可以用别的方式取代,但是文件映射表的MetaStore一时半会儿还没有太好的替代品。

在Github查找

笔者在CDP7.1.5建表时就见识过:

'discover.partitions'='true'

这个属性了。。。所以一定不是无中生有的。。。既然Apache3.1.2中找不到,那就到Github查看是否别的版本有它。。。

PartitionManagementTask类

在这里插入图片描述

在:https://github.com/apache/hive/blob/master/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java

可以看到:

package org.apache.hadoop.hive.metastore;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.TimeValidator;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
 * Partition management task is primarily responsible for partition retention and discovery based on table properties.
 *
 * Partition Retention - If "partition.retention.period" table property is set with retention interval, when this
 * metastore task runs periodically, it will drop partitions with age (creation time) greater than retention period.
 * Dropping partitions after retention period will also delete the data in that partition.
 *
 * Partition Discovery - If "discover.partitions" table property is set, this metastore task monitors table location
 * for newly added partition directories and create partition objects if it does not exist. Also, if partition object
 * exist and if corresponding directory does not exists under table location then the partition object will be dropped.
 *
 */
public class PartitionManagementTask implements MetastoreTaskThread {
  private static final Logger LOG = LoggerFactory.getLogger(PartitionManagementTask.class);
  public static final String DISCOVER_PARTITIONS_TBLPROPERTY = "discover.partitions";
  public static final String PARTITION_RETENTION_PERIOD_TBLPROPERTY = "partition.retention.period";
  private static final Lock lock = new ReentrantLock();
  // these are just for testing
  private static int completedAttempts;
  private static int skippedAttempts;

  private Configuration conf;

  @Override
  public long runFrequency(TimeUnit unit) {
    return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TASK_FREQUENCY, unit);
  }

  @Override
  public void setConf(Configuration configuration) {
    // we modify conf in setupConf(), so we make a copy
    conf = new Configuration(configuration);
  }

  @Override
  public Configuration getConf() {
    return conf;
  }

  private static boolean partitionDiscoveryEnabled(Map<String, String> params) {
    return params != null && params.containsKey(DISCOVER_PARTITIONS_TBLPROPERTY) &&
            params.get(DISCOVER_PARTITIONS_TBLPROPERTY).equalsIgnoreCase("true");
  }

  @Override
  public void run() {
    if (lock.tryLock()) {
      skippedAttempts = 0;
      String qualifiedTableName = null;
      IMetaStoreClient msc = null;
      try {
        msc = new HiveMetaStoreClient(conf);
        String catalogName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_CATALOG_NAME);
        String dbPattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_DATABASE_PATTERN);
        String tablePattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_PATTERN);
        String tableTypes = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES);
        Set<String> tableTypesSet = new HashSet<>();
        for (String type : tableTypes.split(",")) {
          try {
            tableTypesSet.add(TableType.valueOf(type.trim().toUpperCase()).name());
          } catch (IllegalArgumentException e) {
            // ignore
            LOG.warn("Unknown table type: {}", type);
          }
        }
        // if tableTypes is empty, then a list with single empty string has to specified to scan no tables.
        // specifying empty here is equivalent to disabling the partition discovery altogether as it scans no tables.
        if (tableTypesSet.isEmpty()) {
          LOG.info("Skipping partition management as no table types specified");
          return;
        }

        StringBuilder filterBuilder = new StringBuilder()
            .append(hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS)
            .append("discover__partitions").append(" like \"true\" ");
        boolean external = tableTypesSet.contains(TableType.EXTERNAL_TABLE.name());
        boolean managed = tableTypesSet.contains(TableType.MANAGED_TABLE.name());
        if (!managed && external) {
          // only for external tables
          filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE)
              .append(" = \"").append(TableType.EXTERNAL_TABLE.name()).append("\" ");
        } else if (managed && !external) {
          // only for managed tables
          filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE)
              .append(" = \"").append(TableType.MANAGED_TABLE.name()).append("\" ");
        }
        if (!tablePattern.trim().isEmpty()) {
          filterBuilder.append(" and ")
              .append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_NAME)
              .append(" like \"").append(tablePattern.replaceAll("\\*", ".*")).append("\"");
        }

        List<String> databases = msc.getDatabases(catalogName, dbPattern);
        List<TableName> candidates = new ArrayList<>();
        for (String db : databases) {
          Database database = msc.getDatabase(catalogName, db);
          if (MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)) {
            LOG.debug("Skipping table under database: {}", db);
            continue;
          }
          if (MetaStoreUtils.isDbBeingPlannedFailedOver(database)) {
            LOG.info("Skipping table belongs to database {} being failed over.", db);
            continue;
          }
          List<String> tablesNames = msc.listTableNamesByFilter(catalogName, db,
              filterBuilder.toString(), -1);
          tablesNames.forEach(tablesName -> candidates.add(TableName.fromString(tablesName, catalogName, db)));
        }

        if (candidates.isEmpty()) {
          LOG.info("Got empty table list in catalog: {}, dbPattern: {}", catalogName, dbPattern);
          return;
        }

        // TODO: Msck creates MetastoreClient (MSC) on its own. MSC creation is expensive. Sharing MSC also
        // will not be safe unless synchronized MSC is used. Using synchronized MSC in multi-threaded context also
        // defeats the purpose of thread pooled msck repair.
        int threadPoolSize = MetastoreConf.getIntVar(conf,
          MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TASK_THREAD_POOL_SIZE);
        final ExecutorService executorService = Executors
          .newFixedThreadPool(Math.min(candidates.size(), threadPoolSize),
            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("PartitionDiscoveryTask-%d").build());
        CountDownLatch countDownLatch = new CountDownLatch(candidates.size());
        LOG.info("Found {} candidate tables for partition discovery", candidates.size());
        setupMsckPathInvalidation();
        Configuration msckConf = Msck.getMsckConf(conf);
        for (TableName table : candidates) {
          // this always runs in 'sync' mode where partitions can be added and dropped
          MsckInfo msckInfo = new MsckInfo(table.getCat(), table.getDb(), table.getTable(),
            null, null, true, true, true, -1);
          executorService.submit(new MsckThread(msckInfo, msckConf, qualifiedTableName, countDownLatch));
        }
        countDownLatch.await();
        executorService.shutdownNow();
      } catch (Exception e) {
        LOG.error("Exception while running partition discovery task for table: " + qualifiedTableName, e);
      } finally {
        if (msc != null) {
          msc.close();
        }
        lock.unlock();
      }
      completedAttempts++;
    } else {
      skippedAttempts++;
      LOG.info("Lock is held by some other partition discovery task. Skipping this attempt..#{}", skippedAttempts);
    }
  }

  public static long getRetentionPeriodInSeconds(final Table table) {
    String retentionPeriod;
    long retentionSeconds = -1;
    if (table.getParameters() != null && table.getParameters().containsKey(PARTITION_RETENTION_PERIOD_TBLPROPERTY)) {
      retentionPeriod = table.getParameters().get(PARTITION_RETENTION_PERIOD_TBLPROPERTY);
      if (retentionPeriod.isEmpty()) {
        LOG.warn("'{}' table property is defined but empty. Skipping retention period..",
          PARTITION_RETENTION_PERIOD_TBLPROPERTY);
      } else {
        try {
          TimeValidator timeValidator = new TimeValidator(TimeUnit.SECONDS);
          timeValidator.validate(retentionPeriod);
          retentionSeconds = MetastoreConf.convertTimeStr(retentionPeriod, TimeUnit.SECONDS, TimeUnit.SECONDS);
        } catch (IllegalArgumentException e) {
          LOG.warn("'{}' retentionPeriod value is invalid. Skipping retention period..", retentionPeriod);
          // will return -1
        }
      }
    }
    return retentionSeconds;
  }

  private void setupMsckPathInvalidation() {
    // if invalid partition directory appears, we just skip and move on. We don't want partition management to throw
    // when invalid path is encountered as these are background threads. We just want to skip and move on. Users will
    // have to fix the invalid paths via external means.
    conf.set(MetastoreConf.ConfVars.MSCK_PATH_VALIDATION.getVarname(), "skip");
  }

  private static class MsckThread implements Runnable {
    private MsckInfo msckInfo;
    private Configuration conf;
    private String qualifiedTableName;
    private CountDownLatch countDownLatch;

    MsckThread(MsckInfo msckInfo, Configuration conf, String qualifiedTableName, CountDownLatch countDownLatch) {
      this.msckInfo = msckInfo;
      this.conf = conf;
      this.qualifiedTableName = qualifiedTableName;
      this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
      try {
        Msck msck = new Msck( true, true);
        msck.init(conf);
        msck.repair(msckInfo);
      } catch (Exception e) {
        LOG.error("Exception while running partition discovery task for table: " + qualifiedTableName, e);
      } finally {
        // there is no recovery from exception, so we always count down and retry in next attempt
        countDownLatch.countDown();
      }
    }
  }

  @VisibleForTesting
  public static int getSkippedAttempts() {
    return skippedAttempts;
  }

  @VisibleForTesting
  public static int getCompletedAttempts() {
    return completedAttempts;
  }
}

功夫不负有心人,终于在这个类找到了这个表属性。这个任务显然就是守护进程,定时刷新Hive表的MetaStore,从而自动删除多余的分区【对应HDFS只有路径没有文件,但是存在于MetaStore的分区】并且添加缺失的分区【对应HDFS只有路径和文件,但是不存在于MetaStore的分区】。

在获取到同步锁后,就会跑MSCK命令。

runFrequency方法中获取到的刷新频率,显然就是线程池的吊起频率。。。

package org.apache.hadoop.hive.metastore;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;

import java.util.concurrent.TimeUnit;

/**
 * Any task that will run as a separate thread in the metastore should implement this
 * interface.
 */
public interface MetastoreTaskThread extends Configurable, Runnable {

  /**
   * Get the frequency at which the thread should be scheduled in the thread pool.  You must call
   * {@link #setConf(Configuration)} before calling this method.
   * @param unit TimeUnit to express the frequency in.
   * @return frequency
   */
  long runFrequency(TimeUnit unit);
}

可惜在3.1.2的Hive中还没有这个类:

在这里插入图片描述

那么配置类中也就找不到对应的配置项:

在这里插入图片描述

还是要去Github找新一点的版本。。。

MetastoreConf类

在:https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java

可以看到:

在这里插入图片描述

// Partition management task params
    PARTITION_MANAGEMENT_TASK_FREQUENCY("metastore.partition.management.task.frequency",
      "metastore.partition.management.task.frequency",
      6, TimeUnit.HOURS, "Frequency at which timer task runs to do automatic partition management for tables\n" +
      "with table property 'discover.partitions'='true'. Partition management include 2 pieces. One is partition\n" +
      "discovery and other is partition retention period. When 'discover.partitions'='true' is set, partition\n" +
      "management will look for partitions in table location and add partitions objects for it in metastore.\n" +
      "Similarly if partition object exists in metastore and partition location does not exist, partition object\n" +
      "will be dropped. The second piece in partition management is retention period. When 'discover.partition'\n" +
      "is set to true and if 'partition.retention.period' table property is defined, partitions that are older\n" +
      "than the specified retention period will be automatically dropped from metastore along with the data."),

显然这个配置项也不是出现在Hive3.1.2,而是更新的版本!!!而且,默认值是6小时???

HiveMetaStore

HiveMetaStore中:

private static void startRemoteOnlyTasks(Configuration conf) throws Exception {
    if (MetastoreConf.getBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON)) {
        ThreadPool.initialize(conf);
        Collection<String> taskNames = MetastoreConf.getStringCollection(conf, ConfVars.TASK_THREADS_REMOTE_ONLY);
        Iterator var2 = taskNames.iterator();

        while(var2.hasNext()) {
            String taskName = (String)var2.next();
            MetastoreTaskThread task = (MetastoreTaskThread)JavaUtils.newInstance(JavaUtils.getClass(taskName, MetastoreTaskThread.class));
            task.setConf(conf);
            long freq = task.runFrequency(TimeUnit.MILLISECONDS);
            ThreadPool.getPool().scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS);
        }

    }
}

可以看到会按照频率来使用线程池调度这些任务。单位是毫秒。。。

COMPACTOR_INITIATOR_ON("metastore.compactor.initiator.on", "hive.compactor.initiator.on", false, "Whether to run the initiator and cleaner threads on this metastore instance or not.\nSet this to true on one instance of the Thrift metastore service as part of turning\non Hive transactions. For a complete list of parameters required for turning on\ntransactions, see hive.txn.manager."),

又是和:

/**
 * An interface that allows Hive to manage transactions.  All classes
 * implementing this should extend {@link HiveTxnManagerImpl} rather than
 * implementing this directly.
 */
public interface HiveTxnManager {
}

有关的。显然这是保证Hive事务的接口。。。

至此,可知Hive会在MetastoreConf类写一些编码级的配置【堆内存中,可以在session会话中set部分值】,并且在元数据表TABLE_PARAMS固化一些kv键值对配置。这样,Hive的MetaStore守护进程就可以从这2种方式存储的kv键值根据key拿到value,进而执行事务的操作。

CDP7.1.5【或者更早的发行版】就支持的配置项,对应Apache这边需要alpha4.0才支持。。。白piao版总是要比缴纳了保护费的企业版差劲一些,不管是稳定性还是功能性。。。Kylin4.0企业版的功能在开源的5.0姗姗来迟。。。也是这道理。。。甜点要先服务付费客户,卖剩下的给白piao的客户免费品尝。。。不过能白piao就不错了,还要啥自行车。。。

自动分区发现的使用

Apache版本

按照Apache Hive的官方文档:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-DiscoverPartitions

在这里插入图片描述

这是Hive4.0的特性。

Discover Partitions

Table property “discover.partitions” can now be specified to control automatic discovery and synchronization of partition metadata in Hive Metastore.

When Hive Metastore Service (HMS) is started in remote service mode, a background thread (PartitionManagementTask) gets scheduled periodically every 300s (configurable via metastore.partition.management.task.frequency config) that looks for tables with “discover.partitions” table property set to true and performs MSCK REPAIR in sync mode. If the table is a transactional table, then Exclusive Lock is obtained for that table before performing MSCK REPAIR. With this table property, “MSCK REPAIR TABLE table_name SYNC PARTITIONS” is no longer required to be run manually.

Version information

As of Hive 4.0.0 (HIVE-20707).

对应这个issue:

在这里插入图片描述

https://issues.apache.org/jira/browse/HIVE-20707。

Partition Retention

Table property “partition.retention.period” can now be specified for partitioned tables with a retention interval. When a retention interval is specified, the background thread running in HMS (refer Discover Partitions section), will check the age (creation time) of the partition and if the partition’s age is older than the retention period, it will be dropped. Dropping partitions after retention period will also delete the data in that partition. For example, if an external partitioned table with ‘date’ partition is created with table properties “discover.partitions”=“true” and “partition.retention.period”=“7d” then only the partitions created in last 7 days are retained.

Version information

As of Hive 4.0.0 (HIVE-20707).

这个特性是分区保留。。。

CDP版本

也就只能在CDP去使用这个特性了。。。Alpha的Apache Hive4.0估计一时半会儿也没什么人去用。

在CDP中使用Hive时,参照官方的说法:https://docs.cloudera.com/runtime/7.0.3/using-hiveql/topics/hive-automate-msck.html

Automated partition discovery and repair is useful for processing log data, and other data, in Spark and Hive catalogs. You learn how to set the partition discovery parameter to suit your use case. An aggressive partition discovery and repair configuration can delay the upgrade process.

Apache Hive can automatically and periodically discover discrepancies in partition metadata in the Hive metastore and in corresponding directories, or objects, on the file system. After discovering discrepancies, Hive performs synchronization.

The discover.partitions table property enables and disables synchronization of the file system with partitions. In external partitioned tables, this property is enabled (true) by default when you create the table. To a legacy external table (created using a version of Hive that does not support this feature), you need to add discover.partitions to the table properties to enable partition discovery.

By default, the discovery and synchronization of partitions occurs every 5 minutes. This is too often if you are upgrading and can result in the Hive DB being queried every few milliseconds, leading to performance degradation. During upgrading the high frequency of batch routines dictates running discovery and synchronization infrequently, perhaps hourly or even daily. You can configure the frequency as shown in this task.

  1. Assuming you have an external table created using a version of Hive that does not support partition discovery, enable partition discovery for the table.

    ALTER TABLE exttbl SET TBLPROPERTIES ('discover.partitions' = 'true');
    
  2. In Cloudera Manager, click Clusters > Hive > Configuration, search for Hive Server Advanced Configuration Snippet (Safety Valve) for hive-site.xml.

  3. Add the following property and value to hive-site.xml: Property: metastore.partition.management.task.frequency Value: 600.

    This action sets synchronization of partitions to occur every 10 minutes expressed in seconds. If you are upgrading, consider running discovery and synchonization once every 24 hours by setting the value to 86,400 seconds.

也就是说,CDP的Hive默认外部的分区表是开启了分区自动发现特性,并且默认是5分钟吊起一次。这里设置时单位是秒。至少CDP7.0.3就已经可用了。。。

使用

如果不符合期望,可以手动修改:

ALTER TABLE 表名 SET TBLPROPERTIES ('discover.partitions'='true');
ALTER TABLE 表名 SET TBLPROPERTIES ('metastore.partition.management.task.frequency' = 600);

这。。。是写到元数据表做持久化的,和Apache的堆内存对象又不同了。。。

不过按照:

/**
 * Get the variable as a long indicating a period of time
 * @param conf configuration to retrieve it from
 * @param var variable to retrieve
 * @param outUnit Timeout to return value in
 * @return value, or default value if value not in config file
 */
public static long getTimeVar(Configuration conf, ConfVars var, TimeUnit outUnit) {
  assert var.defaultVal.getClass() == TimeValue.class;
  String val = conf.get(var.varname);

  if (val == null) {
    // Look for it under the old Hive name
    val = conf.get(var.hiveName);
  }

  if (val != null) {
    return convertTimeStr(val, ((TimeValue)var.defaultVal).unit, outUnit);
  } else {
    return outUnit.convert(((TimeValue)var.defaultVal).val, ((TimeValue)var.defaultVal).unit);
  }
}

如果堆内存没有,就去配置中拿,也没啥毛病。。。

别的问题

自动发现是5分钟,也就是近实时。如果一个任务跑完,下一个任务立即被吊起,这5分钟【甚至更久】里Hive On Tez任务是读不到新分区的!!!Spark或者Flink直接读Parquet文件是另一回事。。。

所以:https://lizhiyong.blog.csdn.net/article/details/127680034

笔者之前做平台开发时,还需要+一个msck或者alter table add partition,只有这条命令也刷完了,才能保证最终一致性!!!

别的问题

自动发现是5分钟,也就是近实时。如果一个任务跑完,下一个任务被吊起,这5分钟【甚至更久】里Hive On Tez任务是读不到新分区的!!!Spark或者Flink直接读Parquet文件是另一回事。。。

所以:https://lizhiyong.blog.csdn.net/article/details/127680034

笔者之前做平台开发时,还需要+一个msck或者alter table add partition,只有这条命令也刷完了,才能保证最终一致性!!!

总结

通过一顿操作猛如虎。。。笔者找到了Hive的MetaStore存储的元数据,并且分析出了和表配置相关的运行机理。恰巧,也意识到了白piao版在功能上的滞后性。。。还是有所收获的。。。

转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/130468723

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/21208.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

高精度示波器keysight是德DSOS054、MSOS054销售回收

安捷伦Keysight DSOS054A MSOS054 500MHZ高清晰度示波器 特征&#xff1a; 带宽&#xff1a;500 MHz&#xff0c;具有平坦的频率响应&#xff0c;可实现高信号保真度 频道&#xff1a;4 最大存储深度&#xff1a;800 Mpts&#xff08;2 通道&#xff09;&#xff0c;400 Mpt…

2023五一杯数学建模竞赛ABC题思路解析+代码+论文

AB题见文末&#xff0c;下面是C C题&#xff1a;“双碳”目标下低碳建筑研究 “双碳”即碳达峰与碳中和的简称&#xff0c;我国力争2030年前实现碳达峰&#xff0c;2060年前实现碳中和。“双碳”战略倡导绿色、环保、低碳的生活方式。我国加快降低碳排放步伐&#xff0c;大力推…

莫言用 GPT 写颁奖辞,那如果他自己写会是什么效果呢?

在《收获》杂志 65 周年庆典上&#xff0c;莫言在为余华颁奖时表示&#xff0c;余华是自己的好朋友&#xff0c;但给他的颁奖词写了好几天也想不出来&#xff0c;后来找了 ChatGPT 帮忙写。最后&#xff0c;莫言让 ChatGPT 写了一篇莎士比亚风格 1000 多字的颁奖词&#xff0c;…

2023-5-17-CPU架构学习(amd、ard等)

&#x1f37f;*★,*:.☆(&#xffe3;▽&#xffe3;)/$:*.★* &#x1f37f; &#x1f4a5;&#x1f4a5;&#x1f4a5;欢迎来到&#x1f91e;汤姆&#x1f91e;的csdn博文&#x1f4a5;&#x1f4a5;&#x1f4a5; &#x1f49f;&#x1f49f;喜欢的朋友可以关注一下&#xf…

微服务的使用场景和架构设计方案

目录 【单体架构】 【微服务解决哪些问题】 微服务的拆分原则 微服务使用过程中有哪些坑&#xff1f; 【RPC框架】 常见的网络 IO 模型 RPC 执行过程总结 【CAP原理】 如何使用 CAP 理论 【服务注册和发现】 【配置中心】 【Consul】 Consul介绍 Consul角色 Con…

高效易懂,打造维护性好的Web自动化测试框架PO模式精讲

目录 前言&#xff1a; 一、 PO概述 二、PO何实现于Web框架素自动化测试重中之重。 1.为了保证易维护性和易读性&#xff0c;我们可以在项目中定义一个统一的库&#xff0c;用来存放所有的定位器类。 2.定义一个基础的类&#xff0c;该类用于针对PO的元素定位进行封装。 3…

Vivado综合属性系列之四 ROM_STYLE

目录 一、前言 二、ROM_STYLE 一、前言 ROM英文全称为Read Only Memory&#xff0c;只读存储器&#xff0c;里面主要存放固定的数据。 二、ROM_STYLE ROM的使用方式与RAM类似&#xff0c;格式样例为&#xff1a;(* rom_style"{distributed | block}" *)&#xff0…

Java经典笔试题—day09

Java经典笔试题—day09 &#x1f50e;选择题&#x1f50e;编程题&#x1f95d;另类加法&#x1f95d;走方格的方案数 &#x1f50e;结尾 &#x1f50e;选择题 (1)下面程序的输出是 ( ) String x“fmn”; x.toUpperCase(); String yx.replace(‘f’,‘F’); yy“wxy”; System…

Vue3-黑马(十)

目录&#xff1a; &#xff08;1&#xff09;vue3-antdv-全局提示与校验 &#xff08;2&#xff09;vue3-进阶-router-入门 &#xff08;3&#xff09;vue3-进阶-router-动态导入-嵌套路由-重定向 &#xff08;1&#xff09;vue3-antdv-全局提示与校验 当用户新增修改&…

​GPT充当大脑,指挥多个模型协作完成各类任务,通用系统AutoML-GPT来了

使用 ChatGPT 实现通用人工智能&#xff0c;思路打开了。 当前&#xff0c;AI 模型虽然已经涉及非常广泛的应用领域&#xff0c;但大部分 AI 模型是为特定任务而设计的&#xff0c;它们往往需要大量的人力来完成正确的模型架构、优化算法和超参数。ChatGPT、GPT-4 爆火之后&…

Unity用AI制作天空盒,并使用,详细图文教程

Unity用AI制作天空盒&#xff0c;并使用&#xff0c;详细图文教程 效果AI制作使用总结版权声明 效果 先上我自己做的效果 AI制作 首先登录AI制作的网站&#xff0c;打开就可以用&#xff0c;不需要登录 这是网址&#xff1a;https://skybox.blockadelabs.com/ 1.创建新的 2…

Jmeter的提取值存入csv或excel中,BeanShell PostProcessor后置处理器可满足

实际用户场景&#xff1a;用户登录小游戏&#xff0c;可进行各种操作&#xff0c;例如查看排行榜&#xff0c;玩游戏&#xff0c;进行留言等&#xff0c;现在需要对三个接口查看排行榜/玩游戏/留言进行压测&#xff0c;有两种压测方案&#xff1a; 方案一&#xff1a;将登录接…

2023年Android开发者路线-第1部分

2023年Android开发者路线-第1部分 2023年Android开发者路线-第2部分 2023年Android开发者路线-第3部分 2023年Android开发者路线-第4部分 2023年Android开发者路线-第1部分 Android 生态系统处于不断发展的状态&#xff1a;每天都会引入新的库和资料&#xff0c;旨在加快开…

DI依赖注入(setter注入、构造器注入、自动装配、集合注入)

文章目录 1 setter注入1.1 环境准备1.2 注入引用数据类型步骤1:声明属性并提供setter方法步骤2:配置文件中进行注入配置步骤3:运行程序 1.3 注入简单数据类型步骤1:声明属性并提供setter方法步骤2:配置文件中进行注入配置步骤3:运行程序 2 构造器注入2.1 环境准备2.2 构造器注入…

【Linux】信号的处理

信号篇终章 文章目录 前言一、信号的处理 1.可重入函数 2.volatile关键字 3.SIGCHLD信号总结 前言 在前两篇linux文章中我们详细的讲解了信号的产生和信号的保存&#xff0c;今天来到最后一个重点信号的处理&#xff0c;对于信号的处理我们会重新引入进程…

19c rac环境修改pubic, vip,scan ip步骤

19c rac环境第一次修改public、vip和scan ip&#xff0c;和11g还是稍有不同。首先说明下环境 具体步骤如下 1、修改public地址&#xff0c;关闭实例后使用root用户操作 [rootdb1 ~]# ./oifcfg getif bond0 172.20.30.0 global public bond2 100.100.100.0 global clust…

Vivado 下 IP核之 PLL实验

目录 实验任务&#xff1a;Vivado 下 IP核之 PLL实验 1、实验简介 2、实验环境 3、实验原理 3.1、PLL IP核简介 3.2、MMCM 和 PLL 各自的含义以及两者的区别 3.3、PLL 分频 和 倍频 的工作原理 3.4、实验任务 4、建立工程 4.1、PLL IP 核配置 4.2、模块设计 4.…

计算卸载-论文05-双层优化(无线充电与卸载)

标题&#xff1a;《A Divide-and-Conquer Bilevel Optimization Algorithm for Jointly Pricing Computing Resources and Energy in Wireless Powered MEC》 期刊&#xff1a;IEEE TRANSACTIONS ON CYBERNETICS&#xff0c;2022 一、理论梳理 问题&#xff1a;相比于移动云…

Espresso Sequencer:针对Rollup生态的Decentralized Shared Sequencing Layer

1. 引言 前序博客&#xff1a; Rollup去中心化Rollup DecentralizationAztec 征集 Rollup Sequencer去中心化提案Espresso Sequencer&#xff1a;去中心化RollupsRadius&#xff1a;针对Rollup生态的Trustless Shared Sequencing层 当前的L2 Rollup方案在扩容的同时&#xf…

​AI + 非遗文化传播,人工智能师资培训重磅招募

大语言模型热度空前&#xff0c;诸如文心一言、 ChatGPT 等已经能够与人对话互动、回答问题、协助创作&#xff0c;逐渐应用于人们的工作和生活&#xff0c;也引发了社会热议。为推动大模型及人工智能相关专业人员的培养&#xff0c;同时将人工智能技术融入非遗文化传播&#x…