diff --git a/.github/workflows/python-ci.yml b/.github/workflows/python-ci.yml index 0181178..98cf2c0 100644 --- a/.github/workflows/python-ci.yml +++ b/.github/workflows/python-ci.yml @@ -47,7 +47,8 @@ jobs: - name: Analysing test code with pylint working-directory: ./python run: | - pylint tests/**/* --disable="missing-module-docstring,missing-function-docstring,missing-class-docstring,duplicate-code" + pylint tests/**/* \ + --disable="missing-module-docstring,missing-function-docstring,missing-class-docstring,duplicate-code" - name: Checking type with mypy working-directory: ./python/src run: | diff --git a/python/src/space/core/manifests/index.py b/python/src/space/core/manifests/index.py index 734b0f4..fa7671d 100644 --- a/python/src/space/core/manifests/index.py +++ b/python/src/space/core/manifests/index.py @@ -30,10 +30,6 @@ from space.core.schema.arrow import field_id, field_id_to_column_id_dict from space.core.utils import paths -# Manifest file fields. -_INDEX_COMPRESSED_BYTES_FIELD = '_INDEX_COMPRESSED_BYTES' -_INDEX_UNCOMPRESSED_BYTES_FIELD = '_INDEX_UNCOMPRESSED_BYTES' - def _stats_subfields(type_: pa.DataType) -> List[pa.Field]: """Column stats struct field sub-fields.""" @@ -51,8 +47,8 @@ def _manifest_schema( fields = [(constants.FILE_PATH_FIELD, pa.utf8()), (constants.NUM_ROWS_FIELD, pa.int64()), - (_INDEX_COMPRESSED_BYTES_FIELD, pa.int64()), - (_INDEX_UNCOMPRESSED_BYTES_FIELD, pa.int64())] + (constants.INDEX_COMPRESSED_BYTES_FIELD, pa.int64()), + (constants.INDEX_UNCOMPRESSED_BYTES_FIELD, pa.int64())] # Fields to collect Parquet column statistics: [(field_id, type), ...]. stats_fields: List[Tuple[int, pa.DataType]] = [] @@ -139,6 +135,13 @@ def __init__(self, metadata_dir: str, schema: pa.Schema, self._stats_column_ids.append(column_id) self._field_stats_dict[column_id] = _FieldStats(type_) + self._cached_manifest_data: List[pa.Table] = [] + + @property + def manifest_schema(self) -> pa.Schema: + """The schema of index manifest files.""" + return self._manifest_schema + def write(self, file_path: str, parquet_metadata: pq.FileMetaData) -> meta.StorageStatistics: """Write a new manifest row. @@ -175,11 +178,13 @@ def write(self, file_path: str, index_compressed_bytes=index_compressed_bytes, index_uncompressed_bytes=index_uncompressed_bytes) + def write_arrow(self, manifest_data: pa.Table) -> None: + """Write manifest rows in Arrow format.""" + self._cached_manifest_data.append(manifest_data) + def finish(self) -> Optional[str]: """Materialize the manifest file and return the file path.""" # Convert cached manifest data to Arrow. - all_manifest_data: List[pa.Table] = [] - if self._file_paths: arrays = [ self._file_paths, self._num_rows, self._index_compressed_bytes, @@ -189,15 +194,15 @@ def finish(self) -> Optional[str]: for column_id in self._stats_column_ids: arrays.append(self._field_stats_dict[column_id].to_arrow()) - all_manifest_data.append( + self._cached_manifest_data.append( pa.Table.from_arrays( arrays=arrays, schema=self._manifest_schema)) # type: ignore[call-arg] - if not all_manifest_data: + if not self._cached_manifest_data: return None - manifest_data = pa.concat_tables(all_manifest_data) + manifest_data = pa.concat_tables(self._cached_manifest_data) if manifest_data.num_rows == 0: return None @@ -217,11 +222,13 @@ class _IndexManifests: def read_index_manifests( manifest_path: str, + manifest_file_id: int, filter_: Optional[pc.Expression] = None) -> runtime.FileSet: """Read an index manifest file. Args: manifest_path: full file path of the manifest file. + manifest_file_id: a temporary manifest file ID assigned by the caller. filter_: a filter on the index manifest rows. Returns: @@ -234,15 +241,13 @@ def read_index_manifests( file_set = runtime.FileSet() for i in range(table.num_rows): - file = runtime.DataFile() - file.path = manifests.file_path[i].as_py() - - stats = file.storage_statistics - stats.num_rows = manifests.num_rows[i].as_py() - stats.index_compressed_bytes = manifests.index_compressed_bytes[i].as_py() - stats.index_uncompressed_bytes = manifests.index_uncompressed_bytes[ - i].as_py() - + stats = meta.StorageStatistics( + num_rows=manifests.num_rows[i].as_py(), + index_compressed_bytes=manifests.index_compressed_bytes[i].as_py(), + index_uncompressed_bytes=manifests.index_uncompressed_bytes[i].as_py()) + file = runtime.DataFile(path=manifests.file_path[i].as_py(), + manifest_file_id=manifest_file_id, + storage_statistics=stats) file_set.index_files.append(file) return file_set @@ -254,6 +259,6 @@ def _index_manifests(table: pa.Table) -> _IndexManifests: file_path=table.column(constants.FILE_PATH_FIELD).combine_chunks(), num_rows=table.column(constants.NUM_ROWS_FIELD).combine_chunks(), index_compressed_bytes=table.column( - _INDEX_COMPRESSED_BYTES_FIELD).combine_chunks(), + constants.INDEX_COMPRESSED_BYTES_FIELD).combine_chunks(), index_uncompressed_bytes=table.column( - _INDEX_UNCOMPRESSED_BYTES_FIELD).combine_chunks()) + constants.INDEX_UNCOMPRESSED_BYTES_FIELD).combine_chunks()) diff --git a/python/src/space/core/ops/__init__.py b/python/src/space/core/ops/__init__.py index 4d69f09..6d7f9bf 100644 --- a/python/src/space/core/ops/__init__.py +++ b/python/src/space/core/ops/__init__.py @@ -15,4 +15,5 @@ """Space local data operations.""" from space.core.ops.append import LocalAppendOp +from space.core.ops.delete import FileSetDeleteOp from space.core.ops.read import FileSetReadOp diff --git a/python/src/space/core/ops/append.py b/python/src/space/core/ops/append.py index 3169ae8..0a82dc0 100644 --- a/python/src/space/core/ops/append.py +++ b/python/src/space/core/ops/append.py @@ -80,7 +80,14 @@ class LocalAppendOp(BaseAppendOp, StoragePaths): Not thread safe. """ - def __init__(self, location: str, metadata: meta.StorageMetadata): + def __init__(self, + location: str, + metadata: meta.StorageMetadata, + record_address_input: bool = False): + """ + Args: + record_address_input: if true, input record fields are addresses. + """ StoragePaths.__init__(self, location) self._metadata = metadata @@ -91,6 +98,8 @@ def __init__(self, location: str, metadata: meta.StorageMetadata): self._index_fields, self._record_fields = arrow.classify_fields( self._physical_schema, record_fields, selected_fields=None) + self._record_address_input = record_address_input + # Data file writers. self._index_writer_info: Optional[_IndexWriterInfo] = None @@ -151,6 +160,10 @@ def finish(self) -> Optional[runtime.Patch]: return self._patch + def append_index_manifest(self, index_manifest_table: pa.Table) -> None: + """Append external index manifest data.""" + return self._index_manifest_writer.write_arrow(index_manifest_table) + def _append_arrow(self, data: pa.Table) -> None: # TODO: to verify the schema of input data. if data.num_rows == 0: @@ -165,15 +178,20 @@ def _append_arrow(self, data: pa.Table) -> None: # Write record fields into files. # TODO: to parallelize it. - record_addresses = [ - self._write_record_column(f, data.column(f.name)) - for f in self._record_fields - ] - # TODO: to preserve the field order in schema. - for field_name, address_column in record_addresses: - # TODO: the field/column added must have field ID. - index_data = index_data.append_column(field_name, address_column) + if self._record_address_input: + for f in self._record_fields: + index_data = index_data.append_column(f.name, data.column( + f.name)) # type: ignore[arg-type] + else: + record_addresses = [ + self._write_record_column(f, data.column(f.name)) + for f in self._record_fields + ] + + for field_name, address_column in record_addresses: + # TODO: the field/column added must have field ID. + index_data = index_data.append_column(field_name, address_column) # Write index fields into files. self._cached_index_file_bytes += index_data.nbytes diff --git a/python/src/space/core/ops/delete.py b/python/src/space/core/ops/delete.py new file mode 100644 index 0000000..69cbf20 --- /dev/null +++ b/python/src/space/core/ops/delete.py @@ -0,0 +1,191 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Local delete operation implementation.""" + +from abc import abstractmethod +from typing import List, Optional, Set + +import pyarrow as pa +import pyarrow.parquet as pq +import pyarrow.compute as pc + +from space.core.ops import utils +from space.core.ops.append import LocalAppendOp +from space.core.ops.base import BaseOp +from space.core.proto import metadata_pb2 as meta +from space.core.proto import runtime_pb2 as runtime +from space.core.utils.paths import StoragePaths +from space.core.schema import constants + + +class BaseDeleteOp(BaseOp): + """Abstract base delete operation class. + + The deletion only applies to index files. The rows in record files are + cleaned up by a separate garbage collection operation. + """ + + @abstractmethod + def delete(self) -> Optional[runtime.Patch]: + """Delete data matching the filter from the storage. + + TODO: a class is not needed for the current single thread implementation. + To revisit the interface. + """ + + +class FileSetDeleteOp(BaseDeleteOp, StoragePaths): + """Delete operation of a given file set running locally. + + It can be used as components of more complex operations and distributed + delete operation. + + Not thread safe. + """ + + def __init__(self, location: str, metadata: meta.StorageMetadata, + file_set: runtime.FileSet, filter_: pc.Expression): + StoragePaths.__init__(self, location) + + if not _validate_files(file_set): + raise RuntimeError(f"Invalid input file set for delete op:\n{file_set}") + + self._file_set = file_set + # Rows not matching not filter will be reinserted. + self._reinsert_filter = ~filter_ + + self._append_op = LocalAppendOp(location, + metadata, + record_address_input=True) + + def delete(self) -> Optional[runtime.Patch]: + # The index files and manifests deleted, to remove them from index + # manifests. + deleted_files: List[str] = [] + deleted_manifest_ids: Set[int] = set() + + deleted_rows = 0 + stats_before_delete = meta.StorageStatistics() + for file in self._file_set.index_files: + utils.update_index_storage_stats(stats_before_delete, + file.storage_statistics) + + index_data = pq.read_table(self.full_path(file.path), + filters=self._reinsert_filter) + + # No row is deleted. No need to re-insert rows. + if index_data.num_rows == file.storage_statistics.num_rows: + continue + + # Collect statistics. + deleted_rows += (file.storage_statistics.num_rows - index_data.num_rows) + all_deleted = index_data.num_rows == 0 + + # Record deleted files and manifests information. + deleted_files.append(file.path) + deleted_manifest_ids.add(file.manifest_file_id) + + # Write reinsert file for survived rows. + if all_deleted: + continue + + # Re-insert survived rows. + self._append_op.write(index_data) + + if deleted_rows == 0: + return None + + # Carry over unmodified index files in reinserted manifests. + deleted_manifest_files: List[str] = [] + for manifest_id in deleted_manifest_ids: + if manifest_id not in self._file_set.index_manifest_files: + raise RuntimeError( + f"Index manifest ID {manifest_id} not found in file set") + + deleted_manifest_files.append( + self._file_set.index_manifest_files[manifest_id]) + + # Carry over survivor files to the new manifest data. + # Survivor stands for unmodified files. + survivor_files_filter = ~_build_file_path_filter(constants.FILE_PATH_FIELD, + deleted_files) + survivor_index_manifests = pq.ParquetDataset( + [self.full_path(f) for f in deleted_manifest_files], + filters=survivor_files_filter).read() + if survivor_index_manifests.num_rows > 0: + self._append_op.append_index_manifest(survivor_index_manifests) + + reinsert_patch = self._append_op.finish() + + # Populate the patch for the delete. + patch = runtime.Patch() + if reinsert_patch is not None: + patch.addition.index_manifest_files.extend( + reinsert_patch.addition.index_manifest_files) + + for f in deleted_manifest_files: + patch.deletion.index_manifest_files.append(f) + + # Compute storage statistics update. + survivor_stats = _read_index_statistics(survivor_index_manifests) + reinsert_stats = (reinsert_patch.storage_statistics_update if + reinsert_patch is not None else meta.StorageStatistics()) + + deleted_compressed_bytes = (reinsert_stats.index_compressed_bytes + + survivor_stats.index_compressed_bytes + ) - stats_before_delete.index_compressed_bytes + deleted_uncompressed_bytes = ( + reinsert_stats.index_uncompressed_bytes + + survivor_stats.index_uncompressed_bytes + ) - stats_before_delete.index_uncompressed_bytes + + patch.storage_statistics_update.CopyFrom( + meta.StorageStatistics( + num_rows=-deleted_rows, + index_compressed_bytes=deleted_compressed_bytes, + index_uncompressed_bytes=deleted_uncompressed_bytes)) + return patch + + +def _build_file_path_filter(field_name: str, file_paths: List[str]): + filter_ = pc.scalar(False) + for f in file_paths: + filter_ |= (pc.field(field_name) == pc.scalar(f)) + return filter_ + + +def _read_index_statistics(manifest_data: pa.Table) -> meta.StorageStatistics: + """Read storage statistics of unmodified index files from manifests.""" + + def sum_bytes(field_name: str) -> int: + return sum(manifest_data.column( + field_name).combine_chunks().to_pylist()) # type: ignore[arg-type] + + return meta.StorageStatistics(index_compressed_bytes=sum_bytes( + constants.INDEX_COMPRESSED_BYTES_FIELD), + index_uncompressed_bytes=sum_bytes( + constants.INDEX_UNCOMPRESSED_BYTES_FIELD)) + + +def _validate_files(file_set: runtime.FileSet) -> bool: + """Return false if the file set does not contain sufficient information for + deletion. + """ + for f in file_set.index_files: + if (not f.path or f.storage_statistics.num_rows == 0 + or f.manifest_file_id == 0): + return False + + return len(file_set.index_manifest_files) > 0 diff --git a/python/src/space/core/ops/read.py b/python/src/space/core/ops/read.py index 5003df5..529559b 100644 --- a/python/src/space/core/ops/read.py +++ b/python/src/space/core/ops/read.py @@ -44,7 +44,13 @@ def __iter__(self) -> Iterator[pa.Table]: class FileSetReadOp(BaseReadOp, StoragePaths): - """Read data from a dataset.""" + """Read operation of a given file set running locally. + + It can be used as components of more complex operations and distributed + read operation. + + Not thread safe. + """ def __init__(self, location: str, @@ -78,12 +84,12 @@ def __iter__(self) -> Iterator[pa.Table]: yield self._read_index_and_record(file.path) def _read_index_and_record(self, index_path: str) -> pa.Table: - index_table = pq.read_table(self.full_path(index_path), - filters=self._filter) # type: ignore[arg-type] + index_data = pq.read_table(self.full_path(index_path), + filters=self._filter) # type: ignore[arg-type] index_column_ids: List[int] = [] record_columns: List[Tuple[int, pa.Field]] = [] - for column_id, field in enumerate(index_table.schema): + for column_id, field in enumerate(index_data.schema): field_id = arrow.field_id(field) if field_id in self._index_field_ids: index_column_ids.append(column_id) @@ -92,18 +98,18 @@ def _read_index_and_record(self, index_path: str) -> pa.Table: (column_id, arrow.binary_field(self._record_fields_dict[field_id]))) - result_table = index_table.select( - index_column_ids) # type: ignore[arg-type] + result_data = index_data.select(index_column_ids) # type: ignore[arg-type] - # Record record fields from addresses. + # Read record fields from addresses. for column_id, field in record_columns: - result_table = result_table.append_column( + result_data = result_data.append_column( field, self._read_record_column( - index_table.select([column_id]), # type: ignore[list-item] + index_data.select([column_id]), # type: ignore[list-item] field.name)) - return result_table + # TODO: to keep field order the same as schema. + return result_data def _read_record_column(self, record_address: pa.Table, field: str) -> pa.BinaryArray: diff --git a/python/src/space/core/proto/runtime.proto b/python/src/space/core/proto/runtime.proto index a1a70d6..67e50d1 100644 --- a/python/src/space/core/proto/runtime.proto +++ b/python/src/space/core/proto/runtime.proto @@ -19,18 +19,26 @@ import "space/core/proto/metadata.proto"; package space.proto; // Information of a data file. +// NEXT_ID: 4 message DataFile { // Data file path. string path = 1; // Storage statistics of data in the file. - StorageStatistics storage_statistics = 4; + StorageStatistics storage_statistics = 2; + + // Locally assigned manifest file IDs. + int64 manifest_file_id = 3; } // A set of associated data and manifest files. +// NEXT_ID: 2 message FileSet { // Index data files. repeated DataFile index_files = 1; + + // Key is locally assigned manifest IDs by a local operation. + map index_manifest_files = 2; } // A patch describing metadata changes to the storage. diff --git a/python/src/space/core/proto/runtime_pb2.py b/python/src/space/core/proto/runtime_pb2.py index 62a45cb..742a4e2 100644 --- a/python/src/space/core/proto/runtime_pb2.py +++ b/python/src/space/core/proto/runtime_pb2.py @@ -14,21 +14,25 @@ from space.core.proto import metadata_pb2 as space_dot_core_dot_proto_dot_metadata__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1espace/core/proto/runtime.proto\x12\x0bspace.proto\x1a\x1fspace/core/proto/metadata.proto\"T\n\x08\x44\x61taFile\x12\x0c\n\x04path\x18\x01 \x01(\t\x12:\n\x12storage_statistics\x18\x04 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\"5\n\x07\x46ileSet\x12*\n\x0bindex_files\x18\x01 \x03(\x0b\x32\x15.space.proto.DataFile\"\xa6\x01\n\x05Patch\x12,\n\x08\x61\x64\x64ition\x18\x01 \x01(\x0b\x32\x1a.space.proto.ManifestFiles\x12,\n\x08\x64\x65letion\x18\x02 \x01(\x0b\x32\x1a.space.proto.ManifestFiles\x12\x41\n\x19storage_statistics_update\x18\x03 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\"\xc3\x01\n\tJobResult\x12+\n\x05state\x18\x01 \x01(\x0e\x32\x1c.space.proto.JobResult.State\x12\x41\n\x19storage_statistics_update\x18\x02 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\"F\n\x05State\x12\x15\n\x11STATE_UNSPECIFIED\x10\x00\x12\r\n\tSUCCEEDED\x10\x01\x12\n\n\x06\x46\x41ILED\x10\x02\x12\x0b\n\x07SKIPPED\x10\x03\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1espace/core/proto/runtime.proto\x12\x0bspace.proto\x1a\x1fspace/core/proto/metadata.proto\"n\n\x08\x44\x61taFile\x12\x0c\n\x04path\x18\x01 \x01(\t\x12:\n\x12storage_statistics\x18\x02 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12\x18\n\x10manifest_file_id\x18\x03 \x01(\x03\"\xbc\x01\n\x07\x46ileSet\x12*\n\x0bindex_files\x18\x01 \x03(\x0b\x32\x15.space.proto.DataFile\x12J\n\x14index_manifest_files\x18\x02 \x03(\x0b\x32,.space.proto.FileSet.IndexManifestFilesEntry\x1a\x39\n\x17IndexManifestFilesEntry\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xa6\x01\n\x05Patch\x12,\n\x08\x61\x64\x64ition\x18\x01 \x01(\x0b\x32\x1a.space.proto.ManifestFiles\x12,\n\x08\x64\x65letion\x18\x02 \x01(\x0b\x32\x1a.space.proto.ManifestFiles\x12\x41\n\x19storage_statistics_update\x18\x03 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\"\xc3\x01\n\tJobResult\x12+\n\x05state\x18\x01 \x01(\x0e\x32\x1c.space.proto.JobResult.State\x12\x41\n\x19storage_statistics_update\x18\x02 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\"F\n\x05State\x12\x15\n\x11STATE_UNSPECIFIED\x10\x00\x12\r\n\tSUCCEEDED\x10\x01\x12\n\n\x06\x46\x41ILED\x10\x02\x12\x0b\n\x07SKIPPED\x10\x03\x62\x06proto3') _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'space.core.proto.runtime_pb2', globals()) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None + _FILESET_INDEXMANIFESTFILESENTRY._options = None + _FILESET_INDEXMANIFESTFILESENTRY._serialized_options = b'8\001' _DATAFILE._serialized_start=80 - _DATAFILE._serialized_end=164 - _FILESET._serialized_start=166 - _FILESET._serialized_end=219 - _PATCH._serialized_start=222 - _PATCH._serialized_end=388 - _JOBRESULT._serialized_start=391 - _JOBRESULT._serialized_end=586 - _JOBRESULT_STATE._serialized_start=516 - _JOBRESULT_STATE._serialized_end=586 + _DATAFILE._serialized_end=190 + _FILESET._serialized_start=193 + _FILESET._serialized_end=381 + _FILESET_INDEXMANIFESTFILESENTRY._serialized_start=324 + _FILESET_INDEXMANIFESTFILESENTRY._serialized_end=381 + _PATCH._serialized_start=384 + _PATCH._serialized_end=550 + _JOBRESULT._serialized_start=553 + _JOBRESULT._serialized_end=748 + _JOBRESULT_STATE._serialized_start=678 + _JOBRESULT_STATE._serialized_end=748 # @@protoc_insertion_point(module_scope) diff --git a/python/src/space/core/proto/runtime_pb2.pyi b/python/src/space/core/proto/runtime_pb2.pyi index c5a5b69..9983c5a 100644 --- a/python/src/space/core/proto/runtime_pb2.pyi +++ b/python/src/space/core/proto/runtime_pb2.pyi @@ -34,44 +34,73 @@ DESCRIPTOR: google.protobuf.descriptor.FileDescriptor @typing_extensions.final class DataFile(google.protobuf.message.Message): - """Information of a data file.""" + """Information of a data file. + NEXT_ID: 4 + """ DESCRIPTOR: google.protobuf.descriptor.Descriptor PATH_FIELD_NUMBER: builtins.int STORAGE_STATISTICS_FIELD_NUMBER: builtins.int + MANIFEST_FILE_ID_FIELD_NUMBER: builtins.int path: builtins.str """Data file path.""" @property def storage_statistics(self) -> space.core.proto.metadata_pb2.StorageStatistics: """Storage statistics of data in the file.""" + manifest_file_id: builtins.int + """Locally assigned manifest file IDs.""" def __init__( self, *, path: builtins.str = ..., storage_statistics: space.core.proto.metadata_pb2.StorageStatistics | None = ..., + manifest_file_id: builtins.int = ..., ) -> None: ... def HasField(self, field_name: typing_extensions.Literal["storage_statistics", b"storage_statistics"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["path", b"path", "storage_statistics", b"storage_statistics"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["manifest_file_id", b"manifest_file_id", "path", b"path", "storage_statistics", b"storage_statistics"]) -> None: ... global___DataFile = DataFile @typing_extensions.final class FileSet(google.protobuf.message.Message): - """A set of associated data and manifest files.""" + """A set of associated data and manifest files. + NEXT_ID: 2 + """ DESCRIPTOR: google.protobuf.descriptor.Descriptor + @typing_extensions.final + class IndexManifestFilesEntry(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + KEY_FIELD_NUMBER: builtins.int + VALUE_FIELD_NUMBER: builtins.int + key: builtins.int + value: builtins.str + def __init__( + self, + *, + key: builtins.int = ..., + value: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ... + INDEX_FILES_FIELD_NUMBER: builtins.int + INDEX_MANIFEST_FILES_FIELD_NUMBER: builtins.int @property def index_files(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___DataFile]: """Index data files.""" + @property + def index_manifest_files(self) -> google.protobuf.internal.containers.ScalarMap[builtins.int, builtins.str]: + """Key is locally assigned manifest IDs by a local operation.""" def __init__( self, *, index_files: collections.abc.Iterable[global___DataFile] | None = ..., + index_manifest_files: collections.abc.Mapping[builtins.int, builtins.str] | None = ..., ) -> None: ... - def ClearField(self, field_name: typing_extensions.Literal["index_files", b"index_files"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["index_files", b"index_files", "index_manifest_files", b"index_manifest_files"]) -> None: ... global___FileSet = FileSet diff --git a/python/src/space/core/schema/constants.py b/python/src/space/core/schema/constants.py index 51ec864..a725e3e 100644 --- a/python/src/space/core/schema/constants.py +++ b/python/src/space/core/schema/constants.py @@ -28,3 +28,7 @@ STATS_FIELD = "_STATS" MIN_FIELD = "_MIN" MAX_FIELD = "_MAX" + +# Manifest file fields. +INDEX_COMPRESSED_BYTES_FIELD = '_INDEX_COMPRESSED_BYTES' +INDEX_UNCOMPRESSED_BYTES_FIELD = '_INDEX_UNCOMPRESSED_BYTES' diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index b3526ac..b75bda9 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -176,18 +176,25 @@ def data_files(self, manifest_files = self.snapshot(snapshot_id).manifest_files result = runtime.FileSet() + # A temporily assigned identifier for tracking manifest files. + # Start from 1 to detect unassigned values 0 that is default. + manifest_file_id = 1 + # Construct falsifiable filter to prune manifest rows. manifest_filter = None if filter_ is not None: manifest_filter = build_manifest_filter(self._physical_schema, self._field_name_ids, filter_) - for f in manifest_files.index_manifest_files: - result_per_manifest = read_index_manifests(self.full_path(f), + for manifest_file in manifest_files.index_manifest_files: + result_per_manifest = read_index_manifests(self.full_path(manifest_file), + manifest_file_id, manifest_filter) if not result_per_manifest.index_files: continue + result.index_manifest_files[manifest_file_id] = manifest_file + manifest_file_id += 1 result.index_files.extend(result_per_manifest.index_files) return result @@ -212,6 +219,12 @@ def _write_metadata( def _patch_manifests(manifest_files: meta.ManifestFiles, patch: runtime.Patch): """Apply changes in a patch to manifest files for a commit.""" + # Process deleted index manifest files. + deleted_manifests = set(patch.deletion.index_manifest_files) + for i in range(len(manifest_files.index_manifest_files) - 1, -1, -1): + if manifest_files.index_manifest_files[i] in deleted_manifests: + del manifest_files.index_manifest_files[i] + # Process added manifest files. for f in patch.addition.index_manifest_files: manifest_files.index_manifest_files.append(f) diff --git a/python/tests/core/manifests/test_index.py b/python/tests/core/manifests/test_index.py index 8e8a8b3..a14cb42 100644 --- a/python/tests/core/manifests/test_index.py +++ b/python/tests/core/manifests/test_index.py @@ -73,10 +73,50 @@ def test_write_all_types_and_read(self, tmp_path): }) ])) + # Test directly write manifests. + external_manifests = { + "_FILE": ["data/file2", "data/file3"], + "_INDEX_COMPRESSED_BYTES": [100, 200], + "_INDEX_UNCOMPRESSED_BYTES": [300, 400], + "_NUM_ROWS": [10, 20], + "_STATS_f0": [{ + "_MAX": -100, + "_MIN": -200 + }, { + "_MAX": -300, + "_MIN": -400 + }], + "_STATS_f1": [{ + "_MAX": -100.1, + "_MIN": -200.1 + }, { + "_MAX": -300.1, + "_MIN": -400.1 + }], + "_STATS_f2": [{ + "_MAX": True, + "_MIN": False + }, { + "_MAX": False, + "_MIN": False + }], + "_STATS_f3": [{ + "_MAX": "z", + "_MIN": "A" + }, { + "_MAX": "abcedf", + "_MIN": "ABCDEF" + }] + } + manifest_writer.write_arrow( + pa.Table.from_pydict(external_manifests, + schema=manifest_writer.manifest_schema)) + manifest_path = manifest_writer.finish() assert manifest_path is not None - assert pq.read_table(manifest_path).to_pydict() == { + + expected_manifests = { "_FILE": ["data/file0", "data/file1"], "_INDEX_COMPRESSED_BYTES": [645, 334], "_INDEX_UNCOMPRESSED_BYTES": [624, 320], @@ -111,26 +151,46 @@ def test_write_all_types_and_read(self, tmp_path): }] } - assert read_index_manifests(manifest_path) == runtime.FileSet(index_files=[ - runtime.DataFile(path="data/file0", - storage_statistics=meta.StorageStatistics( - num_rows=5, - index_compressed_bytes=645, - index_uncompressed_bytes=624)), - runtime.DataFile(path="data/file1", - storage_statistics=meta.StorageStatistics( - num_rows=2, - index_compressed_bytes=334, - index_uncompressed_bytes=320)) - ]) + for k, v in external_manifests.items(): + expected_manifests[k] = v + expected_manifests[k] + + assert pq.read_table(manifest_path).to_pydict() == expected_manifests + assert read_index_manifests( + manifest_path, 123) == runtime.FileSet(index_files=[ + runtime.DataFile(path="data/file2", + manifest_file_id=123, + storage_statistics=meta.StorageStatistics( + num_rows=10, + index_compressed_bytes=100, + index_uncompressed_bytes=300)), + runtime.DataFile(path="data/file3", + manifest_file_id=123, + storage_statistics=meta.StorageStatistics( + num_rows=20, + index_compressed_bytes=200, + index_uncompressed_bytes=400)), + runtime.DataFile(path="data/file0", + manifest_file_id=123, + storage_statistics=meta.StorageStatistics( + num_rows=5, + index_compressed_bytes=645, + index_uncompressed_bytes=624)), + runtime.DataFile(path="data/file1", + manifest_file_id=123, + storage_statistics=meta.StorageStatistics( + num_rows=2, + index_compressed_bytes=334, + index_uncompressed_bytes=320)) + ]) # Test index manifest filtering. # TODO: to move it to a separate test and add more test cases. filtered_manifests = read_index_manifests( - manifest_path, + manifest_path, 0, pc.field("_STATS_f3", "_MIN") >= "ABCDEF") - assert len(filtered_manifests.index_files) == 1 - assert filtered_manifests.index_files[0].path == "data/file1" + assert len(filtered_manifests.index_files) == 2 + assert filtered_manifests.index_files[0].path == "data/file3" + assert filtered_manifests.index_files[1].path == "data/file1" def test_write_collet_stats_for_primary_keys_only(self, tmp_path): data_dir = tmp_path / "dataset" / "data" diff --git a/python/tests/core/ops/test_append.py b/python/tests/core/ops/test_append.py index 7cbee2b..d96b005 100644 --- a/python/tests/core/ops/test_append.py +++ b/python/tests/core/ops/test_append.py @@ -104,6 +104,8 @@ def test_write_pydict_with_record_fields(self, tmp_path, # Data file exists. self._check_file_exists(location, index_manifest["_FILE"]) self._check_file_exists(location, record_manifest["_FILE"]) + assert self._file_schema( + location, index_manifest["_FILE"][0]) == storage.physical_schema # Validate statistics. assert patch.storage_statistics_update == meta.StorageStatistics( @@ -134,3 +136,6 @@ def _read_manifests(self, storage: Storage, def _check_file_exists(self, location, file_paths: List[str]): for f in file_paths: assert (location / f).exists() + + def _file_schema(self, location, file_path: str) -> pa.Schema: + return pq.read_schema(str(location / file_path)) diff --git a/python/tests/core/ops/test_delete.py b/python/tests/core/ops/test_delete.py new file mode 100644 index 0000000..edf1453 --- /dev/null +++ b/python/tests/core/ops/test_delete.py @@ -0,0 +1,75 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pyarrow as pa +import pyarrow.compute as pc + +from space.core.ops import LocalAppendOp +from space.core.ops import FileSetReadOp +from space.core.ops import FileSetDeleteOp +from space.core.storage import Storage + + +class TestFileSetDeleteOp: + + # TODO: to add tests using Arrow table input. + def test_delete_all_types(self, tmp_path, all_types_schema, + all_types_input_data): + location = tmp_path / "dataset" + storage = Storage.create(location=str(location), + schema=all_types_schema, + primary_keys=["int64"], + record_fields=[]) + + append_op = LocalAppendOp(str(location), storage.metadata) + # TODO: the test should cover all types supported by column stats. + input_data = [pa.Table.from_pydict(d) for d in all_types_input_data] + for batch in input_data: + append_op.write(batch) + + storage.commit(append_op.finish()) + old_data_files = storage.data_files() + + delete_op = FileSetDeleteOp( + str(location), + storage.metadata, + storage.data_files(), + # pylint: disable=singleton-comparison + filter_=pc.field("bool") == False) + patch = delete_op.delete() + assert patch is not None + storage.commit(patch) + + # Verify storage metadata after patch. + new_data_files = storage.data_files() + + def validate_data_files(data_files, patch_manifests): + assert len(data_files.index_manifest_files) == 1 + assert len(patch_manifests.index_manifest_files) == 1 + assert data_files.index_manifest_files[ + 1] == patch_manifests.index_manifest_files[0] + + validate_data_files(old_data_files, patch.deletion) + validate_data_files(new_data_files, patch.addition) + + read_op = FileSetReadOp(str(location), storage.metadata, + storage.data_files()) + results = list(iter(read_op)) + assert len(results) == 1 + assert list(iter(read_op))[0] == pa.Table.from_pydict({ + "int64": [1], + "float64": [0.1], + "bool": [True], + "string": ["a"] + }) diff --git a/python/tests/core/ops/test_read.py b/python/tests/core/ops/test_read.py index 046ac47..ce3e0f9 100644 --- a/python/tests/core/ops/test_read.py +++ b/python/tests/core/ops/test_read.py @@ -38,9 +38,9 @@ def test_read_all_types(self, tmp_path, all_types_schema, append_op.write(batch) storage.commit(append_op.finish()) - data_files = storage.data_files() - read_op = FileSetReadOp(str(location), storage.metadata, data_files) + read_op = FileSetReadOp(str(location), storage.metadata, + storage.data_files()) results = list(iter(read_op)) assert len(results) == 1 assert list(iter(read_op))[0] == pa.concat_tables(input_data) @@ -49,7 +49,7 @@ def test_read_all_types(self, tmp_path, all_types_schema, read_op = FileSetReadOp( str(location), storage.metadata, - data_files, + storage.data_files(), # pylint: disable=singleton-comparison filter_=pc.field("bool") == True) results = list(iter(read_op)) diff --git a/python/tests/core/test_storage.py b/python/tests/core/test_storage.py index 985c324..6e4707d 100644 --- a/python/tests/core/test_storage.py +++ b/python/tests/core/test_storage.py @@ -135,14 +135,15 @@ def test_commit(self, tmp_path): assert new_snapshot.storage_statistics == added_storage_statistics # Add more manifests + added_storage_statistics2 = meta.StorageStatistics( + num_rows=100, + index_compressed_bytes=100, + index_uncompressed_bytes=200, + record_uncompressed_bytes=300) patch = runtime.Patch(addition=meta.ManifestFiles( index_manifest_files=["data/index_manifest1"], record_manifest_files=["data/record_manifest1"]), - storage_statistics_update=meta.StorageStatistics( - num_rows=100, - index_compressed_bytes=100, - index_uncompressed_bytes=200, - record_uncompressed_bytes=300)) + storage_statistics_update=added_storage_statistics2) storage.commit(patch) new_snapshot = storage.snapshot(2) @@ -157,6 +158,21 @@ def test_commit(self, tmp_path): index_uncompressed_bytes=220, record_uncompressed_bytes=330) + # Test deletion. + patch = runtime.Patch(deletion=meta.ManifestFiles( + index_manifest_files=["data/index_manifest0"]), + storage_statistics_update=meta.StorageStatistics( + num_rows=-123, + index_compressed_bytes=-10, + index_uncompressed_bytes=-20, + record_uncompressed_bytes=-30)) + storage.commit(patch) + new_snapshot = storage.snapshot(3) + assert new_snapshot.manifest_files.index_manifest_files == [ + "data/index_manifest1" + ] + assert new_snapshot.storage_statistics == added_storage_statistics2 + def test_data_files(self, tmp_path): location = tmp_path / "dataset" data_dir = location / "data" @@ -186,14 +202,19 @@ def commit_add_index_manifest(manifest_path: str): "string": ["a", "b", "c"] }) ])) - commit_add_index_manifest(manifest_writer.finish()) + manifest_file = manifest_writer.finish() + commit_add_index_manifest(manifest_file) + manifests_dict1 = {1: storage.short_path(manifest_file)} index_file0 = runtime.DataFile(path="data/file0", + manifest_file_id=1, storage_statistics=meta.StorageStatistics( num_rows=3, index_compressed_bytes=110, index_uncompressed_bytes=109)) - assert storage.data_files() == runtime.FileSet(index_files=[index_file0]) + + assert storage.data_files() == runtime.FileSet( + index_files=[index_file0], index_manifest_files=manifests_dict1) # Write the 2nd data file, generate manifest, and commit. manifest_writer = create_index_manifest_writer() @@ -205,22 +226,30 @@ def commit_add_index_manifest(manifest_path: str): "string": ["abcedf", "ABCDEF"] }) ])) - commit_add_index_manifest(manifest_writer.finish()) + manifest_file = manifest_writer.finish() + commit_add_index_manifest(manifest_file) + manifests_dict2 = manifests_dict1.copy() + manifests_dict2[2] = storage.short_path(manifest_file) index_file1 = runtime.DataFile(path="data/file1", + manifest_file_id=2, storage_statistics=meta.StorageStatistics( num_rows=2, index_compressed_bytes=104, index_uncompressed_bytes=100)) + assert storage.data_files() == runtime.FileSet( - index_files=[index_file0, index_file1]) + index_files=[index_file0, index_file1], + index_manifest_files=manifests_dict2) # Test time travel data_files(). assert storage.data_files(snapshot_id=0) == runtime.FileSet() assert storage.data_files(snapshot_id=1) == runtime.FileSet( - index_files=[index_file0]) + index_files=[index_file0], index_manifest_files=manifests_dict1) # Test data_files() with filters. + index_file1.manifest_file_id = 1 assert storage.data_files( filter_=pc.field("int64") > 1000) == runtime.FileSet( - index_files=[index_file1]) + index_files=[index_file1], + index_manifest_files={1: manifests_dict2[2]})