Skip to content

Commit

Permalink
Batch the metastore call used to retrieve the partitions by names
Browse files Browse the repository at this point in the history
  • Loading branch information
findinpath authored and findepi committed May 25, 2022
1 parent db099cf commit c58d9ea
Showing 1 changed file with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.PartitionStatistics;
Expand Down Expand Up @@ -67,6 +68,8 @@ public enum SyncMode
ADD, DROP, FULL
}

private static final int BATCH_GET_PARTITIONS_BY_NAMES_MAX_PAGE_SIZE = 1000;

private static final MethodHandle SYNC_PARTITION_METADATA = methodHandle(
SyncPartitionMetadataProcedure.class,
"syncPartitionMetadata",
Expand Down Expand Up @@ -139,11 +142,7 @@ private void doSyncPartitionMetadata(ConnectorSession session, ConnectorAccessCo
FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, tableLocation);
List<String> partitionsNamesInMetastore = metastore.getPartitionNames(schemaName, tableName)
.orElseThrow(() -> new TableNotFoundException(schemaTableName));
List<String> partitionsInMetastore = metastore.getPartitionsByNames(schemaName, tableName, partitionsNamesInMetastore).values().stream()
.filter(Optional::isPresent).map(Optional::get)
.map(partition -> new Path(partition.getStorage().getLocation()).toUri())
.map(uri -> tableLocation.toUri().relativize(uri).getPath())
.collect(toImmutableList());
List<String> partitionsInMetastore = getPartitionsInMetastore(schemaTableName, tableLocation, partitionsNamesInMetastore, metastore);
List<String> partitionsInFileSystem = listDirectory(fileSystem, fileSystem.getFileStatus(tableLocation), table.getPartitionColumns(), table.getPartitionColumns().size(), caseSensitive).stream()
.map(fileStatus -> fileStatus.getPath().toUri())
.map(uri -> tableLocation.toUri().relativize(uri).getPath())
Expand All @@ -161,6 +160,19 @@ private void doSyncPartitionMetadata(ConnectorSession session, ConnectorAccessCo
syncPartitions(partitionsToAdd, partitionsToDrop, syncMode, metastore, session, table);
}

private List<String> getPartitionsInMetastore(SchemaTableName schemaTableName, Path tableLocation, List<String> partitionsNames, SemiTransactionalHiveMetastore metastore)
{
ImmutableList.Builder<String> partitionsInMetastoreBuilder = ImmutableList.builderWithExpectedSize(partitionsNames.size());
for (List<String> partitionsNamesBatch : Lists.partition(partitionsNames, BATCH_GET_PARTITIONS_BY_NAMES_MAX_PAGE_SIZE)) {
metastore.getPartitionsByNames(schemaTableName.getSchemaName(), schemaTableName.getTableName(), partitionsNamesBatch).values().stream()
.filter(Optional::isPresent).map(Optional::get)
.map(partition -> new Path(partition.getStorage().getLocation()).toUri())
.map(uri -> tableLocation.toUri().relativize(uri).getPath())
.forEach(partitionsInMetastoreBuilder::add);
}
return partitionsInMetastoreBuilder.build();
}

private static List<FileStatus> listDirectory(FileSystem fileSystem, FileStatus current, List<Column> partitionColumns, int depth, boolean caseSensitive)
{
if (depth == 0) {
Expand Down

0 comments on commit c58d9ea

Please sign in to comment.