Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support drop iceberg table when metadata or snapshot is missing and refactor code for delta-lake #15065

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import io.trino.execution.warnings.WarningCollector;
import io.trino.metadata.Metadata;
import io.trino.metadata.QualifiedObjectName;
import io.trino.metadata.RedirectionAwareTableHandle;
import io.trino.security.AccessControl;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.sql.tree.DropTable;
import io.trino.sql.tree.Expression;

Expand Down Expand Up @@ -76,17 +76,17 @@ public ListenableFuture<Void> execute(
"Table '%s' does not exist, but a view with that name exists. Did you mean DROP VIEW %s?", originalTableName, originalTableName);
}

RedirectionAwareTableHandle redirectionAwareTableHandle = metadata.getRedirectionAwareTableHandle(session, originalTableName);
if (redirectionAwareTableHandle.getTableHandle().isEmpty()) {
QualifiedObjectName targetTable = metadata.getRedirectedTableName(session, originalTableName);

try {
accessControl.checkCanDropTable(session.toSecurityContext(), targetTable);
metadata.dropTable(session, targetTable);
}
catch (TableNotFoundException e) {
if (!statement.isExists()) {
throw semanticException(TABLE_NOT_FOUND, statement, "Table '%s' does not exist", originalTableName);
}
return immediateVoidFuture();
}
QualifiedObjectName tableName = redirectionAwareTableHandle.getRedirectedTableName().orElse(originalTableName);
accessControl.checkCanDropTable(session.toSecurityContext(), tableName);

metadata.dropTable(session, redirectionAwareTableHandle.getTableHandle().get(), tableName.asCatalogSchemaTableName());

return immediateVoidFuture();
}
Expand Down
14 changes: 14 additions & 0 deletions core/trino-main/src/main/java/io/trino/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,18 @@ Optional<TableExecuteHandle> getTableHandleForExecute(
* Drops the specified table
*
* @throws RuntimeException if the table cannot be dropped or table handle is no longer valid
*
* @deprecated use {@link #dropTable(Session, QualifiedObjectName)}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This overload is a dead code now, no need for keeping it (as deprecated)

*/
void dropTable(Session session, TableHandle tableHandle, CatalogSchemaTableName tableName);

/**
* Drops the specified table
*
* @throws RuntimeException if the table cannot be dropped
*/
void dropTable(Session session, QualifiedObjectName tableName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want this method to replace the old one, void dropTable(Session session, TableHandle tableHandle).
So it would be good to

  • make ConnectorMetadata change backward compatibility
  • apply changes to DropTableTask

Doing this would reveal that the old drop handles redirections whereas the new drop-by-name cannot do that.
The MetadataManager could follow redirects as needed, but I am concerned this is not the right place to add that logic.

Also, decoupling ConnectorMetadata.redirectTable and ConnectorMetadata.dropTable calls makes it impossible to make a non-racy implementation.
I am leaving towards introducing a non-void return type here, so that drop-by-name can either declare success or return a redirection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martint wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks unresolved. @krvikash @findinpath do you have a plan here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps something like this (rough idea, not polished up)

DropResult dropTable(Session session, QualifiedObjectName tableName);
sealed class DropResult
  permits DropSuccess, DropRedirected
class DropSuccess {  .. }  // singleton (can this be an enum?)
records DropRedirected(CatalogSchemaTableName target) {}


/**
* Truncates the specified table
*/
Expand Down Expand Up @@ -708,6 +717,11 @@ default boolean isMaterializedView(Session session, QualifiedObjectName viewName
*/
RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional<TableVersion> startVersion, Optional<TableVersion> endVersion);

/**
* Get the table name after performing redirection.
*/
QualifiedObjectName getRedirectedTableName(Session session, QualifiedObjectName tableName);

/**
* Returns true if the connector reports number of written bytes for an existing table. Otherwise, it returns false.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,20 @@ public void dropTable(Session session, TableHandle tableHandle, CatalogSchemaTab
}
}

@Override
public void dropTable(Session session, QualifiedObjectName tableName)
{
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, tableName.getCatalogName());
ConnectorSession connectorSession = session.toConnectorSession(catalogMetadata.getCatalogHandle());
ConnectorMetadata metadata = catalogMetadata.getMetadata(session);

metadata.dropTable(connectorSession, tableName.asSchemaTableName());

if (catalogMetadata.getSecurityManagement() == SYSTEM) {
systemSecurityMetadata.tableDropped(session, tableName.asCatalogSchemaTableName());
findepi marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Override
public void truncateTable(Session session, TableHandle tableHandle)
{
Expand Down Expand Up @@ -1486,7 +1500,8 @@ public Optional<TableScanRedirectApplicationResult> applyTableScanRedirect(Sessi
return metadata.applyTableScanRedirect(connectorSession, tableHandle.getConnectorHandle());
}

private QualifiedObjectName getRedirectedTableName(Session session, QualifiedObjectName originalTableName)
@Override
public QualifiedObjectName getRedirectedTableName(Session session, QualifiedObjectName originalTableName)
{
requireNonNull(session, "session is null");
requireNonNull(originalTableName, "originalTableName is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,13 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
@Override
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
tables.remove(getTableName(tableHandle));
dropTable(session, getTableName(tableHandle));
}

@Override
public void dropTable(ConnectorSession session, SchemaTableName tableName)
{
tables.remove(tableName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,9 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
@Override
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) {}

@Override
public void dropTable(ConnectorSession session, SchemaTableName tableName) {}

@Override
public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTableName) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.trino.metadata.MaterializedViewPropertyManager;
import io.trino.metadata.MetadataManager;
import io.trino.metadata.QualifiedObjectName;
import io.trino.metadata.QualifiedTablePrefix;
import io.trino.metadata.ResolvedFunction;
import io.trino.metadata.TableHandle;
import io.trino.metadata.TableMetadata;
Expand All @@ -43,6 +44,7 @@
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.MaterializedViewNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.connector.TestingColumnHandle;
import io.trino.spi.function.OperatorType;
import io.trino.spi.resourcegroups.ResourceGroupId;
Expand Down Expand Up @@ -318,6 +320,15 @@ public void dropTable(Session session, TableHandle tableHandle, CatalogSchemaTab
tables.remove(tableName.getSchemaTableName());
}

@Override
public void dropTable(Session session, QualifiedObjectName tableName)
{
SchemaTableName schemaTableName = tableName.asSchemaTableName();
if (tables.remove(schemaTableName) == null) {
throw new TableNotFoundException(schemaTableName);
}
}

@Override
public void renameTable(Session session, TableHandle tableHandle, CatalogSchemaTableName currentTableName, QualifiedObjectName newTableName)
{
Expand Down Expand Up @@ -380,6 +391,18 @@ private SchemaTableName getTableName(TableHandle tableHandle)
return ((TestingTableHandle) tableHandle.getConnectorHandle()).getTableName();
}

@Override
public List<QualifiedObjectName> listTables(Session session, QualifiedTablePrefix prefix)
{
return tables.keySet().stream().map(table -> QualifiedObjectName.convertFromSchemaTableName(TEST_CATALOG_NAME).apply(table)).toList();
}

@Override
public QualifiedObjectName getRedirectedTableName(Session session, QualifiedObjectName tableName)
{
return tableName;
}

@Override
public Map<String, ColumnHandle> getColumnHandles(Session session, TableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,12 @@ public void dropTable(Session session, TableHandle tableHandle, CatalogSchemaTab
throw new UnsupportedOperationException();
}

@Override
public void dropTable(Session session, QualifiedObjectName tableName)
{
throw new UnsupportedOperationException();
}

@Override
public void truncateTable(Session session, TableHandle tableHandle)
{
Expand Down Expand Up @@ -878,6 +884,12 @@ public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session sessio
throw new UnsupportedOperationException();
}

@Override
public QualifiedObjectName getRedirectedTableName(Session session, QualifiedObjectName tableName)
{
throw new UnsupportedOperationException();
}

@Override
public boolean supportsReportingWrittenBytes(Session session, TableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,12 @@ public void dropTable(Session session, TableHandle tableHandle, CatalogSchemaTab
delegate.dropTable(session, tableHandle, tableName);
}

@Override
public void dropTable(Session session, QualifiedObjectName tableName)
{
delegate.dropTable(session, tableName);
}

@Override
public void truncateTable(Session session, TableHandle tableHandle)
{
Expand Down Expand Up @@ -857,6 +863,12 @@ public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session sessio
return delegate.getRedirectionAwareTableHandle(session, tableName, startVersion, endVersion);
}

@Override
public QualifiedObjectName getRedirectedTableName(Session session, QualifiedObjectName tableName)
{
return delegate.getRedirectedTableName(session, tableName);
}

@Override
public boolean supportsReportingWrittenBytes(Session session, TableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,24 @@ default void createTable(ConnectorSession session, ConnectorTableMetadata tableM
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables");
}

/**
* Drops the specified table
*
* @throws RuntimeException if the table cannot be dropped
*/
default void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
dropTable(session, getTableHandle(session, schemaTableName));
findepi marked this conversation as resolved.
Show resolved Hide resolved
}

krvikash marked this conversation as resolved.
Show resolved Hide resolved
/**
* Drops the specified table
*
* @throws RuntimeException if the table cannot be dropped or table handle is no longer valid
*
* @deprecated use {@link #dropTable(ConnectorSession, SchemaTableName)}
*/
@Deprecated
default void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
throw new TrinoException(NOT_SUPPORTED, "This connector does not support dropping tables");
krvikash marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,14 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle
}
}

