Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the InMemory Catalog Implementation #289

Merged
merged 24 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ hide:

# Catalogs

PyIceberg currently has native support for REST, SQL, Hive, Glue and DynamoDB.
PyIceberg currently has native support for REST, SQL, Hive, Glue, DynamoDB, and In-Memory catalogs.
kevinjqliu marked this conversation as resolved.
Show resolved Hide resolved

There are three ways to pass in configuration:

Expand Down Expand Up @@ -231,6 +231,20 @@ catalog:
region_name: <REGION_NAME>
```

## In-Memory Catalog

The In-Memory catalog uses in-memory data-structures to store information.
This is useful for test, demo, and playground. Do not use in production as the data is not persisted.

kevinjqliu marked this conversation as resolved.
Show resolved Hide resolved
While you can specify In-Memory catalog in the configuration file like this, it is not recommended since information is only persisted for the duration of the function call.

```yaml
catalog:
default:
type: in_memory
warehouse: /tmp/warehouse # default warehouse location
```

# Concurrency

PyIceberg uses multiple threads to parallelize operations. The number of workers can be configured by supplying a `max-workers` entry in the configuration file, or by setting the `PYICEBERG_MAX_WORKERS` environment variable. The default value depends on the system hardware and Python version. See [the Python documentation](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) for more details.
8 changes: 8 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class CatalogType(Enum):
GLUE = "glue"
DYNAMODB = "dynamodb"
SQL = "sql"
IN_MEMORY = "in_memory"


def load_rest(name: str, conf: Properties) -> Catalog:
Expand Down Expand Up @@ -143,12 +144,19 @@ def load_sql(name: str, conf: Properties) -> Catalog:
) from exc


def load_in_memory(name: str, conf: Properties) -> Catalog:
from pyiceberg.catalog.in_memory import InMemoryCatalog

return InMemoryCatalog(name, **conf)


AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = {
CatalogType.REST: load_rest,
CatalogType.HIVE: load_hive,
CatalogType.GLUE: load_glue,
CatalogType.DYNAMODB: load_dynamodb,
CatalogType.SQL: load_sql,
CatalogType.IN_MEMORY: load_in_memory,
}


Expand Down
248 changes: 248 additions & 0 deletions pyiceberg/catalog/in_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import uuid
from typing import (
Dict,
List,
Optional,
Set,
Union,
)

import pyarrow as pa

from pyiceberg.catalog import (
Catalog,
Identifier,
Properties,
PropertiesUpdateSummary,
)
from pyiceberg.exceptions import (
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchNamespaceError,
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io import WAREHOUSE, load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
Table,
update_table_metadata,
)
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT

DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse"
Copy link
Contributor Author

@kevinjqliu kevinjqliu Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by default, write on disk to /tmp/warehouse



class InMemoryCatalog(Catalog):
"""
An in-memory catalog implementation that uses in-memory data-structures to store the namespaces and tables.

This is useful for test, demo, and playground but not in production as data is not persisted.
"""

__tables: Dict[Identifier, Table]
__namespaces: Dict[Identifier, Properties]

def __init__(self, name: str, **properties: str) -> None:
super().__init__(name, **properties)
self.__tables = {}
self.__namespaces = {}
self._warehouse_location = properties.get(WAREHOUSE, None) or DEFAULT_WAREHOUSE_LOCATION
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can pass a warehouse location using properties. warehouse location can be another fs such as s3


def create_table(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
table_uuid: Optional[uuid.UUID] = None,
) -> Table:
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore

identifier = Catalog.identifier_to_tuple(identifier)
namespace = Catalog.namespace_from(identifier)

if identifier in self.__tables:
raise TableAlreadyExistsError(f"Table already exists: {identifier}")
else:
if namespace not in self.__namespaces:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other implementations don't auto-create namespaces, however I think it is fine for the InMemory one.

self.__namespaces[namespace] = {}

if not location:
location = f'{self._warehouse_location}/{"/".join(identifier)}'

metadata_location = self._get_metadata_location(location=location)
metadata = new_table_metadata(
schema=schema,
partition_spec=partition_spec,
sort_order=sort_order,
location=location,
properties=properties,
table_uuid=table_uuid,
)
io = load_file_io({**self.properties, **properties}, location=location)
self._write_metadata(metadata, io, metadata_location)

table = Table(
identifier=identifier,
metadata=metadata,
metadata_location=metadata_location,
io=io,
catalog=self,
)
self.__tables[identifier] = table
return table

def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
raise NotImplementedError

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
identifier_tuple = self.identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
current_table = self.load_table(identifier_tuple)
base_metadata = current_table.metadata

for requirement in table_request.requirements:
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata, table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

# update table state
current_table.metadata = updated_metadata

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
identifier = self.identifier_to_tuple_without_catalog(identifier)
try:
return self.__tables[identifier]
except KeyError as error:
raise NoSuchTableError(f"Table does not exist: {identifier}") from error

def drop_table(self, identifier: Union[str, Identifier]) -> None:
identifier = self.identifier_to_tuple_without_catalog(identifier)
try:
self.__tables.pop(identifier)
except KeyError as error:
raise NoSuchTableError(f"Table does not exist: {identifier}") from error

def purge_table(self, identifier: Union[str, Identifier]) -> None:
self.drop_table(identifier)

def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
from_identifier = self.identifier_to_tuple_without_catalog(from_identifier)
try:
table = self.__tables.pop(from_identifier)
except KeyError as error:
raise NoSuchTableError(f"Table does not exist: {from_identifier}") from error

to_identifier = Catalog.identifier_to_tuple(to_identifier)
to_namespace = Catalog.namespace_from(to_identifier)
if to_namespace not in self.__namespaces:
self.__namespaces[to_namespace] = {}

self.__tables[to_identifier] = Table(
identifier=to_identifier,
metadata=table.metadata,
metadata_location=table.metadata_location,
io=self._load_file_io(properties=table.metadata.properties, location=table.metadata_location),
catalog=self,
)
return self.__tables[to_identifier]

def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
namespace = Catalog.identifier_to_tuple(namespace)
if namespace in self.__namespaces:
raise NamespaceAlreadyExistsError(f"Namespace already exists: {namespace}")
else:
self.__namespaces[namespace] = properties if properties else {}

def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
namespace = Catalog.identifier_to_tuple(namespace)
if [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]]:
raise NamespaceNotEmptyError(f"Namespace is not empty: {namespace}")
try:
self.__namespaces.pop(namespace)
except KeyError as error:
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error

def list_tables(self, namespace: Optional[Union[str, Identifier]] = None) -> List[Identifier]:
if namespace:
namespace = Catalog.identifier_to_tuple(namespace)
list_tables = [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]]
else:
list_tables = list(self.__tables.keys())

return list_tables

def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
# Hierarchical namespace is not supported. Return an empty list
if namespace:
return []

return list(self.__namespaces.keys())

def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
namespace = Catalog.identifier_to_tuple(namespace)
try:
return self.__namespaces[namespace]
except KeyError as error:
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error

def update_namespace_properties(
self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT
) -> PropertiesUpdateSummary:
removed: Set[str] = set()
updated: Set[str] = set()

namespace = Catalog.identifier_to_tuple(namespace)
if namespace in self.__namespaces:
if removals:
for key in removals:
if key in self.__namespaces[namespace]:
del self.__namespaces[namespace][key]
removed.add(key)
if updates:
for key, value in updates.items():
self.__namespaces[namespace][key] = value
updated.add(key)
else:
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}")

expected_to_change = removed.difference(removals or set())

return PropertiesUpdateSummary(
removed=list(removed or []), updated=list(updates.keys() if updates else []), missing=list(expected_to_change)
)
2 changes: 1 addition & 1 deletion pyiceberg/cli/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def describe_properties(self, properties: Properties) -> None:
Console().print(output_table)

def text(self, response: str) -> None:
Console().print(response)
Console(soft_wrap=True).print(response)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some test_console.py outputs are too long and end up with an extra \n in the middle of the string, causing tests to fail


def schema(self, schema: Schema) -> None:
output_table = self._table
Expand Down
Loading