Skip to content

Commit

Permalink
[HUDI-4582] Support batch synchronization of partition to HMS to avoi…
Browse files Browse the repository at this point in the history
…d timeout (apache#6347)


Co-authored-by: xxhua <xxhua@freewheel.tv>
  • Loading branch information
2 people authored and fengjian committed Apr 5, 2023
1 parent a3c3934 commit cebe0d1
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.sync.common.HoodieSyncConfig;

import com.beust.jcommander.Parameter;
Expand Down Expand Up @@ -69,6 +70,7 @@ public static String getBucketSpec(String bucketCols, int bucketNum) {

public HiveSyncConfig(Properties props) {
super(props);
validateParameters();
}

public HiveSyncConfig(Properties props, Configuration hadoopConf) {
Expand All @@ -78,6 +80,7 @@ public HiveSyncConfig(Properties props, Configuration hadoopConf) {
// HiveConf needs to load fs conf to allow instantiation via AWSGlueClientFactory
hiveConf.addResource(getHadoopFileSystem().getConf());
setHadoopConf(hiveConf);
validateParameters();
}

public HiveConf getHiveConf() {
Expand Down Expand Up @@ -171,4 +174,8 @@ public TypedProperties toProps() {
return props;
}
}

public void validateParameters() {
ValidationUtils.checkArgument(getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM) > 0, "batch-sync-num for sync hive table must be greater than 0, pls check your parameter");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
Expand Down Expand Up @@ -47,12 +48,14 @@
import org.apache.parquet.schema.MessageType;
import org.apache.thrift.TException;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
Expand Down Expand Up @@ -192,18 +195,23 @@ public void addPartitionsToTable(String tableName, List<String> partitionsToAdd)
LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName);
try {
StorageDescriptor sd = client.getTable(databaseName, tableName).getSd();
List<Partition> partitionList = partitionsToAdd.stream().map(partition -> {
StorageDescriptor partitionSd = new StorageDescriptor();
partitionSd.setCols(sd.getCols());
partitionSd.setInputFormat(sd.getInputFormat());
partitionSd.setOutputFormat(sd.getOutputFormat());
partitionSd.setSerdeInfo(sd.getSerdeInfo());
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.getString(META_SYNC_BASE_PATH), partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
partitionSd.setLocation(fullPartitionPath);
return new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null);
}).collect(Collectors.toList());
client.add_partitions(partitionList, true, false);
int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
for (List<String> batch : CollectionUtils.batches(partitionsToAdd, batchSyncPartitionNum)) {
List<Partition> partitionList = new ArrayList<>();
batch.forEach(x -> {
StorageDescriptor partitionSd = new StorageDescriptor();
partitionSd.setCols(sd.getCols());
partitionSd.setInputFormat(sd.getInputFormat());
partitionSd.setOutputFormat(sd.getOutputFormat());
partitionSd.setSerdeInfo(sd.getSerdeInfo());
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.getString(META_SYNC_BASE_PATH), x).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(x);
partitionSd.setLocation(fullPartitionPath);
partitionList.add(new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null));
});
client.add_partitions(partitionList, true, false);
LOG.info("HMSDDLExecutor add a batch partitions done: " + partitionList.size());
}
} catch (TException e) {
LOG.error(databaseName + "." + tableName + " add partition failed", e);
throw new HoodieHiveSyncException(databaseName + "." + tableName + " add partition failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,6 @@ public void dropPartitionsToTable(String tableName, List<String> partitionsToDro
}

private List<String> constructDropPartitions(String tableName, List<String> partitions) {
if (config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM) <= 0) {
throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter");
}
List<String> result = new ArrayList<>();
int batchSyncPartitionNum = config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
StringBuilder alterSQL = getAlterTableDropPrefix(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,6 @@ public void updateTableComments(String tableName, Map<String, Pair<String, Strin
}

private List<String> constructAddPartitions(String tableName, List<String> partitions) {
if (config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM) <= 0) {
throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter");
}
List<String> result = new ArrayList<>();
int batchSyncPartitionNum = config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
StringBuilder alterSQL = getAlterTablePrefix(tableName);
Expand Down

0 comments on commit cebe0d1

Please sign in to comment.