Skip to content

Commit

Permalink
Core: Only validate the current partition specs (#5707)
Browse files Browse the repository at this point in the history
If a fields is being deleted that used to be part of a partition spec,
that will throw an error because it cannot resolve the field anymore.

Closes #5676
Closes #5707
Closes #5399
  • Loading branch information
Fokko authored Sep 30, 2022
1 parent 509d682 commit 3b65cca
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 12 deletions.
30 changes: 19 additions & 11 deletions api/src/main/java/org/apache/iceberg/PartitionSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -555,17 +555,25 @@ PartitionSpec buildUnchecked() {
static void checkCompatibility(PartitionSpec spec, Schema schema) {
for (PartitionField field : spec.fields) {
Type sourceType = schema.findType(field.sourceId());
ValidationException.check(
sourceType != null, "Cannot find source column for partition field: %s", field);
ValidationException.check(
sourceType.isPrimitiveType(),
"Cannot partition by non-primitive source field: %s",
sourceType);
ValidationException.check(
field.transform().canTransform(sourceType),
"Invalid source type %s for transform: %s",
sourceType,
field.transform());
Transform<?, ?> transform = field.transform();
// In the case of a Version 1 partition-spec field gets deleted,
// it is replaced with a void transform, see:
// https://iceberg.apache.org/spec/#partition-transforms
// We don't care about the source type since a VoidTransform is always compatible and skip the
// checks
if (!transform.equals(Transforms.alwaysNull())) {
ValidationException.check(
sourceType != null, "Cannot find source column for partition field: %s", field);
ValidationException.check(
sourceType.isPrimitiveType(),
"Cannot partition by non-primitive source field: %s",
sourceType);
ValidationException.check(
transform.canTransform(sourceType),
"Invalid source type %s for transform: %s",
sourceType,
transform);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,12 @@ static TableMetadata fromJson(String metadataLocation, JsonNode node) {
// parse the spec array
ImmutableList.Builder<PartitionSpec> builder = ImmutableList.builder();
for (JsonNode spec : specArray) {
builder.add(PartitionSpecParser.fromJson(schema, spec));
UnboundPartitionSpec unboundSpec = PartitionSpecParser.fromJson(spec);
if (unboundSpec.specId() == defaultSpecId) {
builder.add(unboundSpec.bind(schema));
} else {
builder.add(unboundSpec.bindUnchecked(schema));
}
}
specs = builder.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,31 @@ public void testSparkTableAddDropPartitions() throws Exception {
"spark table partition should be empty", 0, sparkTable().partitioning().length);
}

@Test
public void testDropColumnOfOldPartitionFieldV1() {
// default table created in v1 format
sql(
"CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
tableName);

sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName);

sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName);
}

@Test
public void testDropColumnOfOldPartitionFieldV2() {
sql(
"CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
tableName);

sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2');", tableName);

sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName);

sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName);
}

private void assertPartitioningEquals(SparkTable table, int len, String transform) {
Assert.assertEquals("spark table partition should be " + len, len, table.partitioning().length);
Assert.assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,31 @@ public void testSparkTableAddDropPartitions() throws Exception {
"spark table partition should be empty", 0, sparkTable().partitioning().length);
}

@Test
public void testDropColumnOfOldPartitionFieldV1() {
// default table created in v1 format
sql(
"CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
tableName);

sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName);

sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName);
}

@Test
public void testDropColumnOfOldPartitionFieldV2() {
sql(
"CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
tableName);

sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2');", tableName);

sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName);

sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName);
}

private void assertPartitioningEquals(SparkTable table, int len, String transform) {
Assert.assertEquals("spark table partition should be " + len, len, table.partitioning().length);
Assert.assertEquals(
Expand Down

0 comments on commit 3b65cca

Please sign in to comment.