diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 828dd1862..94ea2b699 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -165,6 +165,34 @@ catalog.create_table( ) ``` +To create or replace a table if it already exists: + +```python +from pyiceberg.catalog import load_catalog +from pyiceberg.schema import Schema +from pyiceberg.types import ( + DoubleType, + StringType, + NestedField, +) + +catalog = load_catalog("default") + +schema = Schema( + NestedField(1, "city", StringType(), required=False), + NestedField(2, "lat", DoubleType(), required=False), + NestedField(3, "long", DoubleType(), required=False), +) + +tbl = catalog.create_or_replace_table( + identifier="docs_example.bids", + schema=schema, + location="s3://pyiceberg", +) +``` + +If the table with the specified identifier already exist then the table metadata will be updated with the provided parameters (schema, location, etc.). + ## Load a table ### Catalog table diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index f2b46fcde..2bf56a452 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -39,16 +39,25 @@ from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, NotInstalledError, TableAlreadyExistsError from pyiceberg.io import FileIO, load_file_io from pyiceberg.manifest import ManifestFile -from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec -from pyiceberg.schema import Schema +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids +from pyiceberg.schema import Schema, assign_fresh_schema_ids from pyiceberg.serializers import ToOutputFile from pyiceberg.table import ( + AddPartitionSpecUpdate, + AddSchemaUpdate, + AddSortOrderUpdate, + AssertTableUUID, CommitTableRequest, CommitTableResponse, + SetCurrentSchemaUpdate, + SetDefaultSortOrderUpdate, + SetDefaultSpecUpdate, Table, + TableRequirement, + TableUpdate, ) +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids from pyiceberg.table.metadata import TableMetadata -from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import ( EMPTY_DICT, Identifier, @@ -646,6 +655,47 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None: delete_files(io, prev_metadata_files, PREVIOUS_METADATA) delete_files(io, {table.metadata_location}, METADATA) + def create_or_replace_table( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + ) -> Table: + """Create a new table or replace an existing one. Replacing the table reatains the table metadata history. + + Args: + identifier (str | Identifier): Table identifier. + schema (Schema): Table's schema. + location (str | None): Location for the table. Optional Argument. + partition_spec (PartitionSpec): PartitionSpec for the table. + sort_order (SortOrder): SortOrder for the table. + properties (Properties): Table properties that can be a string based dictionary. + + Returns: + Table: the new table instance. + """ + try: + return self._replace_table( + identifier=identifier, + new_schema=schema, + new_location=location, + new_partition_spec=partition_spec, + new_sort_order=sort_order, + new_properties=properties, + ) + except NoSuchTableError: + return self.create_table( + identifier=identifier, + schema=schema, + location=location, + partition_spec=partition_spec, + sort_order=sort_order, + properties=properties, + ) + def table_exists(self, identifier: Union[str, Identifier]) -> bool: try: self.load_table(identifier) @@ -717,6 +767,45 @@ def _get_updated_props_and_update_summary( return properties_update_summary, updated_properties + def _replace_table( + self, + identifier: Union[str, Identifier], + new_schema: Union[Schema, "pa.Schema"], + new_partition_spec: PartitionSpec, + new_sort_order: SortOrder, + new_properties: Properties, + new_location: Optional[str] = None, + ) -> Table: + table = self.load_table(identifier) + with table.transaction() as tx: + base_schema = table.schema() + new_schema = assign_fresh_schema_ids(schema_or_type=new_schema, base_schema=base_schema) + new_sort_order = assign_fresh_sort_order_ids( + sort_order=new_sort_order, + old_schema=base_schema, + fresh_schema=new_schema, + sort_order_id=table.sort_order().order_id + 1, + ) + new_partition_spec = assign_fresh_partition_spec_ids( + spec=new_partition_spec, old_schema=base_schema, fresh_schema=new_schema, spec_id=table.spec().spec_id + 1 + ) + + requirements: Tuple[TableRequirement, ...] = (AssertTableUUID(uuid=table.metadata.table_uuid),) + updates: Tuple[TableUpdate, ...] = ( + AddSchemaUpdate(schema=new_schema, last_column_id=new_schema.highest_field_id), + SetCurrentSchemaUpdate(schema_id=-1), + AddSortOrderUpdate(sort_order=new_sort_order), + SetDefaultSortOrderUpdate(sort_order_id=-1), + AddPartitionSpecUpdate(spec=new_partition_spec), + SetDefaultSpecUpdate(spec_id=-1), + ) + tx._apply(updates, requirements) + + tx.set_properties(**new_properties) + if new_location is not None: + tx.update_location(new_location) + return table + def __repr__(self) -> str: """Return the string representation of the Catalog class.""" return f"{self.name} ({self.__class__})" diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py index 9f0d05449..7eb2738ba 100644 --- a/pyiceberg/catalog/rest.py +++ b/pyiceberg/catalog/rest.py @@ -501,7 +501,7 @@ def create_table( properties: Properties = EMPTY_DICT, ) -> Table: iceberg_schema = self._convert_schema_if_needed(schema) - fresh_schema = assign_fresh_schema_ids(iceberg_schema) + fresh_schema = assign_fresh_schema_ids(schema_or_type=iceberg_schema) fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, iceberg_schema, fresh_schema) fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema) diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index 16f158828..86424aa0b 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -243,7 +243,9 @@ def partition_to_path(self, data: Record, schema: Schema) -> str: UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0) -def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fresh_schema: Schema) -> PartitionSpec: +def assign_fresh_partition_spec_ids( + spec: PartitionSpec, old_schema: Schema, fresh_schema: Schema, spec_id: int = INITIAL_PARTITION_SPEC_ID +) -> PartitionSpec: partition_fields = [] for pos, field in enumerate(spec.fields): original_column_name = old_schema.find_column_name(field.source_id) @@ -260,7 +262,7 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fre transform=field.transform, ) ) - return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID) + return PartitionSpec(*partition_fields, spec_id=spec_id) T = TypeVar("T") diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py index b2739d861..451c1a0ad 100644 --- a/pyiceberg/schema.py +++ b/pyiceberg/schema.py @@ -1233,34 +1233,58 @@ def build_position_accessors(schema_or_type: Union[Schema, IcebergType]) -> Dict return visit(schema_or_type, _BuildPositionAccessors()) -def assign_fresh_schema_ids(schema_or_type: Union[Schema, IcebergType], next_id: Optional[Callable[[], int]] = None) -> Schema: - """Traverses the schema, and sets new IDs.""" - return pre_order_visit(schema_or_type, _SetFreshIDs(next_id_func=next_id)) +def assign_fresh_schema_ids( + schema_or_type: Union[Schema, IcebergType], base_schema: Optional[Schema] = None, next_id: Optional[Callable[[], int]] = None +) -> Schema: + """Traverse the schema and assign IDs from the base_schema, or the next_id function.""" + visiting_schema = schema_or_type if isinstance(schema_or_type, Schema) else None + return pre_order_visit( + schema_or_type, _SetFreshIDs(visiting_schema=visiting_schema, base_schema=base_schema, next_id_func=next_id) + ) class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]): - """Traverses the schema and assigns monotonically increasing ids.""" - - old_id_to_new_id: Dict[int, int] - - def __init__(self, next_id_func: Optional[Callable[[], int]] = None) -> None: - self.old_id_to_new_id = {} - counter = itertools.count(1) + """Assign IDs from the base_schema, or generate fresh IDs from either the next_id function or monotonically increasing IDs starting from schema's highest ID.""" + + name_to_id: Dict[str, int] + + def __init__( + self, + visiting_schema: Optional[Schema] = None, + base_schema: Optional[Schema] = None, + next_id_func: Optional[Callable[[], int]] = None, + ) -> None: + self.name_to_id = {} + self.visiting_schema = visiting_schema + self.base_schema = base_schema + counter = itertools.count(1 + (base_schema.highest_field_id if base_schema is not None else 0)) self.next_id_func = next_id_func if next_id_func is not None else lambda: next(counter) - def _get_and_increment(self, current_id: int) -> int: - new_id = self.next_id_func() - self.old_id_to_new_id[current_id] = new_id + def _generate_id(self, field_name: Optional[str] = None) -> int: + field = None + if self.base_schema is not None and field_name is not None: + try: + field = self.base_schema.find_field(field_name) + except ValueError: + pass # field not found, generate new ID below + + new_id = field.field_id if field is not None else self.next_id_func() + if field_name is not None: + self.name_to_id[field_name] = new_id return new_id + def _name(self, id: int) -> Optional[str]: + return self.visiting_schema.find_column_name(id) if self.visiting_schema is not None else None + def schema(self, schema: Schema, struct_result: Callable[[], StructType]) -> Schema: return Schema( *struct_result().fields, - identifier_field_ids=[self.old_id_to_new_id[field_id] for field_id in schema.identifier_field_ids], + identifier_field_ids=[self.name_to_id[field_name] for field_name in schema.identifier_field_names()], + schema_id=self.base_schema.schema_id + 1 if self.base_schema is not None else INITIAL_SCHEMA_ID, ) def struct(self, struct: StructType, field_results: List[Callable[[], IcebergType]]) -> StructType: - new_ids = [self._get_and_increment(field.field_id) for field in struct.fields] + new_ids = [self._generate_id(self._name(field.field_id)) for field in struct.fields] new_fields = [] for field_id, field, field_type in zip(new_ids, struct.fields, field_results): new_fields.append( @@ -1278,7 +1302,7 @@ def field(self, field: NestedField, field_result: Callable[[], IcebergType]) -> return field_result() def list(self, list_type: ListType, element_result: Callable[[], IcebergType]) -> ListType: - element_id = self._get_and_increment(list_type.element_id) + element_id = self._generate_id(self._name(list_type.element_id)) return ListType( element_id=element_id, element=element_result(), @@ -1286,8 +1310,8 @@ def list(self, list_type: ListType, element_result: Callable[[], IcebergType]) - ) def map(self, map_type: MapType, key_result: Callable[[], IcebergType], value_result: Callable[[], IcebergType]) -> MapType: - key_id = self._get_and_increment(map_type.key_id) - value_id = self._get_and_increment(map_type.value_id) + key_id = self._generate_id(self._name(map_type.key_id)) + value_id = self._generate_id(self._name(map_type.value_id)) return MapType( key_id=key_id, key_type=key_result(), diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 787bdb860..f376372e8 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1797,7 +1797,7 @@ def add_column( self._added_name_to_id[full_name] = new_id self._id_to_parent[new_id] = parent_full_path - new_type = assign_fresh_schema_ids(field_type, self.assign_new_column_id) + new_type = assign_fresh_schema_ids(schema_or_type=field_type, next_id=self.assign_new_column_id) field = NestedField(field_id=new_id, name=name, field_type=new_type, required=required, doc=doc) if parent_id in self._adds: diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 3e1acf95f..c09864cba 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -474,7 +474,7 @@ def new_table_metadata( ) -> TableMetadata: from pyiceberg.table import TableProperties - fresh_schema = assign_fresh_schema_ids(schema) + fresh_schema = assign_fresh_schema_ids(schema_or_type=schema) fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, schema, fresh_schema) fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 5f78eb3bc..54401e8b0 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -54,13 +54,15 @@ SetCurrentSchemaUpdate, Table, TableIdentifier, + UpdateSchema, update_table_metadata, ) from pyiceberg.table.metadata import new_table_metadata -from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder -from pyiceberg.transforms import IdentityTransform -from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties -from pyiceberg.types import IntegerType, LongType, NestedField +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortField, SortOrder +from pyiceberg.transforms import IdentityTransform, TruncateTransform +from pyiceberg.typedef import EMPTY_DICT +from pyiceberg.types import IntegerType, LongType, NestedField, StringType, StructType + DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse" @@ -712,6 +714,48 @@ def test_catalog_repr(catalog: InMemoryCatalog) -> None: assert s == "test.in_memory.catalog ()" +def test_catalog_create_or_replace_table(catalog: InMemoryCatalog, table_schema_nested: Schema) -> None: + # Given + table = catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=table_schema_nested, + partition_spec=PartitionSpec(PartitionField(name="foo", transform=IdentityTransform(), source_id=1, field_id=1000)), + properties=TEST_TABLE_PROPERTIES, + ) + highest_field_id = table_schema_nested.highest_field_id + new_schema = ( + UpdateSchema(transaction=None, schema=table_schema_nested) # type: ignore + .update_column("bar", LongType()) + .add_column( + "another_person", + field_type=StructType( + NestedField(field_id=highest_field_id + 2, name="name", field_type=StringType(), required=False), + NestedField(field_id=highest_field_id + 3, name="age", field_type=LongType(), required=True), + ), + ) + ._apply() + ) + new_sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform())) + new_spec = PartitionSpec( + PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=3), name="id"), spec_id=1 + ) + new_properties = TEST_TABLE_PROPERTIES + new_properties["key3"] = "value3" + # When + new_table = catalog.create_or_replace_table( + identifier=table.identifier, + schema=new_schema, + partition_spec=new_spec, + sort_order=new_sort_order, + properties=new_properties, + ) + # Then + assert new_table.schema() == new_schema + assert new_table.spec() == new_spec + assert new_table.sort_order() == new_sort_order + assert new_table.properties == new_properties + + def test_table_properties_int_value(catalog: InMemoryCatalog) -> None: # table properties can be set to int, but still serialized to string property_with_int = {"property_name": 42}