Skip to content

Commit

Permalink
Handle corrupted Delta Lake tables with explicit table handle
Browse files Browse the repository at this point in the history
Previously, a corrupted table handle was represented with a
`DeltaLakeTableHandle` missing a `MetadataEntry`.  When drop support for
corrupted tables was implemented, a connector could have only one table
handle class.  This commit improves the distinction by introducing a
dedicated table handle class to represent corrupted tables. As a
necessity, this class is explicitly handled in multiple
`DeltaLakeMetadata` methods. This sets viable example to follow for
implementing drop support for corrupted Iceberg tables as a follow-up.

Note: `testGetInsertLayoutTableNotFound` test was removed, instead of
being updated, since `ConnectorMetadata.getInsertLayout` cannot be
reached for a corrupted table, as getting column list will fail earlier.
  • Loading branch information
findepi committed Mar 24, 2023
1 parent f76a348 commit d2534ab
Show file tree
Hide file tree
Showing 14 changed files with 144 additions and 98 deletions.
12 changes: 11 additions & 1 deletion core/trino-spi/src/main/java/io/trino/spi/TrinoException.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,20 @@ public TrinoException(ErrorCodeSupplier errorCode, String message, Throwable cau
this(errorCode, Optional.empty(), message, cause);
}

public TrinoException(ErrorCode errorCode, String message, Throwable cause)
{
this(errorCode, Optional.empty(), message, cause);
}

public TrinoException(ErrorCodeSupplier errorCodeSupplier, Optional<Location> location, String message, Throwable cause)
{
this(errorCodeSupplier.toErrorCode(), location, message, cause);
}

