diff --git a/core/trino-spi/src/main/java/io/trino/spi/TrinoException.java b/core/trino-spi/src/main/java/io/trino/spi/TrinoException.java index f2acde8da68b..bf3542abca6b 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/TrinoException.java +++ b/core/trino-spi/src/main/java/io/trino/spi/TrinoException.java @@ -39,10 +39,20 @@ public TrinoException(ErrorCodeSupplier errorCode, String message, Throwable cau this(errorCode, Optional.empty(), message, cause); } + public TrinoException(ErrorCode errorCode, String message, Throwable cause) + { + this(errorCode, Optional.empty(), message, cause); + } + public TrinoException(ErrorCodeSupplier errorCodeSupplier, Optional location, String message, Throwable cause) + { + this(errorCodeSupplier.toErrorCode(), location, message, cause); + } + + private TrinoException(ErrorCode errorCode, Optional location, String message, Throwable cause) { super(message, cause); - this.errorCode = errorCodeSupplier.toErrorCode(); + this.errorCode = requireNonNull(errorCode, "errorCode is null"); this.location = requireNonNull(location, "location is null"); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/CorruptedDeltaLakeTableHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/CorruptedDeltaLakeTableHandle.java new file mode 100644 index 000000000000..8b1821bbba84 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/CorruptedDeltaLakeTableHandle.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.SchemaTableName; + +import static java.util.Objects.requireNonNull; + +public record CorruptedDeltaLakeTableHandle( + SchemaTableName schemaTableName, + TrinoException originalException) + implements ConnectorTableHandle +{ + public CorruptedDeltaLakeTableHandle + { + requireNonNull(schemaTableName, "schemaTableName is null"); + requireNonNull(originalException, "originalException is null"); + } + + public TrinoException createException() + { + // Original exception originates from a different place. Create a new exception not to confuse reader with a stacktrace not matching call site. + return new TrinoException(originalException.getErrorCode(), originalException.getMessage(), originalException); + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 213727b6c4bd..251d4bc66f03 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -400,7 +400,7 @@ public Optional redirectTable(ConnectorSession session, } @Override - public DeltaLakeTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) + public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { requireNonNull(tableName, "tableName is null"); if (!DeltaLakeTableName.isDataTable(tableName.getTableName())) { @@ -415,13 +415,22 @@ public DeltaLakeTableHandle getTableHandle(ConnectorSession session, SchemaTable } TableSnapshot tableSnapshot = metastore.getSnapshot(dataTableName, session); - Optional metadata = metastore.getMetadata(tableSnapshot, session); - metadata.ifPresent(metadataEntry -> verifySupportedColumnMapping(getColumnMappingMode(metadataEntry))); + MetadataEntry metadataEntry; + try { + metadataEntry = metastore.getMetadata(tableSnapshot, session); + } + catch (TrinoException e) { + if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) { + return new CorruptedDeltaLakeTableHandle(dataTableName, e); + } + throw e; + } + verifySupportedColumnMapping(getColumnMappingMode(metadataEntry)); return new DeltaLakeTableHandle( dataTableName.getSchemaName(), dataTableName.getTableName(), metastore.getTableLocation(dataTableName), - metadata, + metadataEntry, TupleDomain.all(), TupleDomain.all(), Optional.empty(), @@ -448,7 +457,7 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) { - DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) table; + DeltaLakeTableHandle tableHandle = checkValidTableHandle(table); String location = metastore.getTableLocation(tableHandle.getSchemaTableName()); Map columnComments = getColumnComments(tableHandle.getMetadataEntry()); Map columnsNullability = getColumnsNullability(tableHandle.getMetadataEntry()); @@ -495,7 +504,7 @@ public List listTables(ConnectorSession session, Optional getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) { - DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle; + DeltaLakeTableHandle table = checkValidTableHandle(tableHandle); return getColumns(table.getMetadataEntry()).stream() .collect(toImmutableMap(DeltaLakeColumnHandle::getName, identity())); } @@ -564,16 +573,14 @@ public Iterator streamTableColumns(ConnectorSession sessio return Stream.of(TableColumnsMetadata.forRedirectedTable(table)); } - // intentionally skip case when table snapshot is present but it lacks metadata portion - return metastore.getMetadata(metastore.getSnapshot(table, session), session).stream().map(metadata -> { - Map columnComments = getColumnComments(metadata); - Map columnsNullability = getColumnsNullability(metadata); - Map columnGenerations = getGeneratedColumnExpressions(metadata); - List columnMetadata = getColumns(metadata).stream() - .map(column -> getColumnMetadata(column, columnComments.get(column.getName()), columnsNullability.getOrDefault(column.getName(), true), columnGenerations.get(column.getName()))) - .collect(toImmutableList()); - return TableColumnsMetadata.forTable(table, columnMetadata); - }); + MetadataEntry metadata = metastore.getMetadata(metastore.getSnapshot(table, session), session); + Map columnComments = getColumnComments(metadata); + Map columnsNullability = getColumnsNullability(metadata); + Map columnGenerations = getGeneratedColumnExpressions(metadata); + List columnMetadata = getColumns(metadata).stream() + .map(column -> getColumnMetadata(column, columnComments.get(column.getName()), columnsNullability.getOrDefault(column.getName(), true), columnGenerations.get(column.getName()))) + .collect(toImmutableList()); + return Stream.of(TableColumnsMetadata.forTable(table, columnMetadata)); } catch (NotADeltaLakeTableException e) { return Stream.empty(); @@ -603,10 +610,11 @@ private List getColumns(MetadataEntry deltaMetadata) @Override public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle) { + DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle); if (!isTableStatisticsEnabled(session)) { return TableStatistics.empty(); } - return metastore.getTableStatistics(session, (DeltaLakeTableHandle) tableHandle); + return metastore.getTableStatistics(session, handle); } @Override @@ -1011,7 +1019,7 @@ private static boolean isCreatedBy(Table table, String queryId) @Override public void setTableComment(ConnectorSession session, ConnectorTableHandle tableHandle, Optional comment) { - DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle; + DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle); checkSupportedWriterVersion(session, handle.getSchemaTableName()); ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle); @@ -1096,7 +1104,7 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl @Override public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata newColumnMetadata) { - DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle; + DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle); checkSupportedWriterVersion(session, handle.getSchemaTableName()); if (!newColumnMetadata.isNullable() && !metastore.getValidDataFiles(handle.getSchemaTableName(), session).isEmpty()) { @@ -1492,7 +1500,7 @@ public Optional getTableHandleForExecute( Map executeProperties, RetryMode retryMode) { - DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) connectorTableHandle; + DeltaLakeTableHandle tableHandle = checkValidTableHandle(connectorTableHandle); DeltaLakeTableProcedureId procedureId; try { @@ -1766,18 +1774,25 @@ public Optional getInfo(ConnectorTableHandle table) @Override public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) { - DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle; + SchemaTableName schemaTableName; + if (tableHandle instanceof CorruptedDeltaLakeTableHandle corruptedTableHandle) { + schemaTableName = corruptedTableHandle.schemaTableName(); + } + else { + DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle; + schemaTableName = handle.getSchemaTableName(); + } - Table table = metastore.getTable(handle.getSchemaName(), handle.getTableName()) - .orElseThrow(() -> new TableNotFoundException(handle.getSchemaTableName())); + Table table = metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName()) + .orElseThrow(() -> new TableNotFoundException(schemaTableName)); - metastore.dropTable(session, handle.getSchemaName(), handle.getTableName(), table.getTableType().equals(MANAGED_TABLE.toString())); + metastore.dropTable(session, schemaTableName.getSchemaName(), schemaTableName.getTableName(), table.getTableType().equals(MANAGED_TABLE.toString())); } @Override public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTableName) { - DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle; + DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle); Table table = metastore.getTable(handle.getSchemaName(), handle.getTableName()) .orElseThrow(() -> new TableNotFoundException(handle.getSchemaTableName())); if (table.getTableType().equals(MANAGED_TABLE.name()) && !allowManagedTableRename) { @@ -1811,12 +1826,12 @@ private CommitInfoEntry getCommitInfoEntry( @Override public void setTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle, Map> properties) { + DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle); Set unsupportedProperties = difference(properties.keySet(), UPDATABLE_TABLE_PROPERTIES); if (!unsupportedProperties.isEmpty()) { throw new TrinoException(NOT_SUPPORTED, "The following properties cannot be updated: " + String.join(", ", unsupportedProperties)); } - DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle; ProtocolEntry currentProtocolEntry = getProtocolEntry(session, handle.getSchemaTableName()); long createdTime = Instant.now().toEpochMilli(); @@ -2030,7 +2045,7 @@ public Optional> applyFilter(C tableName.getSchemaName(), tableName.getTableName(), tableHandle.getLocation(), - Optional.of(tableHandle.getMetadataEntry()), + tableHandle.getMetadataEntry(), // Do not simplify the enforced constraint, the connector is guaranteeing the constraint will be applied as is. // The unenforced constraint will still be checked by the engine. tableHandle.getEnforcedPartitionConstraint() @@ -2106,7 +2121,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession DeltaLakeSessionProperties.EXTENDED_STATISTICS_ENABLED)); } - DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle; + DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle); MetadataEntry metadata = handle.getMetadataEntry(); Optional filesModifiedAfterFromProperties = getFilesModifiedAfterProperty(analyzeProperties); @@ -2157,7 +2172,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession handle.getSchemaTableName().getSchemaName(), handle.getSchemaTableName().getTableName(), handle.getLocation(), - Optional.of(metadata), + metadata, TupleDomain.all(), TupleDomain.all(), Optional.empty(), @@ -2439,7 +2454,7 @@ private Optional getRawSystemTable(ConnectorSession session, Schema // Only when dealing with an actual system table proceed to retrieve the table handle String name = DeltaLakeTableName.tableNameFrom(tableName.getTableName()); - DeltaLakeTableHandle tableHandle; + ConnectorTableHandle tableHandle; try { tableHandle = getTableHandle(session, new SchemaTableName(tableName.getSchemaName(), name)); } @@ -2450,6 +2465,9 @@ private Optional getRawSystemTable(ConnectorSession session, Schema if (tableHandle == null) { return Optional.empty(); } + if (tableHandle instanceof CorruptedDeltaLakeTableHandle) { + return Optional.empty(); + } Optional tableType = DeltaLakeTableName.tableTypeFrom(tableName.getTableName()); if (tableType.isEmpty()) { @@ -2461,7 +2479,7 @@ private Optional getRawSystemTable(ConnectorSession session, Schema case DATA -> Optional.empty(); // Handled above case HISTORY -> Optional.of(new DeltaLakeHistoryTable( systemTableName, - getCommitInfoEntries(tableHandle.getSchemaTableName(), session), + getCommitInfoEntries(((DeltaLakeTableHandle) tableHandle).getSchemaTableName(), session), typeManager)); }; } @@ -2571,6 +2589,15 @@ private static ColumnMetadata getColumnMetadata(DeltaLakeColumnHandle column, @N .build(); } + public static DeltaLakeTableHandle checkValidTableHandle(ConnectorTableHandle tableHandle) + { + requireNonNull(tableHandle, "tableHandle is null"); + if (tableHandle instanceof CorruptedDeltaLakeTableHandle corruptedTableHandle) { + throw corruptedTableHandle.createException(); + } + return ((DeltaLakeTableHandle) tableHandle); + } + public static TupleDomain createStatisticsPredicate( AddFileEntry addFileEntry, List schema, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java index 82ec0f446af4..56e13d4f79ae 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java @@ -18,7 +18,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.airlift.units.DataSize; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; -import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.SchemaTableName; @@ -30,7 +29,6 @@ import java.util.Set; import static com.google.common.base.Preconditions.checkArgument; -import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; import static io.trino.plugin.deltalake.DeltaLakeTableHandle.WriteType.UPDATE; import static java.util.Objects.requireNonNull; @@ -47,7 +45,7 @@ public enum WriteType private final String schemaName; private final String tableName; private final String location; - private final Optional metadataEntry; + private final MetadataEntry metadataEntry; private final TupleDomain enforcedPartitionConstraint; private final TupleDomain nonPartitionConstraint; private final Optional writeType; @@ -72,7 +70,7 @@ public DeltaLakeTableHandle( @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName, @JsonProperty("location") String location, - @JsonProperty("metadataEntry") Optional metadataEntry, + @JsonProperty("metadataEntry") MetadataEntry metadataEntry, @JsonProperty("enforcedPartitionConstraint") TupleDomain enforcedPartitionConstraint, @JsonProperty("nonPartitionConstraint") TupleDomain nonPartitionConstraint, @JsonProperty("writeType") Optional writeType, @@ -105,7 +103,7 @@ public DeltaLakeTableHandle( String schemaName, String tableName, String location, - Optional metadataEntry, + MetadataEntry metadataEntry, TupleDomain enforcedPartitionConstraint, TupleDomain nonPartitionConstraint, Optional writeType, @@ -140,19 +138,19 @@ public DeltaLakeTableHandle( public DeltaLakeTableHandle withProjectedColumns(Set projectedColumns) { return new DeltaLakeTableHandle( - getSchemaName(), - getTableName(), - getLocation(), - Optional.of(getMetadataEntry()), - getEnforcedPartitionConstraint(), - getNonPartitionConstraint(), - getWriteType(), + schemaName, + tableName, + location, + metadataEntry, + enforcedPartitionConstraint, + nonPartitionConstraint, + writeType, Optional.of(projectedColumns), - getUpdatedColumns(), - getUpdateRowIdColumns(), - getAnalyzeHandle(), - getReadVersion(), - isRetriesEnabled()); + updatedColumns, + updateRowIdColumns, + analyzeHandle, + readVersion, + retriesEnabled); } public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize maxScannedFileSize) @@ -196,7 +194,7 @@ public String getLocation() @JsonProperty public MetadataEntry getMetadataEntry() { - return metadataEntry.orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableName)); + return metadataEntry; } @JsonProperty diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastore.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastore.java index 68beb6b73650..29abd3174dbb 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastore.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastore.java @@ -49,7 +49,7 @@ public interface DeltaLakeMetastore void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to); - Optional getMetadata(TableSnapshot tableSnapshot, ConnectorSession session); + MetadataEntry getMetadata(TableSnapshot tableSnapshot, ConnectorSession session); ProtocolEntry getProtocol(ConnectorSession session, TableSnapshot table); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java index f4438d68b456..0bde3bb82b7e 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java @@ -156,10 +156,7 @@ public void createTable(ConnectorSession session, Table table, PrincipalPrivileg transactionLogAccess.invalidateCaches(tableLocation); try { TableSnapshot tableSnapshot = transactionLogAccess.loadSnapshot(table.getSchemaTableName(), tableLocation, session); - Optional maybeMetadata = transactionLogAccess.getMetadataEntry(tableSnapshot, session); - if (maybeMetadata.isEmpty()) { - throw new TrinoException(DELTA_LAKE_INVALID_TABLE, "Provided location did not contain a valid Delta Lake table: " + tableLocation); - } + transactionLogAccess.getMetadataEntry(tableSnapshot, session); // verify metadata exists } catch (IOException | RuntimeException e) { throw new TrinoException(DELTA_LAKE_INVALID_TABLE, "Failed to access table location: " + tableLocation, e); @@ -191,7 +188,7 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa } @Override - public Optional getMetadata(TableSnapshot tableSnapshot, ConnectorSession session) + public MetadataEntry getMetadata(TableSnapshot tableSnapshot, ConnectorSession session) { return transactionLogAccess.getMetadataEntry(tableSnapshot, session); } @@ -249,8 +246,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab double numRecords = 0L; - MetadataEntry metadata = transactionLogAccess.getMetadataEntry(tableSnapshot, session) - .orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableHandle.getTableName())); + MetadataEntry metadata = transactionLogAccess.getMetadataEntry(tableSnapshot, session); List columnMetadata = DeltaLakeSchemaSupport.extractSchema(metadata, typeManager); List columns = columnMetadata.stream() .map(columnMeta -> new DeltaLakeColumnHandle( diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java index 7358d21ae328..cd62e7010675 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java @@ -35,6 +35,7 @@ import io.trino.spi.classloader.ThreadContextClassLoader; import io.trino.spi.connector.ConnectorAccessControl; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.procedure.Procedure; import io.trino.spi.procedure.Procedure.Argument; @@ -55,6 +56,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.plugin.base.util.Procedures.checkProcedureArgument; +import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkValidTableHandle; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getVacuumMinRetention; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; @@ -128,7 +130,7 @@ public void vacuum( } catch (Exception e) { // This is not categorized as TrinoException. All possible external failures should be handled explicitly. - throw new RuntimeException(format("Failure when vacuuming %s.%s with retention %s", schema, table, retention), e); + throw new RuntimeException(format("Failure when vacuuming %s.%s with retention %s: %s", schema, table, retention, e), e); } } @@ -162,8 +164,9 @@ private void doVacuum( DeltaLakeMetadata metadata = metadataFactory.create(session.getIdentity()); SchemaTableName tableName = new SchemaTableName(schema, table); - DeltaLakeTableHandle handle = metadata.getTableHandle(session, tableName); - checkProcedureArgument(handle != null, "Table '%s' does not exist", tableName); + ConnectorTableHandle connectorTableHandle = metadata.getTableHandle(session, tableName); + checkProcedureArgument(connectorTableHandle != null, "Table '%s' does not exist", tableName); + DeltaLakeTableHandle handle = checkValidTableHandle(connectorTableHandle); accessControl.checkCanInsertIntoTable(null, tableName); accessControl.checkCanDeleteFromTable(null, tableName); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java index 77b7ee15a151..75ff341195b5 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java @@ -187,7 +187,7 @@ public void invalidateCaches(String tableLocation) activeDataFileCache.invalidate(tableLocation); } - public Optional getMetadataEntry(TableSnapshot tableSnapshot, ConnectorSession session) + public MetadataEntry getMetadataEntry(TableSnapshot tableSnapshot, ConnectorSession session) { if (tableSnapshot.getCachedMetadata().isEmpty()) { try (Stream metadataEntries = getEntries( @@ -201,7 +201,8 @@ public Optional getMetadataEntry(TableSnapshot tableSnapshot, Con tableSnapshot.setCachedMetadata(metadataEntries.reduce((first, second) -> second)); } } - return tableSnapshot.getCachedMetadata(); + return tableSnapshot.getCachedMetadata() + .orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable())); } public List getActiveFiles(TableSnapshot tableSnapshot, ConnectorSession session) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java index 45962593ace6..49fcd17ff41c 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java @@ -107,7 +107,7 @@ public void writeCheckpoint(ConnectorSession session, TableSnapshot snapshot) // so we can read add entries below this should be reworked so we pass metadata entry explicitly to getCheckpointTransactionLogEntries, // and we should get rid of `setCachedMetadata` in TableSnapshot to make it immutable. // Also more proper would be to use metadata entry obtained above in snapshot.getCheckpointTransactionLogEntries to read other checkpoint entries, but using newer one should not do harm. - checkState(transactionLogAccess.getMetadataEntry(snapshot, session).isPresent(), "metadata entry in snapshot null"); + transactionLogAccess.getMetadataEntry(snapshot, session); // register metadata entry in writer checkState(checkpointMetadataLogEntry.get().getMetaData() != null, "metaData not present in log entry"); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java index af63d28b2c53..3a621eb97f6c 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java @@ -23,7 +23,6 @@ import org.testng.annotations.Test; import java.io.IOException; -import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; @@ -116,8 +115,8 @@ public void testNoColumnStats() } @Test - public void testDropTableBadLocation() - throws IOException, URISyntaxException + public void testCorruptedTableLocation() + throws Exception { // create a bad_person table which is based on person table in temporary location String tableName = "bad_person"; @@ -129,7 +128,40 @@ public void testDropTableBadLocation() // break the table by deleting all its files including transaction log deleteRecursively(tableLocation, ALLOW_INSECURE); - // try to drop table + // Assert queries fail cleanly + assertQueryFails("TABLE " + tableName, "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("SELECT * FROM " + tableName + " WHERE false", "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("SELECT 1 FROM " + tableName + " WHERE false", "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("SHOW CREATE TABLE " + tableName, "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("CREATE TABLE a_new_table (LIKE " + tableName + " EXCLUDING PROPERTIES)", "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("DESCRIBE " + tableName, "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("SHOW COLUMNS FROM " + tableName, "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("SHOW STATS FOR " + tableName, "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("ANALYZE " + tableName, "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("ALTER TABLE " + tableName + " EXECUTE optimize", "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("ALTER TABLE " + tableName + " EXECUTE vacuum", "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("ALTER TABLE " + tableName + " RENAME TO bad_person_some_new_name", "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("ALTER TABLE " + tableName + " ADD COLUMN foo int", "Metadata not found in transaction log for tpch.bad_person"); + // TODO (https://github.com/trinodb/trino/issues/16248) ADD field + assertQueryFails("ALTER TABLE " + tableName + " DROP COLUMN foo", "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("ALTER TABLE " + tableName + " DROP COLUMN foo.bar", "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES change_data_feed_enabled = true", "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("INSERT INTO " + tableName + " VALUES (NULL)", "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("UPDATE " + tableName + " SET foo = 'bar'", "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("DELETE FROM " + tableName, "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("MERGE INTO " + tableName + " USING (SELECT 1 a) input ON true WHEN MATCHED THEN DELETE", "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("TRUNCATE TABLE " + tableName, "This connector does not support truncating tables"); + assertQueryFails("COMMENT ON TABLE " + tableName + " IS NULL", "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("COMMENT ON COLUMN " + tableName + ".foo IS NULL", "Metadata not found in transaction log for tpch.bad_person"); + assertQueryFails("CALL system.vacuum(CURRENT_SCHEMA, '" + tableName + "', '7d')", "Metadata not found in transaction log for tpch.bad_person"); + assertQuerySucceeds("CALL system.drop_extended_stats(CURRENT_SCHEMA, '" + tableName + "')"); + + // Avoid failing metadata queries + assertQuery("SHOW TABLES LIKE 'bad\\_perso_' ESCAPE '\\'", "VALUES 'bad_person'"); + assertQueryReturnsEmptyResult("SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = CURRENT_SCHEMA AND table_name LIKE 'bad\\_perso_' ESCAPE '\\'"); + assertQueryReturnsEmptyResult("SELECT column_name, data_type FROM system.jdbc.columns WHERE table_cat = CURRENT_CATALOG AND table_schem = CURRENT_SCHEMA AND table_name LIKE 'bad\\_perso_' ESCAPE '\\'"); + + // DROP TABLE should succeed so that users can remove their corrupted table getQueryRunner().execute("DROP TABLE " + tableName); assertFalse(getQueryRunner().tableExists(getSession(), tableName)); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java index dc221fde2597..9bfe92f321de 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java @@ -313,32 +313,6 @@ public void testGetInsertLayoutTableUnpartitioned() .isNotPresent(); } - @Test - public void testGetInsertLayoutTableNotFound() - { - SchemaTableName schemaTableName = newMockSchemaTableName(); - - DeltaLakeTableHandle missingTableHandle = new DeltaLakeTableHandle( - schemaTableName.getSchemaName(), - schemaTableName.getTableName(), - getTableLocation(schemaTableName), - Optional.empty(), - TupleDomain.none(), - TupleDomain.none(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - 0, - false); - - assertThatThrownBy(() -> deltaLakeMetadataFactory.create(SESSION.getIdentity()) - .getInsertLayout(SESSION, missingTableHandle)) - .isInstanceOf(TrinoException.class) - .hasMessage("Metadata not found in transaction log for " + schemaTableName.getTableName()); - } - @DataProvider public Object[][] testApplyProjectionProvider() { @@ -449,7 +423,7 @@ public void testGetInputInfoForPartitionedTable() ImmutableList.of(BIGINT_COLUMN_1, BIGINT_COLUMN_2), ImmutableList.of(BIGINT_COLUMN_1)); deltaLakeMetadata.createTable(SESSION, tableMetadata, false); - DeltaLakeTableHandle tableHandle = deltaLakeMetadata.getTableHandle(SESSION, tableMetadata.getTable()); + DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) deltaLakeMetadata.getTableHandle(SESSION, tableMetadata.getTable()); assertThat(deltaLakeMetadata.getInfo(tableHandle)).isEqualTo(Optional.of(new DeltaLakeInputInfo(true))); } @@ -461,7 +435,7 @@ public void testGetInputInfoForUnPartitionedTable() ImmutableList.of(BIGINT_COLUMN_1, BIGINT_COLUMN_2), ImmutableList.of()); deltaLakeMetadata.createTable(SESSION, tableMetadata, false); - DeltaLakeTableHandle tableHandle = deltaLakeMetadata.getTableHandle(SESSION, tableMetadata.getTable()); + DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) deltaLakeMetadata.getTableHandle(SESSION, tableMetadata.getTable()); assertThat(deltaLakeMetadata.getInfo(tableHandle)).isEqualTo(Optional.of(new DeltaLakeInputInfo(false))); } @@ -505,9 +479,9 @@ private static List createNewColumnAssignments(Map createMetadataEntry() + private static MetadataEntry createMetadataEntry() { - return Optional.of(new MetadataEntry( + return new MetadataEntry( "test_id", "test_name", "test_description", @@ -515,7 +489,7 @@ private static Optional createMetadataEntry() "test_schema", ImmutableList.of("test_partition_column"), ImmutableMap.of("test_configuration_key", "test_configuration_value"), - 1)); + 1); } private String getTableLocation(SchemaTableName schemaTableName) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java index 5d0f8e8f0d48..1aff34e3a8d9 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java @@ -68,7 +68,7 @@ public class TestDeltaLakeSplitManager "schema", "table", "location", - Optional.of(metadataEntry), + metadataEntry, TupleDomain.all(), TupleDomain.all(), Optional.empty(), @@ -273,7 +273,7 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa } @Override - public Optional getMetadata(TableSnapshot tableSnapshot, ConnectorSession session) + public MetadataEntry getMetadata(TableSnapshot tableSnapshot, ConnectorSession session) { throw new UnsupportedOperationException("Unimplemented"); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java index 000620b55783..4757f8c45a2a 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java @@ -137,7 +137,7 @@ private void setupTransactionLogAccess(String tableName, String tableLocation, D "schema", tableName, "location", - Optional.empty(), // ignored + new MetadataEntry("id", "test", "description", null, "", ImmutableList.of(), ImmutableMap.of(), 0), TupleDomain.none(), TupleDomain.none(), Optional.empty(), @@ -157,7 +157,7 @@ public void testGetMetadataEntry() { setupTransactionLogAccess("person"); - MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION).get(); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); assertEquals(metadataEntry.getCreatedTime(), 1579190100722L); assertEquals(metadataEntry.getId(), "b6aeffad-da73-4dde-b68e-937e468b1fdf"); @@ -176,7 +176,7 @@ public void testGetMetadataEntryUppercase() throws Exception { setupTransactionLogAccess("uppercase_columns"); - MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION).get(); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); assertThat(metadataEntry.getOriginalPartitionColumns()).containsOnly("ALA"); assertThat(metadataEntry.getCanonicalPartitionColumns()).containsOnly("ala"); assertEquals(tableSnapshot.getCachedMetadata(), Optional.of(metadataEntry)); @@ -343,7 +343,7 @@ public void testAllGetMetadataEntry(String tableName) setupTransactionLogAccess(tableName); transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); - MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION).get(); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); assertThat(metadataEntry.getOriginalPartitionColumns()).containsOnly("age"); @@ -599,7 +599,7 @@ public void testSnapshotsAreConsistent() } assertEquals(expectedDataFiles.size(), dataFilesWithFixedVersion.size()); - List columns = extractColumnMetadata(transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION).get(), TESTING_TYPE_MANAGER); + List columns = extractColumnMetadata(transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION), TESTING_TYPE_MANAGER); for (int i = 0; i < expectedDataFiles.size(); i++) { AddFileEntry expected = expectedDataFiles.get(i); AddFileEntry actual = dataFilesWithFixedVersion.get(i); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java index 41887cbfc7c9..4c71580fb6f5 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java @@ -151,7 +151,7 @@ private DeltaLakeTableHandle registerTable(String tableName, String directoryNam "db_name", tableName, "location", - Optional.of(new MetadataEntry("id", "test", "description", null, "", ImmutableList.of(), ImmutableMap.of(), 0)), + new MetadataEntry("id", "test", "description", null, "", ImmutableList.of(), ImmutableMap.of(), 0), TupleDomain.all(), TupleDomain.all(), Optional.empty(), @@ -281,7 +281,7 @@ public void testStatisticsMultipleFiles() tableHandle.getSchemaName(), tableHandle.getTableName(), tableHandle.getLocation(), - Optional.of(tableHandle.getMetadataEntry()), + tableHandle.getMetadataEntry(), TupleDomain.all(), TupleDomain.withColumnDomains(ImmutableMap.of((DeltaLakeColumnHandle) COLUMN_HANDLE, Domain.singleValue(DOUBLE, 42.0))), tableHandle.getWriteType(), @@ -305,7 +305,7 @@ public void testStatisticsNoRecords() tableHandle.getSchemaName(), tableHandle.getTableName(), tableHandle.getLocation(), - Optional.of(tableHandle.getMetadataEntry()), + tableHandle.getMetadataEntry(), TupleDomain.none(), TupleDomain.all(), tableHandle.getWriteType(), @@ -319,7 +319,7 @@ public void testStatisticsNoRecords() tableHandle.getSchemaName(), tableHandle.getTableName(), tableHandle.getLocation(), - Optional.of(tableHandle.getMetadataEntry()), + tableHandle.getMetadataEntry(), TupleDomain.all(), TupleDomain.none(), tableHandle.getWriteType(),