Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Iceberg's DROP TABLE for corrupted tables #16674

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.SchemaTableName;

import static java.util.Objects.requireNonNull;

public record CorruptedIcebergTableHandle(SchemaTableName schemaTableName, TrinoException originalException)
implements ConnectorTableHandle
{
public CorruptedIcebergTableHandle
{
requireNonNull(schemaTableName, "schemaTableName is null");
requireNonNull(originalException, "originalException is null");
krvikash marked this conversation as resolved.
Show resolved Hide resolved
}

public TrinoException createException()
{
// Original exception originates from a different place. Create a new exception not to confuse reader with a stacktrace not matching call site.
return new TrinoException(originalException.getErrorCode(), originalException.getMessage(), originalException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum IcebergErrorCode
ICEBERG_COMMIT_ERROR(12, EXTERNAL),
ICEBERG_CATALOG_ERROR(13, EXTERNAL),
ICEBERG_WRITER_CLOSE_ERROR(14, EXTERNAL),
ICEBERG_MISSING_METADATA(15, EXTERNAL),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_MISSING_METADATA;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_MODIFIED_TIME;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_PATH;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId;
Expand Down Expand Up @@ -348,7 +349,7 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
}

@Override
public IcebergTableHandle getTableHandle(
public ConnectorTableHandle getTableHandle(
ConnectorSession session,
SchemaTableName tableName,
Optional<ConnectorTableVersion> startVersion,
Expand All @@ -370,6 +371,12 @@ public IcebergTableHandle getTableHandle(
catch (TableNotFoundException e) {
return null;
}
catch (TrinoException e) {
if (e.getErrorCode().equals(ICEBERG_MISSING_METADATA.toErrorCode())) {
return new CorruptedIcebergTableHandle(tableName, e);
}
throw e;
}

Optional<Long> tableSnapshotId;
Schema tableSchema;
Expand Down Expand Up @@ -568,7 +575,7 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con
@Override
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
{
IcebergTableHandle tableHandle = (IcebergTableHandle) table;
IcebergTableHandle tableHandle = checkValidTableHandle(table);
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());
List<ColumnMetadata> columns = getColumnMetadatas(SchemaParser.fromJson(tableHandle.getTableSchemaJson()));
return new ConnectorTableMetadata(tableHandle.getSchemaTableName(), columns, getIcebergTableProperties(icebergTable), getTableComment(icebergTable));
Expand All @@ -583,7 +590,7 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
IcebergTableHandle table = checkValidTableHandle(tableHandle);
ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
for (IcebergColumnHandle columnHandle : getColumns(SchemaParser.fromJson(table.getTableSchemaJson()), typeManager)) {
columnHandles.put(columnHandle.getName(), columnHandle);
Expand Down Expand Up @@ -683,7 +690,8 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
@Override
public void setTableComment(ConnectorSession session, ConnectorTableHandle tableHandle, Optional<String> comment)
{
catalog.updateTableComment(session, ((IcebergTableHandle) tableHandle).getSchemaTableName(), comment);
IcebergTableHandle handle = checkValidTableHandle(tableHandle);
catalog.updateTableComment(session, handle.getSchemaTableName(), comment);
}

@Override
Expand Down Expand Up @@ -1486,19 +1494,25 @@ public Optional<Object> getInfo(ConnectorTableHandle tableHandle)
@Override
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
catalog.dropTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName());
if (tableHandle instanceof CorruptedIcebergTableHandle corruptedTableHandle) {
catalog.dropCorruptedTable(session, corruptedTableHandle.schemaTableName());
}
else {
catalog.dropTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName());
}
}

@Override
public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTable)
{
catalog.renameTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName(), newTable);
IcebergTableHandle handle = checkValidTableHandle(tableHandle);
catalog.renameTable(session, handle.getSchemaTableName(), newTable);
}

@Override
public void setTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle, Map<String, Optional<Object>> properties)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
IcebergTableHandle table = checkValidTableHandle(tableHandle);
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());

Set<String> unsupportedProperties = difference(properties.keySet(), UPDATABLE_TABLE_PROPERTIES);
Expand Down Expand Up @@ -1778,12 +1792,12 @@ public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Connector
return TableStatisticsMetadata.empty();
}