private TrinoException(ErrorCode errorCode, Optional<Location> location, String message, Throwable cause)
{
super(message, cause);
this.errorCode = errorCodeSupplier.toErrorCode();
this.errorCode = requireNonNull(errorCode, "errorCode is null");
this.location = requireNonNull(location, "location is null");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.SchemaTableName;

import static java.util.Objects.requireNonNull;

public record CorruptedDeltaLakeTableHandle(
SchemaTableName schemaTableName,
TrinoException originalException)
implements ConnectorTableHandle
{
public CorruptedDeltaLakeTableHandle
{
requireNonNull(schemaTableName, "schemaTableName is null");
requireNonNull(originalException, "originalException is null");
}

public TrinoException createException()
{
// Original exception originates from a different place. Create a new exception not to confuse reader with a stacktrace not matching call site.
return new TrinoException(originalException.getErrorCode(), originalException.getMessage(), originalException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
}

@Override
public DeltaLakeTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
{
requireNonNull(tableName, "tableName is null");
if (!DeltaLakeTableName.isDataTable(tableName.getTableName())) {
Expand All @@ -415,13 +415,22 @@ public DeltaLakeTableHandle getTableHandle(ConnectorSession session, SchemaTable
}

TableSnapshot tableSnapshot = metastore.getSnapshot(dataTableName, session);
Optional<MetadataEntry> metadata = metastore.getMetadata(tableSnapshot, session);
metadata.ifPresent(metadataEntry -> verifySupportedColumnMapping(getColumnMappingMode(metadataEntry)));
MetadataEntry metadataEntry;
try {
metadataEntry = metastore.getMetadata(tableSnapshot, session);
}
catch (TrinoException e) {
if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) {
return new CorruptedDeltaLakeTableHandle(dataTableName, e);
}
throw e;
}
verifySupportedColumnMapping(getColumnMappingMode(metadataEntry));
return new DeltaLakeTableHandle(
dataTableName.getSchemaName(),
dataTableName.getTableName(),
metastore.getTableLocation(dataTableName),
metadata,
metadataEntry,
TupleDomain.all(),
TupleDomain.all(),
Optional.empty(),
Expand All @@ -448,7 +457,7 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con
@Override
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
{
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) table;
DeltaLakeTableHandle tableHandle = checkValidTableHandle(table);
String location = metastore.getTableLocation(tableHandle.getSchemaTableName());
Map<String, String> columnComments = getColumnComments(tableHandle.getMetadataEntry());
Map<String, Boolean> columnsNullability = getColumnsNullability(tableHandle.getMetadataEntry());
Expand Down Expand Up @@ -495,7 +504,7 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle;
DeltaLakeTableHandle table = checkValidTableHandle(tableHandle);
return getColumns(table.getMetadataEntry()).stream()
.collect(toImmutableMap(DeltaLakeColumnHandle::getName, identity()));
}
Expand Down Expand Up @@ -564,16 +573,14 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio
return Stream.of(TableColumnsMetadata.forRedirectedTable(table));
}

// intentionally skip case when table snapshot is present but it lacks metadata portion
return metastore.getMetadata(metastore.getSnapshot(table, session), session).stream().map(metadata -> {
Map<String, String> columnComments = getColumnComments(metadata);
Map<String, Boolean> columnsNullability = getColumnsNullability(metadata);
Map<String, String> columnGenerations = getGeneratedColumnExpressions(metadata);
List<ColumnMetadata> columnMetadata = getColumns(metadata).stream()
.map(column -> getColumnMetadata(column, columnComments.get(column.getName()), columnsNullability.getOrDefault(column.getName(), true), columnGenerations.get(column.getName())))
.collect(toImmutableList());
return TableColumnsMetadata.forTable(table, columnMetadata);
});
MetadataEntry metadata = metastore.getMetadata(metastore.getSnapshot(table, session), session);
Map<String, String> columnComments = getColumnComments(metadata);
Map<String, Boolean> columnsNullability = getColumnsNullability(metadata);
Map<String, String> columnGenerations = getGeneratedColumnExpressions(metadata);
List<ColumnMetadata> columnMetadata = getColumns(metadata).stream()
.map(column -> getColumnMetadata(column, columnComments.get(column.getName()), columnsNullability.getOrDefault(column.getName(), true), columnGenerations.get(column.getName())))
.collect(toImmutableList());
return Stream.of(TableColumnsMetadata.forTable(table, columnMetadata));
}
catch (NotADeltaLakeTableException e) {
return Stream.empty();
Expand Down Expand Up @@ -603,10 +610,11 @@ private List<DeltaLakeColumnHandle> getColumns(MetadataEntry deltaMetadata)
@Override
public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle)
{
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
if (!isTableStatisticsEnabled(session)) {
return TableStatistics.empty();
}
return metastore.getTableStatistics(session, (DeltaLakeTableHandle) tableHandle);
return metastore.getTableStatistics(session, handle);
}

@Override
Expand Down Expand Up @@ -1011,7 +1019,7 @@ private static boolean isCreatedBy(Table table, String queryId)
@Override
public void setTableComment(ConnectorSession session, ConnectorTableHandle tableHandle, Optional<String> comment)
{
DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
checkSupportedWriterVersion(session, handle.getSchemaTableName());

ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle);
Expand Down Expand Up @@ -1096,7 +1104,7 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl
@Override
public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata newColumnMetadata)
{
DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
checkSupportedWriterVersion(session, handle.getSchemaTableName());

if (!newColumnMetadata.isNullable() && !metastore.getValidDataFiles(handle.getSchemaTableName(), session).isEmpty()) {
Expand Down Expand Up @@ -1492,7 +1500,7 @@ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
Map<String, Object> executeProperties,
RetryMode retryMode)
{
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) connectorTableHandle;
DeltaLakeTableHandle tableHandle = checkValidTableHandle(connectorTableHandle);

DeltaLakeTableProcedureId procedureId;
try {
Expand Down Expand Up @@ -1766,18 +1774,25 @@ public Optional<Object> getInfo(ConnectorTableHandle table)
@Override
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
SchemaTableName schemaTableName;
if (tableHandle instanceof CorruptedDeltaLakeTableHandle corruptedTableHandle) {
schemaTableName = corruptedTableHandle.schemaTableName();
}
else {
DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
schemaTableName = handle.getSchemaTableName();
}

Table table = metastore.getTable(handle.getSchemaName(), handle.getTableName())
.orElseThrow(() -> new TableNotFoundException(handle.getSchemaTableName()));
Table table = metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(schemaTableName));

metastore.dropTable(session, handle.getSchemaName(), handle.getTableName(), table.getTableType().equals(MANAGED_TABLE.toString()));
metastore.dropTable(session, schemaTableName.getSchemaName(), schemaTableName.getTableName(), table.getTableType().equals(MANAGED_TABLE.toString()));
}

@Override
public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTableName)
{
DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
Table table = metastore.getTable(handle.getSchemaName(), handle.getTableName())
.orElseThrow(() -> new TableNotFoundException(handle.getSchemaTableName()));
if (table.getTableType().equals(MANAGED_TABLE.name()) && !allowManagedTableRename) {
Expand Down Expand Up @@ -1811,12 +1826,12 @@ private CommitInfoEntry getCommitInfoEntry(
@Override
public void setTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle, Map<String, Optional<Object>> properties)
{
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
Set<String> unsupportedProperties = difference(properties.keySet(), UPDATABLE_TABLE_PROPERTIES);
if (!unsupportedProperties.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "The following properties cannot be updated: " + String.join(", ", unsupportedProperties));
}

DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
ProtocolEntry currentProtocolEntry = getProtocolEntry(session, handle.getSchemaTableName());

long createdTime = Instant.now().toEpochMilli();
Expand Down Expand Up @@ -2030,7 +2045,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
tableName.getSchemaName(),
tableName.getTableName(),
tableHandle.getLocation(),
Optional.of(tableHandle.getMetadataEntry()),
tableHandle.getMetadataEntry(),
// Do not simplify the enforced constraint, the connector is guaranteeing the constraint will be applied as is.
// The unenforced constraint will still be checked by the engine.
tableHandle.getEnforcedPartitionConstraint()
Expand Down Expand Up @@ -2106,7 +2121,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
DeltaLakeSessionProperties.EXTENDED_STATISTICS_ENABLED));
}

DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
MetadataEntry metadata = handle.getMetadataEntry();

Optional<Instant> filesModifiedAfterFromProperties = getFilesModifiedAfterProperty(analyzeProperties);
Expand Down Expand Up @@ -2157,7 +2172,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
handle.getSchemaTableName().getSchemaName(),
handle.getSchemaTableName().getTableName(),
handle.getLocation(),
Optional.of(metadata),
metadata,
TupleDomain.all(),
TupleDomain.all(),
Optional.empty(),
Expand Down Expand Up @@ -2439,7 +2454,7 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema

// Only when dealing with an actual system table proceed to retrieve the table handle
String name = DeltaLakeTableName.tableNameFrom(tableName.getTableName());
DeltaLakeTableHandle tableHandle;
ConnectorTableHandle tableHandle;
try {
tableHandle = getTableHandle(session, new SchemaTableName(tableName.getSchemaName(), name));
}
Expand All @@ -2450,6 +2465,9 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
if (tableHandle == null) {
return Optional.empty();
}
if (tableHandle instanceof CorruptedDeltaLakeTableHandle) {
return Optional.empty();
}

Optional<DeltaLakeTableType> tableType = DeltaLakeTableName.tableTypeFrom(tableName.getTableName());
if (tableType.isEmpty()) {
Expand All @@ -2460,7 +2478,7 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
case DATA -> throw new VerifyException("Unexpected DATA table type"); // Handled above.
case HISTORY -> Optional.of(new DeltaLakeHistoryTable(
systemTableName,
getCommitInfoEntries(tableHandle.getSchemaTableName(), session),
getCommitInfoEntries(((DeltaLakeTableHandle) tableHandle).getSchemaTableName(), session),
typeManager));
};
}
Expand Down Expand Up @@ -2570,6 +2588,15 @@ private static ColumnMetadata getColumnMetadata(DeltaLakeColumnHandle column, @N
.build();
}

public static DeltaLakeTableHandle checkValidTableHandle(ConnectorTableHandle tableHandle)
{
requireNonNull(tableHandle, "tableHandle is null");
if (tableHandle instanceof CorruptedDeltaLakeTableHandle corruptedTableHandle) {
throw corruptedTableHandle.createException();
}
return ((DeltaLakeTableHandle) tableHandle);
}

public static TupleDomain<DeltaLakeColumnHandle> createStatisticsPredicate(
AddFileEntry addFileEntry,
List<DeltaLakeColumnMetadata> schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.airlift.units.DataSize;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.SchemaTableName;
Expand All @@ -30,7 +29,6 @@
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.DeltaLakeTableHandle.WriteType.UPDATE;
import static java.util.Objects.requireNonNull;

Expand All @@ -47,7 +45,7 @@ public enum WriteType
private final String schemaName;
private final String tableName;
private final String location;
private final Optional<MetadataEntry> metadataEntry;
private final MetadataEntry metadataEntry;
private final TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint;
private final TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint;
private final Optional<WriteType> writeType;
Expand All @@ -72,7 +70,7 @@ public DeltaLakeTableHandle(
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("location") String location,
@JsonProperty("metadataEntry") Optional<MetadataEntry> metadataEntry,
@JsonProperty("metadataEntry") MetadataEntry metadataEntry,
@JsonProperty("enforcedPartitionConstraint") TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
@JsonProperty("nonPartitionConstraint") TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
@JsonProperty("writeType") Optional<WriteType> writeType,
Expand Down Expand Up @@ -105,7 +103,7 @@ public DeltaLakeTableHandle(
String schemaName,
String tableName,
String location,
Optional<MetadataEntry> metadataEntry,
MetadataEntry metadataEntry,
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Optional<WriteType> writeType,
Expand Down Expand Up @@ -196,7 +194,7 @@ public String getLocation()
@JsonProperty
public MetadataEntry getMetadataEntry()
{
return metadataEntry.orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + getSchemaTableName()));
return metadataEntry;
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public interface DeltaLakeMetastore

void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to);

Optional<MetadataEntry> getMetadata(TableSnapshot tableSnapshot, ConnectorSession session);
MetadataEntry getMetadata(TableSnapshot tableSnapshot, ConnectorSession session);

ProtocolEntry getProtocol(ConnectorSession session, TableSnapshot table);

Expand Down
Loading

0 comments on commit d2534ab

Please sign in to comment.