Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Delta's DROP TABLE support for corrupted tables #16651

Merged
merged 4 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion core/trino-spi/src/main/java/io/trino/spi/TrinoException.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,20 @@ public TrinoException(ErrorCodeSupplier errorCode, String message, Throwable cau
this(errorCode, Optional.empty(), message, cause);
}

public TrinoException(ErrorCode errorCode, String message, Throwable cause)
{
this(errorCode, Optional.empty(), message, cause);
}

public TrinoException(ErrorCodeSupplier errorCodeSupplier, Optional<Location> location, String message, Throwable cause)
{
this(errorCodeSupplier.toErrorCode(), location, message, cause);
}

private TrinoException(ErrorCode errorCode, Optional<Location> location, String message, Throwable cause)
{
super(message, cause);
this.errorCode = errorCodeSupplier.toErrorCode();
this.errorCode = requireNonNull(errorCode, "errorCode is null");
this.location = requireNonNull(location, "location is null");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.SchemaTableName;

import static java.util.Objects.requireNonNull;

public record CorruptedDeltaLakeTableHandle(
SchemaTableName schemaTableName,
TrinoException originalException)
implements ConnectorTableHandle
{
public CorruptedDeltaLakeTableHandle
{
requireNonNull(schemaTableName, "schemaTableName is null");
requireNonNull(originalException, "originalException is null");
}

public TrinoException createException()
{
// Original exception originates from a different place. Create a new exception not to confuse reader with a stacktrace not matching call site.
return new TrinoException(originalException.getErrorCode(), originalException.getMessage(), originalException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
}

findepi marked this conversation as resolved.
Show resolved Hide resolved
@Override
public DeltaLakeTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
{
requireNonNull(tableName, "tableName is null");
if (!DeltaLakeTableName.isDataTable(tableName.getTableName())) {
Expand All @@ -415,13 +415,22 @@ public DeltaLakeTableHandle getTableHandle(ConnectorSession session, SchemaTable
}

TableSnapshot tableSnapshot = metastore.getSnapshot(dataTableName, session);
Optional<MetadataEntry> metadata = metastore.getMetadata(tableSnapshot, session);
metadata.ifPresent(metadataEntry -> verifySupportedColumnMapping(getColumnMappingMode(metadataEntry)));
MetadataEntry metadataEntry;
try {
metadataEntry = metastore.getMetadata(tableSnapshot, session);
}
catch (TrinoException e) {
if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) {
return new CorruptedDeltaLakeTableHandle(dataTableName, e);
}
throw e;
}
verifySupportedColumnMapping(getColumnMappingMode(metadataEntry));
return new DeltaLakeTableHandle(
dataTableName.getSchemaName(),
dataTableName.getTableName(),
metastore.getTableLocation(dataTableName),
metadata,
metadataEntry,
TupleDomain.all(),
TupleDomain.all(),
Optional.empty(),
Expand All @@ -448,7 +457,7 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con
@Override
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
{
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) table;
DeltaLakeTableHandle tableHandle = checkValidTableHandle(table);
String location = metastore.getTableLocation(tableHandle.getSchemaTableName());
Map<String, String> columnComments = getColumnComments(tableHandle.getMetadataEntry());
Map<String, Boolean> columnsNullability = getColumnsNullability(tableHandle.getMetadataEntry());
Expand Down Expand Up @@ -495,7 +504,7 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle;
DeltaLakeTableHandle table = checkValidTableHandle(tableHandle);
return getColumns(table.getMetadataEntry()).stream()
.collect(toImmutableMap(DeltaLakeColumnHandle::getName, identity()));
}
Expand Down Expand Up @@ -564,16 +573,14 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio
return Stream.of(TableColumnsMetadata.forRedirectedTable(table));
}

// intentionally skip case when table snapshot is present but it lacks metadata portion
return metastore.getMetadata(metastore.getSnapshot(table, session), session).stream().map(metadata -> {
Map<String, String> columnComments = getColumnComments(metadata);
Map<String, Boolean> columnsNullability = getColumnsNullability(metadata);
Map<String, String> columnGenerations = getGeneratedColumnExpressions(metadata);
List<ColumnMetadata> columnMetadata = getColumns(metadata).stream()
.map(column -> getColumnMetadata(column, columnComments.get(column.getName()), columnsNullability.getOrDefault(column.getName(), true), columnGenerations.get(column.getName())))
.collect(toImmutableList());
return TableColumnsMetadata.forTable(table, columnMetadata);
});
MetadataEntry metadata = metastore.getMetadata(metastore.getSnapshot(table, session), session);
Map<String, String> columnComments = getColumnComments(metadata);
Map<String, Boolean> columnsNullability = getColumnsNullability(metadata);
Map<String, String> columnGenerations = getGeneratedColumnExpressions(metadata);
List<ColumnMetadata> columnMetadata = getColumns(metadata).stream()
.map(column -> getColumnMetadata(column, columnComments.get(column.getName()), columnsNullability.getOrDefault(column.getName(), true), columnGenerations.get(column.getName())))
.collect(toImmutableList());
return Stream.of(TableColumnsMetadata.forTable(table, columnMetadata));
}
catch (NotADeltaLakeTableException e) {
return Stream.empty();
Expand Down Expand Up @@ -603,10 +610,11 @@ private List<DeltaLakeColumnHandle> getColumns(MetadataEntry deltaMetadata)
@Override
public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle)
{
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
if (!isTableStatisticsEnabled(session)) {
return TableStatistics.empty();
}
return metastore.getTableStatistics(session, (DeltaLakeTableHandle) tableHandle);
return metastore.getTableStatistics(session, handle);
}

@Override
Expand Down Expand Up @@ -1011,7 +1019,7 @@ private static boolean isCreatedBy(Table table, String queryId)
@Override
public void setTableComment(ConnectorSession session, ConnectorTableHandle tableHandle, Optional<String> comment)
{
DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
checkSupportedWriterVersion(session, handle.getSchemaTableName());

ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle);
Expand Down Expand Up @@ -1096,7 +1104,7 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl
@Override
public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata newColumnMetadata)
{
DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
checkSupportedWriterVersion(session, handle.getSchemaTableName());

if (!newColumnMetadata.isNullable() && !metastore.getValidDataFiles(handle.getSchemaTableName(), session).isEmpty()) {
Expand Down Expand Up @@ -1492,7 +1500,7 @@ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
Map<String, Object> executeProperties,
RetryMode retryMode)
{
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) connectorTableHandle;
DeltaLakeTableHandle tableHandle = checkValidTableHandle(connectorTableHandle);

DeltaLakeTableProcedureId procedureId;
try {
Expand Down Expand Up @@ -1766,18 +1774,25 @@ public Optional<Object> getInfo(ConnectorTableHandle table)
@Override
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
SchemaTableName schemaTableName;
if (tableHandle instanceof CorruptedDeltaLakeTableHandle corruptedTableHandle) {
schemaTableName = corruptedTableHandle.schemaTableName();
}
else {
DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
schemaTableName = handle.getSchemaTableName();
}

Table table = metastore.getTable(handle.getSchemaName(), handle.getTableName())
.orElseThrow(() -> new TableNotFoundException(handle.getSchemaTableName()));
Table table = metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(schemaTableName));

metastore.dropTable(session, handle.getSchemaName(), handle.getTableName(), table.getTableType().equals(MANAGED_TABLE.toString()));
metastore.dropTable(session, schemaTableName.getSchemaName(), schemaTableName.getTableName(), table.getTableType().equals(MANAGED_TABLE.toString()));
}

@Override
public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTableName)
{
DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
Table table = metastore.getTable(handle.getSchemaName(), handle.getTableName())
.orElseThrow(() -> new TableNotFoundException(handle.getSchemaTableName()));
if (table.getTableType().equals(MANAGED_TABLE.name()) && !allowManagedTableRename) {
Expand Down Expand Up @@ -1811,12 +1826,12 @@ private CommitInfoEntry getCommitInfoEntry(
@Override
public void setTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle, Map<String, Optional<Object>> properties)
{
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
Set<String> unsupportedProperties = difference(properties.keySet(), UPDATABLE_TABLE_PROPERTIES);
if (!unsupportedProperties.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "The following properties cannot be updated: " + String.join(", ", unsupportedProperties));
}

DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
ProtocolEntry currentProtocolEntry = getProtocolEntry(session, handle.getSchemaTableName());

long createdTime = Instant.now().toEpochMilli();
Expand Down Expand Up @@ -2030,7 +2045,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
tableName.getSchemaName(),
tableName.getTableName(),
tableHandle.getLocation(),
Optional.of(tableHandle.getMetadataEntry()),
tableHandle.getMetadataEntry(),
// Do not simplify the enforced constraint, the connector is guaranteeing the constraint will be applied as is.
// The unenforced constraint will still be checked by the engine.
tableHandle.getEnforcedPartitionConstraint()
Expand Down Expand Up @@ -2106,7 +2121,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
DeltaLakeSessionProperties.EXTENDED_STATISTICS_ENABLED));
}

DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
MetadataEntry metadata = handle.getMetadataEntry();

Optional<Instant> filesModifiedAfterFromProperties = getFilesModifiedAfterProperty(analyzeProperties);
Expand Down Expand Up @@ -2157,7 +2172,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
handle.getSchemaTableName().getSchemaName(),
handle.getSchemaTableName().getTableName(),
handle.getLocation(),
Optional.of(metadata),
metadata,
TupleDomain.all(),
TupleDomain.all(),
Optional.empty(),
Expand Down Expand Up @@ -2439,7 +2454,7 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema

// Only when dealing with an actual system table proceed to retrieve the table handle
String name = DeltaLakeTableName.tableNameFrom(tableName.getTableName());
DeltaLakeTableHandle tableHandle;
ConnectorTableHandle tableHandle;
try {
tableHandle = getTableHandle(session, new SchemaTableName(tableName.getSchemaName(), name));
}
Expand All @@ -2450,6 +2465,9 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
if (tableHandle == null) {
return Optional.empty();
}
if (tableHandle instanceof CorruptedDeltaLakeTableHandle) {
return Optional.empty();
}

Optional<DeltaLakeTableType> tableType = DeltaLakeTableName.tableTypeFrom(tableName.getTableName());
if (tableType.isEmpty()) {
Expand All @@ -2461,7 +2479,7 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
case DATA -> Optional.empty(); // Handled above
case HISTORY -> Optional.of(new DeltaLakeHistoryTable(
systemTableName,
getCommitInfoEntries(tableHandle.getSchemaTableName(), session),
getCommitInfoEntries(((DeltaLakeTableHandle) tableHandle).getSchemaTableName(), session),
typeManager));
};
}
Expand Down Expand Up @@ -2571,6 +2589,15 @@ private static ColumnMetadata getColumnMetadata(DeltaLakeColumnHandle column, @N
.build();
}

