From 066e8c442c3b0ab7be7dc3b03fd50ec4e0d44275 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Fri, 19 Jan 2024 20:36:23 -0800 Subject: [PATCH 01/23] extract InMemoryCatalog out of test --- pyiceberg/catalog/__init__.py | 8 ++ pyiceberg/catalog/memory.py | 225 ++++++++++++++++++++++++++++++++++ tests/catalog/test_base.py | 213 +------------------------------- 3 files changed, 236 insertions(+), 210 deletions(-) create mode 100644 pyiceberg/catalog/memory.py diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 6e5dc2748f..e5d6bd7a30 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -97,6 +97,7 @@ class CatalogType(Enum): GLUE = "glue" DYNAMODB = "dynamodb" SQL = "sql" + MEMORY = "memory" def load_rest(name: str, conf: Properties) -> Catalog: @@ -143,12 +144,19 @@ def load_sql(name: str, conf: Properties) -> Catalog: ) from exc +def load_memory(name: str, conf: Properties) -> Catalog: + from pyiceberg.catalog.memory import InMemoryCatalog + + return InMemoryCatalog(name, **conf) + + AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = { CatalogType.REST: load_rest, CatalogType.HIVE: load_hive, CatalogType.GLUE: load_glue, CatalogType.DYNAMODB: load_dynamodb, CatalogType.SQL: load_sql, + CatalogType.MEMORY: load_memory, } diff --git a/pyiceberg/catalog/memory.py b/pyiceberg/catalog/memory.py new file mode 100644 index 0000000000..b9295ad176 --- /dev/null +++ b/pyiceberg/catalog/memory.py @@ -0,0 +1,225 @@ +from typing import ( + Dict, + List, + Optional, + Set, + Union, +) + +from pyiceberg.catalog import ( + Catalog, + Identifier, + Properties, + PropertiesUpdateSummary, +) +from pyiceberg.exceptions import ( + NamespaceAlreadyExistsError, + NamespaceNotEmptyError, + NoSuchNamespaceError, + NoSuchTableError, + TableAlreadyExistsError, +) +from pyiceberg.io import load_file_io +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table import ( + AddSchemaUpdate, + CommitTableRequest, + CommitTableResponse, + Table, +) +from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.typedef import EMPTY_DICT + + +class InMemoryCatalog(Catalog): + """An in-memory catalog implementation for testing purposes.""" + + __tables: Dict[Identifier, Table] + __namespaces: Dict[Identifier, Properties] + + def __init__(self, name: str, **properties: str) -> None: + super().__init__(name, **properties) + self.__tables = {} + self.__namespaces = {} + + def create_table( + self, + identifier: Union[str, Identifier], + schema: Schema, + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + ) -> Table: + identifier = Catalog.identifier_to_tuple(identifier) + namespace = Catalog.namespace_from(identifier) + + if identifier in self.__tables: + raise TableAlreadyExistsError(f"Table already exists: {identifier}") + else: + if namespace not in self.__namespaces: + self.__namespaces[namespace] = {} + + new_location = location or f's3://warehouse/{"/".join(identifier)}/data' + metadata = TableMetadataV1(**{ + "format-version": 1, + "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", + "location": new_location, + "last-updated-ms": 1602638573874, + "last-column-id": schema.highest_field_id, + "schema": schema.model_dump(), + "partition-spec": partition_spec.model_dump()["fields"], + "properties": properties, + "current-snapshot-id": -1, + "snapshots": [{"snapshot-id": 1925, "timestamp-ms": 1602638573822}], + }) + table = Table( + identifier=identifier, + metadata=metadata, + metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json', + io=load_file_io(), + catalog=self, + ) + self.__tables[identifier] = table + return table + + def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: + raise NotImplementedError + + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: + new_metadata: Optional[TableMetadata] = None + metadata_location = "" + for update in table_request.updates: + if isinstance(update, AddSchemaUpdate): + add_schema_update: AddSchemaUpdate = update + identifier = tuple(table_request.identifier.namespace.root) + (table_request.identifier.name,) + table = self.__tables[identifier] + new_metadata = new_table_metadata( + add_schema_update.schema_, + table.metadata.partition_specs[0], + table.sort_order(), + table.location(), + table.properties, + table.metadata.table_uuid, + ) + + table = Table( + identifier=identifier, + metadata=new_metadata, + metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json', + io=load_file_io(), + catalog=self, + ) + + self.__tables[identifier] = table + metadata_location = f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json' + + return CommitTableResponse( + metadata=new_metadata.model_dump() if new_metadata else {}, + metadata_location=metadata_location if metadata_location else "", + ) + + def load_table(self, identifier: Union[str, Identifier]) -> Table: + identifier = self.identifier_to_tuple_without_catalog(identifier) + try: + return self.__tables[identifier] + except KeyError as error: + raise NoSuchTableError(f"Table does not exist: {identifier}") from error + + def drop_table(self, identifier: Union[str, Identifier]) -> None: + identifier = self.identifier_to_tuple_without_catalog(identifier) + try: + self.__tables.pop(identifier) + except KeyError as error: + raise NoSuchTableError(f"Table does not exist: {identifier}") from error + + def purge_table(self, identifier: Union[str, Identifier]) -> None: + self.drop_table(identifier) + + def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: + from_identifier = self.identifier_to_tuple_without_catalog(from_identifier) + try: + table = self.__tables.pop(from_identifier) + except KeyError as error: + raise NoSuchTableError(f"Table does not exist: {from_identifier}") from error + + to_identifier = Catalog.identifier_to_tuple(to_identifier) + to_namespace = Catalog.namespace_from(to_identifier) + if to_namespace not in self.__namespaces: + self.__namespaces[to_namespace] = {} + + self.__tables[to_identifier] = Table( + identifier=to_identifier, + metadata=table.metadata, + metadata_location=table.metadata_location, + io=load_file_io(), + catalog=self, + ) + return self.__tables[to_identifier] + + def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: + namespace = Catalog.identifier_to_tuple(namespace) + if namespace in self.__namespaces: + raise NamespaceAlreadyExistsError(f"Namespace already exists: {namespace}") + else: + self.__namespaces[namespace] = properties if properties else {} + + def drop_namespace(self, namespace: Union[str, Identifier]) -> None: + namespace = Catalog.identifier_to_tuple(namespace) + if [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]]: + raise NamespaceNotEmptyError(f"Namespace is not empty: {namespace}") + try: + self.__namespaces.pop(namespace) + except KeyError as error: + raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error + + def list_tables(self, namespace: Optional[Union[str, Identifier]] = None) -> List[Identifier]: + if namespace: + namespace = Catalog.identifier_to_tuple(namespace) + list_tables = [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]] + else: + list_tables = list(self.__tables.keys()) + + return list_tables + + def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: + # Hierarchical namespace is not supported. Return an empty list + if namespace: + return [] + + return list(self.__namespaces.keys()) + + def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: + namespace = Catalog.identifier_to_tuple(namespace) + try: + return self.__namespaces[namespace] + except KeyError as error: + raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error + + def update_namespace_properties( + self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT + ) -> PropertiesUpdateSummary: + removed: Set[str] = set() + updated: Set[str] = set() + + namespace = Catalog.identifier_to_tuple(namespace) + if namespace in self.__namespaces: + if removals: + for key in removals: + if key in self.__namespaces[namespace]: + del self.__namespaces[namespace][key] + removed.add(key) + if updates: + for key, value in updates.items(): + self.__namespaces[namespace][key] = value + updated.add(key) + else: + raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") + + expected_to_change = removed.difference(removals or set()) + + return PropertiesUpdateSummary( + removed=list(removed or []), updated=list(updates.keys() if updates else []), missing=list(expected_to_change) + ) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index d15c90fee3..255b262a5c 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -16,13 +16,6 @@ # under the License. # pylint:disable=redefined-outer-name -from typing import ( - Dict, - List, - Optional, - Set, - Union, -) import pyarrow as pa import pytest @@ -30,10 +23,9 @@ from pyiceberg.catalog import ( Catalog, - Identifier, - Properties, PropertiesUpdateSummary, ) +from pyiceberg.catalog.memory import InMemoryCatalog from pyiceberg.exceptions import ( NamespaceAlreadyExistsError, NamespaceNotEmptyError, @@ -41,219 +33,20 @@ NoSuchTableError, TableAlreadyExistsError, ) -from pyiceberg.io import load_file_io -from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec +from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import ( AddSchemaUpdate, CommitTableRequest, - CommitTableResponse, Namespace, SetCurrentSchemaUpdate, Table, TableIdentifier, ) -from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata -from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.transforms import IdentityTransform -from pyiceberg.typedef import EMPTY_DICT from pyiceberg.types import IntegerType, LongType, NestedField -class InMemoryCatalog(Catalog): - """An in-memory catalog implementation for testing purposes.""" - - __tables: Dict[Identifier, Table] - __namespaces: Dict[Identifier, Properties] - - def __init__(self, name: str, **properties: str) -> None: - super().__init__(name, **properties) - self.__tables = {} - self.__namespaces = {} - - def create_table( - self, - identifier: Union[str, Identifier], - schema: Union[Schema, "pa.Schema"], - location: Optional[str] = None, - partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, - sort_order: SortOrder = UNSORTED_SORT_ORDER, - properties: Properties = EMPTY_DICT, - ) -> Table: - schema: Schema = self._convert_schema_if_needed(schema) # type: ignore - - identifier = Catalog.identifier_to_tuple(identifier) - namespace = Catalog.namespace_from(identifier) - - if identifier in self.__tables: - raise TableAlreadyExistsError(f"Table already exists: {identifier}") - else: - if namespace not in self.__namespaces: - self.__namespaces[namespace] = {} - - new_location = location or f's3://warehouse/{"/".join(identifier)}/data' - metadata = TableMetadataV1(**{ - "format-version": 1, - "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", - "location": new_location, - "last-updated-ms": 1602638573874, - "last-column-id": schema.highest_field_id, - "schema": schema.model_dump(), - "partition-spec": partition_spec.model_dump()["fields"], - "properties": properties, - "current-snapshot-id": -1, - "snapshots": [{"snapshot-id": 1925, "timestamp-ms": 1602638573822}], - }) - table = Table( - identifier=identifier, - metadata=metadata, - metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json', - io=load_file_io(), - catalog=self, - ) - self.__tables[identifier] = table - return table - - def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: - raise NotImplementedError - - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - new_metadata: Optional[TableMetadata] = None - metadata_location = "" - for update in table_request.updates: - if isinstance(update, AddSchemaUpdate): - add_schema_update: AddSchemaUpdate = update - identifier = tuple(table_request.identifier.namespace.root) + (table_request.identifier.name,) - table = self.__tables[identifier] - new_metadata = new_table_metadata( - add_schema_update.schema_, - table.metadata.partition_specs[0], - table.sort_order(), - table.location(), - table.properties, - table.metadata.table_uuid, - ) - - table = Table( - identifier=identifier, - metadata=new_metadata, - metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json', - io=load_file_io(), - catalog=self, - ) - - self.__tables[identifier] = table - metadata_location = f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json' - - return CommitTableResponse( - metadata=new_metadata.model_dump() if new_metadata else {}, - metadata_location=metadata_location if metadata_location else "", - ) - - def load_table(self, identifier: Union[str, Identifier]) -> Table: - identifier = self.identifier_to_tuple_without_catalog(identifier) - try: - return self.__tables[identifier] - except KeyError as error: - raise NoSuchTableError(f"Table does not exist: {identifier}") from error - - def drop_table(self, identifier: Union[str, Identifier]) -> None: - identifier = self.identifier_to_tuple_without_catalog(identifier) - try: - self.__tables.pop(identifier) - except KeyError as error: - raise NoSuchTableError(f"Table does not exist: {identifier}") from error - - def purge_table(self, identifier: Union[str, Identifier]) -> None: - self.drop_table(identifier) - - def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: - from_identifier = self.identifier_to_tuple_without_catalog(from_identifier) - try: - table = self.__tables.pop(from_identifier) - except KeyError as error: - raise NoSuchTableError(f"Table does not exist: {from_identifier}") from error - - to_identifier = Catalog.identifier_to_tuple(to_identifier) - to_namespace = Catalog.namespace_from(to_identifier) - if to_namespace not in self.__namespaces: - self.__namespaces[to_namespace] = {} - - self.__tables[to_identifier] = Table( - identifier=to_identifier, - metadata=table.metadata, - metadata_location=table.metadata_location, - io=load_file_io(), - catalog=self, - ) - return self.__tables[to_identifier] - - def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: - namespace = Catalog.identifier_to_tuple(namespace) - if namespace in self.__namespaces: - raise NamespaceAlreadyExistsError(f"Namespace already exists: {namespace}") - else: - self.__namespaces[namespace] = properties if properties else {} - - def drop_namespace(self, namespace: Union[str, Identifier]) -> None: - namespace = Catalog.identifier_to_tuple(namespace) - if [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]]: - raise NamespaceNotEmptyError(f"Namespace is not empty: {namespace}") - try: - self.__namespaces.pop(namespace) - except KeyError as error: - raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error - - def list_tables(self, namespace: Optional[Union[str, Identifier]] = None) -> List[Identifier]: - if namespace: - namespace = Catalog.identifier_to_tuple(namespace) - list_tables = [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]] - else: - list_tables = list(self.__tables.keys()) - - return list_tables - - def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: - # Hierarchical namespace is not supported. Return an empty list - if namespace: - return [] - - return list(self.__namespaces.keys()) - - def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: - namespace = Catalog.identifier_to_tuple(namespace) - try: - return self.__namespaces[namespace] - except KeyError as error: - raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error - - def update_namespace_properties( - self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT - ) -> PropertiesUpdateSummary: - removed: Set[str] = set() - updated: Set[str] = set() - - namespace = Catalog.identifier_to_tuple(namespace) - if namespace in self.__namespaces: - if removals: - for key in removals: - if key in self.__namespaces[namespace]: - del self.__namespaces[namespace][key] - removed.add(key) - if updates: - for key, value in updates.items(): - self.__namespaces[namespace][key] = value - updated.add(key) - else: - raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") - - expected_to_change = removed.difference(removals or set()) - - return PropertiesUpdateSummary( - removed=list(removed or []), updated=list(updates.keys() if updates else []), missing=list(expected_to_change) - ) - - @pytest.fixture def catalog() -> InMemoryCatalog: return InMemoryCatalog("test.in.memory.catalog", **{"test.key": "test.value"}) @@ -681,4 +474,4 @@ def test_add_column_with_statement(catalog: InMemoryCatalog) -> None: def test_catalog_repr(catalog: InMemoryCatalog) -> None: s = repr(catalog) - assert s == "test.in.memory.catalog ()" + assert s == "test.in.memory.catalog ()" From b1a99f7867688238854f28d77a3b6e367984988d Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 20 Jan 2024 10:18:17 -0800 Subject: [PATCH 02/23] generalize InMemoryCatalog --- pyiceberg/catalog/memory.py | 40 +++++++++++++++++++------------------ tests/cli/test_console.py | 26 ++++++++++++++++++++---- 2 files changed, 43 insertions(+), 23 deletions(-) diff --git a/pyiceberg/catalog/memory.py b/pyiceberg/catalog/memory.py index b9295ad176..c89e8f6bb1 100644 --- a/pyiceberg/catalog/memory.py +++ b/pyiceberg/catalog/memory.py @@ -1,3 +1,4 @@ +import uuid from typing import ( Dict, List, @@ -28,13 +29,13 @@ CommitTableResponse, Table, ) -from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata +from pyiceberg.table.metadata import TableMetadata, new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT class InMemoryCatalog(Catalog): - """An in-memory catalog implementation for testing purposes.""" + """An in-memory catalog implementation.""" __tables: Dict[Identifier, Table] __namespaces: Dict[Identifier, Properties] @@ -52,6 +53,7 @@ def create_table( partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, + table_uuid: Optional[uuid.UUID] = None, ) -> Table: identifier = Catalog.identifier_to_tuple(identifier) namespace = Catalog.namespace_from(identifier) @@ -62,24 +64,24 @@ def create_table( if namespace not in self.__namespaces: self.__namespaces[namespace] = {} - new_location = location or f's3://warehouse/{"/".join(identifier)}/data' - metadata = TableMetadataV1(**{ - "format-version": 1, - "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", - "location": new_location, - "last-updated-ms": 1602638573874, - "last-column-id": schema.highest_field_id, - "schema": schema.model_dump(), - "partition-spec": partition_spec.model_dump()["fields"], - "properties": properties, - "current-snapshot-id": -1, - "snapshots": [{"snapshot-id": 1925, "timestamp-ms": 1602638573822}], - }) + if not location: + location = f's3://warehouse/{"/".join(identifier)}/data' + + metadata_location = f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json' + + metadata = new_table_metadata( + schema=schema, + partition_spec=partition_spec, + sort_order=sort_order, + location=location, + properties=properties, + table_uuid=table_uuid, + ) table = Table( identifier=identifier, metadata=metadata, - metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json', - io=load_file_io(), + metadata_location=metadata_location, + io=self._load_file_io(properties=metadata.properties, location=metadata_location), catalog=self, ) self.__tables[identifier] = table @@ -109,7 +111,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons identifier=identifier, metadata=new_metadata, metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json', - io=load_file_io(), + io=self._load_file_io(properties=new_metadata.properties, location=metadata_location), catalog=self, ) @@ -154,7 +156,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U identifier=to_identifier, metadata=table.metadata, metadata_location=table.metadata_location, - io=load_file_io(), + io=self._load_file_io(properties=table.metadata.properties, location=table.metadata_location), catalog=self, ) return self.__tables[to_identifier] diff --git a/tests/cli/test_console.py b/tests/cli/test_console.py index 5eec231cca..098ce36214 100644 --- a/tests/cli/test_console.py +++ b/tests/cli/test_console.py @@ -14,7 +14,10 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import datetime import os +import uuid +from unittest.mock import MagicMock import pytest from click.testing import CliRunner @@ -59,6 +62,13 @@ def fixture_namespace_properties() -> Properties: return TEST_NAMESPACE_PROPERTIES.copy() +@pytest.fixture() +def mock_datetime_now(monkeypatch: pytest.MonkeyPatch) -> None: + datetime_mock = MagicMock(wraps=datetime.datetime) + datetime_mock.now.return_value = datetime.datetime.fromtimestamp(TEST_TIMESTAMP / 1000.0).astimezone() + monkeypatch.setattr(datetime, "datetime", datetime_mock) + + TEST_TABLE_IDENTIFIER = ("default", "my_table") TEST_TABLE_NAMESPACE = "default" TEST_NAMESPACE_PROPERTIES = {"location": "s3://warehouse/database/location"} @@ -71,6 +81,8 @@ def fixture_namespace_properties() -> Properties: TEST_TABLE_LOCATION = "s3://bucket/test/location" TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000)) TEST_TABLE_PROPERTIES = {"read.split.target.size": "134217728"} +TEST_TABLE_UUID = uuid.UUID("d20125c8-7284-442c-9aea-15fee620737c") +TEST_TIMESTAMP = 1602638573874 MOCK_ENVIRONMENT = {"PYICEBERG_CATALOG__PRODUCTION__URI": "test://doesnotexist"} @@ -120,12 +132,14 @@ def test_describe_namespace_does_not_exists(catalog: InMemoryCatalog) -> None: assert result.output == "Namespace does not exist: ('doesnotexist',)\n" -def test_describe_table(catalog: InMemoryCatalog) -> None: +@pytest.fixture() +def test_describe_table(catalog: InMemoryCatalog, mock_datetime_now: None) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, + table_uuid=TEST_TABLE_UUID, ) runner = CliRunner() @@ -134,7 +148,7 @@ def test_describe_table(catalog: InMemoryCatalog) -> None: assert ( # Strip the whitespace on the end "\n".join([line.rstrip() for line in result.output.split("\n")]) - == """Table format version 1 + == """Table format version 2 Metadata location s3://warehouse/default/my_table/metadata/metadata.json Table UUID d20125c8-7284-442c-9aea-15fee620737c Last Updated 1602638573874 @@ -227,6 +241,7 @@ def test_uuid(catalog: InMemoryCatalog) -> None: schema=TEST_TABLE_SCHEMA, location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, + table_uuid=TEST_TABLE_UUID, ) runner = CliRunner() @@ -550,12 +565,14 @@ def test_json_describe_namespace_does_not_exists(catalog: InMemoryCatalog) -> No assert result.output == """{"type": "NoSuchNamespaceError", "message": "Namespace does not exist: ('doesnotexist',)"}\n""" -def test_json_describe_table(catalog: InMemoryCatalog) -> None: +@pytest.fixture() +def test_json_describe_table(catalog: InMemoryCatalog, mock_datetime_now: None) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, + table_uuid=TEST_TABLE_UUID, ) runner = CliRunner() @@ -563,7 +580,7 @@ def test_json_describe_table(catalog: InMemoryCatalog) -> None: assert result.exit_code == 0 assert ( result.output - == """{"identifier":["default","my_table"],"metadata_location":"s3://warehouse/default/my_table/metadata/metadata.json","metadata":{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}}\n""" + == """{"identifier":["default","my_table"],"metadata_location":"s3://warehouse/default/my_table/metadata/metadata.json","metadata":{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":2,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}}\n""" ) @@ -634,6 +651,7 @@ def test_json_uuid(catalog: InMemoryCatalog) -> None: schema=TEST_TABLE_SCHEMA, location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, + table_uuid=TEST_TABLE_UUID, ) runner = CliRunner() From 32c449e2be3ff73b416c2e3dbbe25c51673353a8 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 21 Jan 2024 10:58:16 -0800 Subject: [PATCH 03/23] make write work --- pyiceberg/catalog/memory.py | 70 +++++++++++++++++-------------------- pyiceberg/io/pyarrow.py | 2 ++ tests/cli/test_console.py | 5 +-- 3 files changed, 38 insertions(+), 39 deletions(-) diff --git a/pyiceberg/catalog/memory.py b/pyiceberg/catalog/memory.py index c89e8f6bb1..e1a2908c0b 100644 --- a/pyiceberg/catalog/memory.py +++ b/pyiceberg/catalog/memory.py @@ -20,19 +20,20 @@ NoSuchTableError, TableAlreadyExistsError, ) -from pyiceberg.io import load_file_io from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import ( - AddSchemaUpdate, CommitTableRequest, CommitTableResponse, Table, + update_table_metadata, ) -from pyiceberg.table.metadata import TableMetadata, new_table_metadata +from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT +DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse" + class InMemoryCatalog(Catalog): """An in-memory catalog implementation.""" @@ -40,8 +41,10 @@ class InMemoryCatalog(Catalog): __tables: Dict[Identifier, Table] __namespaces: Dict[Identifier, Properties] - def __init__(self, name: str, **properties: str) -> None: + def __init__(self, name: str, warehouse_location: Optional[str] = None, **properties: str) -> None: super().__init__(name, **properties) + + self._warehouse_location = warehouse_location or DEFAULT_WAREHOUSE_LOCATION self.__tables = {} self.__namespaces = {} @@ -64,10 +67,11 @@ def create_table( if namespace not in self.__namespaces: self.__namespaces[namespace] = {} - if not location: - location = f's3://warehouse/{"/".join(identifier)}/data' + # if not location: + location = f'{self._warehouse_location}/{"/".join(identifier)}' - metadata_location = f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json' + # _get_default_warehouse_location + metadata_location = f'{self._warehouse_location}/{"/".join(identifier)}/metadata/metadata.json' metadata = new_table_metadata( schema=schema, @@ -91,37 +95,29 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: raise NotImplementedError def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - new_metadata: Optional[TableMetadata] = None - metadata_location = "" - for update in table_request.updates: - if isinstance(update, AddSchemaUpdate): - add_schema_update: AddSchemaUpdate = update - identifier = tuple(table_request.identifier.namespace.root) + (table_request.identifier.name,) - table = self.__tables[identifier] - new_metadata = new_table_metadata( - add_schema_update.schema_, - table.metadata.partition_specs[0], - table.sort_order(), - table.location(), - table.properties, - table.metadata.table_uuid, - ) - - table = Table( - identifier=identifier, - metadata=new_metadata, - metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json', - io=self._load_file_io(properties=new_metadata.properties, location=metadata_location), - catalog=self, - ) - - self.__tables[identifier] = table - metadata_location = f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json' - - return CommitTableResponse( - metadata=new_metadata.model_dump() if new_metadata else {}, - metadata_location=metadata_location if metadata_location else "", + identifier_tuple = self.identifier_to_tuple_without_catalog( + tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) ) + current_table = self.load_table(identifier_tuple) + base_metadata = current_table.metadata + + for requirement in table_request.requirements: + requirement.validate(base_metadata) + + updated_metadata = update_table_metadata(base_metadata, table_request.updates) + if updated_metadata == base_metadata: + # no changes, do nothing + return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) + + # write new metadata + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 + new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) + self._write_metadata(updated_metadata, current_table.io, new_metadata_location) + + # update table state + current_table.metadata = updated_metadata + + return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) def load_table(self, identifier: Union[str, Identifier]) -> Table: identifier = self.identifier_to_tuple_without_catalog(identifier) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 7a94ce4c7d..54c9bbcfd4 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -296,6 +296,8 @@ def create(self, overwrite: bool = False) -> OutputStream: try: if not overwrite and self.exists() is True: raise FileExistsError(f"Cannot create file, already exists: {self.location}") + # Some FS (such as LocalFileSystem) requires directories to exist before creating files + self._filesystem.create_dir(os.path.dirname(self._path), recursive=True) output_file = self._filesystem.open_output_stream(self._path, buffer_size=self._buffer_size) except PermissionError: raise diff --git a/tests/cli/test_console.py b/tests/cli/test_console.py index 098ce36214..c3718c6a51 100644 --- a/tests/cli/test_console.py +++ b/tests/cli/test_console.py @@ -23,6 +23,7 @@ from click.testing import CliRunner from pytest_mock import MockFixture +from pyiceberg.catalog.memory import DEFAULT_WAREHOUSE_LOCATION from pyiceberg.cli.console import run from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema @@ -270,7 +271,7 @@ def test_location(catalog: InMemoryCatalog) -> None: runner = CliRunner() result = runner.invoke(run, ["location", "default.my_table"]) assert result.exit_code == 0 - assert result.output == """s3://bucket/test/location\n""" + assert result.output == f"""{DEFAULT_WAREHOUSE_LOCATION}/default/my_table\n""" def test_location_does_not_exists(catalog: InMemoryCatalog) -> None: @@ -680,7 +681,7 @@ def test_json_location(catalog: InMemoryCatalog) -> None: runner = CliRunner() result = runner.invoke(run, ["--output=json", "location", "default.my_table"]) assert result.exit_code == 0 - assert result.output == """"s3://bucket/test/location"\n""" + assert result.output == f'"{DEFAULT_WAREHOUSE_LOCATION}/default/my_table"\n' def test_json_location_does_not_exists(catalog: InMemoryCatalog) -> None: From 3c6e06ab5c8ad1d9663abaf6b5cc73e7c09f58f3 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 21 Jan 2024 11:12:30 -0800 Subject: [PATCH 04/23] write to temporary location --- tests/catalog/test_base.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 255b262a5c..b56ace12de 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -18,6 +18,8 @@ import pyarrow as pa +from pathlib import PosixPath + import pytest from pytest_lazyfixture import lazy_fixture @@ -48,8 +50,10 @@ @pytest.fixture -def catalog() -> InMemoryCatalog: - return InMemoryCatalog("test.in.memory.catalog", **{"test.key": "test.value"}) +def catalog(tmp_path: PosixPath) -> InMemoryCatalog: + return InMemoryCatalog( + "test.in.memory.catalog", warehouse_location=tmp_path.absolute().as_posix(), **{"test.key": "test.value"} + ) TEST_TABLE_IDENTIFIER = ("com", "organization", "department", "my_table") From 21b1e500b9f1cad59fef80672d6ce4487046aa3d Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 21 Jan 2024 16:53:20 -0800 Subject: [PATCH 05/23] can override table location --- pyiceberg/catalog/memory.py | 11 +++++------ tests/catalog/test_base.py | 16 ++++++++++++---- tests/cli/test_console.py | 30 ++++++++++++++++++++++++++++-- 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/pyiceberg/catalog/memory.py b/pyiceberg/catalog/memory.py index e1a2908c0b..a9e42e428a 100644 --- a/pyiceberg/catalog/memory.py +++ b/pyiceberg/catalog/memory.py @@ -20,6 +20,7 @@ NoSuchTableError, TableAlreadyExistsError, ) +from pyiceberg.io import WAREHOUSE from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import ( @@ -41,12 +42,11 @@ class InMemoryCatalog(Catalog): __tables: Dict[Identifier, Table] __namespaces: Dict[Identifier, Properties] - def __init__(self, name: str, warehouse_location: Optional[str] = None, **properties: str) -> None: + def __init__(self, name: str, **properties: str) -> None: super().__init__(name, **properties) - - self._warehouse_location = warehouse_location or DEFAULT_WAREHOUSE_LOCATION self.__tables = {} self.__namespaces = {} + self._warehouse_location = properties.get(WAREHOUSE, None) or DEFAULT_WAREHOUSE_LOCATION def create_table( self, @@ -67,10 +67,9 @@ def create_table( if namespace not in self.__namespaces: self.__namespaces[namespace] = {} - # if not location: - location = f'{self._warehouse_location}/{"/".join(identifier)}' + if not location: + location = f'{self._warehouse_location}/{"/".join(identifier)}' - # _get_default_warehouse_location metadata_location = f'{self._warehouse_location}/{"/".join(identifier)}/metadata/metadata.json' metadata = new_table_metadata( diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index b56ace12de..567265fb2d 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -35,6 +35,7 @@ NoSuchTableError, TableAlreadyExistsError, ) +from pyiceberg.io import WAREHOUSE from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import ( @@ -51,9 +52,7 @@ @pytest.fixture def catalog(tmp_path: PosixPath) -> InMemoryCatalog: - return InMemoryCatalog( - "test.in.memory.catalog", warehouse_location=tmp_path.absolute().as_posix(), **{"test.key": "test.value"} - ) + return InMemoryCatalog("test.in.memory.catalog", **{WAREHOUSE: tmp_path.absolute().as_posix(), "test.key": "test.value"}) TEST_TABLE_IDENTIFIER = ("com", "organization", "department", "my_table") @@ -78,7 +77,6 @@ def given_catalog_has_a_table(catalog: InMemoryCatalog) -> Table: return catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -121,6 +119,16 @@ def test_name_from_str() -> None: def test_create_table(catalog: InMemoryCatalog) -> None: + table = catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + partition_spec=TEST_TABLE_PARTITION_SPEC, + properties=TEST_TABLE_PROPERTIES, + ) + assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table + + +def test_create_table_override(catalog: InMemoryCatalog) -> None: table = catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, diff --git a/tests/cli/test_console.py b/tests/cli/test_console.py index c3718c6a51..11bdd3b191 100644 --- a/tests/cli/test_console.py +++ b/tests/cli/test_console.py @@ -264,7 +264,6 @@ def test_location(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -274,6 +273,20 @@ def test_location(catalog: InMemoryCatalog) -> None: assert result.output == f"""{DEFAULT_WAREHOUSE_LOCATION}/default/my_table\n""" +def test_location_override(catalog: InMemoryCatalog) -> None: + catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + location=TEST_TABLE_LOCATION, + partition_spec=TEST_TABLE_PARTITION_SPEC, + ) + + runner = CliRunner() + result = runner.invoke(run, ["location", "default.my_table"]) + assert result.exit_code == 0 + assert result.output == f"""{TEST_TABLE_LOCATION}\n""" + + def test_location_does_not_exists(catalog: InMemoryCatalog) -> None: # pylint: disable=unused-argument @@ -674,7 +687,6 @@ def test_json_location(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -684,6 +696,20 @@ def test_json_location(catalog: InMemoryCatalog) -> None: assert result.output == f'"{DEFAULT_WAREHOUSE_LOCATION}/default/my_table"\n' +def test_json_location_override(catalog: InMemoryCatalog) -> None: + catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + location=TEST_TABLE_LOCATION, + partition_spec=TEST_TABLE_PARTITION_SPEC, + ) + + runner = CliRunner() + result = runner.invoke(run, ["--output=json", "location", "default.my_table"]) + assert result.exit_code == 0 + assert result.output == f'"{TEST_TABLE_LOCATION}"\n' + + def test_json_location_does_not_exists(catalog: InMemoryCatalog) -> None: # pylint: disable=unused-argument From e2541ac199322eabde8059b5f14f9eebe6ac9859 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 21 Jan 2024 17:08:13 -0800 Subject: [PATCH 06/23] memory.py -> in_memory.py --- pyiceberg/catalog/__init__.py | 2 +- pyiceberg/catalog/{memory.py => in_memory.py} | 0 tests/catalog/test_base.py | 6 +++--- tests/cli/test_console.py | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) rename pyiceberg/catalog/{memory.py => in_memory.py} (100%) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index e5d6bd7a30..73c2552719 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -145,7 +145,7 @@ def load_sql(name: str, conf: Properties) -> Catalog: def load_memory(name: str, conf: Properties) -> Catalog: - from pyiceberg.catalog.memory import InMemoryCatalog + from pyiceberg.catalog.in_memory import InMemoryCatalog return InMemoryCatalog(name, **conf) diff --git a/pyiceberg/catalog/memory.py b/pyiceberg/catalog/in_memory.py similarity index 100% rename from pyiceberg/catalog/memory.py rename to pyiceberg/catalog/in_memory.py diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 567265fb2d..853283c55e 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -27,7 +27,7 @@ Catalog, PropertiesUpdateSummary, ) -from pyiceberg.catalog.memory import InMemoryCatalog +from pyiceberg.catalog.in_memory import InMemoryCatalog from pyiceberg.exceptions import ( NamespaceAlreadyExistsError, NamespaceNotEmptyError, @@ -52,7 +52,7 @@ @pytest.fixture def catalog(tmp_path: PosixPath) -> InMemoryCatalog: - return InMemoryCatalog("test.in.memory.catalog", **{WAREHOUSE: tmp_path.absolute().as_posix(), "test.key": "test.value"}) + return InMemoryCatalog("test.in_memory.catalog", **{WAREHOUSE: tmp_path.absolute().as_posix(), "test.key": "test.value"}) TEST_TABLE_IDENTIFIER = ("com", "organization", "department", "my_table") @@ -486,4 +486,4 @@ def test_add_column_with_statement(catalog: InMemoryCatalog) -> None: def test_catalog_repr(catalog: InMemoryCatalog) -> None: s = repr(catalog) - assert s == "test.in.memory.catalog ()" + assert s == "test.in_memory.catalog ()" diff --git a/tests/cli/test_console.py b/tests/cli/test_console.py index 11bdd3b191..1f9f831d21 100644 --- a/tests/cli/test_console.py +++ b/tests/cli/test_console.py @@ -23,7 +23,7 @@ from click.testing import CliRunner from pytest_mock import MockFixture -from pyiceberg.catalog.memory import DEFAULT_WAREHOUSE_LOCATION +from pyiceberg.catalog.in_memory import DEFAULT_WAREHOUSE_LOCATION from pyiceberg.cli.console import run from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema @@ -53,7 +53,7 @@ def env_vars(mocker: MockFixture) -> None: @pytest.fixture(name="catalog") def fixture_catalog(mocker: MockFixture) -> InMemoryCatalog: - in_memory_catalog = InMemoryCatalog("test.in.memory.catalog", **{"test.key": "test.value"}) + in_memory_catalog = InMemoryCatalog("test.in_memory.catalog", **{"test.key": "test.value"}) mocker.patch("pyiceberg.cli.console.load_catalog", return_value=in_memory_catalog) return in_memory_catalog From 3013bdb4c16554fcd5f5758d647b80914b893610 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 22 Jan 2024 09:29:18 -0800 Subject: [PATCH 07/23] fix test_commit_table --- tests/catalog/test_base.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 853283c55e..f993b40e01 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -407,6 +407,7 @@ def test_commit_table(catalog: InMemoryCatalog) -> None: NestedField(2, "y", LongType(), doc="comment"), NestedField(3, "z", LongType()), NestedField(4, "add", LongType()), + schema_id=1, ) # When @@ -422,8 +423,10 @@ def test_commit_table(catalog: InMemoryCatalog) -> None: # Then assert response.metadata.table_uuid == given_table.metadata.table_uuid - assert len(response.metadata.schemas) == 1 - assert response.metadata.schemas[0] == new_schema + assert given_table.metadata.current_schema_id == 1 + assert len(response.metadata.schemas) == 2 + assert response.metadata.schemas[1] == new_schema.model_copy(update={"schema_id": 1}) + assert given_table.metadata.last_column_id == new_schema.highest_field_id def test_add_column(catalog: InMemoryCatalog) -> None: From be3eb1ca2a44f861aed3e9593e7a73523a481092 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 29 Jan 2024 19:24:34 -0800 Subject: [PATCH 08/23] rebase from main --- pyiceberg/catalog/in_memory.py | 6 +++++- tests/catalog/test_base.py | 5 ++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pyiceberg/catalog/in_memory.py b/pyiceberg/catalog/in_memory.py index a9e42e428a..a32dad3996 100644 --- a/pyiceberg/catalog/in_memory.py +++ b/pyiceberg/catalog/in_memory.py @@ -7,6 +7,8 @@ Union, ) +import pyarrow as pa + from pyiceberg.catalog import ( Catalog, Identifier, @@ -51,13 +53,15 @@ def __init__(self, name: str, **properties: str) -> None: def create_table( self, identifier: Union[str, Identifier], - schema: Schema, + schema: Union[Schema, "pa.Schema"], location: Optional[str] = None, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, table_uuid: Optional[uuid.UUID] = None, ) -> Table: + schema: Schema = self._convert_schema_if_needed(schema) # type: ignore + identifier = Catalog.identifier_to_tuple(identifier) namespace = Catalog.namespace_from(identifier) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index f993b40e01..572c38d348 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -17,9 +17,10 @@ # pylint:disable=redefined-outer-name -import pyarrow as pa from pathlib import PosixPath +from typing import Union +import pyarrow as pa import pytest from pytest_lazyfixture import lazy_fixture @@ -160,8 +161,6 @@ def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_si table = catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=pyarrow_schema_simple_without_ids, - location=TEST_TABLE_LOCATION, - partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table From f80849afcaa3a99e4320b5c0f07458f9c7958264 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 29 Jan 2024 19:58:41 -0800 Subject: [PATCH 09/23] revert fs changes --- pyiceberg/io/pyarrow.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 54c9bbcfd4..7a94ce4c7d 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -296,8 +296,6 @@ def create(self, overwrite: bool = False) -> OutputStream: try: if not overwrite and self.exists() is True: raise FileExistsError(f"Cannot create file, already exists: {self.location}") - # Some FS (such as LocalFileSystem) requires directories to exist before creating files - self._filesystem.create_dir(os.path.dirname(self._path), recursive=True) output_file = self._filesystem.open_output_stream(self._path, buffer_size=self._buffer_size) except PermissionError: raise From faea973a6a17cadb3d18b79331b44f48c2c310bd Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 29 Jan 2024 20:10:19 -0800 Subject: [PATCH 10/23] fix tests --- tests/catalog/test_base.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 572c38d348..26e156480c 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -120,16 +120,6 @@ def test_name_from_str() -> None: def test_create_table(catalog: InMemoryCatalog) -> None: - table = catalog.create_table( - identifier=TEST_TABLE_IDENTIFIER, - schema=TEST_TABLE_SCHEMA, - partition_spec=TEST_TABLE_PARTITION_SPEC, - properties=TEST_TABLE_PROPERTIES, - ) - assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table - - -def test_create_table_override(catalog: InMemoryCatalog) -> None: table = catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, @@ -160,6 +150,7 @@ def test_convert_schema_if_needed( def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_simple_without_ids: pa.Schema) -> None: table = catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, + location=TEST_TABLE_LOCATION, schema=pyarrow_schema_simple_without_ids, properties=TEST_TABLE_PROPERTIES, ) From 4437a3049673d82d79133b1018b9f5ae1b196b93 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 29 Jan 2024 21:14:30 -0800 Subject: [PATCH 11/23] add docs and comments --- mkdocs/docs/configuration.md | 16 +++++++++++++++- pyiceberg/catalog/__init__.py | 6 +++--- pyiceberg/catalog/in_memory.py | 6 +++++- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index bfe1e62fac..404485481f 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -24,7 +24,7 @@ hide: # Catalogs -PyIceberg currently has native support for REST, SQL, Hive, Glue and DynamoDB. +PyIceberg currently has native support for REST, SQL, Hive, Glue, DynamoDB and In-Memory catalogs. There are three ways to pass in configuration: @@ -231,6 +231,20 @@ catalog: region_name: ``` +## In-Memory Catalog + +In-memory catalog uses in-memory data-structures to store information. +This is useful for test, demo, and playground. Do not use in production as the data is not persisted. + +While you can specify In-Memory catalog in the configuration file like this, it is not recommended since information is only persisted for the duration of the function call. + +```yaml +catalog: + default: + type: in_memory + warehouse: /tmp/warehouse # default warehouse location +``` + # Concurrency PyIceberg uses multiple threads to parallelize operations. The number of workers can be configured by supplying a `max-workers` entry in the configuration file, or by setting the `PYICEBERG_MAX_WORKERS` environment variable. The default value depends on the system hardware and Python version. See [the Python documentation](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) for more details. diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 73c2552719..4559c5cf21 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -97,7 +97,7 @@ class CatalogType(Enum): GLUE = "glue" DYNAMODB = "dynamodb" SQL = "sql" - MEMORY = "memory" + IN_MEMORY = "in_memory" def load_rest(name: str, conf: Properties) -> Catalog: @@ -144,7 +144,7 @@ def load_sql(name: str, conf: Properties) -> Catalog: ) from exc -def load_memory(name: str, conf: Properties) -> Catalog: +def load_in_memory(name: str, conf: Properties) -> Catalog: from pyiceberg.catalog.in_memory import InMemoryCatalog return InMemoryCatalog(name, **conf) @@ -156,7 +156,7 @@ def load_memory(name: str, conf: Properties) -> Catalog: CatalogType.GLUE: load_glue, CatalogType.DYNAMODB: load_dynamodb, CatalogType.SQL: load_sql, - CatalogType.MEMORY: load_memory, + CatalogType.IN_MEMORY: load_in_memory, } diff --git a/pyiceberg/catalog/in_memory.py b/pyiceberg/catalog/in_memory.py index a32dad3996..66c79d2f27 100644 --- a/pyiceberg/catalog/in_memory.py +++ b/pyiceberg/catalog/in_memory.py @@ -39,7 +39,11 @@ class InMemoryCatalog(Catalog): - """An in-memory catalog implementation.""" + """ + An in-memory catalog implementation that uses in-memory data-structures to store the namespaces and tables. + + This is useful for test, demo, and playground but not in production as data is not persisted. + """ __tables: Dict[Identifier, Table] __namespaces: Dict[Identifier, Properties] From aabcde0368864668d395001bc655d4dfcbcde1d8 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 29 Jan 2024 21:17:39 -0800 Subject: [PATCH 12/23] comma --- mkdocs/docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 404485481f..94f37e8229 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -24,7 +24,7 @@ hide: # Catalogs -PyIceberg currently has native support for REST, SQL, Hive, Glue, DynamoDB and In-Memory catalogs. +PyIceberg currently has native support for REST, SQL, Hive, Glue, DynamoDB, and In-Memory catalogs. There are three ways to pass in configuration: From b91e1cf9c779b7f8e4f5589605d06e7120073a33 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 29 Jan 2024 21:19:14 -0800 Subject: [PATCH 13/23] comment --- mkdocs/docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 94f37e8229..92934745fa 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -233,7 +233,7 @@ catalog: ## In-Memory Catalog -In-memory catalog uses in-memory data-structures to store information. +The In-Memory catalog uses in-memory data-structures to store information. This is useful for test, demo, and playground. Do not use in production as the data is not persisted. While you can specify In-Memory catalog in the configuration file like this, it is not recommended since information is only persisted for the duration of the function call. From 2834baebb6350675df027237ef9bc68261ef666d Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 29 Jan 2024 21:22:50 -0800 Subject: [PATCH 14/23] order --- tests/catalog/test_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 26e156480c..b9a9c70cc1 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -150,8 +150,8 @@ def test_convert_schema_if_needed( def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_simple_without_ids: pa.Schema) -> None: table = catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, - location=TEST_TABLE_LOCATION, schema=pyarrow_schema_simple_without_ids, + location=TEST_TABLE_LOCATION, properties=TEST_TABLE_PROPERTIES, ) assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table From 341f5ba506c78023161c7bff63b935998db6e60f Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 29 Jan 2024 21:58:58 -0800 Subject: [PATCH 15/23] fix test --- tests/catalog/test_base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index b9a9c70cc1..0fd7d9de55 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -413,9 +413,11 @@ def test_commit_table(catalog: InMemoryCatalog) -> None: # Then assert response.metadata.table_uuid == given_table.metadata.table_uuid + assert given_table.schema().schema_id == 1 assert given_table.metadata.current_schema_id == 1 assert len(response.metadata.schemas) == 2 - assert response.metadata.schemas[1] == new_schema.model_copy(update={"schema_id": 1}) + assert response.metadata.schemas[1] == new_schema + assert response.metadata.schemas[1].schema_id == 1 assert given_table.metadata.last_column_id == new_schema.highest_field_id From 67c028ac40dfeb109faa4410eb8f5e729205515d Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 29 Jan 2024 22:00:56 -0800 Subject: [PATCH 16/23] add license --- pyiceberg/catalog/in_memory.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pyiceberg/catalog/in_memory.py b/pyiceberg/catalog/in_memory.py index 66c79d2f27..1840f492db 100644 --- a/pyiceberg/catalog/in_memory.py +++ b/pyiceberg/catalog/in_memory.py @@ -1,3 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. import uuid from typing import ( Dict, From 96ba8defbd152933c8a13443b405329e5288154f Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 30 Jan 2024 22:07:17 -0800 Subject: [PATCH 17/23] `create_table` write metadata file --- pyiceberg/catalog/in_memory.py | 10 +++-- pyiceberg/cli/output.py | 2 +- tests/catalog/test_base.py | 16 ++++++-- tests/cli/test_console.py | 69 ++++------------------------------ 4 files changed, 28 insertions(+), 69 deletions(-) diff --git a/pyiceberg/catalog/in_memory.py b/pyiceberg/catalog/in_memory.py index 1840f492db..a7fb517fe4 100644 --- a/pyiceberg/catalog/in_memory.py +++ b/pyiceberg/catalog/in_memory.py @@ -38,7 +38,7 @@ NoSuchTableError, TableAlreadyExistsError, ) -from pyiceberg.io import WAREHOUSE +from pyiceberg.io import WAREHOUSE, load_file_io from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import ( @@ -94,8 +94,7 @@ def create_table( if not location: location = f'{self._warehouse_location}/{"/".join(identifier)}' - metadata_location = f'{self._warehouse_location}/{"/".join(identifier)}/metadata/metadata.json' - + metadata_location = self._get_metadata_location(location=location) metadata = new_table_metadata( schema=schema, partition_spec=partition_spec, @@ -104,11 +103,14 @@ def create_table( properties=properties, table_uuid=table_uuid, ) + io = load_file_io({**self.properties, **properties}, location=location) + self._write_metadata(metadata, io, metadata_location) + table = Table( identifier=identifier, metadata=metadata, metadata_location=metadata_location, - io=self._load_file_io(properties=metadata.properties, location=metadata_location), + io=io, catalog=self, ) self.__tables[identifier] = table diff --git a/pyiceberg/cli/output.py b/pyiceberg/cli/output.py index 18cdab1556..3377ef3d2a 100644 --- a/pyiceberg/cli/output.py +++ b/pyiceberg/cli/output.py @@ -157,7 +157,7 @@ def describe_properties(self, properties: Properties) -> None: Console().print(output_table) def text(self, response: str) -> None: - Console().print(response) + Console(soft_wrap=True).print(response) def schema(self, schema: Schema) -> None: output_table = self._table diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 0fd7d9de55..66140b8947 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -64,7 +64,6 @@ def catalog(tmp_path: PosixPath) -> InMemoryCatalog: NestedField(2, "y", LongType(), doc="comment"), NestedField(3, "z", LongType()), ) -TEST_TABLE_LOCATION = "protocol://some/location" TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000)) TEST_TABLE_PROPERTIES = {"key1": "value1", "key2": "value2"} NO_SUCH_TABLE_ERROR = "Table does not exist: \\('com', 'organization', 'department', 'my_table'\\)" @@ -123,13 +122,25 @@ def test_create_table(catalog: InMemoryCatalog) -> None: table = catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table +def test_create_table_location_override(catalog: InMemoryCatalog) -> None: + new_location = f"{catalog._warehouse_location}/new_location" + table = catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + location=new_location, + partition_spec=TEST_TABLE_PARTITION_SPEC, + properties=TEST_TABLE_PROPERTIES, + ) + assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table + assert table.location() == new_location + + @pytest.mark.parametrize( "schema,expected", [ @@ -151,7 +162,6 @@ def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_si table = catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=pyarrow_schema_simple_without_ids, - location=TEST_TABLE_LOCATION, properties=TEST_TABLE_PROPERTIES, ) assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table diff --git a/tests/cli/test_console.py b/tests/cli/test_console.py index 1f9f831d21..ec3199a4db 100644 --- a/tests/cli/test_console.py +++ b/tests/cli/test_console.py @@ -17,14 +17,15 @@ import datetime import os import uuid +from pathlib import PosixPath from unittest.mock import MagicMock import pytest from click.testing import CliRunner from pytest_mock import MockFixture -from pyiceberg.catalog.in_memory import DEFAULT_WAREHOUSE_LOCATION from pyiceberg.cli.console import run +from pyiceberg.io import WAREHOUSE from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.transforms import IdentityTransform @@ -52,8 +53,10 @@ def env_vars(mocker: MockFixture) -> None: @pytest.fixture(name="catalog") -def fixture_catalog(mocker: MockFixture) -> InMemoryCatalog: - in_memory_catalog = InMemoryCatalog("test.in_memory.catalog", **{"test.key": "test.value"}) +def fixture_catalog(mocker: MockFixture, tmp_path: PosixPath) -> InMemoryCatalog: + in_memory_catalog = InMemoryCatalog( + "test.in_memory.catalog", **{WAREHOUSE: tmp_path.absolute().as_posix(), "test.key": "test.value"} + ) mocker.patch("pyiceberg.cli.console.load_catalog", return_value=in_memory_catalog) return in_memory_catalog @@ -79,7 +82,6 @@ def mock_datetime_now(monkeypatch: pytest.MonkeyPatch) -> None: NestedField(2, "y", LongType(), doc="comment"), NestedField(3, "z", LongType()), ) -TEST_TABLE_LOCATION = "s3://bucket/test/location" TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000)) TEST_TABLE_PROPERTIES = {"read.split.target.size": "134217728"} TEST_TABLE_UUID = uuid.UUID("d20125c8-7284-442c-9aea-15fee620737c") @@ -101,7 +103,6 @@ def test_list_namespace(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -138,7 +139,6 @@ def test_describe_table(catalog: InMemoryCatalog, mock_datetime_now: None) -> No catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, table_uuid=TEST_TABLE_UUID, ) @@ -182,7 +182,6 @@ def test_schema(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -211,7 +210,6 @@ def test_spec(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -240,7 +238,6 @@ def test_uuid(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, table_uuid=TEST_TABLE_UUID, ) @@ -266,25 +263,10 @@ def test_location(catalog: InMemoryCatalog) -> None: schema=TEST_TABLE_SCHEMA, partition_spec=TEST_TABLE_PARTITION_SPEC, ) - - runner = CliRunner() - result = runner.invoke(run, ["location", "default.my_table"]) - assert result.exit_code == 0 - assert result.output == f"""{DEFAULT_WAREHOUSE_LOCATION}/default/my_table\n""" - - -def test_location_override(catalog: InMemoryCatalog) -> None: - catalog.create_table( - identifier=TEST_TABLE_IDENTIFIER, - schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, - partition_spec=TEST_TABLE_PARTITION_SPEC, - ) - runner = CliRunner() result = runner.invoke(run, ["location", "default.my_table"]) assert result.exit_code == 0 - assert result.output == f"""{TEST_TABLE_LOCATION}\n""" + assert result.output == f"""{catalog._warehouse_location}/default/my_table\n""" def test_location_does_not_exists(catalog: InMemoryCatalog) -> None: @@ -300,7 +282,6 @@ def test_drop_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -341,7 +322,6 @@ def test_rename_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -364,7 +344,6 @@ def test_properties_get_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -379,7 +358,6 @@ def test_properties_get_table_specific_property(catalog: InMemoryCatalog) -> Non catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -394,7 +372,6 @@ def test_properties_get_table_specific_property_that_doesnt_exist(catalog: InMem catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -463,7 +440,6 @@ def test_properties_set_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -504,7 +480,6 @@ def test_properties_remove_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -519,7 +494,6 @@ def test_properties_remove_table_property_does_not_exists(catalog: InMemoryCatal catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -551,7 +525,6 @@ def test_json_list_namespace(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -584,7 +557,6 @@ def test_json_describe_table(catalog: InMemoryCatalog, mock_datetime_now: None) catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, table_uuid=TEST_TABLE_UUID, ) @@ -614,7 +586,6 @@ def test_json_schema(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -640,7 +611,6 @@ def test_json_spec(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -663,7 +633,6 @@ def test_json_uuid(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, table_uuid=TEST_TABLE_UUID, ) @@ -693,21 +662,7 @@ def test_json_location(catalog: InMemoryCatalog) -> None: runner = CliRunner() result = runner.invoke(run, ["--output=json", "location", "default.my_table"]) assert result.exit_code == 0 - assert result.output == f'"{DEFAULT_WAREHOUSE_LOCATION}/default/my_table"\n' - - -def test_json_location_override(catalog: InMemoryCatalog) -> None: - catalog.create_table( - identifier=TEST_TABLE_IDENTIFIER, - schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, - partition_spec=TEST_TABLE_PARTITION_SPEC, - ) - - runner = CliRunner() - result = runner.invoke(run, ["--output=json", "location", "default.my_table"]) - assert result.exit_code == 0 - assert result.output == f'"{TEST_TABLE_LOCATION}"\n' + assert result.output == f'"{catalog._warehouse_location}/default/my_table"\n' def test_json_location_does_not_exists(catalog: InMemoryCatalog) -> None: @@ -723,7 +678,6 @@ def test_json_drop_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -764,7 +718,6 @@ def test_json_rename_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -787,7 +740,6 @@ def test_json_properties_get_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -802,7 +754,6 @@ def test_json_properties_get_table_specific_property(catalog: InMemoryCatalog) - catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -817,7 +768,6 @@ def test_json_properties_get_table_specific_property_that_doesnt_exist(catalog: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -891,7 +841,6 @@ def test_json_properties_set_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -937,7 +886,6 @@ def test_json_properties_remove_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -952,7 +900,6 @@ def test_json_properties_remove_table_property_does_not_exists(catalog: InMemory catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) From 8a7b8768239a9bcfef20cfa44414413615c757b6 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Thu, 29 Feb 2024 20:03:14 -0800 Subject: [PATCH 18/23] move InMemoryCatalog back to test_base --- pyiceberg/catalog/__init__.py | 7 - pyiceberg/catalog/in_memory.py | 248 --------------------------------- tests/catalog/test_base.py | 97 ++++++++----- 3 files changed, 66 insertions(+), 286 deletions(-) delete mode 100644 pyiceberg/catalog/in_memory.py diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 3250b2426a..58a3d5999f 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -144,19 +144,12 @@ def load_sql(name: str, conf: Properties) -> Catalog: ) from exc -def load_in_memory(name: str, conf: Properties) -> Catalog: - from pyiceberg.catalog.in_memory import InMemoryCatalog - - return InMemoryCatalog(name, **conf) - - AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = { CatalogType.REST: load_rest, CatalogType.HIVE: load_hive, CatalogType.GLUE: load_glue, CatalogType.DYNAMODB: load_dynamodb, CatalogType.SQL: load_sql, - CatalogType.IN_MEMORY: load_in_memory, } diff --git a/pyiceberg/catalog/in_memory.py b/pyiceberg/catalog/in_memory.py deleted file mode 100644 index a7fb517fe4..0000000000 --- a/pyiceberg/catalog/in_memory.py +++ /dev/null @@ -1,248 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -import uuid -from typing import ( - Dict, - List, - Optional, - Set, - Union, -) - -import pyarrow as pa - -from pyiceberg.catalog import ( - Catalog, - Identifier, - Properties, - PropertiesUpdateSummary, -) -from pyiceberg.exceptions import ( - NamespaceAlreadyExistsError, - NamespaceNotEmptyError, - NoSuchNamespaceError, - NoSuchTableError, - TableAlreadyExistsError, -) -from pyiceberg.io import WAREHOUSE, load_file_io -from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec -from pyiceberg.schema import Schema -from pyiceberg.table import ( - CommitTableRequest, - CommitTableResponse, - Table, - update_table_metadata, -) -from pyiceberg.table.metadata import new_table_metadata -from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder -from pyiceberg.typedef import EMPTY_DICT - -DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse" - - -class InMemoryCatalog(Catalog): - """ - An in-memory catalog implementation that uses in-memory data-structures to store the namespaces and tables. - - This is useful for test, demo, and playground but not in production as data is not persisted. - """ - - __tables: Dict[Identifier, Table] - __namespaces: Dict[Identifier, Properties] - - def __init__(self, name: str, **properties: str) -> None: - super().__init__(name, **properties) - self.__tables = {} - self.__namespaces = {} - self._warehouse_location = properties.get(WAREHOUSE, None) or DEFAULT_WAREHOUSE_LOCATION - - def create_table( - self, - identifier: Union[str, Identifier], - schema: Union[Schema, "pa.Schema"], - location: Optional[str] = None, - partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, - sort_order: SortOrder = UNSORTED_SORT_ORDER, - properties: Properties = EMPTY_DICT, - table_uuid: Optional[uuid.UUID] = None, - ) -> Table: - schema: Schema = self._convert_schema_if_needed(schema) # type: ignore - - identifier = Catalog.identifier_to_tuple(identifier) - namespace = Catalog.namespace_from(identifier) - - if identifier in self.__tables: - raise TableAlreadyExistsError(f"Table already exists: {identifier}") - else: - if namespace not in self.__namespaces: - self.__namespaces[namespace] = {} - - if not location: - location = f'{self._warehouse_location}/{"/".join(identifier)}' - - metadata_location = self._get_metadata_location(location=location) - metadata = new_table_metadata( - schema=schema, - partition_spec=partition_spec, - sort_order=sort_order, - location=location, - properties=properties, - table_uuid=table_uuid, - ) - io = load_file_io({**self.properties, **properties}, location=location) - self._write_metadata(metadata, io, metadata_location) - - table = Table( - identifier=identifier, - metadata=metadata, - metadata_location=metadata_location, - io=io, - catalog=self, - ) - self.__tables[identifier] = table - return table - - def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: - raise NotImplementedError - - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - identifier_tuple = self.identifier_to_tuple_without_catalog( - tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) - ) - current_table = self.load_table(identifier_tuple) - base_metadata = current_table.metadata - - for requirement in table_request.requirements: - requirement.validate(base_metadata) - - updated_metadata = update_table_metadata(base_metadata, table_request.updates) - if updated_metadata == base_metadata: - # no changes, do nothing - return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) - - # write new metadata - new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 - new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) - self._write_metadata(updated_metadata, current_table.io, new_metadata_location) - - # update table state - current_table.metadata = updated_metadata - - return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) - - def load_table(self, identifier: Union[str, Identifier]) -> Table: - identifier = self.identifier_to_tuple_without_catalog(identifier) - try: - return self.__tables[identifier] - except KeyError as error: - raise NoSuchTableError(f"Table does not exist: {identifier}") from error - - def drop_table(self, identifier: Union[str, Identifier]) -> None: - identifier = self.identifier_to_tuple_without_catalog(identifier) - try: - self.__tables.pop(identifier) - except KeyError as error: - raise NoSuchTableError(f"Table does not exist: {identifier}") from error - - def purge_table(self, identifier: Union[str, Identifier]) -> None: - self.drop_table(identifier) - - def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: - from_identifier = self.identifier_to_tuple_without_catalog(from_identifier) - try: - table = self.__tables.pop(from_identifier) - except KeyError as error: - raise NoSuchTableError(f"Table does not exist: {from_identifier}") from error - - to_identifier = Catalog.identifier_to_tuple(to_identifier) - to_namespace = Catalog.namespace_from(to_identifier) - if to_namespace not in self.__namespaces: - self.__namespaces[to_namespace] = {} - - self.__tables[to_identifier] = Table( - identifier=to_identifier, - metadata=table.metadata, - metadata_location=table.metadata_location, - io=self._load_file_io(properties=table.metadata.properties, location=table.metadata_location), - catalog=self, - ) - return self.__tables[to_identifier] - - def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: - namespace = Catalog.identifier_to_tuple(namespace) - if namespace in self.__namespaces: - raise NamespaceAlreadyExistsError(f"Namespace already exists: {namespace}") - else: - self.__namespaces[namespace] = properties if properties else {} - - def drop_namespace(self, namespace: Union[str, Identifier]) -> None: - namespace = Catalog.identifier_to_tuple(namespace) - if [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]]: - raise NamespaceNotEmptyError(f"Namespace is not empty: {namespace}") - try: - self.__namespaces.pop(namespace) - except KeyError as error: - raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error - - def list_tables(self, namespace: Optional[Union[str, Identifier]] = None) -> List[Identifier]: - if namespace: - namespace = Catalog.identifier_to_tuple(namespace) - list_tables = [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]] - else: - list_tables = list(self.__tables.keys()) - - return list_tables - - def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: - # Hierarchical namespace is not supported. Return an empty list - if namespace: - return [] - - return list(self.__namespaces.keys()) - - def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: - namespace = Catalog.identifier_to_tuple(namespace) - try: - return self.__namespaces[namespace] - except KeyError as error: - raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error - - def update_namespace_properties( - self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT - ) -> PropertiesUpdateSummary: - removed: Set[str] = set() - updated: Set[str] = set() - - namespace = Catalog.identifier_to_tuple(namespace) - if namespace in self.__namespaces: - if removals: - for key in removals: - if key in self.__namespaces[namespace]: - del self.__namespaces[namespace][key] - removed.add(key) - if updates: - for key, value in updates.items(): - self.__namespaces[namespace][key] = value - updated.add(key) - else: - raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") - - expected_to_change = removed.difference(removals or set()) - - return PropertiesUpdateSummary( - removed=list(removed or []), updated=list(updates.keys() if updates else []), missing=list(expected_to_change) - ) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 7e38d8f793..ae943c384b 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -17,8 +17,15 @@ # pylint:disable=redefined-outer-name +import uuid from pathlib import PosixPath -from typing import Union +from typing import ( + Dict, + List, + Optional, + Set, + Union, +) import pyarrow as pa import pytest @@ -27,9 +34,10 @@ from pyiceberg.catalog import ( Catalog, + Identifier, + Properties, PropertiesUpdateSummary, ) -from pyiceberg.catalog.in_memory import InMemoryCatalog from pyiceberg.exceptions import ( NamespaceAlreadyExistsError, NamespaceNotEmptyError, @@ -37,26 +45,34 @@ NoSuchTableError, TableAlreadyExistsError, ) -from pyiceberg.io import WAREHOUSE -from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.io import WAREHOUSE, load_file_io +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import ( AddSchemaUpdate, CommitTableRequest, + CommitTableResponse, Namespace, SetCurrentSchemaUpdate, Table, TableIdentifier, update_table_metadata, ) -from pyiceberg.table.metadata import TableMetadataV1 +from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.transforms import IdentityTransform +from pyiceberg.typedef import EMPTY_DICT from pyiceberg.types import IntegerType, LongType, NestedField +DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse" + class InMemoryCatalog(Catalog): - """An in-memory catalog implementation for testing purposes.""" + """ + An in-memory catalog implementation that uses in-memory data-structures to store the namespaces and tables. + + This is useful for test, demo, and playground but not in production as data is not persisted. + """ __tables: Dict[Identifier, Table] __namespaces: Dict[Identifier, Properties] @@ -65,6 +81,7 @@ def __init__(self, name: str, **properties: str) -> None: super().__init__(name, **properties) self.__tables = {} self.__namespaces = {} + self._warehouse_location = properties.get(WAREHOUSE, None) or DEFAULT_WAREHOUSE_LOCATION def create_table( self, @@ -74,6 +91,7 @@ def create_table( partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, + table_uuid: Optional[uuid.UUID] = None, ) -> Table: schema: Schema = self._convert_schema_if_needed(schema) # type: ignore @@ -86,24 +104,26 @@ def create_table( if namespace not in self.__namespaces: self.__namespaces[namespace] = {} - new_location = location or f's3://warehouse/{"/".join(identifier)}/data' - metadata = TableMetadataV1(**{ - "format-version": 1, - "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", - "location": new_location, - "last-updated-ms": 1602638573874, - "last-column-id": schema.highest_field_id, - "schema": schema.model_dump(), - "partition-spec": partition_spec.model_dump()["fields"], - "properties": properties, - "current-snapshot-id": -1, - "snapshots": [{"snapshot-id": 1925, "timestamp-ms": 1602638573822}], - }) + if not location: + location = f'{self._warehouse_location}/{"/".join(identifier)}' + + metadata_location = self._get_metadata_location(location=location) + metadata = new_table_metadata( + schema=schema, + partition_spec=partition_spec, + sort_order=sort_order, + location=location, + properties=properties, + table_uuid=table_uuid, + ) + io = load_file_io({**self.properties, **properties}, location=location) + self._write_metadata(metadata, io, metadata_location) + table = Table( identifier=identifier, metadata=metadata, - metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json', - io=load_file_io(), + metadata_location=metadata_location, + io=io, catalog=self, ) self.__tables[identifier] = table @@ -113,14 +133,29 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: raise NotImplementedError def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - identifier = tuple(table_request.identifier.namespace.root) + (table_request.identifier.name,) - table = self.__tables[identifier] - table.metadata = update_table_metadata(base_metadata=table.metadata, updates=table_request.updates) - - return CommitTableResponse( - metadata=table.metadata.model_dump(), - metadata_location=table.location(), + identifier_tuple = self.identifier_to_tuple_without_catalog( + tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) ) + current_table = self.load_table(identifier_tuple) + base_metadata = current_table.metadata + + for requirement in table_request.requirements: + requirement.validate(base_metadata) + + updated_metadata = update_table_metadata(base_metadata, table_request.updates) + if updated_metadata == base_metadata: + # no changes, do nothing + return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) + + # write new metadata + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 + new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) + self._write_metadata(updated_metadata, current_table.io, new_metadata_location) + + # update table state + current_table.metadata = updated_metadata + + return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) def load_table(self, identifier: Union[str, Identifier]) -> Table: identifier = self.identifier_to_tuple_without_catalog(identifier) @@ -155,7 +190,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U identifier=to_identifier, metadata=table.metadata, metadata_location=table.metadata_location, - io=load_file_io(), + io=self._load_file_io(properties=table.metadata.properties, location=table.metadata_location), catalog=self, ) return self.__tables[to_identifier] @@ -666,7 +701,7 @@ def test_add_column_with_statement(catalog: InMemoryCatalog) -> None: def test_catalog_repr(catalog: InMemoryCatalog) -> None: s = repr(catalog) - assert s == "test.in_memory.catalog ()" + assert s == "test.in_memory.catalog ()" def test_table_properties_int_value(catalog: InMemoryCatalog) -> None: @@ -680,4 +715,4 @@ def test_table_properties_raise_for_none_value(catalog: InMemoryCatalog) -> None property_with_none = {"property_name": None} with pytest.raises(ValidationError) as exc_info: _ = given_catalog_has_a_table(catalog, properties=property_with_none) - assert "None type is not a supported value in properties: property_name" in str(exc_info.value) \ No newline at end of file + assert "None type is not a supported value in properties: property_name" in str(exc_info.value) From 10adb1cb03d4fd2b6ec457b5d8689b24e353bd00 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Thu, 29 Feb 2024 20:05:38 -0800 Subject: [PATCH 19/23] remove unused references --- mkdocs/docs/configuration.md | 9 --------- pyiceberg/catalog/__init__.py | 1 - 2 files changed, 10 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index e1a6d3281b..4a461ccc0d 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -265,15 +265,6 @@ catalog: The In-Memory catalog uses in-memory data-structures to store information. This is useful for test, demo, and playground. Do not use in production as the data is not persisted. -While you can specify In-Memory catalog in the configuration file like this, it is not recommended since information is only persisted for the duration of the function call. - -```yaml -catalog: - default: - type: in_memory - warehouse: /tmp/warehouse # default warehouse location -``` - # Concurrency PyIceberg uses multiple threads to parallelize operations. The number of workers can be configured by supplying a `max-workers` entry in the configuration file, or by setting the `PYICEBERG_MAX_WORKERS` environment variable. The default value depends on the system hardware and Python version. See [the Python documentation](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) for more details. diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 58a3d5999f..db83658f1f 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -97,7 +97,6 @@ class CatalogType(Enum): GLUE = "glue" DYNAMODB = "dynamodb" SQL = "sql" - IN_MEMORY = "in_memory" def load_rest(name: str, conf: Properties) -> Catalog: From a466e5fa2f1400d320141c63c53eaa114a532bd2 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 5 Mar 2024 10:57:35 -0500 Subject: [PATCH 20/23] Update mkdocs/docs/configuration.md Co-authored-by: Fokko Driesprong --- mkdocs/docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 4a461ccc0d..dd0772cadc 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -24,7 +24,7 @@ hide: # Catalogs -PyIceberg currently has native support for REST, SQL, Hive, Glue, DynamoDB, and In-Memory catalogs. +PyIceberg currently has native support for REST, SQL, Hive, Glue and DynamoDB. There are three ways to pass in configuration: From 03ec82bb3a8ebc837c1bcc0fcde6a705be14eb14 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 5 Mar 2024 10:58:06 -0500 Subject: [PATCH 21/23] Update mkdocs/docs/configuration.md Co-authored-by: Fokko Driesprong --- mkdocs/docs/configuration.md | 5 ----- 1 file changed, 5 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index dd0772cadc..f0eb56ebd5 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -260,11 +260,6 @@ catalog: region_name: ``` -## In-Memory Catalog - -The In-Memory catalog uses in-memory data-structures to store information. -This is useful for test, demo, and playground. Do not use in production as the data is not persisted. - # Concurrency PyIceberg uses multiple threads to parallelize operations. The number of workers can be configured by supplying a `max-workers` entry in the configuration file, or by setting the `PYICEBERG_MAX_WORKERS` environment variable. The default value depends on the system hardware and Python version. See [the Python documentation](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) for more details. From 58b34ca512a86c649d608fe08817bff64668244e Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 5 Mar 2024 10:58:14 -0500 Subject: [PATCH 22/23] Update tests/catalog/test_base.py Co-authored-by: Fokko Driesprong --- tests/catalog/test_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index ae943c384b..314543db07 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -81,7 +81,7 @@ def __init__(self, name: str, **properties: str) -> None: super().__init__(name, **properties) self.__tables = {} self.__namespaces = {} - self._warehouse_location = properties.get(WAREHOUSE, None) or DEFAULT_WAREHOUSE_LOCATION + self._warehouse_location = properties.get(WAREHOUSE, DEFAULT_WAREHOUSE_LOCATION) def create_table( self, From 810c3c54854ca62e5a5a8d5e817c99e7801117a0 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 9 Mar 2024 08:55:30 -0800 Subject: [PATCH 23/23] remove schema_id --- tests/catalog/test_base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 314543db07..b9ddcfb0b7 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -620,7 +620,6 @@ def test_commit_table(catalog: InMemoryCatalog) -> None: NestedField(2, "y", LongType(), doc="comment"), NestedField(3, "z", LongType()), NestedField(4, "add", LongType()), - schema_id=1, ) # When