Skip to content

Commit

Permalink
Ignore partition fields that are dropped from the current-schema
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Dec 24, 2024
1 parent dbd7d1c commit 2a1703b
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 21 deletions.
16 changes: 13 additions & 3 deletions api/src/main/java/org/apache/iceberg/PartitionSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ public List<PartitionField> fields() {
return lazyFieldList();
}

public List<PartitionField> activeFields() {
return this.fields().stream()
.filter(f -> this.schema.findField(f.sourceId()) != null)
.collect(Collectors.toList());
}

public boolean isPartitioned() {
return fields.length > 0 && fields().stream().anyMatch(f -> !f.transform().isVoid());
}
Expand Down Expand Up @@ -131,6 +137,12 @@ public StructType partitionType() {
for (PartitionField field : fields) {
Type sourceType = schema.findType(field.sourceId());
Type resultType = field.transform().getResultType(sourceType);

// When the source field has been dropped we cannot determine the type
if (resultType == null) {
resultType = Types.UnknownType.get();
}

structFields.add(Types.NestedField.optional(field.fieldId(), field.name(), resultType));
}

Expand Down Expand Up @@ -632,9 +644,7 @@ static void checkCompatibility(PartitionSpec spec, Schema schema) {
// https://iceberg.apache.org/spec/#partition-transforms
// We don't care about the source type since a VoidTransform is always compatible and skip the
// checks
if (!transform.equals(Transforms.alwaysNull())) {
ValidationException.check(
sourceType != null, "Cannot find source column for partition field: %s", field);
if (sourceType != null && !transform.equals(Transforms.alwaysNull())) {
ValidationException.check(
sourceType.isPrimitiveType(),
"Cannot partition by non-primitive source field: %s",
Expand Down
3 changes: 2 additions & 1 deletion api/src/main/java/org/apache/iceberg/types/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ enum TypeID {
STRUCT(StructLike.class),
LIST(List.class),
MAP(Map.class),
VARIANT(Object.class);
VARIANT(Object.class),
UNKNOWN(Object.class);

private final Class<?> javaClass;

Expand Down
19 changes: 19 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private Types() {}
.put(StringType.get().toString(), StringType.get())
.put(UUIDType.get().toString(), UUIDType.get())
.put(BinaryType.get().toString(), BinaryType.get())
.put(UnknownType.get().toString(), UnknownType.get())
.buildOrThrow();

private static final Pattern FIXED = Pattern.compile("fixed\\[\\s*(\\d+)\\s*\\]");
Expand Down Expand Up @@ -412,6 +413,24 @@ public String toString() {
}
}

public static class UnknownType extends PrimitiveType {
private static final UnknownType INSTANCE = new UnknownType();

public static UnknownType get() {
return INSTANCE;
}

@Override
public TypeID typeId() {
return TypeID.UNKNOWN;
}

@Override
public String toString() {
return "unknown";
}
}

public static class VariantType implements Type {
private static final VariantType INSTANCE = new VariantType();

Expand Down
4 changes: 2 additions & 2 deletions api/src/test/java/org/apache/iceberg/types/TestTypes.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void fromPrimitiveString() {
assertThat(Types.fromPrimitiveString("Decimal(2,3)")).isEqualTo(Types.DecimalType.of(2, 3));

assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> Types.fromPrimitiveString("Unknown"))
.withMessageContaining("Unknown");
.isThrownBy(() -> Types.fromPrimitiveString("unknown-type"))
.withMessageContaining("unknown-type");
}
}
8 changes: 5 additions & 3 deletions core/src/main/java/org/apache/iceberg/Partitioning.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ public static StructType groupingKeyType(Schema schema, Collection<PartitionSpec
*/
public static StructType partitionType(Table table) {
Collection<PartitionSpec> specs = table.specs().values();
return buildPartitionProjectionType("table partition", specs, allFieldIds(specs));
return buildPartitionProjectionType(
"table partition", specs, allActiveFieldIds(table.schema(), specs));
}

/**
Expand Down Expand Up @@ -346,10 +347,11 @@ private static boolean compatibleTransforms(Transform<?, ?> t1, Transform<?, ?>
|| t2.equals(Transforms.alwaysNull());
}

// collects IDs of all partition field used across specs
private static Set<Integer> allFieldIds(Collection<PartitionSpec> specs) {
// collects IDs of all partition field used across specs that are in the current schema
private static Set<Integer> allActiveFieldIds(Schema schema, Collection<PartitionSpec> specs) {
return FluentIterable.from(specs)
.transformAndConcat(PartitionSpec::fields)
.filter(field -> schema.findField(field.sourceId()) != null)
.transform(PartitionField::fieldId)
.toSet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ static Schema toOption(Schema schema) {
Preconditions.checkArgument(
isOptionSchema(schema), "Union schemas are not supported: %s", schema);
return schema;
} else if (schema.getType() == Schema.Type.NULL) {
return schema;
} else {
return Schema.createUnion(NULL, schema);
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ abstract class TypeToSchema extends TypeUtil.SchemaVisitor<Schema> {
private static final Schema UUID_SCHEMA =
LogicalTypes.uuid().addToSchema(Schema.createFixed("uuid_fixed", null, null, 16));
private static final Schema BINARY_SCHEMA = Schema.create(Schema.Type.BYTES);
private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);

static {
TIMESTAMP_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, false);
Expand Down Expand Up @@ -243,6 +244,9 @@ public Schema primitive(Type.PrimitiveType primitive) {
null,
TypeUtil.decimalRequiredBytes(decimal.precision())));
break;
case UNKNOWN:
primitiveSchema = NULL_SCHEMA;
break;
default:
throw new UnsupportedOperationException("Unsupported type ID: " + primitive.typeId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private PartitionUtil() {}
}

List<Types.NestedField> partitionFields = spec.partitionType().fields();
List<PartitionField> fields = spec.fields();
List<PartitionField> fields = spec.activeFields();
for (int pos = 0; pos < fields.size(); pos += 1) {
PartitionField field = fields.get(pos);
if (field.transform().isIdentity()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.spark.sql.connector.catalog.CatalogManager;
Expand Down Expand Up @@ -530,26 +534,54 @@ public void testSparkTableAddDropPartitions() throws Exception {
}

@TestTemplate
public void testDropColumnOfOldPartitionFieldV1() {
// default table created in v1 format
public void testDropColumnOfOldPartitionField() {
sql(
"CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts) TBLPROPERTIES('format-version' = '1')",
tableName);
"CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts) TBLPROPERTIES('format-version' = %d)",
tableName, formatVersion);

sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName);

sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName);
}

@TestTemplate
public void testDropColumnOfOldPartitionFieldV2() {
private void runCreateAndDropPartitionField(
String column, String partitionType, List<Object[]> expected, String predicate) {
sql("DROP TABLE IF EXISTS %s", tableName);
sql(
"CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts) TBLPROPERTIES('format-version' = '2')",
tableName);

sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName);
"CREATE TABLE %s (col_int INTEGER, col_ts TIMESTAMP_NTZ, col_long BIGINT) USING ICEBERG TBLPROPERTIES ('format-version' = %d)",
tableName, formatVersion);
sql("INSERT INTO %s VALUES (1000, CAST('2024-03-01 19:25:00' as TIMESTAMP), 2100)", tableName);
sql("ALTER TABLE %s ADD PARTITION FIELD %s AS col2_partition", tableName, partitionType);
sql("INSERT INTO %s VALUES (2000, CAST('2024-04-01 19:25:00' as TIMESTAMP), 2200)", tableName);
sql("ALTER TABLE %s DROP PARTITION FIELD col2_partition", tableName);
sql("INSERT INTO %s VALUES (3000, CAST('2024-05-01 19:25:00' as TIMESTAMP), 2300)", tableName);
sql("ALTER TABLE %s DROP COLUMN %s", tableName, column);

assertEquals(
"Should return correct data",
expected,
sql("SELECT * FROM %s WHERE %s ORDER BY col_int", tableName, predicate));
}

sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName);
@TestTemplate
public void testDropPartitionAndUnderlyingField() {
String predicateLong = "col_ts >= '2024-04-01 19:25:00'";
List<Object[]> expectedLong =
Lists.newArrayList(
new Object[] {2000, LocalDateTime.ofEpochSecond(1711999500, 0, ZoneOffset.UTC)},
new Object[] {3000, LocalDateTime.ofEpochSecond(1714591500, 0, ZoneOffset.UTC)});
runCreateAndDropPartitionField("col_long", "col_long", expectedLong, predicateLong);
runCreateAndDropPartitionField(
"col_long", "truncate(2, col_long)", expectedLong, predicateLong);
runCreateAndDropPartitionField("col_long", "bucket(16, col_long)", expectedLong, predicateLong);

String predicateTs = "col_long >= 2200";
List<Object[]> expectedTs =
Lists.newArrayList(new Object[] {2000, 2200L}, new Object[] {3000, 2300L});
runCreateAndDropPartitionField("col_ts", "col_ts", expectedTs, predicateTs);
runCreateAndDropPartitionField("col_ts", "year(col_ts)", expectedTs, predicateTs);
runCreateAndDropPartitionField("col_ts", "month(col_ts)", expectedTs, predicateTs);
runCreateAndDropPartitionField("col_ts", "day(col_ts)", expectedTs, predicateTs);
}

private void assertPartitioningEquals(SparkTable table, int len, String transform) {
Expand Down

0 comments on commit 2a1703b

Please sign in to comment.