Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate Redundant Identifier Support in TableIdentifier, and row_filter #994

Merged
merged 7 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
RecursiveDict,
)
from pyiceberg.utils.config import Config, merge_config
from pyiceberg.utils.deprecated import deprecation_message
from pyiceberg.utils.deprecated import deprecated, deprecation_message

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -613,6 +613,11 @@ def update_namespace_properties(
ValueError: If removals and updates have overlapping keys.
"""

@deprecated(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This deprecates the public function

deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="Support for parsing catalog level identifier in Catalog identifiers is deprecated. Please refer to the table using only its namespace and its table name.",
)
def identifier_to_tuple_without_catalog(self, identifier: Union[str, Identifier]) -> Identifier:
"""Convert an identifier to a tuple and drop this catalog's name from the first element.

Expand All @@ -627,6 +632,25 @@ def identifier_to_tuple_without_catalog(self, identifier: Union[str, Identifier]
identifier_tuple = identifier_tuple[1:]
return identifier_tuple

def _identifier_to_tuple_without_catalog(self, identifier: Union[str, Identifier]) -> Identifier:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where as this is now called by the PyIceberg functions and only prints the deprecation warning message if we are using the unsupported naming convention

"""Convert an identifier to a tuple and drop this catalog's name from the first element.

Args:
identifier (str | Identifier): Table identifier.

Returns:
Identifier: a tuple of strings with this catalog's name removed
"""
identifier_tuple = Catalog.identifier_to_tuple(identifier)
if len(identifier_tuple) >= 3 and identifier_tuple[0] == self.name:
deprecation_message(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="Support for parsing catalog level identifier in Catalog identifiers is deprecated. Please refer to the table using only its namespace and its table name.",
)
identifier_tuple = identifier_tuple[1:]
return identifier_tuple

@staticmethod
def identifier_to_tuple(identifier: Union[str, Identifier]) -> Identifier:
"""Parse an identifier to a tuple.
Expand Down Expand Up @@ -769,7 +793,7 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool:
return False

def purge_table(self, identifier: Union[str, Identifier]) -> None:
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
table = self.load_table(identifier_tuple)
self.drop_table(identifier_tuple)
io = load_file_io(self.properties, table.metadata_location)
Expand Down Expand Up @@ -823,7 +847,7 @@ def _create_staged_table(
)
io = self._load_file_io(properties=properties, location=metadata_location)
return StagedTable(
identifier=(self.name, database_name, table_name),
identifier=(database_name, table_name),
metadata=metadata,
metadata_location=metadata_location,
io=io,
Expand Down
8 changes: 4 additions & 4 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
dynamo_table_item = self._get_iceberg_table_item(database_name=database_name, table_name=table_name)
return self._convert_dynamo_table_item_to_iceberg_table(dynamo_table_item=dynamo_table_item)
Expand All @@ -260,7 +260,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)

try:
Expand Down Expand Up @@ -291,7 +291,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchPropertyException: When from table miss some required properties.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)

Expand Down Expand Up @@ -638,7 +638,7 @@ def _convert_dynamo_table_item_to_iceberg_table(self, dynamo_table_item: Dict[st
file = io.new_input(metadata_location)
metadata = FromInputFile.table_metadata(file)
return Table(
identifier=(self.name, database_name, table_name),
identifier=(database_name, table_name),
metadata=metadata,
metadata_location=metadata_location,
io=self._load_file_io(metadata.properties, metadata_location),
Expand Down
10 changes: 5 additions & 5 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table:
file = io.new_input(metadata_location)
metadata = FromInputFile.table_metadata(file)
return Table(
identifier=(self.name, database_name, table_name),
identifier=(database_name, table_name),
metadata=metadata,
metadata_location=metadata_location,
io=self._load_file_io(metadata.properties, metadata_location),
Expand Down Expand Up @@ -462,7 +462,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(
identifier_tuple = self._identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)
Expand Down Expand Up @@ -541,7 +541,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)

return self._convert_glue_to_iceberg(self._get_glue_table(database_name=database_name, table_name=table_name))
Expand All @@ -555,7 +555,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
self.glue.delete_table(DatabaseName=database_name, Name=table_name)
Expand All @@ -581,7 +581,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchPropertyException: When from table miss some required properties.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
try:
Expand Down
14 changes: 7 additions & 7 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,15 +289,15 @@ def _convert_hive_into_iceberg(self, table: HiveTable) -> Table:
file = io.new_input(metadata_location)
metadata = FromInputFile.table_metadata(file)
return Table(
identifier=(self.name, table.dbName, table.tableName),
identifier=(table.dbName, table.tableName),
metadata=metadata,
metadata_location=metadata_location,
io=self._load_file_io(metadata.properties, metadata_location),
catalog=self,
)

def _convert_iceberg_into_hive(self, table: Table) -> HiveTable:
identifier_tuple = self.identifier_to_tuple_without_catalog(table.identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(table.identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
current_time_millis = int(time.time() * 1000)

Expand Down Expand Up @@ -431,7 +431,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(
identifier_tuple = self._identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
Expand Down Expand Up @@ -477,7 +477,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
# Table does not exist, create it.
hive_table = self._convert_iceberg_into_hive(
StagedTable(
identifier=(self.name, database_name, table_name),
identifier=(database_name, table_name),
metadata=updated_staged_table.metadata,
metadata_location=updated_staged_table.metadata_location,
io=updated_staged_table.io,
Expand Down Expand Up @@ -509,7 +509,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)

with self._client as open_client:
Expand All @@ -526,7 +526,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
with self._client as open_client:
Expand Down Expand Up @@ -554,7 +554,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchTableError: When a table with the name does not exist.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
try:
Expand Down
12 changes: 6 additions & 6 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylin

def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response: TableResponse) -> Table:
return Table(
identifier=(self.name,) + identifier_tuple if self.name else identifier_tuple,
identifier=identifier_tuple,
metadata_location=table_response.metadata_location, # type: ignore
metadata=table_response.metadata,
io=self._load_file_io(
Expand All @@ -506,7 +506,7 @@ def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response:

def _response_to_staged_table(self, identifier_tuple: Tuple[str, ...], table_response: TableResponse) -> StagedTable:
return StagedTable(
identifier=(self.name,) + identifier_tuple if self.name else identifier_tuple,
identifier=identifier_tuple if self.name else identifier_tuple,
metadata_location=table_response.metadata_location, # type: ignore
metadata=table_response.metadata,
io=self._load_file_io(
Expand Down Expand Up @@ -664,7 +664,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:

@retry(**_RETRY_ARGS)
def load_table(self, identifier: Union[str, Identifier]) -> Table:
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
response = self._session.get(
self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier_tuple))
)
Expand All @@ -678,7 +678,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:

@retry(**_RETRY_ARGS)
def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = False) -> None:
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
response = self._session.delete(
self.url(
Endpoints.drop_table, prefixed=True, purge=purge_requested, **self._split_identifier_for_path(identifier_tuple)
Expand All @@ -695,7 +695,7 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:

@retry(**_RETRY_ARGS)
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
payload = {
"source": self._split_identifier_for_json(from_identifier_tuple),
"destination": self._split_identifier_for_json(to_identifier),
Expand Down Expand Up @@ -830,7 +830,7 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool:
Returns:
bool: True if the table exists, False otherwise.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
response = self._session.head(
self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier_tuple))
)
Expand Down
16 changes: 8 additions & 8 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def _convert_orm_to_iceberg(self, orm_table: IcebergTables) -> Table:
file = io.new_input(metadata_location)
metadata = FromInputFile.table_metadata(file)
return Table(
identifier=(self.name,) + Catalog.identifier_to_tuple(table_namespace) + (table_name,),
identifier=Catalog.identifier_to_tuple(table_namespace) + (table_name,),
metadata=metadata,
metadata_location=metadata_location,
io=self._load_file_io(metadata.properties, metadata_location),
Expand Down Expand Up @@ -192,7 +192,7 @@ def create_table(
"""
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore

identifier_nocatalog = self.identifier_to_tuple_without_catalog(identifier)
identifier_nocatalog = self._identifier_to_tuple_without_catalog(identifier)
namespace_identifier = Catalog.namespace_from(identifier_nocatalog)
table_name = Catalog.table_name_from(identifier_nocatalog)
if not self._namespace_exists(namespace_identifier):
Expand Down Expand Up @@ -238,7 +238,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
TableAlreadyExistsError: If the table already exists
NoSuchNamespaceError: If namespace does not exist
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
namespace_tuple = Catalog.namespace_from(identifier_tuple)
namespace = Catalog.namespace_to_string(namespace_tuple)
table_name = Catalog.table_name_from(identifier_tuple)
Expand Down Expand Up @@ -277,7 +277,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
namespace_tuple = Catalog.namespace_from(identifier_tuple)
namespace = Catalog.namespace_to_string(namespace_tuple)
table_name = Catalog.table_name_from(identifier_tuple)
Expand All @@ -301,7 +301,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
namespace_tuple = Catalog.namespace_from(identifier_tuple)
namespace = Catalog.namespace_to_string(namespace_tuple)
table_name = Catalog.table_name_from(identifier_tuple)
Expand Down Expand Up @@ -348,8 +348,8 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
TableAlreadyExistsError: If a table with the new name already exist.
NoSuchNamespaceError: If the target namespace does not exist.
"""
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
to_identifier_tuple = self.identifier_to_tuple_without_catalog(to_identifier)
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
to_identifier_tuple = self._identifier_to_tuple_without_catalog(to_identifier)
from_namespace_tuple = Catalog.namespace_from(from_identifier_tuple)
from_namespace = Catalog.namespace_to_string(from_namespace_tuple)
from_table_name = Catalog.table_name_from(from_identifier_tuple)
Expand Down Expand Up @@ -407,7 +407,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(
identifier_tuple = self._identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
namespace_tuple = Catalog.namespace_from(identifier_tuple)
Expand Down
Loading