From 33e571202b48311587557f79646c58f51621289d Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 20 Nov 2024 17:33:28 +0100 Subject: [PATCH] Ignore partition fields that are dropped from the current-schema Fixes #4563 --- .../org/apache/iceberg/PartitionSpec.java | 17 ++++-- .../java/org/apache/iceberg/Partitioning.java | 8 +-- .../apache/iceberg/util/PartitionUtil.java | 2 +- .../TestAlterTablePartitionFields.java | 54 +++++++++++++++---- 4 files changed, 62 insertions(+), 19 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java b/api/src/main/java/org/apache/iceberg/PartitionSpec.java index 9b74893f1831..af01a38ba75a 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java @@ -88,6 +88,12 @@ public List fields() { return lazyFieldList(); } + public List activeFields() { + return this.fields().stream() + .filter(f -> this.schema.findField(f.sourceId()) != null) + .collect(Collectors.toList()); + } + public boolean isPartitioned() { return fields.length > 0 && fields().stream().anyMatch(f -> !f.transform().isVoid()); } @@ -131,7 +137,12 @@ public StructType partitionType() { for (PartitionField field : fields) { Type sourceType = schema.findType(field.sourceId()); Type resultType = field.transform().getResultType(sourceType); - structFields.add(Types.NestedField.optional(field.fieldId(), field.name(), resultType)); + + // When the source field has been dropped we cannot determine the type + if (resultType != null) { + structFields.add( + Types.NestedField.optional(field.fieldId(), field.name(), resultType)); + } } this.lazyPartitionType = Types.StructType.of(structFields); @@ -632,9 +643,7 @@ static void checkCompatibility(PartitionSpec spec, Schema schema) { // https://iceberg.apache.org/spec/#partition-transforms // We don't care about the source type since a VoidTransform is always compatible and skip the // checks - if (!transform.equals(Transforms.alwaysNull())) { - ValidationException.check( - sourceType != null, "Cannot find source column for partition field: %s", field); + if (sourceType != null && !transform.equals(Transforms.alwaysNull())) { ValidationException.check( sourceType.isPrimitiveType(), "Cannot partition by non-primitive source field: %s", diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java b/core/src/main/java/org/apache/iceberg/Partitioning.java index 832e0b59fe50..c708d39f523e 100644 --- a/core/src/main/java/org/apache/iceberg/Partitioning.java +++ b/core/src/main/java/org/apache/iceberg/Partitioning.java @@ -239,7 +239,8 @@ public static StructType groupingKeyType(Schema schema, Collection specs = table.specs().values(); - return buildPartitionProjectionType("table partition", specs, allFieldIds(specs)); + return buildPartitionProjectionType( + "table partition", specs, allActiveFieldIds(table.schema(), specs)); } /** @@ -346,10 +347,11 @@ private static boolean compatibleTransforms(Transform t1, Transform || t2.equals(Transforms.alwaysNull()); } - // collects IDs of all partition field used across specs - private static Set allFieldIds(Collection specs) { + // collects IDs of all partition field used across specs that are in the current schema + private static Set allActiveFieldIds(Schema schema, Collection specs) { return FluentIterable.from(specs) .transformAndConcat(PartitionSpec::fields) + .filter(field -> schema.findField(field.sourceId()) != null) .transform(PartitionField::fieldId) .toSet(); } diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java index 411d401075d6..0fe7a87dee6f 100644 --- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java @@ -77,7 +77,7 @@ private PartitionUtil() {} } List partitionFields = spec.partitionType().fields(); - List fields = spec.fields(); + List fields = spec.activeFields(); for (int pos = 0; pos < fields.size(); pos += 1) { PartitionField field = fields.get(pos); if (field.transform().isIdentity()) { diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java index 38e5c942c9ff..b3fd08572b21 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java @@ -20,6 +20,9 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.List; import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; @@ -27,6 +30,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.spark.source.SparkTable; import org.apache.spark.sql.connector.catalog.CatalogManager; @@ -530,26 +534,54 @@ public void testSparkTableAddDropPartitions() throws Exception { } @TestTemplate - public void testDropColumnOfOldPartitionFieldV1() { - // default table created in v1 format + public void testDropColumnOfOldPartitionField() { sql( - "CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts) TBLPROPERTIES('format-version' = '1')", - tableName); + "CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts) TBLPROPERTIES('format-version' = %d)", + tableName, formatVersion); sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName); sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName); } - @TestTemplate - public void testDropColumnOfOldPartitionFieldV2() { + private void runCreateAndDropPartitionField( + String column, String partitionType, List expected, String predicate) { + sql("DROP TABLE IF EXISTS %s", tableName); sql( - "CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts) TBLPROPERTIES('format-version' = '2')", - tableName); - - sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName); + "CREATE TABLE %s (col_int INTEGER, col_ts TIMESTAMP_NTZ, col_long BIGINT) USING ICEBERG TBLPROPERTIES ('format-version' = %d)", + tableName, formatVersion); + sql("INSERT INTO %s VALUES (1000, CAST('2024-03-01 19:25:00' as TIMESTAMP), 2100)", tableName); + sql("ALTER TABLE %s ADD PARTITION FIELD %s AS col2_partition", tableName, partitionType); + sql("INSERT INTO %s VALUES (2000, CAST('2024-04-01 19:25:00' as TIMESTAMP), 2200)", tableName); + sql("ALTER TABLE %s DROP PARTITION FIELD col2_partition", tableName); + sql("INSERT INTO %s VALUES (3000, CAST('2024-05-01 19:25:00' as TIMESTAMP), 2300)", tableName); + sql("ALTER TABLE %s DROP COLUMN %s", tableName, column); + + assertEquals( + "Should return correct data", + expected, + sql("SELECT * FROM %s WHERE %s ORDER BY col_int", tableName, predicate)); + } - sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName); + @TestTemplate + public void testDropPartitionAndUnderlyingField() { + String predicateLong = "col_ts >= '2024-04-01 19:25:00'"; + List expectedLong = + Lists.newArrayList( + new Object[] {2000, LocalDateTime.ofEpochSecond(1711999500, 0, ZoneOffset.UTC)}, + new Object[] {3000, LocalDateTime.ofEpochSecond(1714591500, 0, ZoneOffset.UTC)}); + runCreateAndDropPartitionField("col_long", "col_long", expectedLong, predicateLong); + runCreateAndDropPartitionField( + "col_long", "truncate(2, col_long)", expectedLong, predicateLong); + runCreateAndDropPartitionField("col_long", "bucket(16, col_long)", expectedLong, predicateLong); + + String predicateTs = "col_long >= 2200"; + List expectedTs = + Lists.newArrayList(new Object[] {2000, 2200L}, new Object[] {3000, 2300L}); + runCreateAndDropPartitionField("col_ts", "col_ts", expectedTs, predicateTs); + runCreateAndDropPartitionField("col_ts", "year(col_ts)", expectedTs, predicateTs); + runCreateAndDropPartitionField("col_ts", "month(col_ts)", expectedTs, predicateTs); + runCreateAndDropPartitionField("col_ts", "day(col_ts)", expectedTs, predicateTs); } private void assertPartitioningEquals(SparkTable table, int len, String transform) {