Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for REPLACE TABLE operation #433

Open
wants to merge 31 commits into
base: main
Choose a base branch
from

Conversation

anupam-saini
Copy link
Contributor

@anupam-saini anupam-saini commented Feb 15, 2024

Closes #281
API proposal (from PR feedback):

table = catalog.create_or_replace_table(identifier, schema, location, partition_spec, sort_order, properties)

TODO:

  • Update schema
  • Update partition spec
  • Update sort order
  • Update location
  • Update table properties

pyiceberg/schema.py Outdated Show resolved Hide resolved
@Fokko
Copy link
Contributor

Fokko commented Feb 16, 2024

@anupam-saini Thanks for working on this. I'm not sure if the following API is where people would expect it:

with table.transaction() as transaction:
    transaction.replace_table_with(new_table)

Specially because this is an unsafe operation that breaks for downstream consumers.

I would expect this operation on the catalog itself:

catalog = load_catalog('default')

catalog.create_table('schema.table', schema=...)
catalog.create_or_replace_table('schema.table', schema=...)

We want to generalize this operation, so we don't have to implement this for each of the catalogs. Therefore I would expect this on the Catalog(ABC) itself.

Just a heads up, for the replace table it keeps the history in Spark:

image

And when we look at the metadata, we can see the previous schema/snapshot as well:

{
  "format-version" : 2,
  "table-uuid" : "9b8b02af-2097-453f-86e2-5b2715e9d37a",
  "location" : "s3://warehouse/default/fokko",
  "last-sequence-number" : 2,
  "last-updated-ms" : 1708081058809,
  "last-column-id" : 2,
  "current-schema-id" : 1,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    } ]
  }, {
    "type" : "struct",
    "schema-id" : 1,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 2,
      "name" : "age"
      "required" : false,
      "type" : "int"
    } ],
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ ]
  } ],
  "last-partition-id" : 999,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "root",
    "created-at" : "2024-02-16T10:57:38.541088095Z",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : 398515508184271470,
  "refs" : {
    "main" : {
      "snapshot-id" : 398515508184271470,
      "type" : "branch"
    }
  },
  "snapshots" : [ {
    "sequence-number" : 1,
    "snapshot-id" : 4615041670163082108,
    "timestamp-ms" : 1708081058629,
    "summary" : {
      "operation" : "append",
      "spark.app.id" : "local-1708080918556",
      "added-data-files" : "1",
      "added-records" : "1",
      "added-files-size" : "416",
      "changed-partition-count" : "1",
      "total-records" : "1",
      "total-files-size" : "416",
      "total-data-files" : "1",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "s3://warehouse/default/fokko/metadata/snap-4615041670163082108-1-d3852ba7-ff54-4abd-99a2-0265206cfbfa.avro",
    "schema-id" : 0
  }, {
    "sequence-number" : 2,
    "snapshot-id" : 398515508184271470,
    "timestamp-ms" : 1708081058809,
    "summary" : {
      "operation" : "append",
      "spark.app.id" : "local-1708080918556",
      "added-data-files" : "1",
      "added-records" : "1",
      "added-files-size" : "628",
      "changed-partition-count" : "1",
      "total-records" : "1",
      "total-files-size" : "628",
      "total-data-files" : "1",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "s3://warehouse/default/fokko/metadata/snap-398515508184271470-1-4d03a8b5-8912-4235-8c18-75400fef9874.avro",
    "schema-id" : 1
  } ],
  "statistics" : [ ],
  "snapshot-log" : [ {
    "timestamp-ms" : 1708081058809,
    "snapshot-id" : 398515508184271470
  } ],
  "metadata-log" : [ {
    "timestamp-ms" : 1708081058629,
    "metadata-file" : "s3://warehouse/default/fokko/metadata/00000-10d2c8d5-f6a2-4dc4-90cb-c545d8ffd497.metadata.json"
  } ]
}

@anupam-saini
Copy link
Contributor Author

Thank you @Fokko for taking time to explain in such great detail. Now it makes much more sense to have this part of the Catalog API. Made changes as suggested.

@Fokko Fokko added this to the PyIceberg 0.7.0 release milestone Feb 19, 2024
@anupam-saini anupam-saini marked this pull request as ready for review February 21, 2024 03:54
@sungwy sungwy mentioned this pull request Feb 22, 2024
@anupam-saini anupam-saini requested a review from sungwy March 1, 2024 03:12
@anupam-saini
Copy link
Contributor Author

Now with Sort Order and Partition Spec updates, this PR has all the necessary pieces for create-replace table operation and is ready for review.

@Fokko @syun64

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply @anupam-saini This is looking great, but looks like there are still some discrepancies with the reference implementation in Java.

AddPartitionSpecUpdate(spec=new_partition_spec),
SetDefaultSpecUpdate(spec_id=-1),
)
tx._apply(updates, requirements)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With #471 in this is not necessary anymore! 🥳

Suggested change
tx._apply(updates, requirements)
tx._apply(updates, requirements)