IcebergTableHandle tableHandle = getTableHandle(session, tableMetadata.getTable(), Optional.empty(), Optional.empty());
ConnectorTableHandle tableHandle = getTableHandle(session, tableMetadata.getTable(), Optional.empty(), Optional.empty());
krvikash marked this conversation as resolved.
Show resolved Hide resolved
if (tableHandle == null) {
// Assume new table (CTAS), collect all stats possible
return getStatisticsCollectionMetadata(tableMetadata, Optional.empty(), availableColumnNames -> {});
}
TableStatistics tableStatistics = getTableStatistics(session, tableHandle);
TableStatistics tableStatistics = getTableStatistics(session, checkValidTableHandle(tableHandle));
if (tableStatistics.getRowCount().getValue() == 0.0) {
// Table has no data (empty, or wiped out). Collect all stats possible
return getStatisticsCollectionMetadata(tableMetadata, Optional.empty(), availableColumnNames -> {});
Expand All @@ -1798,13 +1812,13 @@ public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Connector
@Override
public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, Map<String, Object> analyzeProperties)
{
IcebergTableHandle handle = checkValidTableHandle(tableHandle);
if (!isExtendedStatisticsEnabled(session)) {
throw new TrinoException(NOT_SUPPORTED, "Analyze is not enabled. You can enable analyze using %s config or %s catalog session property".formatted(
IcebergConfig.EXTENDED_STATISTICS_CONFIG,
IcebergSessionProperties.EXTENDED_STATISTICS_ENABLED));
}

IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
checkArgument(handle.getTableType() == DATA, "Cannot analyze non-DATA table: %s", handle.getTableType());

if (handle.getSnapshotId().isEmpty()) {
Expand Down Expand Up @@ -2683,10 +2697,10 @@ else if (strings.size() != 2) {
String schema = strings.get(0);
String name = strings.get(1);
SchemaTableName schemaTableName = new SchemaTableName(schema, name);
IcebergTableHandle tableHandle = getTableHandle(session, schemaTableName, Optional.empty(), Optional.empty());
ConnectorTableHandle tableHandle = getTableHandle(session, schemaTableName, Optional.empty(), Optional.empty());

if (tableHandle == null) {
// Base table is gone
if (tableHandle == null || tableHandle instanceof CorruptedIcebergTableHandle) {
// Base table is gone or table is corrupted
krvikash marked this conversation as resolved.
Show resolved Hide resolved
return new MaterializedViewFreshness(STALE, Optional.empty());
}
Optional<Long> snapshotAtRefresh;
Expand All @@ -2696,7 +2710,7 @@ else if (strings.size() != 2) {
else {
snapshotAtRefresh = Optional.of(Long.parseLong(value));
}
TableChangeInfo tableChangeInfo = getTableChangeInfo(session, tableHandle, snapshotAtRefresh);
TableChangeInfo tableChangeInfo = getTableChangeInfo(session, (IcebergTableHandle) tableHandle, snapshotAtRefresh);
if (tableChangeInfo instanceof NoTableChange) {
// Fresh
}
Expand Down Expand Up @@ -2807,6 +2821,15 @@ private void beginTransaction(Table icebergTable)
transaction = icebergTable.newTransaction();
}

private static IcebergTableHandle checkValidTableHandle(ConnectorTableHandle tableHandle)
{
requireNonNull(tableHandle, "tableHandle is null");
if (tableHandle instanceof CorruptedIcebergTableHandle corruptedTableHandle) {
throw corruptedTableHandle.createException();
}
return ((IcebergTableHandle) tableHandle);
}

private sealed interface TableChangeInfo
permits NoTableChange, FirstChangeSnapshot, UnknownTableChange {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.StorageFormat;
import io.trino.plugin.iceberg.util.HiveSchemaUtil;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import org.apache.iceberg.TableMetadata;
Expand All @@ -44,6 +45,7 @@
import static io.trino.plugin.hive.util.HiveClassNames.FILE_INPUT_FORMAT_CLASS;
import static io.trino.plugin.hive.util.HiveClassNames.FILE_OUTPUT_FORMAT_CLASS;
import static io.trino.plugin.hive.util.HiveClassNames.LAZY_SIMPLE_SERDE_CLASS;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_MISSING_METADATA;
import static io.trino.plugin.iceberg.IcebergUtil.METADATA_FOLDER_NAME;
import static io.trino.plugin.iceberg.IcebergUtil.fixBrokenMetadataLocation;
import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider;
Expand Down Expand Up @@ -227,13 +229,22 @@ protected void refreshFromMetadataLocation(String newLocation)
return;
}

TableMetadata newMetadata = Failsafe.with(RetryPolicy.builder()
.withMaxRetries(20)
.withBackoff(100, 5000, MILLIS, 4.0)
.withMaxDuration(Duration.ofMinutes(10))
.abortOn(AbstractIcebergTableOperations::isNotFoundException)
.build())
.get(() -> TableMetadataParser.read(fileIo, io().newInputFile(newLocation)));
TableMetadata newMetadata;
try {
newMetadata = Failsafe.with(RetryPolicy.builder()
.withMaxRetries(20)
.withBackoff(100, 5000, MILLIS, 4.0)
.withMaxDuration(Duration.ofMinutes(10))
.abortOn(AbstractIcebergTableOperations::isNotFoundException)
.build())
.get(() -> TableMetadataParser.read(fileIo, io().newInputFile(newLocation)));
}
catch (Throwable failure) {
if (isNotFoundException(failure)) {
throw new TrinoException(ICEBERG_MISSING_METADATA, "Metadata not found in metadata location for table " + getSchemaTableName(), failure);
}
throw failure;
}

String newUUID = newMetadata.uuid();
if (currentMetadata != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ Transaction newCreateTableTransaction(

void dropTable(ConnectorSession session, SchemaTableName schemaTableName);

void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaTableName);

void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergMaterializedViewAdditionalProperties.STORAGE_SCHEMA;
import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.encodeMaterializedViewData;
import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.fromConnectorMaterializedViewDefinition;
Expand Down Expand Up @@ -364,10 +365,29 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
catch (AmazonServiceException e) {
throw new TrinoException(HIVE_METASTORE_ERROR, e);
}
dropTableData(table.io(), table.operations().current());
try {
dropTableData(table.io(), table.operations().current());
krvikash marked this conversation as resolved.
Show resolved Hide resolved
}
catch (RuntimeException e) {
// If the snapshot file is not found, an exception will be thrown by the dropTableData function.
// So log the exception and continue with deleting the table location
LOG.warn(e, "Failed to delete table data referenced by metadata");
}
deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location());
}

@Override
public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaTableName)
{
com.amazonaws.services.glue.model.Table table = dropTableFromMetastore(session, schemaTableName);
String metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP);
if (metadataLocation == null) {
throw new TrinoException(ICEBERG_INVALID_METADATA, format("Table %s is missing [%s] property", schemaTableName, METADATA_LOCATION_PROP));
}
String tableLocation = metadataLocation.replaceFirst("/metadata/[^/]*$", "");
deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, tableLocation);
}

