Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GlueHiveMetastore cleanup #18483

Merged
merged 7 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2225,9 +2225,7 @@ private void cleanupFailedWrite(ConnectorSession session, String tableLocation,
}
catch (Exception e) {
// Can be safely ignored since a VACUUM from DeltaLake will take care of such orphaned files
LOG.warn(e, "Failed cleanup of leftover files from failed write, files are: %s", dataFiles.stream()
.map(dataFileInfo -> appendPath(tableLocation, dataFileInfo.getPath()))
.collect(toImmutableList()));
LOG.warn(e, "Failed cleanup of leftover files from failed write, files are: %s", filesToDelete);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,6 @@ public Set<HiveColumnStatisticType> getSupportedColumnStatistics(Type type)
return delegate.getSupportedColumnStatistics(type);
}

public PartitionStatistics getTableStatistics(String databaseName, String tableName)
{
return getTableStatistics(databaseName, tableName, Optional.empty());
}

public PartitionStatistics getTableStatistics(String databaseName, String tableName, Optional<Set<String>> columns)
{
Table table = getExistingTable(databaseName, tableName);
Expand Down Expand Up @@ -157,11 +152,6 @@ public Optional<List<SchemaTableName>> getAllTables()
return delegate.getAllTables();
}

public List<String> getTablesWithParameter(String databaseName, String parameterKey, String parameterValue)
{
return delegate.getTablesWithParameter(databaseName, parameterKey, parameterValue);
}

public List<String> getAllViews(String databaseName)
{
return delegate.getAllViews(databaseName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,28 +430,7 @@ private void updatePartitionStatisticsBatch(Table table, Map<String, Function<Pa
@Override
public List<String> getAllTables(String databaseName)
{
try {
List<String> tableNames = getPaginatedResults(
glueClient::getTables,
new GetTablesRequest()
.withDatabaseName(databaseName),
GetTablesRequest::setNextToken,
GetTablesResult::getNextToken,
stats.getGetTables())
.map(GetTablesResult::getTableList)
.flatMap(List::stream)
.filter(tableFilter)
.map(com.amazonaws.services.glue.model.Table::getName)
.collect(toImmutableList());
return tableNames;
}
catch (EntityNotFoundException | AccessDeniedException e) {
// database does not exist or permission denied
return ImmutableList.of();
}
catch (AmazonServiceException e) {
throw new TrinoException(HIVE_METASTORE_ERROR, e);
}
return getTableNames(databaseName, tableFilter);
}

@Override
Expand All @@ -461,15 +440,15 @@ public Optional<List<SchemaTableName>> getAllTables()
}

@Override
public synchronized List<String> getTablesWithParameter(String databaseName, String parameterKey, String parameterValue)
public List<String> getTablesWithParameter(String databaseName, String parameterKey, String parameterValue)
{
return getAllViews(databaseName, table -> parameterValue.equals(getTableParameters(table).get(parameterKey)));
return getTableNames(databaseName, table -> parameterValue.equals(getTableParameters(table).get(parameterKey)));
}

@Override
public List<String> getAllViews(String databaseName)
{
return getAllViews(databaseName, table -> true);
return getTableNames(databaseName, VIEWS_FILTER);
}

@Override
Expand All @@ -478,10 +457,10 @@ public Optional<List<SchemaTableName>> getAllViews()
return Optional.empty();
}

private List<String> getAllViews(String databaseName, Predicate<com.amazonaws.services.glue.model.Table> additionalFilter)
private List<String> getTableNames(String databaseName, Predicate<com.amazonaws.services.glue.model.Table> filter)
{
try {
List<String> views = getPaginatedResults(
List<String> tableNames = getPaginatedResults(
glueClient::getTables,
new GetTablesRequest()
.withDatabaseName(databaseName),
Expand All @@ -490,10 +469,10 @@ private List<String> getAllViews(String databaseName, Predicate<com.amazonaws.se
stats.getGetTables())
.map(GetTablesResult::getTableList)
.flatMap(List::stream)
.filter(VIEWS_FILTER.and(additionalFilter))
.filter(filter)
.map(com.amazonaws.services.glue.model.Table::getName)
.collect(toImmutableList());
return views;
return tableNames;
}
catch (EntityNotFoundException | AccessDeniedException e) {
// database does not exist or permission denied
Expand Down Expand Up @@ -531,7 +510,6 @@ public void createDatabase(Database database)
}
}

// TODO: respect deleteData
@Override
public void dropDatabase(String databaseName, boolean deleteData)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3332,7 +3332,7 @@ public void testUpdateTableColumnStatisticsEmptyOptionalFields()
protected void testUpdateTableStatistics(SchemaTableName tableName, PartitionStatistics initialStatistics, PartitionStatistics... statistics)
{
HiveMetastoreClosure metastoreClient = new HiveMetastoreClosure(getMetastoreClient());
assertThat(metastoreClient.getTableStatistics(tableName.getSchemaName(), tableName.getTableName()))
assertThat(metastoreClient.getTableStatistics(tableName.getSchemaName(), tableName.getTableName(), Optional.empty()))
.isEqualTo(initialStatistics);

AtomicReference<PartitionStatistics> expectedStatistics = new AtomicReference<>(initialStatistics);
Expand All @@ -3341,20 +3341,20 @@ protected void testUpdateTableStatistics(SchemaTableName tableName, PartitionSta
assertThat(actualStatistics).isEqualTo(expectedStatistics.get());
return partitionStatistics;
});
assertThat(metastoreClient.getTableStatistics(tableName.getSchemaName(), tableName.getTableName()))
assertThat(metastoreClient.getTableStatistics(tableName.getSchemaName(), tableName.getTableName(), Optional.empty()))
.isEqualTo(partitionStatistics);
expectedStatistics.set(partitionStatistics);
}

assertThat(metastoreClient.getTableStatistics(tableName.getSchemaName(), tableName.getTableName()))
assertThat(metastoreClient.getTableStatistics(tableName.getSchemaName(), tableName.getTableName(), Optional.empty()))
.isEqualTo(expectedStatistics.get());

metastoreClient.updateTableStatistics(tableName.getSchemaName(), tableName.getTableName(), NO_ACID_TRANSACTION, actualStatistics -> {
assertThat(actualStatistics).isEqualTo(expectedStatistics.get());
return initialStatistics;
});

assertThat(metastoreClient.getTableStatistics(tableName.getSchemaName(), tableName.getTableName()))
assertThat(metastoreClient.getTableStatistics(tableName.getSchemaName(), tableName.getTableName(), Optional.empty()))
.isEqualTo(initialStatistics);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,7 @@ public void testStatisticsLongColumnNames()

doCreateEmptyTable(tableName, ORC, columns);

assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName()))
assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName(), Optional.empty()))
.isEqualTo(ZERO_TABLE_STATISTICS);
testUpdateTableStatistics(tableName, ZERO_TABLE_STATISTICS, partitionStatistics);
}
Expand Down Expand Up @@ -1218,26 +1218,26 @@ public void testStatisticsColumnModification()
return partitionStatistics;
});

assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName()))
assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName(), Optional.empty()))
.isEqualTo(partitionStatistics);

metastore.renameColumn(tableName.getSchemaName(), tableName.getTableName(), "column1", "column4");
assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName()))
assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName(), Optional.empty()))
.isEqualTo(new PartitionStatistics(
HIVE_BASIC_STATISTICS,
Map.of("column2", INTEGER_COLUMN_STATISTICS)));

metastore.dropColumn(tableName.getSchemaName(), tableName.getTableName(), "column2");
assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName()))
assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName(), Optional.empty()))
.isEqualTo(new PartitionStatistics(HIVE_BASIC_STATISTICS, Map.of()));

metastore.addColumn(tableName.getSchemaName(), tableName.getTableName(), "column5", HiveType.HIVE_INT, "comment");
assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName()))
assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName(), Optional.empty()))
.isEqualTo(new PartitionStatistics(HIVE_BASIC_STATISTICS, Map.of()));

// TODO: column1 stats should be removed on column delete. However this is tricky since stats can be stored in multiple partitions.
metastore.renameColumn(tableName.getSchemaName(), tableName.getTableName(), "column4", "column1");
assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName()))
assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName(), Optional.empty()))
.isEqualTo(new PartitionStatistics(
HIVE_BASIC_STATISTICS,
Map.of("column1", INTEGER_COLUMN_STATISTICS)));
Expand Down Expand Up @@ -1273,21 +1273,21 @@ public void testStatisticsPartitionedTableColumnModification()

assertThat(metastoreClient.getStats().getBatchUpdatePartition().getTime().getAllTime().getCount()).isEqualTo(countBefore + 1);
PartitionStatistics tableStatistics = new PartitionStatistics(createEmptyStatistics(), Map.of());
assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName()))
assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName(), Optional.empty()))
.isEqualTo(tableStatistics);
assertThat(metastore.getPartitionStatistics(tableName.getSchemaName(), tableName.getTableName(), Set.of("ds=2016-01-01")))
.isEqualTo(Map.of("ds=2016-01-01", partitionStatistics));

// renaming table column does not rename partition columns
metastore.renameColumn(tableName.getSchemaName(), tableName.getTableName(), "column1", "column4");
assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName()))
assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName(), Optional.empty()))
.isEqualTo(tableStatistics);
assertThat(metastore.getPartitionStatistics(tableName.getSchemaName(), tableName.getTableName(), Set.of("ds=2016-01-01")))
.isEqualTo(Map.of("ds=2016-01-01", partitionStatistics));

// dropping table column does not drop partition columns
metastore.dropColumn(tableName.getSchemaName(), tableName.getTableName(), "column2");
assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName()))
assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName(), Optional.empty()))
.isEqualTo(tableStatistics);
assertThat(metastore.getPartitionStatistics(tableName.getSchemaName(), tableName.getTableName(), Set.of("ds=2016-01-01")))
.isEqualTo(Map.of("ds=2016-01-01", partitionStatistics));
Expand Down Expand Up @@ -1334,7 +1334,7 @@ public void testInvalidColumnStatisticsMetadata()
.withDatabaseName(tableName.getSchemaName())
.withTableInput(tableInput));

assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName()))
assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName(), Optional.empty()))
.isEqualTo(partitionStatistics);
}
finally {
Expand Down