从’discover.partitions’='true’分析Hive的TBLPROPERTIES
前言
Hive3.1.2先建表:
show databases ;
use db_lzy;
show tables ;
create external table if not exists test_external_20230502(
id int,
comment1 string,
comment2 string
)
stored as parquet
;
create external table if not exists test_external_par_20230502(
id int,
comment1 string,
comment2 string
)
partitioned by (
dt string
)
stored as parquet
;
然后查看建表语句:
show create table test_external_20230502;
show create table test_external_par_20230502;
可以看到:
hive (db_lzy)> show create table test_external_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_20230502`(
`id` int,
`comment1` string,
`comment2` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_20230502'
TBLPROPERTIES (
'bucketing_version'='2',
'transient_lastDdlTime'='1683028671')
Time taken: 0.181 seconds, Fetched: 15 row(s)
hive (db_lzy)>
> show create table test_external_par_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_par_20230502`(
`id` int,
`comment1` string,
`comment2` string)
PARTITIONED BY (
`dt` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_par_20230502'
TBLPROPERTIES (
'bucketing_version'='2',
'transient_lastDdlTime'='1683028770')
Time taken: 0.121 seconds, Fetched: 17 row(s)
hive (db_lzy)>
可以看到都有TBLPROPERTIES
表属性的信息,这个信息在写DDL时并没有指定,显然是自动生成的。
在CDP建表有时候会默认添加一个表属性:
'discover.partitions'='true'
这个表属性用于表分区的自动发现。接下来就从这个属性入手,分析Hive的TBLPROPERTIES。
走JDBC增加表属性
先写个简单的JDBC:
package com.zhiyong;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
/**
* @program: zhiyong_study
* @description: 测试Hive的TblProperties
* @author: zhiyong
* @create: 2023-05-02 19:31
**/
public class TblpropertiesTest1 {
public static void main(String[] args) throws Exception{
final String HIVE_JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
final String HIVE_HOST = "192.168.88.101";
final String HIVE_PORT = "10000";
final String HIVE_USER = "root";
final String HIVE_PASSWORD = "123456";
String HIVE_URL = "jdbc:hive2://" + HIVE_HOST + ":" + HIVE_PORT + "/default";
Connection conn = null;
Statement stmt = null;
ResultSet resultSet = null;
String sql_Alter_TBLPROPERTIES = "alter table db_lzy.test_external_par_20230502 set TBLPROPERTIES('zhiyong_key1' = 'zhiyong_value1')";
Class.forName(HIVE_JDBC_DRIVER);
conn = DriverManager.getConnection(HIVE_URL, HIVE_USER, HIVE_PASSWORD);
stmt = conn.createStatement();
System.out.println("修改表属性:");
stmt.execute(sql_Alter_TBLPROPERTIES);
}
}
}
打断点调试来查看调用的堆栈,可以发现从HiveConnection
到TCLIService
、TBinaryProtocol
、TSocket
,调用过程和之前的:
https://lizhiyong.blog.csdn.net/article/details/129742904
分析的基本一致,底层就是走了Thrift协议。那么从语言无关的ssql-Client这端显然是看不出啥端倪了。更多有用的源码信息是在server端,暂时先不看。
此时:
hive (db_lzy)> show create table test_external_par_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_par_20230502`(
`id` int,
`comment1` string,
`comment2` string)
PARTITIONED BY (
`dt` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_par_20230502'
TBLPROPERTIES (
'bucketing_version'='2',
'last_modified_by'='hadoop',
'last_modified_time'='1683030293',
'transient_lastDdlTime'='1683030293',
'zhiyong_key1'='zhiyong_value1')
Time taken: 0.155 seconds, Fetched: 20 row(s)
hive (db_lzy)>
可以看到已经将配置项写入了表属性。
show databases ;
use db_hive_metastore;
show tables ;
select * from TBLS;
显示:
从表名可以找到刚才修改了表属性的就是这个TBL_ID=17
的表。
select * from TABLE_PARAMS
where TBL_ID=17;
显示:
显然set的表属性就记录在MetaStore元数据库【笔者用的Apache3.1.2是存到了MySQL】的TABLE_PARAMS
表中。
可以看到这个表还记录了表的外部属性。所以SQL Boy们删除表数据时有时候会先执行:
alter table 库名.表名 set TBLPROPERTIES ('EXTERNAL'='FALSE');
比较奇怪的是在元数据表中没找到:
alter table 库名.表名 set TBLPROPERTIES ("external.table.purge"="true");
这个表属性对应的配置项。
但是在HiveStrictManagedMigration.java
:
boolean migrateToExternalTable(Table tableObj, TableType tableType) throws HiveException {
String msg;
switch (tableType) {
case MANAGED_TABLE:
if (AcidUtils.isTransactionalTable(tableObj)) {
msg = createExternalConversionExcuse(tableObj,
"Table is a transactional table");
LOG.debug(msg);
return false;
}
LOG.info("Converting {} to external table ...", getQualifiedName(tableObj));
if (!runOptions.dryRun) {
String command = String.format(
"ALTER TABLE %s SET TBLPROPERTIES ('EXTERNAL'='TRUE', 'external.table.purge'='true')",
getQualifiedName(tableObj));
runHiveCommand(command);
}
return true;
case EXTERNAL_TABLE:
msg = createExternalConversionExcuse(tableObj,
"Table is already an external table");
LOG.debug(msg);
break;
default: // VIEW/MATERIALIZED_VIEW
msg = createExternalConversionExcuse(tableObj,
"Table type " + tableType + " cannot be converted");
LOG.debug(msg);
break;
}
return false;
}
可以看到和这个配置项相关的内容。说明在Hive3.1.2中该配置项有效。并且可以看出通过purge这个属性,使得外部表可以挂载文件。。。
手动新增一条:
此时:
hive (db_lzy)> show create table test_external_par_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_par_20230502`(
`id` int,
`comment1` string,
`comment2` string)
PARTITIONED BY (
`dt` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_par_20230502'
TBLPROPERTIES (
'bucketing_version'='2',
'last_modified_by'='hadoop',
'last_modified_time'='1683030293',
'transient_lastDdlTime'='1683030293',
'zhiyong_key1'='zhiyong_value1',
'zhiyong_key2'='zhiyong_value2')
Time taken: 0.098 seconds, Fetched: 21 row(s)
hive (db_lzy)>
重新拿到的建表DDL已经出现了这个配置。。。
至此就可以明白为神马Spark使用Overwrite模式写入时会重建表,并且增加一大坨的spark.sql
相关的配置。。。也能够明白为神马IceBerg和Hudi会选用Hive的MetaStore做元数据存储。。。也能够明白为神马Flink会使用Hive的MetaStore做Catalog。。。
就地取材,拿这个表当kv键值对存储,就可以保存表的多种属性,存取都很方便。并且可以直接复用Hive的MetaStore,解析别的表的元数据也很方便。走Thrift来进行rpc远程调用和、走JDBC读源数据表,都可以无缝衔接Hive生态圈的多种组件。这可能也是云原生时代Hadoop和Yarn被OSS和K8S暴打,但是Hive还依旧霸气的原因。。。存储、运算、资源调度都可以用别的方式取代,但是文件映射表的MetaStore一时半会儿还没有太好的替代品。
在Github查找
笔者在CDP7.1.5建表时就见识过:
'discover.partitions'='true'
这个属性了。。。所以一定不是无中生有的。。。既然Apache3.1.2中找不到,那就到Github查看是否别的版本有它。。。
PartitionManagementTask类
在:https://github.com/apache/hive/blob/master/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
可以看到:
package org.apache.hadoop.hive.metastore;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.TimeValidator;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Partition management task is primarily responsible for partition retention and discovery based on table properties.
*
* Partition Retention - If "partition.retention.period" table property is set with retention interval, when this
* metastore task runs periodically, it will drop partitions with age (creation time) greater than retention period.
* Dropping partitions after retention period will also delete the data in that partition.
*
* Partition Discovery - If "discover.partitions" table property is set, this metastore task monitors table location
* for newly added partition directories and create partition objects if it does not exist. Also, if partition object
* exist and if corresponding directory does not exists under table location then the partition object will be dropped.
*
*/
public class PartitionManagementTask implements MetastoreTaskThread {
private static final Logger LOG = LoggerFactory.getLogger(PartitionManagementTask.class);
public static final String DISCOVER_PARTITIONS_TBLPROPERTY = "discover.partitions";
public static final String PARTITION_RETENTION_PERIOD_TBLPROPERTY = "partition.retention.period";
private static final Lock lock = new ReentrantLock();
// these are just for testing
private static int completedAttempts;
private static int skippedAttempts;
private Configuration conf;
@Override
public long runFrequency(TimeUnit unit) {
return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TASK_FREQUENCY, unit);
}
@Override
public void setConf(Configuration configuration) {
// we modify conf in setupConf(), so we make a copy
conf = new Configuration(configuration);
}
@Override
public Configuration getConf() {
return conf;
}
private static boolean partitionDiscoveryEnabled(Map<String, String> params) {
return params != null && params.containsKey(DISCOVER_PARTITIONS_TBLPROPERTY) &&
params.get(DISCOVER_PARTITIONS_TBLPROPERTY).equalsIgnoreCase("true");
}
@Override
public void run() {
if (lock.tryLock()) {
skippedAttempts = 0;
String qualifiedTableName = null;
IMetaStoreClient msc = null;
try {
msc = new HiveMetaStoreClient(conf);
String catalogName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_CATALOG_NAME);
String dbPattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_DATABASE_PATTERN);
String tablePattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_PATTERN);
String tableTypes = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES);
Set<String> tableTypesSet = new HashSet<>();
for (String type : tableTypes.split(",")) {
try {
tableTypesSet.add(TableType.valueOf(type.trim().toUpperCase()).name());
} catch (IllegalArgumentException e) {
// ignore
LOG.warn("Unknown table type: {}", type);
}
}
// if tableTypes is empty, then a list with single empty string has to specified to scan no tables.
// specifying empty here is equivalent to disabling the partition discovery altogether as it scans no tables.
if (tableTypesSet.isEmpty()) {
LOG.info("Skipping partition management as no table types specified");
return;
}
StringBuilder filterBuilder = new StringBuilder()
.append(hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS)
.append("discover__partitions").append(" like \"true\" ");
boolean external = tableTypesSet.contains(TableType.EXTERNAL_TABLE.name());
boolean managed = tableTypesSet.contains(TableType.MANAGED_TABLE.name());
if (!managed && external) {
// only for external tables
filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE)
.append(" = \"").append(TableType.EXTERNAL_TABLE.name()).append("\" ");
} else if (managed && !external) {
// only for managed tables
filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE)
.append(" = \"").append(TableType.MANAGED_TABLE.name()).append("\" ");
}
if (!tablePattern.trim().isEmpty()) {
filterBuilder.append(" and ")
.append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_NAME)
.append(" like \"").append(tablePattern.replaceAll("\\*", ".*")).append("\"");
}
List<String> databases = msc.getDatabases(catalogName, dbPattern);
List<TableName> candidates = new ArrayList<>();
for (String db : databases) {
Database database = msc.getDatabase(catalogName, db);
if (MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)) {
LOG.debug("Skipping table under database: {}", db);
continue;
}
if (MetaStoreUtils.isDbBeingPlannedFailedOver(database)) {
LOG.info("Skipping table belongs to database {} being failed over.", db);
continue;
}
List<String> tablesNames = msc.listTableNamesByFilter(catalogName, db,
filterBuilder.toString(), -1);
tablesNames.forEach(tablesName -> candidates.add(TableName.fromString(tablesName, catalogName, db)));
}
if (candidates.isEmpty()) {
LOG.info("Got empty table list in catalog: {}, dbPattern: {}", catalogName, dbPattern);
return;
}
// TODO: Msck creates MetastoreClient (MSC) on its own. MSC creation is expensive. Sharing MSC also
// will not be safe unless synchronized MSC is used. Using synchronized MSC in multi-threaded context also
// defeats the purpose of thread pooled msck repair.
int threadPoolSize = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TASK_THREAD_POOL_SIZE);
final ExecutorService executorService = Executors
.newFixedThreadPool(Math.min(candidates.size(), threadPoolSize),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("PartitionDiscoveryTask-%d").build());
CountDownLatch countDownLatch = new CountDownLatch(candidates.size());
LOG.info("Found {} candidate tables for partition discovery", candidates.size());
setupMsckPathInvalidation();
Configuration msckConf = Msck.getMsckConf(conf);
for (TableName table : candidates) {
// this always runs in 'sync' mode where partitions can be added and dropped
MsckInfo msckInfo = new MsckInfo(table.getCat(), table.getDb(), table.getTable(),
null, null, true, true, true, -1);
executorService.submit(new MsckThread(msckInfo, msckConf, qualifiedTableName, countDownLatch));
}
countDownLatch.await();
executorService.shutdownNow();
} catch (Exception e) {
LOG.error("Exception while running partition discovery task for table: " + qualifiedTableName, e);
} finally {
if (msc != null) {
msc.close();
}
lock.unlock();
}
completedAttempts++;
} else {
skippedAttempts++;
LOG.info("Lock is held by some other partition discovery task. Skipping this attempt..#{}", skippedAttempts);
}
}
public static long getRetentionPeriodInSeconds(final Table table) {
String retentionPeriod;
long retentionSeconds = -1;
if (table.getParameters() != null && table.getParameters().containsKey(PARTITION_RETENTION_PERIOD_TBLPROPERTY)) {
retentionPeriod = table.getParameters().get(PARTITION_RETENTION_PERIOD_TBLPROPERTY);
if (retentionPeriod.isEmpty()) {
LOG.warn("'{}' table property is defined but empty. Skipping retention period..",
PARTITION_RETENTION_PERIOD_TBLPROPERTY);
} else {
try {
TimeValidator timeValidator = new TimeValidator(TimeUnit.SECONDS);
timeValidator.validate(retentionPeriod);
retentionSeconds = MetastoreConf.convertTimeStr(retentionPeriod, TimeUnit.SECONDS, TimeUnit.SECONDS);
} catch (IllegalArgumentException e) {
LOG.warn("'{}' retentionPeriod value is invalid. Skipping retention period..", retentionPeriod);
// will return -1
}
}
}
return retentionSeconds;
}
private void setupMsckPathInvalidation() {
// if invalid partition directory appears, we just skip and move on. We don't want partition management to throw
// when invalid path is encountered as these are background threads. We just want to skip and move on. Users will
// have to fix the invalid paths via external means.
conf.set(MetastoreConf.ConfVars.MSCK_PATH_VALIDATION.getVarname(), "skip");
}
private static class MsckThread implements Runnable {
private MsckInfo msckInfo;
private Configuration conf;
private String qualifiedTableName;
private CountDownLatch countDownLatch;
MsckThread(MsckInfo msckInfo, Configuration conf, String qualifiedTableName, CountDownLatch countDownLatch) {
this.msckInfo = msckInfo;
this.conf = conf;
this.qualifiedTableName = qualifiedTableName;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
Msck msck = new Msck( true, true);
msck.init(conf);
msck.repair(msckInfo);
} catch (Exception e) {
LOG.error("Exception while running partition discovery task for table: " + qualifiedTableName, e);
} finally {
// there is no recovery from exception, so we always count down and retry in next attempt
countDownLatch.countDown();
}
}
}
@VisibleForTesting
public static int getSkippedAttempts() {
return skippedAttempts;
}
@VisibleForTesting
public static int getCompletedAttempts() {
return completedAttempts;
}
}
功夫不负有心人,终于在这个类找到了这个表属性。这个任务显然就是守护进程,定时刷新Hive表的MetaStore,从而自动删除多余的分区【对应HDFS只有路径没有文件,但是存在于MetaStore的分区】并且添加缺失的分区【对应HDFS只有路径和文件,但是不存在于MetaStore的分区】。
在获取到同步锁后,就会跑MSCK命令。
在runFrequency
方法中获取到的刷新频率,显然就是线程池的吊起频率。。。
package org.apache.hadoop.hive.metastore;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import java.util.concurrent.TimeUnit;
/**
* Any task that will run as a separate thread in the metastore should implement this
* interface.
*/
public interface MetastoreTaskThread extends Configurable, Runnable {
/**
* Get the frequency at which the thread should be scheduled in the thread pool. You must call
* {@link #setConf(Configuration)} before calling this method.
* @param unit TimeUnit to express the frequency in.
* @return frequency
*/
long runFrequency(TimeUnit unit);
}
可惜在3.1.2的Hive中还没有这个类:
那么配置类中也就找不到对应的配置项:
还是要去Github找新一点的版本。。。
MetastoreConf类
在:https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
可以看到:
// Partition management task params
PARTITION_MANAGEMENT_TASK_FREQUENCY("metastore.partition.management.task.frequency",
"metastore.partition.management.task.frequency",
6, TimeUnit.HOURS, "Frequency at which timer task runs to do automatic partition management for tables\n" +
"with table property 'discover.partitions'='true'. Partition management include 2 pieces. One is partition\n" +
"discovery and other is partition retention period. When 'discover.partitions'='true' is set, partition\n" +
"management will look for partitions in table location and add partitions objects for it in metastore.\n" +
"Similarly if partition object exists in metastore and partition location does not exist, partition object\n" +
"will be dropped. The second piece in partition management is retention period. When 'discover.partition'\n" +
"is set to true and if 'partition.retention.period' table property is defined, partitions that are older\n" +
"than the specified retention period will be automatically dropped from metastore along with the data."),
显然这个配置项也不是出现在Hive3.1.2,而是更新的版本!!!而且,默认值是6小时???
HiveMetaStore
在HiveMetaStore
中:
private static void startRemoteOnlyTasks(Configuration conf) throws Exception {
if (MetastoreConf.getBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON)) {
ThreadPool.initialize(conf);
Collection<String> taskNames = MetastoreConf.getStringCollection(conf, ConfVars.TASK_THREADS_REMOTE_ONLY);
Iterator var2 = taskNames.iterator();
while(var2.hasNext()) {
String taskName = (String)var2.next();
MetastoreTaskThread task = (MetastoreTaskThread)JavaUtils.newInstance(JavaUtils.getClass(taskName, MetastoreTaskThread.class));
task.setConf(conf);
long freq = task.runFrequency(TimeUnit.MILLISECONDS);
ThreadPool.getPool().scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS);
}
}
}
可以看到会按照频率来使用线程池调度这些任务。单位是毫秒。。。
而
COMPACTOR_INITIATOR_ON("metastore.compactor.initiator.on", "hive.compactor.initiator.on", false, "Whether to run the initiator and cleaner threads on this metastore instance or not.\nSet this to true on one instance of the Thrift metastore service as part of turning\non Hive transactions. For a complete list of parameters required for turning on\ntransactions, see hive.txn.manager."),
又是和:
/**
* An interface that allows Hive to manage transactions. All classes
* implementing this should extend {@link HiveTxnManagerImpl} rather than
* implementing this directly.
*/
public interface HiveTxnManager {
}
有关的。显然这是保证Hive事务的接口。。。
至此,可知Hive会在MetastoreConf
类写一些编码级的配置【堆内存中,可以在session会话中set部分值】,并且在元数据表TABLE_PARAMS
固化一些kv键值对配置。这样,Hive的MetaStore守护进程就可以从这2种方式存储的kv键值根据key拿到value,进而执行事务的操作。
CDP7.1.5【或者更早的发行版】就支持的配置项,对应Apache这边需要alpha4.0才支持。。。白piao版总是要比缴纳了保护费的企业版差劲一些,不管是稳定性还是功能性。。。Kylin4.0企业版的功能在开源的5.0姗姗来迟。。。也是这道理。。。甜点要先服务付费客户,卖剩下的给白piao的客户免费品尝。。。不过能白piao就不错了,还要啥自行车。。。
自动分区发现的使用
Apache版本
按照Apache Hive的官方文档:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-DiscoverPartitions
这是Hive4.0的特性。
Discover Partitions
Table property “discover.partitions” can now be specified to control automatic discovery and synchronization of partition metadata in Hive Metastore.
When Hive Metastore Service (HMS) is started in remote service mode, a background thread (PartitionManagementTask) gets scheduled periodically every 300s (configurable via metastore.partition.management.task.frequency config) that looks for tables with “discover.partitions” table property set to true and performs MSCK REPAIR in sync mode. If the table is a transactional table, then Exclusive Lock is obtained for that table before performing MSCK REPAIR. With this table property, “MSCK REPAIR TABLE table_name SYNC PARTITIONS” is no longer required to be run manually.
Version information
As of Hive 4.0.0 (HIVE-20707).
对应这个issue:
https://issues.apache.org/jira/browse/HIVE-20707。
Partition Retention
Table property “partition.retention.period” can now be specified for partitioned tables with a retention interval. When a retention interval is specified, the background thread running in HMS (refer Discover Partitions section), will check the age (creation time) of the partition and if the partition’s age is older than the retention period, it will be dropped. Dropping partitions after retention period will also delete the data in that partition. For example, if an external partitioned table with ‘date’ partition is created with table properties “discover.partitions”=“true” and “partition.retention.period”=“7d” then only the partitions created in last 7 days are retained.
Version information
As of Hive 4.0.0 (HIVE-20707).
这个特性是分区保留。。。
CDP版本
也就只能在CDP去使用这个特性了。。。Alpha的Apache Hive4.0估计一时半会儿也没什么人去用。
在CDP中使用Hive时,参照官方的说法:https://docs.cloudera.com/runtime/7.0.3/using-hiveql/topics/hive-automate-msck.html
Automated partition discovery and repair is useful for processing log data, and other data, in Spark and Hive catalogs. You learn how to set the partition discovery parameter to suit your use case. An aggressive partition discovery and repair configuration can delay the upgrade process.
Apache Hive can automatically and periodically discover discrepancies in partition metadata in the Hive metastore and in corresponding directories, or objects, on the file system. After discovering discrepancies, Hive performs synchronization.
The discover.partitions
table property enables and disables synchronization of the file system with partitions. In external partitioned tables, this property is enabled (true
) by default when you create the table. To a legacy external table (created using a version of Hive that does not support this feature), you need to add discover.partitions
to the table properties to enable partition discovery.
By default, the discovery and synchronization of partitions occurs every 5 minutes. This is too often if you are upgrading and can result in the Hive DB being queried every few milliseconds, leading to performance degradation. During upgrading the high frequency of batch routines dictates running discovery and synchronization infrequently, perhaps hourly or even daily. You can configure the frequency as shown in this task.
-
Assuming you have an external table created using a version of Hive that does not support partition discovery, enable partition discovery for the table.
ALTER TABLE exttbl SET TBLPROPERTIES ('discover.partitions' = 'true');
-
In Cloudera Manager, click Clusters > Hive > Configuration, search for
Hive Server Advanced Configuration Snippet (Safety Valve) for hive-site.xml
. -
Add the following property and value to hive-site.xml: Property:
metastore.partition.management.task.frequency
Value: 600.This action sets synchronization of partitions to occur every 10 minutes expressed in seconds. If you are upgrading, consider running discovery and synchonization once every 24 hours by setting the value to 86,400 seconds.
也就是说,CDP的Hive默认外部的分区表是开启了分区自动发现特性,并且默认是5分钟吊起一次。这里设置时单位是秒。至少CDP7.0.3就已经可用了。。。
使用
如果不符合期望,可以手动修改:
ALTER TABLE 表名 SET TBLPROPERTIES ('discover.partitions'='true');
ALTER TABLE 表名 SET TBLPROPERTIES ('metastore.partition.management.task.frequency' = 600);
这。。。是写到元数据表做持久化的,和Apache的堆内存对象又不同了。。。
不过按照:
/**
* Get the variable as a long indicating a period of time
* @param conf configuration to retrieve it from
* @param var variable to retrieve
* @param outUnit Timeout to return value in
* @return value, or default value if value not in config file
*/
public static long getTimeVar(Configuration conf, ConfVars var, TimeUnit outUnit) {
assert var.defaultVal.getClass() == TimeValue.class;
String val = conf.get(var.varname);
if (val == null) {
// Look for it under the old Hive name
val = conf.get(var.hiveName);
}
if (val != null) {
return convertTimeStr(val, ((TimeValue)var.defaultVal).unit, outUnit);
} else {
return outUnit.convert(((TimeValue)var.defaultVal).val, ((TimeValue)var.defaultVal).unit);
}
}
如果堆内存没有,就去配置中拿,也没啥毛病。。。
别的问题
自动发现是5分钟,也就是近实时。如果一个任务跑完,下一个任务立即被吊起,这5分钟【甚至更久】里Hive On Tez任务是读不到新分区的!!!Spark或者Flink直接读Parquet文件是另一回事。。。
所以:https://lizhiyong.blog.csdn.net/article/details/127680034
笔者之前做平台开发时,还需要+一个msck
或者alter table add partition
,只有这条命令也刷完了,才能保证最终一致性!!!
别的问题
自动发现是5分钟,也就是近实时。如果一个任务跑完,下一个任务被吊起,这5分钟【甚至更久】里Hive On Tez任务是读不到新分区的!!!Spark或者Flink直接读Parquet文件是另一回事。。。
所以:https://lizhiyong.blog.csdn.net/article/details/127680034
笔者之前做平台开发时,还需要+一个msck
或者alter table add partition
,只有这条命令也刷完了,才能保证最终一致性!!!
总结
通过一顿操作猛如虎。。。笔者找到了Hive的MetaStore存储的元数据,并且分析出了和表配置相关的运行机理。恰巧,也意识到了白piao版在功能上的滞后性。。。还是有所收获的。。。
转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/130468723