From 321a7dd1ea5667b7f902809d9329a37a3936913d Mon Sep 17 00:00:00 2001 From: coufon Date: Wed, 20 Dec 2023 06:28:46 +0000 Subject: [PATCH] Add Append op for index files, manifests, and column stats. --- python/src/space/core/manifests/__init__.py | 14 ++ python/src/space/core/manifests/index.py | 215 +++++++++++++++++++ python/src/space/core/ops/__init__.py | 14 ++ python/src/space/core/ops/append.py | 171 +++++++++++++++ python/src/space/core/ops/base.py | 30 +++ python/src/space/core/ops/utils.py | 27 +++ python/src/space/core/proto/metadata.proto | 21 +- python/src/space/core/proto/metadata_pb2.py | 22 +- python/src/space/core/proto/metadata_pb2.pyi | 46 +++- python/src/space/core/proto/runtime.proto | 48 +++++ python/src/space/core/proto/runtime_pb2.py | 30 +++ python/src/space/core/proto/runtime_pb2.pyi | 107 +++++++++ python/src/space/core/schema/arrow.py | 112 +++++++++- python/src/space/core/schema/constants.py | 18 ++ python/src/space/core/schema/substrait.py | 18 +- python/src/space/core/storage.py | 23 +- python/tests/core/test_storage.py | 8 +- 17 files changed, 889 insertions(+), 35 deletions(-) create mode 100644 python/src/space/core/manifests/__init__.py create mode 100644 python/src/space/core/manifests/index.py create mode 100644 python/src/space/core/ops/__init__.py create mode 100644 python/src/space/core/ops/append.py create mode 100644 python/src/space/core/ops/base.py create mode 100644 python/src/space/core/ops/utils.py create mode 100644 python/src/space/core/proto/runtime.proto create mode 100644 python/src/space/core/proto/runtime_pb2.py create mode 100644 python/src/space/core/proto/runtime_pb2.pyi create mode 100644 python/src/space/core/schema/constants.py diff --git a/python/src/space/core/manifests/__init__.py b/python/src/space/core/manifests/__init__.py new file mode 100644 index 0000000..36a92ed --- /dev/null +++ b/python/src/space/core/manifests/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/src/space/core/manifests/index.py b/python/src/space/core/manifests/index.py new file mode 100644 index 0000000..e4368fe --- /dev/null +++ b/python/src/space/core/manifests/index.py @@ -0,0 +1,215 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Index manifest files writer and reader implementation.""" + +from typing import Any, Dict, List, Optional, Tuple + +import pyarrow as pa +import pyarrow.parquet as pq + +import space.core.proto.metadata_pb2 as meta +from space.core.schema.arrow import field_id, field_id_to_column_id_dict +from space.core.utils import paths + +# Manifest file fields. +_FILE_PATH_FIELD = '_FILE' +_NUM_ROWS_FIELD = '_NUM_ROWS' +_INDEX_COMPRESSED_BYTES_FIELD = '_INDEX_COMPRESSED_BYTES' +_INDEX_UNCOMPRESSED_BYTES_FIELD = '_INDEX_UNCOMPRESSED_BYTES' + +# Constants for building column statistics field name. +_STATS_FIELD = "_STATS" +_MIN_FIELD = "_MIN" +_MAX_FIELD = "_MAX" + + +def _stats_field_name(name: str) -> str: + """Column stats struct field name.""" + return f"{_STATS_FIELD}_{name}" + + +def _stats_subfields(type_: pa.DataType) -> List[pa.Field]: + """Column stats struct field sub-fields.""" + return [pa.field(_MIN_FIELD, type_), pa.field(_MAX_FIELD, type_)] + + +def _manifest_schema( + schema: pa.Schema, primary_keys: List[str] +) -> Tuple[pa.Schema, List[Tuple[int, pa.DataType]]]: + """Build the index manifest file schema, based on storage schema.""" + primary_keys_ = set(primary_keys) + + fields = [(_FILE_PATH_FIELD, pa.utf8()), (_NUM_ROWS_FIELD, pa.int64()), + (_INDEX_COMPRESSED_BYTES_FIELD, pa.int64()), + (_INDEX_UNCOMPRESSED_BYTES_FIELD, pa.int64())] + + # Fields to collect Parquet column statistics: [(field_id, type), ...]. + stats_fields: List[Tuple[int, pa.DataType]] = [] + + for f in schema: + if f.name not in primary_keys_: + continue + + fields.append( + (_stats_field_name(f.name), pa.struct(_stats_subfields(f.type)))) + stats_fields.append((field_id(f), f.type)) + + return pa.schema(fields), stats_fields # type: ignore[arg-type] + + +class _FieldStats: + """A local cache for merging stats from row groups.""" + + def __init__(self, type_: pa.DataType): + self._fields = _stats_subfields(type_) + + self._min: Any = None + self._max: Any = None + + # _min and _max cache the stats of one file; after finalizing they are + # moved to _min_list and _max_list that cache stats of all files. + self._min_list: List[Any] = [] + self._max_list: List[Any] = [] + + def merge(self, stats: pq.Statistics) -> None: + """Merge the cached stats with a new stats.""" + if self._min is None or stats.min < self._min: + self._min = stats.min + + if self._max is None or stats.max > self._max: + self._max = stats.max + + def finish(self) -> None: + """Finialize the stats for the current file.""" + self._min_list.append(self._min) + self._max_list.append(self._max) + + self._min = None + self._max = None + + def to_arrow(self) -> pa.Array: + """Return the Arrow format of the stats.""" + return pa.StructArray.from_arrays( + [self._min_list, self._max_list], # type: ignore[arg-type] + fields=self._fields) + + +# pylint: disable=too-many-instance-attributes +class IndexManifestWriter: + """Writer of index manifest files.""" + + def __init__(self, metadata_dir: str, schema: pa.Schema, + primary_keys: List[str]): + """ + Args: + metadata_dir: full directory path of metadata folder. + schema: the storage schema, either logical or physical schema works + because only primary key fields are used for column stats. + primary_keys: un-enforced storage primary keys; index fields only. + """ + self._metadata_dir = metadata_dir + self._manifest_schema, self._stats_fields = _manifest_schema( + schema, primary_keys) + + # Cache manifest file field values in memory. + self._file_paths: List[str] = [] + self._num_rows: List[int] = [] + self._index_compressed_bytes: List[int] = [] + self._index_uncompressed_bytes: List[int] = [] + + # Caches for column stats. + self._stats_column_ids: List[int] = [] + # Key is column ID. + self._field_stats_dict: Dict[int, _FieldStats] = {} + + column_id_dict = field_id_to_column_id_dict(schema) + for field_id_, type_ in self._stats_fields: + column_id = column_id_dict[field_id_] + self._stats_column_ids.append(column_id) + self._field_stats_dict[column_id] = _FieldStats(type_) + + # Cache for manifest data to materialize. + self._manifest_data: List[pa.Table] = [] + + def write(self, file_path: str, + parquet_metadata: pq.FileMetaData) -> meta.StorageStatistics: + """Write a new manifest row. + + Args: + file_path: a relative file path of the index file. + parquet_metadata: the Parquet metadata of the index file. + + Returns: + The storage statistics of the index file. + """ + self._file_paths.append(file_path) + self._num_rows.append(parquet_metadata.num_rows) + + index_compressed_bytes = 0 + index_uncompressed_bytes = 0 + + for rg in range(parquet_metadata.num_row_groups): + rg_metadata = parquet_metadata.row_group(rg) + for column_id, field_stats in self._field_stats_dict.items(): + column_stats = rg_metadata.column(column_id) + field_stats.merge(column_stats.statistics) + index_compressed_bytes += column_stats.total_compressed_size + index_uncompressed_bytes += column_stats.total_uncompressed_size + + for field_stats in self._field_stats_dict.values(): + field_stats.finish() + + self._index_compressed_bytes.append(index_compressed_bytes) + self._index_uncompressed_bytes.append(index_uncompressed_bytes) + + return meta.StorageStatistics( + num_rows=parquet_metadata.num_rows, + index_compressed_bytes=index_compressed_bytes, + index_uncompressed_bytes=index_uncompressed_bytes) + + def finish(self) -> Optional[str]: + """Materialize the manifest file and return the file path.""" + # Convert cached manifest data to Arrow. + if self._file_paths: + arrays = [ + self._file_paths, self._num_rows, self._index_compressed_bytes, + self._index_uncompressed_bytes + ] + + for column_id in self._stats_column_ids: + arrays.append(self._field_stats_dict[column_id].to_arrow()) + + self._manifest_data.append( + pa.Table.from_arrays( + arrays=arrays, + schema=self._manifest_schema)) # type: ignore[call-arg] + + manifest_data = pa.concat_tables(self._manifest_data) + if manifest_data.num_rows == 0: + return None + + return _write_index_manifest(self._metadata_dir, self._manifest_schema, + manifest_data) + + +def _write_index_manifest(metadata_dir: str, schema: pa.Schema, + data: pa.Table) -> str: + # TODO: currently assume this file is small, so always write a single file. + file_path = paths.new_index_manifest_path(metadata_dir) + writer = pq.ParquetWriter(file_path, schema) + writer.write_table(data) + + writer.close() + return file_path diff --git a/python/src/space/core/ops/__init__.py b/python/src/space/core/ops/__init__.py new file mode 100644 index 0000000..36a92ed --- /dev/null +++ b/python/src/space/core/ops/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/src/space/core/ops/append.py b/python/src/space/core/ops/append.py new file mode 100644 index 0000000..7214e9e --- /dev/null +++ b/python/src/space/core/ops/append.py @@ -0,0 +1,171 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Local append operation implementation.""" + +from __future__ import annotations +from abc import abstractmethod +from dataclasses import dataclass +from typing import Optional + +import pyarrow as pa +import pyarrow.parquet as pq + +from space.core.manifests.index import IndexManifestWriter +from space.core.ops import utils +from space.core.ops.base import BaseOp, InputData +from space.core.proto import metadata_pb2 as meta +from space.core.proto import runtime_pb2 as runtime +from space.core.schema.arrow import arrow_schema +from space.core.utils import paths +from space.core.utils.paths import StoragePaths + +# TODO: to obtain the values from user provided options. +# Thresholds for writing Parquet files. The sizes are uncompressed bytes. +_MAX_ARRAY_RECORD_BYTES = 100 * 1024 * 1024 +_MAX_PARQUET_BYTES = 1 * 1024 * 1024 +_MAX_ROW_GROUP_BYTES = 100 * 1024 + + +class BaseAppendOp(BaseOp): + """Abstract base Append operation class.""" + + @abstractmethod + def write(self, data: InputData) -> None: + """Append data into storage.""" + + @abstractmethod + def finish(self) -> Optional[runtime.Patch]: + """Complete the append operation and return a metadata patch.""" + + +@dataclass +class IndexWriterInfo: + """Contain information of index file writer.""" + writer: pq.ParquetWriter + file_path: str + + +class LocalAppendOp(BaseAppendOp, StoragePaths): + """Append operation running locally. + + It can be used as components of more complex operations and distributed + append operation. + + Not thread safe. + """ + + def __init__(self, location: str, metadata: meta.StorageMetadata): + StoragePaths.__init__(self, location) + + self._metadata = metadata + self._schema = arrow_schema(self._metadata.schema.fields) + + # Data file writers. + self._index_writer_info: Optional[IndexWriterInfo] = None + + # Local runtime caches. + self._cached_index_data: Optional[pa.Table] = None + self._cached_index_file_bytes = 0 + + # Manifest file writers. + self._index_manifest_writer = IndexManifestWriter( + self._metadata_dir, self._schema, + self._metadata.schema.primary_keys) # type: ignore[arg-type] + + self._patch = runtime.Patch() + + def write(self, data: InputData) -> None: + if not isinstance(data, pa.Table): + data = pa.Table.from_pydict(data) + + self._append_arrow(data) + + def finish(self) -> Optional[runtime.Patch]: + """Complete the append operation and return a metadata patch. + + Returns: + A patch to the storage or None if no actual storage modification happens. + """ + if self._patch.storage_statistics_update.num_rows == 0: + return None + + # Flush all cached index data. + if self._cached_index_data is not None: + self._maybe_create_index_writer() + assert self._index_writer_info is not None + self._index_writer_info.writer.write_table(self._cached_index_data) + + if self._index_writer_info is not None: + self._finish_index_writer() + + index_manifest_full_path = self._index_manifest_writer.finish() + if index_manifest_full_path is not None: + self._patch.added_index_manifest_files.append( + self.short_path(index_manifest_full_path)) + + return self._patch + + def _append_arrow(self, data: pa.Table) -> None: + # TODO: to verify the schema of input data. + if data.num_rows == 0: + return + + # TODO: index data is a subset of fields of data when we consider record + # fields. + index_data = data + self._maybe_create_index_writer() + + self._cached_index_file_bytes += index_data.nbytes + + if self._cached_index_data is None: + self._cached_index_data = index_data + else: + self._cached_index_data = pa.concat_tables( + [self._cached_index_data, index_data]) + + # _cached_index_data is written as a new row group. + if self._cached_index_data.nbytes > _MAX_ROW_GROUP_BYTES: + assert self._index_writer_info is not None + self._index_writer_info.writer.write_table(self._cached_index_data) + self._cached_index_data = None + + # Materialize the index file. + if self._cached_index_file_bytes > _MAX_PARQUET_BYTES: + self._finish_index_writer() + + def _maybe_create_index_writer(self) -> None: + """Create a new index file writer if needed.""" + if self._index_writer_info is None: + full_file_path = paths.new_index_file_path(self._data_dir) + writer = pq.ParquetWriter(full_file_path, self._schema) + self._index_writer_info = IndexWriterInfo( + writer, self.short_path(full_file_path)) + + def _finish_index_writer(self) -> None: + """Materialize a new index file, update metadata and stats.""" + if self._index_writer_info is None: + return + + self._index_writer_info.writer.close() + + # Update metadata in manifest files. + stats = self._index_manifest_writer.write( + self._index_writer_info.file_path, + self._index_writer_info.writer.writer.metadata) + utils.update_index_storage_statistics( + base=self._patch.storage_statistics_update, update=stats) + + self._index_writer_info = None + self._cached_index_file_bytes = 0 diff --git a/python/src/space/core/ops/base.py b/python/src/space/core/ops/base.py new file mode 100644 index 0000000..4293a97 --- /dev/null +++ b/python/src/space/core/ops/base.py @@ -0,0 +1,30 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Abstract base operation.""" + +from __future__ import annotations +from abc import ABC +from typing import Any, Dict, Union +from typing_extensions import TypeAlias + +import pyarrow as pa + +# Input data can be either nested Py dict or Arrow table. +InputData: TypeAlias = Union[Dict[str, Any], pa.Table] + + +# pylint: disable=too-few-public-methods +class BaseOp(ABC): + """Abstract base operation class.""" diff --git a/python/src/space/core/ops/utils.py b/python/src/space/core/ops/utils.py new file mode 100644 index 0000000..b7e3f9f --- /dev/null +++ b/python/src/space/core/ops/utils.py @@ -0,0 +1,27 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Utilities for operation classes.""" + +from space.core.proto import metadata_pb2 as meta + + +def update_index_storage_statistics( + base: meta.StorageStatistics, + update: meta.StorageStatistics, +) -> None: + """Update index storage statistics.""" + base.num_rows += base.num_rows + base.index_compressed_bytes += update.index_compressed_bytes + base.index_uncompressed_bytes += update.index_uncompressed_bytes diff --git a/python/src/space/core/proto/metadata.proto b/python/src/space/core/proto/metadata.proto index d02e534..1215f95 100644 --- a/python/src/space/core/proto/metadata.proto +++ b/python/src/space/core/proto/metadata.proto @@ -32,7 +32,7 @@ message EntryPoint { // Metadata persisting the current status of a storage, including logical // metadata such as schema, and physical metadata persisted as a history of // snapshots -// NEXT_ID: 7 +// NEXT_ID: 8 message StorageMetadata { // Create time of the storage. google.protobuf.Timestamp create_time = 1; @@ -56,6 +56,9 @@ message StorageMetadata { // All alive snapshots with snapshot ID as key. map snapshots = 6; + + // Statistics of all data in the storage. + StorageStatistics storage_statistics = 7; } // The storage logical schema where user provided types are persisted instead @@ -64,6 +67,9 @@ message StorageMetadata { message Schema { // Fields persisted as Substrait named struct. substrait.NamedStruct fields = 1; + + // Primary key field names. Required but primary keys are un-enforced. + repeated string primary_keys = 2; } // Storage snapshot persisting physical metadata such as manifest file paths. @@ -76,3 +82,16 @@ message Snapshot { // The create time of the snapshot. google.protobuf.Timestamp create_time = 2; } + +// Statistics of storage data. +// NEXT_ID: 4 +message StorageStatistics { + // Number of rows. + int64 num_rows = 1; + + // Compressed bytes of index data. + int64 index_compressed_bytes = 2; + + // Uncompressed bytes of index data. + int64 index_uncompressed_bytes = 3; +} diff --git a/python/src/space/core/proto/metadata_pb2.py b/python/src/space/core/proto/metadata_pb2.py index 7121cf9..d6e061e 100644 --- a/python/src/space/core/proto/metadata_pb2.py +++ b/python/src/space/core/proto/metadata_pb2.py @@ -15,7 +15,7 @@ from substrait import type_pb2 as substrait_dot_type__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1fspace/core/proto/metadata.proto\x12\x0bspace.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x14substrait/type.proto\"#\n\nEntryPoint\x12\x15\n\rmetadata_file\x18\x01 \x01(\t\"\x9f\x03\n\x0fStorageMetadata\x12/\n\x0b\x63reate_time\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x10last_update_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12/\n\x04type\x18\x03 \x01(\x0e\x32!.space.proto.StorageMetadata.Type\x12#\n\x06schema\x18\x04 \x01(\x0b\x32\x13.space.proto.Schema\x12\x1b\n\x13\x63urrent_snapshot_id\x18\x05 \x01(\x03\x12>\n\tsnapshots\x18\x06 \x03(\x0b\x32+.space.proto.StorageMetadata.SnapshotsEntry\x1aG\n\x0eSnapshotsEntry\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12$\n\x05value\x18\x02 \x01(\x0b\x32\x15.space.proto.Snapshot:\x02\x38\x01\")\n\x04Type\x12\x14\n\x10TYPE_UNSPECIFIED\x10\x00\x12\x0b\n\x07\x44\x41TASET\x10\x01\"0\n\x06Schema\x12&\n\x06\x66ields\x18\x01 \x01(\x0b\x32\x16.substrait.NamedStruct\"P\n\x08Snapshot\x12\x13\n\x0bsnapshot_id\x18\x01 \x01(\x03\x12/\n\x0b\x63reate_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestampb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1fspace/core/proto/metadata.proto\x12\x0bspace.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x14substrait/type.proto\"#\n\nEntryPoint\x12\x15\n\rmetadata_file\x18\x01 \x01(\t\"\xdb\x03\n\x0fStorageMetadata\x12/\n\x0b\x63reate_time\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x10last_update_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12/\n\x04type\x18\x03 \x01(\x0e\x32!.space.proto.StorageMetadata.Type\x12#\n\x06schema\x18\x04 \x01(\x0b\x32\x13.space.proto.Schema\x12\x1b\n\x13\x63urrent_snapshot_id\x18\x05 \x01(\x03\x12>\n\tsnapshots\x18\x06 \x03(\x0b\x32+.space.proto.StorageMetadata.SnapshotsEntry\x12:\n\x12storage_statistics\x18\x07 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x1aG\n\x0eSnapshotsEntry\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12$\n\x05value\x18\x02 \x01(\x0b\x32\x15.space.proto.Snapshot:\x02\x38\x01\")\n\x04Type\x12\x14\n\x10TYPE_UNSPECIFIED\x10\x00\x12\x0b\n\x07\x44\x41TASET\x10\x01\"F\n\x06Schema\x12&\n\x06\x66ields\x18\x01 \x01(\x0b\x32\x16.substrait.NamedStruct\x12\x14\n\x0cprimary_keys\x18\x02 \x03(\t\"P\n\x08Snapshot\x12\x13\n\x0bsnapshot_id\x18\x01 \x01(\x03\x12/\n\x0b\x63reate_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"g\n\x11StorageStatistics\x12\x10\n\x08num_rows\x18\x01 \x01(\x03\x12\x1e\n\x16index_compressed_bytes\x18\x02 \x01(\x03\x12 \n\x18index_uncompressed_bytes\x18\x03 \x01(\x03\x62\x06proto3') _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'space.core.proto.metadata_pb2', globals()) @@ -27,13 +27,15 @@ _ENTRYPOINT._serialized_start=103 _ENTRYPOINT._serialized_end=138 _STORAGEMETADATA._serialized_start=141 - _STORAGEMETADATA._serialized_end=556 - _STORAGEMETADATA_SNAPSHOTSENTRY._serialized_start=442 - _STORAGEMETADATA_SNAPSHOTSENTRY._serialized_end=513 - _STORAGEMETADATA_TYPE._serialized_start=515 - _STORAGEMETADATA_TYPE._serialized_end=556 - _SCHEMA._serialized_start=558 - _SCHEMA._serialized_end=606 - _SNAPSHOT._serialized_start=608 - _SNAPSHOT._serialized_end=688 + _STORAGEMETADATA._serialized_end=616 + _STORAGEMETADATA_SNAPSHOTSENTRY._serialized_start=502 + _STORAGEMETADATA_SNAPSHOTSENTRY._serialized_end=573 + _STORAGEMETADATA_TYPE._serialized_start=575 + _STORAGEMETADATA_TYPE._serialized_end=616 + _SCHEMA._serialized_start=618 + _SCHEMA._serialized_end=688 + _SNAPSHOT._serialized_start=690 + _SNAPSHOT._serialized_end=770 + _STORAGESTATISTICS._serialized_start=772 + _STORAGESTATISTICS._serialized_end=875 # @@protoc_insertion_point(module_scope) diff --git a/python/src/space/core/proto/metadata_pb2.pyi b/python/src/space/core/proto/metadata_pb2.pyi index ab55fb5..949daaa 100644 --- a/python/src/space/core/proto/metadata_pb2.pyi +++ b/python/src/space/core/proto/metadata_pb2.pyi @@ -61,7 +61,7 @@ class StorageMetadata(google.protobuf.message.Message): """Metadata persisting the current status of a storage, including logical metadata such as schema, and physical metadata persisted as a history of snapshots - NEXT_ID: 7 + NEXT_ID: 8 """ DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -107,6 +107,7 @@ class StorageMetadata(google.protobuf.message.Message): SCHEMA_FIELD_NUMBER: builtins.int CURRENT_SNAPSHOT_ID_FIELD_NUMBER: builtins.int SNAPSHOTS_FIELD_NUMBER: builtins.int + STORAGE_STATISTICS_FIELD_NUMBER: builtins.int @property def create_time(self) -> google.protobuf.timestamp_pb2.Timestamp: """Create time of the storage.""" @@ -122,6 +123,9 @@ class StorageMetadata(google.protobuf.message.Message): @property def snapshots(self) -> google.protobuf.internal.containers.MessageMap[builtins.int, global___Snapshot]: """All alive snapshots with snapshot ID as key.""" + @property + def storage_statistics(self) -> global___StorageStatistics: + """Statistics of all data in the storage.""" def __init__( self, *, @@ -131,9 +135,10 @@ class StorageMetadata(google.protobuf.message.Message): schema: global___Schema | None = ..., current_snapshot_id: builtins.int = ..., snapshots: collections.abc.Mapping[builtins.int, global___Snapshot] | None = ..., + storage_statistics: global___StorageStatistics | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "last_update_time", b"last_update_time", "schema", b"schema"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "current_snapshot_id", b"current_snapshot_id", "last_update_time", b"last_update_time", "schema", b"schema", "snapshots", b"snapshots", "type", b"type"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "last_update_time", b"last_update_time", "schema", b"schema", "storage_statistics", b"storage_statistics"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "current_snapshot_id", b"current_snapshot_id", "last_update_time", b"last_update_time", "schema", b"schema", "snapshots", b"snapshots", "storage_statistics", b"storage_statistics", "type", b"type"]) -> None: ... global___StorageMetadata = StorageMetadata @@ -147,16 +152,21 @@ class Schema(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor FIELDS_FIELD_NUMBER: builtins.int + PRIMARY_KEYS_FIELD_NUMBER: builtins.int @property def fields(self) -> substrait.type_pb2.NamedStruct: """Fields persisted as Substrait named struct.""" + @property + def primary_keys(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """Primary key field names. Required but primary keys are un-enforced.""" def __init__( self, *, fields: substrait.type_pb2.NamedStruct | None = ..., + primary_keys: collections.abc.Iterable[builtins.str] | None = ..., ) -> None: ... def HasField(self, field_name: typing_extensions.Literal["fields", b"fields"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["fields", b"fields"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["fields", b"fields", "primary_keys", b"primary_keys"]) -> None: ... global___Schema = Schema @@ -186,3 +196,31 @@ class Snapshot(google.protobuf.message.Message): def ClearField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "snapshot_id", b"snapshot_id"]) -> None: ... global___Snapshot = Snapshot + +@typing_extensions.final +class StorageStatistics(google.protobuf.message.Message): + """Statistics of storage data. + NEXT_ID: 4 + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + NUM_ROWS_FIELD_NUMBER: builtins.int + INDEX_COMPRESSED_BYTES_FIELD_NUMBER: builtins.int + INDEX_UNCOMPRESSED_BYTES_FIELD_NUMBER: builtins.int + num_rows: builtins.int + """Number of rows.""" + index_compressed_bytes: builtins.int + """Compressed bytes of index data.""" + index_uncompressed_bytes: builtins.int + """Uncompressed bytes of index data.""" + def __init__( + self, + *, + num_rows: builtins.int = ..., + index_compressed_bytes: builtins.int = ..., + index_uncompressed_bytes: builtins.int = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["index_compressed_bytes", b"index_compressed_bytes", "index_uncompressed_bytes", b"index_uncompressed_bytes", "num_rows", b"num_rows"]) -> None: ... + +global___StorageStatistics = StorageStatistics diff --git a/python/src/space/core/proto/runtime.proto b/python/src/space/core/proto/runtime.proto new file mode 100644 index 0000000..77624f2 --- /dev/null +++ b/python/src/space/core/proto/runtime.proto @@ -0,0 +1,48 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +import "space/core/proto/metadata.proto"; + +package space.proto; + +// A patch describing metadata changes to the storage. +// NEXT_ID: 4 +message Patch { + // Index manifest file paths newly added to the storage. + repeated string added_index_manifest_files = 1; + + // Index manifest file paths to be removed from the storage. + repeated string deleted_index_manifest_files = 2; + + // The change of the storage statistics. + StorageStatistics storage_statistics_update = 3; +} + +// Result of a job. +// NEXT_ID: 2 +message JobResult { + enum State { + STATE_UNSPECIFIED = 0; + SUCCEEDED = 1; + FAILED = 2; + SKIPPED = 3; + } + + State state = 1; + + // The change of the storage statistics. + StorageStatistics storage_statistics_update = 2; +} diff --git a/python/src/space/core/proto/runtime_pb2.py b/python/src/space/core/proto/runtime_pb2.py new file mode 100644 index 0000000..97a3b99 --- /dev/null +++ b/python/src/space/core/proto/runtime_pb2.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: space/core/proto/runtime.proto +"""Generated protocol buffer code.""" +from google.protobuf.internal import builder as _builder +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from space.core.proto import metadata_pb2 as space_dot_core_dot_proto_dot_metadata__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1espace/core/proto/runtime.proto\x12\x0bspace.proto\x1a\x1fspace/core/proto/metadata.proto\"\x94\x01\n\x05Patch\x12\"\n\x1a\x61\x64\x64\x65\x64_index_manifest_files\x18\x01 \x03(\t\x12$\n\x1c\x64\x65leted_index_manifest_files\x18\x02 \x03(\t\x12\x41\n\x19storage_statistics_update\x18\x03 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\"\xc3\x01\n\tJobResult\x12+\n\x05state\x18\x01 \x01(\x0e\x32\x1c.space.proto.JobResult.State\x12\x41\n\x19storage_statistics_update\x18\x02 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\"F\n\x05State\x12\x15\n\x11STATE_UNSPECIFIED\x10\x00\x12\r\n\tSUCCEEDED\x10\x01\x12\n\n\x06\x46\x41ILED\x10\x02\x12\x0b\n\x07SKIPPED\x10\x03\x62\x06proto3') + +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'space.core.proto.runtime_pb2', globals()) +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _PATCH._serialized_start=81 + _PATCH._serialized_end=229 + _JOBRESULT._serialized_start=232 + _JOBRESULT._serialized_end=427 + _JOBRESULT_STATE._serialized_start=357 + _JOBRESULT_STATE._serialized_end=427 +# @@protoc_insertion_point(module_scope) diff --git a/python/src/space/core/proto/runtime_pb2.pyi b/python/src/space/core/proto/runtime_pb2.pyi new file mode 100644 index 0000000..b72be8e --- /dev/null +++ b/python/src/space/core/proto/runtime_pb2.pyi @@ -0,0 +1,107 @@ +""" +@generated by mypy-protobuf. Do not edit manually! +isort:skip_file +Copyright 2023 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import builtins +import collections.abc +import google.protobuf.descriptor +import google.protobuf.internal.containers +import google.protobuf.internal.enum_type_wrapper +import google.protobuf.message +import space.core.proto.metadata_pb2 +import sys +import typing + +if sys.version_info >= (3, 10): + import typing as typing_extensions +else: + import typing_extensions + +DESCRIPTOR: google.protobuf.descriptor.FileDescriptor + +@typing_extensions.final +class Patch(google.protobuf.message.Message): + """A patch describing metadata changes to the storage. + NEXT_ID: 4 + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + ADDED_INDEX_MANIFEST_FILES_FIELD_NUMBER: builtins.int + DELETED_INDEX_MANIFEST_FILES_FIELD_NUMBER: builtins.int + STORAGE_STATISTICS_UPDATE_FIELD_NUMBER: builtins.int + @property + def added_index_manifest_files(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """Index manifest file paths newly added to the storage.""" + @property + def deleted_index_manifest_files(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """Index manifest file paths to be removed from the storage.""" + @property + def storage_statistics_update(self) -> space.core.proto.metadata_pb2.StorageStatistics: + """The change of the storage statistics.""" + def __init__( + self, + *, + added_index_manifest_files: collections.abc.Iterable[builtins.str] | None = ..., + deleted_index_manifest_files: collections.abc.Iterable[builtins.str] | None = ..., + storage_statistics_update: space.core.proto.metadata_pb2.StorageStatistics | None = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["storage_statistics_update", b"storage_statistics_update"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["added_index_manifest_files", b"added_index_manifest_files", "deleted_index_manifest_files", b"deleted_index_manifest_files", "storage_statistics_update", b"storage_statistics_update"]) -> None: ... + +global___Patch = Patch + +@typing_extensions.final +class JobResult(google.protobuf.message.Message): + """Result of a job. + NEXT_ID: 2 + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + class _State: + ValueType = typing.NewType("ValueType", builtins.int) + V: typing_extensions.TypeAlias = ValueType + + class _StateEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[JobResult._State.ValueType], builtins.type): + DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor + STATE_UNSPECIFIED: JobResult._State.ValueType # 0 + SUCCEEDED: JobResult._State.ValueType # 1 + FAILED: JobResult._State.ValueType # 2 + SKIPPED: JobResult._State.ValueType # 3 + + class State(_State, metaclass=_StateEnumTypeWrapper): ... + STATE_UNSPECIFIED: JobResult.State.ValueType # 0 + SUCCEEDED: JobResult.State.ValueType # 1 + FAILED: JobResult.State.ValueType # 2 + SKIPPED: JobResult.State.ValueType # 3 + + STATE_FIELD_NUMBER: builtins.int + STORAGE_STATISTICS_UPDATE_FIELD_NUMBER: builtins.int + state: global___JobResult.State.ValueType + @property + def storage_statistics_update(self) -> space.core.proto.metadata_pb2.StorageStatistics: + """The change of the storage statistics.""" + def __init__( + self, + *, + state: global___JobResult.State.ValueType = ..., + storage_statistics_update: space.core.proto.metadata_pb2.StorageStatistics | None = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["storage_statistics_update", b"storage_statistics_update"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["state", b"state", "storage_statistics_update", b"storage_statistics_update"]) -> None: ... + +global___JobResult = JobResult diff --git a/python/src/space/core/schema/arrow.py b/python/src/space/core/schema/arrow.py index 4bc40c4..da3f0ca 100644 --- a/python/src/space/core/schema/arrow.py +++ b/python/src/space/core/schema/arrow.py @@ -14,10 +14,15 @@ # """Utilities for schemas in the Arrow format.""" -from typing import Dict +from dataclasses import dataclass +from typing import Dict, List, Optional + import pyarrow as pa +from substrait.type_pb2 import NamedStruct, Type from space.core.utils.constants import UTF_8 +from space.core.schema.constants import TF_FEATURES_TYPE +from space.core.schema.types import TfFeatures _PARQUET_FIELD_ID_KEY = b"PARQUET:field_id" @@ -30,3 +35,108 @@ def field_metadata(field_id_: int) -> Dict[bytes, bytes]: def field_id(field: pa.Field) -> int: """Return field ID of an Arrow field.""" return int(field.metadata[_PARQUET_FIELD_ID_KEY]) + + +@dataclass +class _NamesVisitor: + names: List[str] + idx: int = 0 + + def next(self) -> str: + """Return the next name.""" + name = self.names[self.idx] + self.idx += 1 + return name + + +def arrow_schema(fields: NamedStruct) -> pa.Schema: + """Return Arrow schema from Substrait fields. + + Args: + fields: schema fields in the Substrait format. + physical: if true, return the physical schema. Physical schema matches with + the underlying index (Parquet) file schema. Record fields are stored by + their references, e.g., row position in ArrayRecord file. + """ + return pa.schema( + _arrow_fields( + _NamesVisitor(fields.names), # type: ignore[arg-type] + fields.struct.types)) # type: ignore[arg-type] + + +def _arrow_fields(names_visitor: _NamesVisitor, + types: List[Type]) -> List[pa.Field]: + fields: List[pa.Field] = [] + + for type_ in types: + name = names_visitor.next() + arrow_field = pa.field(name, + _arrow_type(type_, names_visitor), + metadata=field_metadata(_substrait_field_id(type_))) + fields.append(arrow_field) + + return fields + + +def _substrait_field_id(type_: Type) -> int: + return getattr(type_, type_.WhichOneof( + "kind")).type_variation_reference # type: ignore[arg-type] + + +# pylint: disable=too-many-return-statements +def _arrow_type(type_: Type, + names_visitor: Optional[_NamesVisitor] = None) -> pa.DataType: + """Return the Arrow type for a Substrait type.""" + # TODO: to support more types in Substrait, e.g., fixed_size_list, map. + if type_.HasField("bool"): + return pa.bool_() + if type_.HasField("i32"): + return pa.int32() + if type_.HasField("i64"): + return pa.int64() + if type_.HasField("fp32"): + return pa.float32() + if type_.HasField("fp64"): + return pa.float64() + if type_.HasField("string"): + return pa.string() + if type_.HasField("binary"): + return pa.binary() + if type_.HasField("list"): + return pa.list_(_arrow_type(type_.list.type, names_visitor)) + if type_.HasField("struct"): + assert names_visitor is not None + subfields = [] + for t in type_.struct.types: + subfields.append( + pa.field(names_visitor.next(), _arrow_type(t, names_visitor))) + return pa.struct(subfields) + if type_.HasField("user_defined"): + return _user_defined_arrow_type(type_) + + raise TypeError(f"Unsupported Substrait type: {type_}") + + +def _user_defined_arrow_type(type_: Type) -> pa.ExtensionType: + type_name = type_.user_defined.type_parameters[0].string + serialized = type_.user_defined.type_parameters[1].string + + if type_name == TF_FEATURES_TYPE: + return TfFeatures.__arrow_ext_deserialize__( + None, serialized) # type: ignore[arg-type] + + raise TypeError(f"Unsupported Substrait user defined type: {type_}") + + +def field_name_to_id_dict(schema: pa.Schema) -> Dict[str, int]: + """Return a dict with field name as key and field ID as value.""" + return {f.name: field_id(f) for f in schema} + + +def field_id_to_column_id_dict(schema: pa.Schema) -> Dict[int, int]: + """Return a dict with field ID as key and column ID as value.""" + field_id_dict = field_name_to_id_dict(schema) + return { + field_id_dict[name]: column_id + for column_id, name in enumerate(schema.names) + } diff --git a/python/src/space/core/schema/constants.py b/python/src/space/core/schema/constants.py new file mode 100644 index 0000000..b910a34 --- /dev/null +++ b/python/src/space/core/schema/constants.py @@ -0,0 +1,18 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Constants for schema.""" + +# Substrait type name of Arrow custom type TfFeatures. +TF_FEATURES_TYPE = "TF_FEATURES" diff --git a/python/src/space/core/schema/substrait.py b/python/src/space/core/schema/substrait.py index c74bfc9..f6734b4 100644 --- a/python/src/space/core/schema/substrait.py +++ b/python/src/space/core/schema/substrait.py @@ -20,12 +20,9 @@ import pyarrow as pa from substrait.type_pb2 import NamedStruct, Type +from space.core.schema.constants import TF_FEATURES_TYPE import space.core.schema.arrow as arrow_schema from space.core.schema.types import TfFeatures -from space.core.utils.constants import UTF_8 - -# Substrait type name of Arrow custom type TfFeatures. -TF_FEATURES_TYPE = "TF_FEATURES" def substrait_fields(schema: pa.Schema) -> NamedStruct: @@ -54,6 +51,7 @@ def _substrait_field(field: pa.Field, type_ = Type() field_id = arrow_schema.field_id(field) + # TODO: to support more types in Substrait, e.g., fixed_size_list, map. if pa.types.is_int64(field.type): _set_field_id(type_.i64, field_id) elif pa.types.is_int32(field.type): @@ -75,7 +73,6 @@ def _substrait_field(field: pa.Field, field.type.value_field, # type: ignore[attr-defined] mutable_names, is_list_item=True)) - # TODO: to support more types in Substrait, e.g., fixed_size_list, map. elif pa.types.is_struct(field.type): _set_field_id(type_.struct, field_id) subfields = list(field.type) # type: ignore[call-overload] @@ -84,13 +81,12 @@ def _substrait_field(field: pa.Field, # TfFeatures is persisted in Substrait as a user defined type, with # parameters [TF_FEATURES_TYPE, __arrow_ext_serialize__()]. _set_field_id(type_.user_defined, field_id) - type_.user_defined.type_parameters.extend([ - Type.Parameter(string=TF_FEATURES_TYPE), - Type.Parameter( - string=field.type.__arrow_ext_serialize__().decode(UTF_8)) - ]) + serialized = Type.Parameter( + string=field.type.__arrow_ext_serialize__()) # type: ignore[arg-type] + type_.user_defined.type_parameters.extend( + [Type.Parameter(string=TF_FEATURES_TYPE), serialized]) else: - raise ValueError( + raise TypeError( f"Type {field.type} of field {field.name} is not supported") return type_ diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index db5de40..cf3e894 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -16,7 +16,7 @@ from __future__ import annotations from os import path -from typing import Optional +from typing import List, Optional import pyarrow as pa @@ -74,20 +74,31 @@ def snapshot(self, snapshot_id: Optional[int] = None) -> meta.Snapshot: raise RuntimeError(f"Snapshot {snapshot_id} is not found") @classmethod - def create(cls, location: str, logical_schema: pa.Schema) -> Storage: # pylint: disable=unused-argument - """Create a new empty storage.""" + def create( + cls, + location: str, + schema: pa.Schema, + primary_keys: List[str], + ) -> Storage: # pylint: disable=unused-argument + """Create a new empty storage. + + Args: + location: the directory path to the storage. + schema: the schema of the storage. + primary_keys: un-enforced primary keys. + """ # TODO: to verify that location is an empty directory. field_id_mgr = FieldIdManager() - logical_schema = field_id_mgr.assign_field_ids(logical_schema) + schema = field_id_mgr.assign_field_ids(schema) now = proto_now() # TODO: to convert Arrow schema to Substrait schema. metadata = meta.StorageMetadata( create_time=now, last_update_time=now, - schema=meta.Schema( - fields=substrait_schema.substrait_fields(logical_schema)), + schema=meta.Schema(fields=substrait_schema.substrait_fields(schema), + primary_keys=primary_keys), current_snapshot_id=_INIT_SNAPSHOT_ID, type=meta.StorageMetadata.DATASET) diff --git a/python/tests/core/test_storage.py b/python/tests/core/test_storage.py index 0b04e25..74db350 100644 --- a/python/tests/core/test_storage.py +++ b/python/tests/core/test_storage.py @@ -65,7 +65,9 @@ def test_snapshot(self, storage): def test_create_storage(self, tmp_path): dir_path = tmp_path / "test_create_storage" / "dataset" - storage = Storage.create(location=str(dir_path), logical_schema=_SCHEMA) + storage = Storage.create(location=str(dir_path), + schema=_SCHEMA, + primary_keys=["a"]) entry_point_file = dir_path / "metadata" / _ENTRY_POINT_FILE assert entry_point_file.exists() @@ -87,7 +89,9 @@ def test_create_storage(self, tmp_path): def test_load_storage(self, tmp_path): dir_path = tmp_path / "test_create_storage" / "dataset" - storage = Storage.create(location=str(dir_path), logical_schema=_SCHEMA) + storage = Storage.create(location=str(dir_path), + schema=_SCHEMA, + primary_keys=["a"]) loaded_storage = Storage.load(str(dir_path)) assert loaded_storage.metadata == storage.metadata