table = self.load_table(identifier)
with table.transaction() as tx:
base_schema = table.schema()
new_schema = assign_fresh_schema_ids(schema_or_type=new_schema, base_schema=base_schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's consider the following:

CREATE TABLE default.t1 (name string);

Results in:

{
  "format-version" : 2,
  "table-uuid" : "c10da17c-c40c-4e02-9e81-15b6f0f35cb9",
  "location" : "s3://warehouse/default/t1",
  "last-sequence-number" : 0,
  "last-updated-ms" : 1710407936565,
  "last-column-id" : 1,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ ]
  } ],
  "last-partition-id" : 999,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "root",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ ]
}
CREATE OR REPLACE TABLE default.t1 (name string, age int);

The second schema is added:

{
  "format-version" : 2,
  "table-uuid" : "c10da17c-c40c-4e02-9e81-15b6f0f35cb9",
  "location" : "s3://warehouse/default/t1",
  "last-sequence-number" : 0,
  "last-updated-ms" : 1710407992389,
  "last-column-id" : 2,
  "current-schema-id" : 1,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    } ]
  }, {
    "type" : "struct",
    "schema-id" : 1,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 2,
      "name" : "age",
      "required" : false,
      "type" : "int"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ ]
  } ],
  "last-partition-id" : 999,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "root",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ {
    "timestamp-ms" : 1710407936565,
    "metadata-file" : "s3://warehouse/default/t1/metadata/00000-83e6fd53-d3c9-41f9-aeea-b978fb882c7f.metadata.json"
  } ]
}

And then go back to the original schema:

CREATE OR REPLACE TABLE default.t1 (name string);

You'll see that no new schema is being added:

{
  "format-version" : 2,
  "table-uuid" : "c10da17c-c40c-4e02-9e81-15b6f0f35cb9",
  "location" : "s3://warehouse/default/t1",
  "last-sequence-number" : 0,
  "last-updated-ms" : 1710408026710,
  "last-column-id" : 2,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    } ]
  }, {
    "type" : "struct",
    "schema-id" : 1,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 2,
      "name" : "age",
      "required" : false,
      "type" : "int"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ ]
  } ],
  "last-partition-id" : 999,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "root",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ {
    "timestamp-ms" : 1710407936565,
    "metadata-file" : "s3://warehouse/default/t1/metadata/00000-83e6fd53-d3c9-41f9-aeea-b978fb882c7f.metadata.json"
  }, {
    "timestamp-ms" : 1710407992389,
    "metadata-file" : "s3://warehouse/default/t1/metadata/00001-00192fa8-9019-481a-b4da-ebd99d11eeb9.metadata.json"
  } ]
}

What do you think of re-using the update_schema() class:

with table.transaction() as transaction:
    with transaction.update_schema(allow_incompatible_changes=True) as update_schema:
        # Remove old fields
        removed_column_names = base_schema._name_to_id().keys() - schema._name_to_id().keys()
        for removed_column_name in removed_column_names:
            update_schema.delete_column(removed_column_name)
        # Add new and evolve existing fields
        update_schema.union_by_name(schema)

^ Pseudocode, could be cleaner. Ideally, the removal should be done with a visit_with_partner (that's the opposite of the union_by_name.

Copy link
Contributor Author

@anupam-saini anupam-saini Mar 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Fokko for this detailed explanation.
But we also need to cover the step 2 of this example where we add a new schema, right?

So from my understanding, if the schema fields match with an old schema in the metadata, we do union_by_name with the old schema and set it as the current one
Else, we add the new schema.
Is this correct assessment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct!

Comment on lines +792 to +793
AddPartitionSpecUpdate(spec=new_partition_spec),
SetDefaultSpecUpdate(spec_id=-1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same goes here, the spec is being re-used:

CREATE TABLE default.t2 (name string, age int) PARTITIONED BY (name);
{
  "format-version" : 2,
  "table-uuid" : "27f1ab29-7a9f-4324-bb64-c10c5bd2be53",
  "location" : "s3://warehouse/default/t2",
  "last-sequence-number" : 0,
  "last-updated-ms" : 1710409060360,
  "last-column-id" : 2,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 2,
      "name" : "age",
      "required" : false,
      "type" : "int"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ {
      "name" : "name",
      "transform" : "identity",
      "source-id" : 1,
      "field-id" : 1000
    } ]
  } ],
  "last-partition-id" : 1000,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "root",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ ]
}
CREATE OR REPLACE TABLE default.t2 (name string, age int) PARTITIONED BY (name, age);
{
  "format-version" : 2,
  "table-uuid" : "27f1ab29-7a9f-4324-bb64-c10c5bd2be53",
  "location" : "s3://warehouse/default/t2",
  "last-sequence-number" : 0,
  "last-updated-ms" : 1710409079414,
  "last-column-id" : 2,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 2,
      "name" : "age",
      "required" : false,
      "type" : "int"
    } ]
  } ],
  "default-spec-id" : 1,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ {
      "name" : "name",
      "transform" : "identity",
      "source-id" : 1,
      "field-id" : 1000
    } ]
  }, {
    "spec-id" : 1,
    "fields" : [ {
      "name" : "name",
      "transform" : "identity",
      "source-id" : 1,
      "field-id" : 1000
    }, {
      "name" : "age",
      "transform" : "identity",
      "source-id" : 2,
      "field-id" : 1001
    } ]
  } ],
  "last-partition-id" : 1001,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "root",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ {
    "timestamp-ms" : 1710409060360,
    "metadata-file" : "s3://warehouse/default/t2/metadata/00000-bc21fe27-d037-4b98-a8ca-cc1502eb19ec.metadata.json"
  } ]
}
CREATE OR REPLACE TABLE default.t2 (name string, age int) PARTITIONED BY (name);
{
  "format-version" : 2,
  "table-uuid" : "27f1ab29-7a9f-4324-bb64-c10c5bd2be53",
  "location" : "s3://warehouse/default/t2",
  "last-sequence-number" : 0,
  "last-updated-ms" : 1710409086268,
  "last-column-id" : 2,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 2,
      "name" : "age",
      "required" : false,
      "type" : "int"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ {
      "name" : "name",
      "transform" : "identity",
      "source-id" : 1,
      "field-id" : 1000
    } ]
  }, {
    "spec-id" : 1,
    "fields" : [ {
      "name" : "name",
      "transform" : "identity",
      "source-id" : 1,
      "field-id" : 1000
    }, {
      "name" : "age",
      "transform" : "identity",
      "source-id" : 2,
      "field-id" : 1001
    } ]
  } ],
  "last-partition-id" : 1001,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "root",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ {
    "timestamp-ms" : 1710409060360,
    "metadata-file" : "s3://warehouse/default/t2/metadata/00000-bc21fe27-d037-4b98-a8ca-cc1502eb19ec.metadata.json"
  }, {
    "timestamp-ms" : 1710409079414,
    "metadata-file" : "s3://warehouse/default/t2/metadata/00001-8062294f-a8d6-493d-905f-b82dfe01cb29.metadata.json"
  } ]
}

