Skip to content

Commit

Permalink
[Delta Uniform] overwrite source column field id for Iceberg Partitio…
Browse files Browse the repository at this point in the history
…nField with field id assigned by Delta

context: Delta and Iceberg traverse schema and assigns field id in different way. Delta uniform currently use a extra Iceberg txn to overwrite the schema in iceberg table with wrong field ids reassigned by Iceberg.

However, if the source field id for partition columns is different in that schema overwrite txn, Iceberg always expect field id for partition columns to be the same and does not have logic to reconcile that, and then Iceberg will fail the overwrite txn. This PR adds the logic to adopt new source column field id to Iceberg PartitionField if changed, so the overwrite txn can go through and set the correct source column field id for PartitionFields.

Closes #2676

GitOrigin-RevId: 37be472e9794d0a87c59a8fe06efe237ee1c609e
  • Loading branch information
lzlfred authored and vkorukanti committed Feb 29, 2024
1 parent 01b8da4 commit eb59d4a
Showing 1 changed file with 40 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
Iceberg PartitionField takes source column field id from latest schema if changed

--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -664,13 +664,22 @@ public class TableMetadata implements Serializable {
return new Builder(this).upgradeFormatVersion(newFormatVersion).build();
}

- private static PartitionSpec updateSpecSchema(Schema schema, PartitionSpec partitionSpec) {
+ private static PartitionSpec updateSpecSchema(Schema newSchema, Schema currSchema, PartitionSpec partitionSpec) {
PartitionSpec.Builder specBuilder =
- PartitionSpec.builderFor(schema).withSpecId(partitionSpec.specId());
+ PartitionSpec.builderFor(newSchema).withSpecId(partitionSpec.specId());

- // add all the fields to the builder. IDs should not change.
+ // add all the fields to the builder. IDs may change so it looks up the source field id by
+ // name from the new schema
for (PartitionField field : partitionSpec.fields()) {
- specBuilder.add(field.sourceId(), field.fieldId(), field.name(), field.transform());
+ String partFieldSourceName = currSchema.findField(field.sourceId()).name();
+ int partFieldSourceInt;
+ org.apache.iceberg.types.Types.NestedField partSourceFieldInNewSchema = newSchema.findField(partFieldSourceName);
+ if (partSourceFieldInNewSchema == null) {
+ partFieldSourceInt = field.sourceId();
+ } else {
+ partFieldSourceInt = partSourceFieldInNewSchema.fieldId();
+ }
+ specBuilder.add(partFieldSourceInt, field.fieldId(), field.name(), field.transform());
}

// build without validation because the schema may have changed in a way that makes this spec
@@ -970,7 +979,7 @@ public class TableMetadata implements Serializable {

// rebuild all the partition specs and sort orders for the new current schema
this.specs =
- Lists.newArrayList(Iterables.transform(specs, spec -> updateSpecSchema(schema, spec)));
+ Lists.newArrayList(Iterables.transform(specs, spec -> updateSpecSchema(schema, schemasById.get(currentSchemaId), spec)));
specsById.clear();
specsById.putAll(indexSpecs(specs));

0 comments on commit eb59d4a

Please sign in to comment.