@Override
public void dropTable(ConnectorSession session, SchemaTableName tableName)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
delegate.dropTable(session, tableName);
}
}

@Override
public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@
import static io.trino.plugin.deltalake.DeltaLakeColumnType.SYNTHESIZED;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_TABLE;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getHiveCatalogName;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isCollectExtendedStatisticsColumnStatisticsOnWrite;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isExtendedStatisticsEnabled;
Expand Down Expand Up @@ -415,8 +416,10 @@ public DeltaLakeTableHandle getTableHandle(ConnectorSession session, SchemaTable
}

TableSnapshot tableSnapshot = metastore.getSnapshot(dataTableName, session);
Optional<MetadataEntry> metadata = metastore.getMetadata(tableSnapshot, session);
metadata.ifPresent(metadataEntry -> verifySupportedColumnMapping(getColumnMappingMode(metadataEntry)));
MetadataEntry metadata = metastore.getMetadata(tableSnapshot, session)
.orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_TABLE, "Metadata not found in transaction log for table " + tableName));

verifySupportedColumnMapping(getColumnMappingMode(metadata));
return new DeltaLakeTableHandle(
dataTableName.getSchemaName(),
dataTableName.getTableName(),
Expand Down Expand Up @@ -1766,12 +1769,16 @@ public Optional<Object> getInfo(ConnectorTableHandle table)
@Override
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
dropTable(session, ((DeltaLakeTableHandle) tableHandle).getSchemaTableName());
}