public static DeltaLakeTableHandle checkValidTableHandle(ConnectorTableHandle tableHandle)
{
requireNonNull(tableHandle, "tableHandle is null");
if (tableHandle instanceof CorruptedDeltaLakeTableHandle corruptedTableHandle) {
throw corruptedTableHandle.createException();
}
return ((DeltaLakeTableHandle) tableHandle);
}

public static TupleDomain<DeltaLakeColumnHandle> createStatisticsPredicate(
AddFileEntry addFileEntry,
List<DeltaLakeColumnMetadata> schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.airlift.units.DataSize;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.SchemaTableName;
Expand All @@ -30,7 +29,6 @@
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.DeltaLakeTableHandle.WriteType.UPDATE;
import static java.util.Objects.requireNonNull;

Expand All @@ -47,7 +45,7 @@ public enum WriteType
private final String schemaName;
private final String tableName;
private final String location;
private final Optional<MetadataEntry> metadataEntry;
private final MetadataEntry metadataEntry;
private final TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint;
private final TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint;
private final Optional<WriteType> writeType;
Expand All @@ -72,7 +70,7 @@ public DeltaLakeTableHandle(
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("location") String location,
@JsonProperty("metadataEntry") Optional<MetadataEntry> metadataEntry,
@JsonProperty("metadataEntry") MetadataEntry metadataEntry,
@JsonProperty("enforcedPartitionConstraint") TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
@JsonProperty("nonPartitionConstraint") TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
@JsonProperty("writeType") Optional<WriteType> writeType,
Expand Down Expand Up @@ -105,7 +103,7 @@ public DeltaLakeTableHandle(
String schemaName,
String tableName,
String location,
Optional<MetadataEntry> metadataEntry,
MetadataEntry metadataEntry,
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Optional<WriteType> writeType,
Expand Down Expand Up @@ -140,19 +138,19 @@ public DeltaLakeTableHandle(
public DeltaLakeTableHandle withProjectedColumns(Set<ColumnHandle> projectedColumns)
{
return new DeltaLakeTableHandle(
getSchemaName(),
getTableName(),
getLocation(),
Optional.of(getMetadataEntry()),
getEnforcedPartitionConstraint(),
getNonPartitionConstraint(),
getWriteType(),
schemaName,
tableName,
location,
metadataEntry,
enforcedPartitionConstraint,
nonPartitionConstraint,
writeType,
Optional.of(projectedColumns),
getUpdatedColumns(),
getUpdateRowIdColumns(),
getAnalyzeHandle(),
getReadVersion(),
isRetriesEnabled());
updatedColumns,
updateRowIdColumns,
analyzeHandle,
readVersion,
retriesEnabled);
}

public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize maxScannedFileSize)
Expand Down Expand Up @@ -196,7 +194,7 @@ public String getLocation()
@JsonProperty
public MetadataEntry getMetadataEntry()
{
return metadataEntry.orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableName));
return metadataEntry;
}

@JsonProperty
Expand Down
Loading