From 9dee7a063fe94f4cb6057052546be693c6433208 Mon Sep 17 00:00:00 2001 From: Alex Jo Date: Tue, 9 Aug 2022 18:06:30 -0400 Subject: [PATCH] Handle empty Iceberg tables while executing procedures If a table was just created it may not contain any snapshots. Procedures run on tables that do not contain any snapshots can safely do nothing. --- .../trino/plugin/iceberg/IcebergMetadata.java | 13 +++++++---- .../procedure/IcebergOptimizeHandle.java | 7 +++--- .../TestIcebergSparkCompatibility.java | 22 +++++++++++++++++++ 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 2c9fbae6128d..f9e317031c66 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -920,7 +920,7 @@ private Optional getTableHandleForOptimize(Connecto tableHandle.getSchemaTableName(), OPTIMIZE, new IcebergOptimizeHandle( - tableHandle.getSnapshotId().orElseThrow(), + tableHandle.getSnapshotId(), SchemaParser.toJson(icebergTable.schema()), PartitionSpecParser.toJson(icebergTable.spec()), getColumns(icebergTable.schema(), typeManager), @@ -1079,8 +1079,8 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle newFiles.add(builder.build()); } - if (scannedDataFiles.isEmpty() && fullyAppliedDeleteFiles.isEmpty() && newFiles.isEmpty()) { - // Table scan turned out to be empty, nothing to commit + if (optimizeHandle.getSnapshotId().isEmpty() || scannedDataFiles.isEmpty() && fullyAppliedDeleteFiles.isEmpty() && newFiles.isEmpty()) { + // Either the table is empty, or the table scan turned out to be empty, nothing to commit transaction = null; return; } @@ -1097,7 +1097,7 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle RewriteFiles rewriteFiles = transaction.newRewrite(); rewriteFiles.rewriteFiles(scannedDataFiles, fullyAppliedDeleteFiles, newFiles, ImmutableSet.of()); // Table.snapshot method returns null if there is no matching snapshot - Snapshot snapshot = requireNonNull(icebergTable.snapshot(optimizeHandle.getSnapshotId()), "snapshot is null"); + Snapshot snapshot = requireNonNull(icebergTable.snapshot(optimizeHandle.getSnapshotId().get()), "snapshot is null"); rewriteFiles.validateFromSnapshot(snapshot.snapshotId()); rewriteFiles.commit(); transaction.commitTransaction(); @@ -1194,6 +1194,11 @@ public void executeRemoveOrphanFiles(ConnectorSession session, IcebergTableExecu IcebergConfig.REMOVE_ORPHAN_FILES_MIN_RETENTION, IcebergSessionProperties.REMOVE_ORPHAN_FILES_MIN_RETENTION); + if (table.currentSnapshot() == null) { + log.debug("Skipping remove_orphan_files procedure for empty table " + table); + return; + } + long expireTimestampMillis = session.getStart().toEpochMilli() - retention.toMillis(); removeOrphanFiles(table, session, executeHandle.getSchemaTableName(), expireTimestampMillis); removeOrphanMetadataFiles(table, session, executeHandle.getSchemaTableName(), expireTimestampMillis); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergOptimizeHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergOptimizeHandle.java index 31a94b32730b..62d81e06f2e9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergOptimizeHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergOptimizeHandle.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; @@ -30,7 +31,7 @@ public class IcebergOptimizeHandle extends IcebergProcedureHandle { - private final long snapshotId; + private final Optional snapshotId; private final String schemaAsJson; private final String partitionSpecAsJson; private final List tableColumns; @@ -41,7 +42,7 @@ public class IcebergOptimizeHandle @JsonCreator public IcebergOptimizeHandle( - long snapshotId, + Optional snapshotId, String schemaAsJson, String partitionSpecAsJson, List tableColumns, @@ -61,7 +62,7 @@ public IcebergOptimizeHandle( } @JsonProperty - public long getSnapshotId() + public Optional getSnapshotId() { return snapshotId; } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index a429dc1dfb43..0e5c0ca9e115 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -1666,6 +1666,28 @@ public void testOptimizeOnV2IcebergTable() .containsOnly(row(1, 2), row(2, 2), row(3, 2), row(11, 12), row(12, 12), row(13, 12)); } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + public void testAlterTableExecuteProceduresOnEmptyTable() + { + String baseTableName = "test_alter_table_execute_procedures_on_empty_table_" + randomTableSuffix(); + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + + onSpark().executeQuery(format( + "CREATE TABLE %s (" + + " _string STRING" + + ", _bigint BIGINT" + + ", _integer INTEGER" + + ") USING ICEBERG", + sparkTableName)); + + onTrino().executeQuery("ALTER TABLE " + trinoTableName + " EXECUTE optimize"); + onTrino().executeQuery("ALTER TABLE " + trinoTableName + " EXECUTE expire_snapshots(retention_threshold => '7d')"); + onTrino().executeQuery("ALTER TABLE " + trinoTableName + " EXECUTE remove_orphan_files(retention_threshold => '7d')"); + + assertThat(onTrino().executeQuery("SELECT * FROM " + trinoTableName)).hasNoRows(); + } + private static String escapeSparkString(String value) { return value.replace("\\", "\\\\").replace("'", "\\'");