Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort Order update #476

Merged
merged 11 commits into from
Feb 29, 2024
Merged
37 changes: 37 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,13 @@ def is_added_schema(self, schema_id: int) -> bool:
update.schema_.schema_id == schema_id for update in self._updates if update.action == TableUpdateAction.add_schema
)

def is_added_sort_order(self, sort_order_id: int) -> bool:
return any(
update.sort_order.order_id == sort_order_id
for update in self._updates
if update.action == TableUpdateAction.add_sort_order
)


@singledispatch
def _apply_table_update(update: TableUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
Expand Down Expand Up @@ -704,6 +711,36 @@ def _(update: SetSnapshotRefUpdate, base_metadata: TableMetadata, context: _Tabl
return base_metadata.model_copy(update=metadata_updates)


@_apply_table_update.register(AddSortOrderUpdate)
def _(update: AddSortOrderUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
context.add_update(update)
return base_metadata.model_copy(
update={
"sort_orders": base_metadata.sort_orders + [update.sort_order],
}
)


@_apply_table_update.register(SetDefaultSortOrderUpdate)
def _(update: SetDefaultSortOrderUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
new_sort_order_id = update.sort_order_id
if new_sort_order_id == -1:
# The last added sort order should be in base_metadata.sort_orders at this point
new_sort_order_id = max(sort_order.order_id for sort_order in base_metadata.sort_orders)
if not context.is_added_sort_order(new_sort_order_id):
raise ValueError("Cannot set current sort order to the last added one when no sort order has been added")

if new_sort_order_id == base_metadata.default_sort_order_id:
return base_metadata

sort_order = base_metadata.sort_order_by_id(new_sort_order_id)
if sort_order is None:
raise ValueError(f"Sort order with id {new_sort_order_id} does not exist")

context.add_update(update)
return base_metadata.model_copy(update={"default_sort_order_id": new_sort_order_id})


def update_table_metadata(base_metadata: TableMetadata, updates: Tuple[TableUpdate, ...]) -> TableMetadata:
"""Update the table metadata with the given updates in one transaction.

Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ def schema_by_id(self, schema_id: int) -> Optional[Schema]:
"""Get the schema by schema_id."""
return next((schema for schema in self.schemas if schema.schema_id == schema_id), None)

def sort_order_by_id(self, sort_order_id: int) -> Optional[Schema]:
anupam-saini marked this conversation as resolved.
Show resolved Hide resolved
"""Get the sort order by sort_order_id."""
return next((sort_order for sort_order in self.sort_orders if sort_order.order_id == sort_order_id), None)


class TableMetadataV1(TableMetadataCommonFields, IcebergBaseModel):
"""Represents version 1 of the Table Metadata.
Expand Down
6 changes: 4 additions & 2 deletions pyiceberg/table/sorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ def __repr__(self) -> str:
UNSORTED_SORT_ORDER = SortOrder(order_id=UNSORTED_SORT_ORDER_ID)


def assign_fresh_sort_order_ids(sort_order: SortOrder, old_schema: Schema, fresh_schema: Schema) -> SortOrder:
def assign_fresh_sort_order_ids(
sort_order: SortOrder, old_schema: Schema, fresh_schema: Schema, sort_order_id: Optional[int] = None
Fokko marked this conversation as resolved.
Show resolved Hide resolved
) -> SortOrder:
if sort_order.is_unsorted:
return UNSORTED_SORT_ORDER

Expand All @@ -189,4 +191,4 @@ def assign_fresh_sort_order_ids(sort_order: SortOrder, old_schema: Schema, fresh
)
)

return SortOrder(*fresh_fields, order_id=INITIAL_SORT_ORDER_ID)
return SortOrder(*fresh_fields, order_id=sort_order_id if sort_order_id is not None else INITIAL_SORT_ORDER_ID)
Fokko marked this conversation as resolved.
Show resolved Hide resolved
28 changes: 28 additions & 0 deletions tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from pyiceberg.schema import Schema
from pyiceberg.table import (
AddSnapshotUpdate,
AddSortOrderUpdate,
AssertCreate,
AssertCurrentSchemaId,
AssertDefaultSortOrderId,
Expand All @@ -52,6 +53,7 @@
AssertRefSnapshotId,
AssertTableUUID,
RemovePropertiesUpdate,
SetDefaultSortOrderUpdate,
SetPropertiesUpdate,
SetSnapshotRefUpdate,
SnapshotRef,
Expand Down Expand Up @@ -664,6 +666,32 @@ def test_update_metadata_set_snapshot_ref(table_v2: Table) -> None:
)


def test_update_metadata_add_update_sort_order(table_v2: Table) -> None:
new_sort_order = SortOrder(order_id=table_v2.sort_order().order_id + 1)
new_metadata = update_table_metadata(
table_v2.metadata,
(AddSortOrderUpdate(sort_order=new_sort_order), SetDefaultSortOrderUpdate(sort_order_id=-1)),
)
assert len(new_metadata.sort_orders) == 2
assert new_metadata.sort_orders[-1] == new_sort_order
assert new_metadata.default_sort_order_id == new_sort_order.order_id


def test_update_metadata_update_sort_order_invalid(table_v2: Table) -> None:
with pytest.raises(ValueError, match="Cannot set current sort order to the last added one when no sort order has been added"):
update_table_metadata(
table_v2.metadata,
(SetDefaultSortOrderUpdate(sort_order_id=-1),)
)

invalid_order_id = 10
with pytest.raises(ValueError, match=f"Sort order with id {invalid_order_id} does not exist"):
update_table_metadata(
table_v2.metadata,
(SetDefaultSortOrderUpdate(sort_order_id=invalid_order_id),)
)


def test_update_metadata_with_multiple_updates(table_v1: Table) -> None:
base_metadata = table_v1.metadata
transaction = table_v1.transaction()
Expand Down
Loading