@Override
public Transaction newCreateTableTransaction(
ConnectorSession session,
Expand Down Expand Up @@ -399,6 +419,11 @@ public void registerTable(ConnectorSession session, SchemaTableName schemaTableN

@Override
public void unregisterTable(ConnectorSession session, SchemaTableName schemaTableName)
{
dropTableFromMetastore(session, schemaTableName);
}

private com.amazonaws.services.glue.model.Table dropTableFromMetastore(ConnectorSession session, SchemaTableName schemaTableName)
{
com.amazonaws.services.glue.model.Table table = getTable(session, schemaTableName)
.orElseThrow(() -> new TableNotFoundException(schemaTableName));
Expand All @@ -412,6 +437,7 @@ public void unregisterTable(ConnectorSession session, SchemaTableName schemaTabl
catch (AmazonServiceException e) {
throw new TrinoException(HIVE_METASTORE_ERROR, e);
}
return table;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,16 +298,7 @@ public void registerTable(ConnectorSession session, SchemaTableName schemaTableN
@Override
public void unregisterTable(ConnectorSession session, SchemaTableName schemaTableName)
{
io.trino.plugin.hive.metastore.Table table = metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(schemaTableName));
if (!isIcebergTable(table)) {
throw new UnknownTableTypeException(schemaTableName);
}

metastore.dropTable(
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
false /* do not delete data */);
dropTableFromMetastore(schemaTableName);
}

@Override
Expand All @@ -333,12 +324,41 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
false /* do not delete data */);
// Use the Iceberg routine for dropping the table data because the data files
// of the Iceberg table may be located in different locations
dropTableData(table.io(), metadata);
try {
// Use the Iceberg routine for dropping the table data because the data files
// of the Iceberg table may be located in different locations
dropTableData(table.io(), metadata);
}
catch (RuntimeException e) {
// If the snapshot file is not found, an exception will be thrown by the dropTableData function.
krvikash marked this conversation as resolved.
Show resolved Hide resolved
// So log the exception and continue with deleting the table location
log.warn(e, "Failed to delete table data referenced by metadata");
}
deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, metastoreTable.getStorage().getLocation());
}

@Override
public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaTableName)
{
io.trino.plugin.hive.metastore.Table table = dropTableFromMetastore(schemaTableName);
deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.getStorage().getLocation());
}

private io.trino.plugin.hive.metastore.Table dropTableFromMetastore(SchemaTableName schemaTableName)
{
io.trino.plugin.hive.metastore.Table table = metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(schemaTableName));
if (!isIcebergTable(table)) {
throw new UnknownTableTypeException(schemaTableName);
}

metastore.dropTable(
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
false /* do not delete data */);
return table;
}

@Override
public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to)
{
Expand Down
Loading