Ideally, we also want to-reuse the update_spec class here.

)

requirements: Tuple[TableRequirement, ...] = (AssertTableUUID(uuid=table.metadata.table_uuid),)
updates: Tuple[TableUpdate, ...] = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to clear the snapshots here as well:

CREATE TABLE default.t3 AS SELECT 'Fokko' as name
{
  "format-version" : 2,
  "table-uuid" : "c61404c8-2211-46c7-866f-2eb87022b728",
  "location" : "s3://warehouse/default/t3",
  "last-sequence-number" : 1,
  "last-updated-ms" : 1710409653861,
  "last-column-id" : 1,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ ]
  } ],
  "last-partition-id" : 999,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "root",
    "created-at" : "2024-03-14T09:47:10.455199504Z",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ {
    "sequence-number" : 1,
    "snapshot-id" : 3622792816294171432,
    "timestamp-ms" : 1710409631964,
    "summary" : {
      "operation" : "append",
      "spark.app.id" : "local-1710405058122",
      "added-data-files" : "1",
      "added-records" : "1",
      "added-files-size" : "416",
      "changed-partition-count" : "1",
      "total-records" : "1",
      "total-files-size" : "416",
      "total-data-files" : "1",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "s3://warehouse/default/t3/metadata/snap-3622792816294171432-1-e457c732-62e5-41eb-998b-abbb8f021ed5.avro",
    "schema-id" : 0
  } ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ {
    "timestamp-ms" : 1710409631964,
    "metadata-file" : "s3://warehouse/default/t3/metadata/00000-c82191f3-e6e2-4001-8e85-8623e3915ff7.metadata.json"
  } ]
}
CREATE OR REPLACE TABLE default.t3 (name string);
{
  "format-version" : 2,
  "table-uuid" : "c61404c8-2211-46c7-866f-2eb87022b728",
  "location" : "s3://warehouse/default/t3",
  "last-sequence-number" : 1,
  "last-updated-ms" : 1710411760623,
  "last-column-id" : 1,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ ]
  } ],
  "last-partition-id" : 999,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "root",
    "created-at" : "2024-03-14T09:47:10.455199504Z",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ {
    "sequence-number" : 1,
    "snapshot-id" : 3622792816294171432,
    "timestamp-ms" : 1710409631964,
    "summary" : {
      "operation" : "append",
      "spark.app.id" : "local-1710405058122",
      "added-data-files" : "1",
      "added-records" : "1",
      "added-files-size" : "416",
      "changed-partition-count" : "1",
      "total-records" : "1",
      "total-files-size" : "416",
      "total-data-files" : "1",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "s3://warehouse/default/t3/metadata/snap-3622792816294171432-1-e457c732-62e5-41eb-998b-abbb8f021ed5.avro",
    "schema-id" : 0
  } ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ {
    "timestamp-ms" : 1710409631964,
    "metadata-file" : "s3://warehouse/default/t3/metadata/00000-c82191f3-e6e2-4001-8e85-8623e3915ff7.metadata.json"
  }, {
    "timestamp-ms" : 1710409653861,
    "metadata-file" : "s3://warehouse/default/t3/metadata/00001-0297d4e7-2468-4c0d-b4ed-ea717df8c3e6.metadata.json"
  } ]
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

REPLACE TABLE Support
4 participants