Table table = metastore.getTable(handle.getSchemaName(), handle.getTableName())
.orElseThrow(() -> new TableNotFoundException(handle.getSchemaTableName()));
@Override
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
Table table = metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(schemaTableName));

metastore.dropTable(session, handle.getSchemaName(), handle.getTableName(), table.getTableType().equals(MANAGED_TABLE.toString()));
metastore.dropTable(session, schemaTableName.getSchemaName(), schemaTableName.getTableName(), table.getTableType().equals(MANAGED_TABLE.toString()));
}

@Override
Expand Down Expand Up @@ -2030,7 +2037,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
tableName.getSchemaName(),
tableName.getTableName(),
tableHandle.getLocation(),
Optional.of(tableHandle.getMetadataEntry()),
tableHandle.getMetadataEntry(),
// Do not simplify the enforced constraint, the connector is guaranteeing the constraint will be applied as is.
// The unenforced constraint will still be checked by the engine.
tableHandle.getEnforcedPartitionConstraint()
Expand Down Expand Up @@ -2157,7 +2164,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
handle.getSchemaTableName().getSchemaName(),
handle.getSchemaTableName().getTableName(),
handle.getLocation(),
Optional.of(metadata),
metadata,
TupleDomain.all(),
TupleDomain.all(),
Optional.empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.airlift.units.DataSize;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.SchemaTableName;
Expand All @@ -30,7 +29,6 @@
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.DeltaLakeTableHandle.WriteType.UPDATE;
import static java.util.Objects.requireNonNull;

Expand All @@ -47,7 +45,7 @@ public enum WriteType
private final String schemaName;
private final String tableName;
private final String location;
private final Optional<MetadataEntry> metadataEntry;
private final MetadataEntry metadataEntry;
private final TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint;
private final TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint;
private final Optional<WriteType> writeType;
Expand All @@ -72,7 +70,7 @@ public DeltaLakeTableHandle(
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("location") String location,
@JsonProperty("metadataEntry") Optional<MetadataEntry> metadataEntry,
@JsonProperty("metadataEntry") MetadataEntry metadataEntry,
@JsonProperty("enforcedPartitionConstraint") TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
@JsonProperty("nonPartitionConstraint") TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
@JsonProperty("writeType") Optional<WriteType> writeType,
Expand Down Expand Up @@ -105,7 +103,7 @@ public DeltaLakeTableHandle(
String schemaName,
String tableName,
String location,
Optional<MetadataEntry> metadataEntry,
MetadataEntry metadataEntry,
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Optional<WriteType> writeType,
Expand Down Expand Up @@ -143,7 +141,7 @@ public DeltaLakeTableHandle withProjectedColumns(Set<ColumnHandle> projectedColu
getSchemaName(),
getTableName(),
getLocation(),
Optional.of(getMetadataEntry()),
getMetadataEntry(),
getEnforcedPartitionConstraint(),
getNonPartitionConstraint(),
getWriteType(),
Expand Down Expand Up @@ -196,7 +194,7 @@ public String getLocation()
@JsonProperty
public MetadataEntry getMetadataEntry()
{
return metadataEntry.orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableName));
return metadataEntry;
}

@JsonProperty
Expand Down
Loading