Skip to content

Commit

Permalink
Evict session level cache when dropping Iceberg table
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Nov 16, 2023
1 parent 5a4e39e commit 4f3d778
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,15 @@ public void updateTableComment(ConnectorSession session, SchemaTableName schemaT
else {
icebergTable.updateProperties().set(TABLE_COMMENT, comment.get()).commit();
}
invalidateTableCache(schemaTableName);
}

@Override
public void updateColumnComment(ConnectorSession session, SchemaTableName schemaTableName, ColumnIdentity columnIdentity, Optional<String> comment)
{
Table icebergTable = loadTable(session, schemaTableName);
icebergTable.updateSchema().updateColumnDoc(columnIdentity.getName(), comment.orElse(null)).commit();
invalidateTableCache(schemaTableName);
}

@Override
Expand Down Expand Up @@ -471,6 +473,8 @@ protected Map<String, String> createMaterializedViewProperties(ConnectorSession
.buildOrThrow();
}

protected abstract void invalidateTableCache(SchemaTableName schemaTableName);

protected static class MaterializedViewMayBeBeingRemovedException
extends RuntimeException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
LOG.warn(e, "Failed to delete table data referenced by metadata");
}
deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location());
invalidateTableCache(schemaTableName);
}

@Override
Expand All @@ -689,6 +690,7 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT
}
String tableLocation = metadataLocation.replaceFirst("/metadata/[^/]*$", "");
deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, tableLocation);
invalidateTableCache(schemaTableName);
}

@Override
Expand Down Expand Up @@ -752,6 +754,7 @@ public void registerTable(ConnectorSession session, SchemaTableName schemaTableN
public void unregisterTable(ConnectorSession session, SchemaTableName schemaTableName)
{
dropTableFromMetastore(session, schemaTableName);
invalidateTableCache(schemaTableName);
}

private com.amazonaws.services.glue.model.Table dropTableFromMetastore(ConnectorSession session, SchemaTableName schemaTableName)
Expand Down Expand Up @@ -796,6 +799,7 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa
createTable(to.getSchemaName(), tableInput);
newTableCreated = true;
deleteTable(from.getSchemaName(), from.getTableName());
invalidateTableCache(from);
}
catch (RuntimeException e) {
if (newTableCreated) {
Expand Down Expand Up @@ -1486,6 +1490,12 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
return Optional.empty();
}

@Override
protected void invalidateTableCache(SchemaTableName schemaTableName)
{
tableMetadataCache.remove(schemaTableName);
}

com.amazonaws.services.glue.model.Table getTable(SchemaTableName tableName, boolean invalidateCaches)
{
if (invalidateCaches) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ private static Optional<String> getQueryId(io.trino.plugin.hive.metastore.Table
public void unregisterTable(ConnectorSession session, SchemaTableName schemaTableName)
{
dropTableFromMetastore(schemaTableName);
invalidateTableCache(schemaTableName);
}

@Override
Expand Down Expand Up @@ -421,13 +422,15 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
log.warn(e, "Failed to delete table data referenced by metadata");
}
deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, metastoreTable.getStorage().getLocation());
invalidateTableCache(schemaTableName);
}

@Override
public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaTableName)
{
io.trino.plugin.hive.metastore.Table table = dropTableFromMetastore(schemaTableName);
deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.getStorage().getLocation());
invalidateTableCache(schemaTableName);
}

private io.trino.plugin.hive.metastore.Table dropTableFromMetastore(SchemaTableName schemaTableName)
Expand All @@ -449,6 +452,7 @@ private io.trino.plugin.hive.metastore.Table dropTableFromMetastore(SchemaTableN
public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to)
{
metastore.renameTable(from.getSchemaName(), from.getTableName(), to.getSchemaName(), to.getTableName());
invalidateTableCache(from);
}

@Override
Expand Down Expand Up @@ -887,4 +891,10 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
}
return Optional.empty();
}

@Override
protected void invalidateTableCache(SchemaTableName schemaTableName)
{
tableMetadataCache.remove(schemaTableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
LOG.warn(e, "Failed to delete table data referenced by metadata");
}
deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location());
invalidateTableCache(schemaTableName);
}

@Override
Expand All @@ -287,6 +288,7 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT
}
String tableLocation = metadataLocation.get().replaceFirst("/metadata/[^/]*$", "");
deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, tableLocation);
invalidateTableCache(schemaTableName);
}

@Override
Expand All @@ -298,6 +300,7 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa
catch (RuntimeException e) {
throw new TrinoException(ICEBERG_CATALOG_ERROR, "Failed to rename table from %s to %s".formatted(from, to), e);
}
invalidateTableCache(from);
}

@Override
Expand Down Expand Up @@ -447,6 +450,12 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
return Optional.empty();
}

@Override
protected void invalidateTableCache(SchemaTableName schemaTableName)
{
tableMetadataCache.remove(schemaTableName);
}

private static TableIdentifier toIdentifier(SchemaTableName table)
{
return TableIdentifier.of(table.getSchemaName(), table.getTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
validateTableCanBeDropped(table);
nessieClient.dropTable(toIdentifier(schemaTableName), true);
deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location());
invalidateTableCache(schemaTableName);
}

@Override
Expand All @@ -230,6 +231,7 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT
public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to)
{
nessieClient.renameTable(toIdentifier(from), toIdentifier(to));
invalidateTableCache(from);
}

@Override
Expand Down Expand Up @@ -419,4 +421,10 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
{
return Optional.empty();
}

@Override
protected void invalidateTableCache(SchemaTableName schemaTableName)
{
tableMetadataCache.remove(schemaTableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ public void unregisterTable(ConnectorSession session, SchemaTableName tableName)
if (!restSessionCatalog.dropTable(convert(session), toIdentifier(tableName))) {
throw new TableNotFoundException(tableName);
}
invalidateTableCache(tableName);
}

@Override
Expand All @@ -274,6 +275,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
if (!restSessionCatalog.purgeTable(convert(session), toIdentifier(schemaTableName))) {
throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to drop table: %s", schemaTableName));
}
invalidateTableCache(schemaTableName);
}

@Override
Expand All @@ -293,6 +295,7 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa
catch (RESTException e) {
throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to rename table %s to %s", from, to), e);
}
invalidateTableCache(from);
}

@Override
Expand Down Expand Up @@ -331,6 +334,7 @@ public void updateTableComment(ConnectorSession session, SchemaTableName schemaT
else {
icebergTable.updateProperties().set(TABLE_COMMENT, comment.get()).commit();
}
invalidateTableCache(schemaTableName);
}

@Override
Expand Down Expand Up @@ -508,6 +512,11 @@ private SessionCatalog.SessionContext convert(ConnectorSession session)
};
}

private void invalidateTableCache(SchemaTableName schemaTableName)
{
tableCache.remove(schemaTableName);
}

private static TableIdentifier toIdentifier(SchemaTableName schemaTableName)
{
return TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName());
Expand Down

0 comments on commit 4f3d778

Please sign in to comment.