-
Notifications
You must be signed in to change notification settings - Fork 167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve the InMemory Catalog Implementation #289
Changes from all commits
066e8c4
b1a99f7
32c449e
3c6e06a
21b1e50
e2541ac
3013bdb
be3eb1c
f80849a
faea973
4437a30
aabcde0
b91e1cf
2834bae
341f5ba
67c028a
96ba8de
c7f9053
8a7b876
10adb1c
a466e5f
03ec82b
58b34ca
810c3c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
kevinjqliu marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -16,6 +16,9 @@ | |||||
# under the License. | ||||||
# pylint:disable=redefined-outer-name | ||||||
|
||||||
|
||||||
import uuid | ||||||
from pathlib import PosixPath | ||||||
from typing import ( | ||||||
Dict, | ||||||
List, | ||||||
|
@@ -42,7 +45,7 @@ | |||||
NoSuchTableError, | ||||||
TableAlreadyExistsError, | ||||||
) | ||||||
from pyiceberg.io import load_file_io | ||||||
from pyiceberg.io import WAREHOUSE, load_file_io | ||||||
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec | ||||||
from pyiceberg.schema import Schema | ||||||
from pyiceberg.table import ( | ||||||
|
@@ -55,15 +58,21 @@ | |||||
TableIdentifier, | ||||||
update_table_metadata, | ||||||
) | ||||||
from pyiceberg.table.metadata import TableMetadataV1 | ||||||
from pyiceberg.table.metadata import new_table_metadata | ||||||
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder | ||||||
from pyiceberg.transforms import IdentityTransform | ||||||
from pyiceberg.typedef import EMPTY_DICT | ||||||
from pyiceberg.types import IntegerType, LongType, NestedField | ||||||
|
||||||
DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse" | ||||||
|
||||||
|
||||||
class InMemoryCatalog(Catalog): | ||||||
"""An in-memory catalog implementation for testing purposes.""" | ||||||
""" | ||||||
An in-memory catalog implementation that uses in-memory data-structures to store the namespaces and tables. | ||||||
|
||||||
This is useful for test, demo, and playground but not in production as data is not persisted. | ||||||
""" | ||||||
|
||||||
__tables: Dict[Identifier, Table] | ||||||
__namespaces: Dict[Identifier, Properties] | ||||||
|
@@ -72,6 +81,7 @@ def __init__(self, name: str, **properties: str) -> None: | |||||
super().__init__(name, **properties) | ||||||
self.__tables = {} | ||||||
self.__namespaces = {} | ||||||
self._warehouse_location = properties.get(WAREHOUSE, DEFAULT_WAREHOUSE_LOCATION) | ||||||
|
||||||
def create_table( | ||||||
self, | ||||||
|
@@ -81,6 +91,7 @@ def create_table( | |||||
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, | ||||||
sort_order: SortOrder = UNSORTED_SORT_ORDER, | ||||||
properties: Properties = EMPTY_DICT, | ||||||
table_uuid: Optional[uuid.UUID] = None, | ||||||
) -> Table: | ||||||
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore | ||||||
|
||||||
|
@@ -93,24 +104,26 @@ def create_table( | |||||
if namespace not in self.__namespaces: | ||||||
self.__namespaces[namespace] = {} | ||||||
|
||||||
new_location = location or f's3://warehouse/{"/".join(identifier)}/data' | ||||||
metadata = TableMetadataV1(**{ | ||||||
"format-version": 1, | ||||||
"table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", | ||||||
"location": new_location, | ||||||
"last-updated-ms": 1602638573874, | ||||||
"last-column-id": schema.highest_field_id, | ||||||
"schema": schema.model_dump(), | ||||||
"partition-spec": partition_spec.model_dump()["fields"], | ||||||
"properties": properties, | ||||||
"current-snapshot-id": -1, | ||||||
"snapshots": [{"snapshot-id": 1925, "timestamp-ms": 1602638573822}], | ||||||
}) | ||||||
if not location: | ||||||
location = f'{self._warehouse_location}/{"/".join(identifier)}' | ||||||
|
||||||
metadata_location = self._get_metadata_location(location=location) | ||||||
metadata = new_table_metadata( | ||||||
schema=schema, | ||||||
partition_spec=partition_spec, | ||||||
sort_order=sort_order, | ||||||
location=location, | ||||||
properties=properties, | ||||||
table_uuid=table_uuid, | ||||||
) | ||||||
io = load_file_io({**self.properties, **properties}, location=location) | ||||||
self._write_metadata(metadata, io, metadata_location) | ||||||
|
||||||
table = Table( | ||||||
identifier=identifier, | ||||||
metadata=metadata, | ||||||
metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json', | ||||||
io=load_file_io(), | ||||||
metadata_location=metadata_location, | ||||||
io=io, | ||||||
catalog=self, | ||||||
) | ||||||
self.__tables[identifier] = table | ||||||
|
@@ -120,14 +133,29 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: | |||||
raise NotImplementedError | ||||||
|
||||||
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: | ||||||
identifier = tuple(table_request.identifier.namespace.root) + (table_request.identifier.name,) | ||||||
table = self.__tables[identifier] | ||||||
table.metadata = update_table_metadata(base_metadata=table.metadata, updates=table_request.updates) | ||||||
|
||||||
return CommitTableResponse( | ||||||
metadata=table.metadata.model_dump(), | ||||||
metadata_location=table.location(), | ||||||
identifier_tuple = self.identifier_to_tuple_without_catalog( | ||||||
tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) | ||||||
) | ||||||
current_table = self.load_table(identifier_tuple) | ||||||
base_metadata = current_table.metadata | ||||||
|
||||||
for requirement in table_request.requirements: | ||||||
requirement.validate(base_metadata) | ||||||
|
||||||
updated_metadata = update_table_metadata(base_metadata, table_request.updates) | ||||||
if updated_metadata == base_metadata: | ||||||
# no changes, do nothing | ||||||
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) | ||||||
|
||||||
# write new metadata | ||||||
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 | ||||||
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) | ||||||
self._write_metadata(updated_metadata, current_table.io, new_metadata_location) | ||||||
|
||||||
# update table state | ||||||
current_table.metadata = updated_metadata | ||||||
|
||||||
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) | ||||||
|
||||||
def load_table(self, identifier: Union[str, Identifier]) -> Table: | ||||||
identifier = self.identifier_to_tuple_without_catalog(identifier) | ||||||
|
@@ -162,7 +190,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U | |||||
identifier=to_identifier, | ||||||
metadata=table.metadata, | ||||||
metadata_location=table.metadata_location, | ||||||
io=load_file_io(), | ||||||
io=self._load_file_io(properties=table.metadata.properties, location=table.metadata_location), | ||||||
catalog=self, | ||||||
) | ||||||
return self.__tables[to_identifier] | ||||||
|
@@ -234,8 +262,8 @@ def update_namespace_properties( | |||||
|
||||||
|
||||||
@pytest.fixture | ||||||
def catalog() -> InMemoryCatalog: | ||||||
return InMemoryCatalog("test.in.memory.catalog", **{"test.key": "test.value"}) | ||||||
def catalog(tmp_path: PosixPath) -> InMemoryCatalog: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added ability to write to temporary files for testing, which is then automatically cleaned up |
||||||
return InMemoryCatalog("test.in_memory.catalog", **{WAREHOUSE: tmp_path.absolute().as_posix(), "test.key": "test.value"}) | ||||||
|
||||||
|
||||||
TEST_TABLE_IDENTIFIER = ("com", "organization", "department", "my_table") | ||||||
|
@@ -246,7 +274,6 @@ def catalog() -> InMemoryCatalog: | |||||
NestedField(2, "y", LongType(), doc="comment"), | ||||||
NestedField(3, "z", LongType()), | ||||||
) | ||||||
TEST_TABLE_LOCATION = "protocol://some/location" | ||||||
TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000)) | ||||||
TEST_TABLE_PROPERTIES = {"key1": "value1", "key2": "value2"} | ||||||
NO_SUCH_TABLE_ERROR = "Table does not exist: \\('com', 'organization', 'department', 'my_table'\\)" | ||||||
|
@@ -263,7 +290,6 @@ def given_catalog_has_a_table( | |||||
return catalog.create_table( | ||||||
identifier=TEST_TABLE_IDENTIFIER, | ||||||
schema=TEST_TABLE_SCHEMA, | ||||||
location=TEST_TABLE_LOCATION, | ||||||
partition_spec=TEST_TABLE_PARTITION_SPEC, | ||||||
properties=properties or TEST_TABLE_PROPERTIES, | ||||||
) | ||||||
|
@@ -309,13 +335,25 @@ def test_create_table(catalog: InMemoryCatalog) -> None: | |||||
table = catalog.create_table( | ||||||
identifier=TEST_TABLE_IDENTIFIER, | ||||||
schema=TEST_TABLE_SCHEMA, | ||||||
location=TEST_TABLE_LOCATION, | ||||||
partition_spec=TEST_TABLE_PARTITION_SPEC, | ||||||
properties=TEST_TABLE_PROPERTIES, | ||||||
) | ||||||
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table | ||||||
|
||||||
|
||||||
def test_create_table_location_override(catalog: InMemoryCatalog) -> None: | ||||||
new_location = f"{catalog._warehouse_location}/new_location" | ||||||
table = catalog.create_table( | ||||||
identifier=TEST_TABLE_IDENTIFIER, | ||||||
schema=TEST_TABLE_SCHEMA, | ||||||
location=new_location, | ||||||
partition_spec=TEST_TABLE_PARTITION_SPEC, | ||||||
properties=TEST_TABLE_PROPERTIES, | ||||||
) | ||||||
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table | ||||||
assert table.location() == new_location | ||||||
|
||||||
|
||||||
@pytest.mark.parametrize( | ||||||
"schema,expected", | ||||||
[ | ||||||
|
@@ -337,8 +375,6 @@ def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_si | |||||
table = catalog.create_table( | ||||||
identifier=TEST_TABLE_IDENTIFIER, | ||||||
schema=pyarrow_schema_simple_without_ids, | ||||||
location=TEST_TABLE_LOCATION, | ||||||
partition_spec=TEST_TABLE_PARTITION_SPEC, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @syun64 FYI, I realized that the
The partition field's So iceberg-python/pyiceberg/partitioning.py Lines 203 to 204 in 102e043
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey @kevinjqliu thank you for flagging this 😄 I think '-1' ID discrepancy is the symptom of the issue that makes the issue easy to understand, just as we decided in #305 (comment) The root cause of the issue I think is that we are introducing a way for non-ID's schema (PyArrow Schema) to be used as an input into create_table, while not supporting the same for partition_spec and sort_order (PartitionField and SortField both require field IDs as inputs). So I think we should update both assign_fresh_partition_spec_ids and assign_fresh_sort_order_ids to support field look up by name. @Fokko - does that sound like a good way to resolve this issue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Created #338 to track this issue There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with you @syun64 that for creating tables having to look up the IDs is not ideal. Probably that API has to be extended at some point. But for the metadata (and also how Iceberg internally tracks columns, since names can change; IDs not), we need to track it by ID. I'm in doubt if assigning There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good @Fokko 👍 and thanks again for flagging this @kevinjqliu ! |
||||||
properties=TEST_TABLE_PROPERTIES, | ||||||
) | ||||||
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table | ||||||
|
@@ -664,7 +700,7 @@ def test_add_column_with_statement(catalog: InMemoryCatalog) -> None: | |||||
|
||||||
def test_catalog_repr(catalog: InMemoryCatalog) -> None: | ||||||
s = repr(catalog) | ||||||
assert s == "test.in.memory.catalog (<class 'test_base.InMemoryCatalog'>)" | ||||||
assert s == "test.in_memory.catalog (<class 'test_base.InMemoryCatalog'>)" | ||||||
|
||||||
|
||||||
def test_table_properties_int_value(catalog: InMemoryCatalog) -> None: | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some
test_console.py
outputs are too long and end up with an extra\n
in the middle of the string, causing tests to fail