From 0136c0acdaa92da1ddf2979a3f648248b6e8574b Mon Sep 17 00:00:00 2001 From: coufon Date: Wed, 27 Dec 2023 17:23:26 +0000 Subject: [PATCH] Implement change data feed --- python/pyproject.toml | 2 + python/src/space/core/datasets.py | 5 + .../core/manifests/falsifiable_filters.py | 21 ++- python/src/space/core/ops/append.py | 7 +- python/src/space/core/ops/change_data.py | 125 ++++++++++++++++++ python/src/space/core/ops/delete.py | 36 ++++- python/src/space/core/ops/insert.py | 4 +- python/src/space/core/proto/metadata.proto | 32 ++++- python/src/space/core/proto/metadata_pb2.py | 16 ++- python/src/space/core/proto/metadata_pb2.pyi | 64 ++++++++- python/src/space/core/proto/runtime.proto | 5 +- python/src/space/core/proto/runtime_pb2.py | 12 +- python/src/space/core/proto/runtime_pb2.pyi | 11 +- python/src/space/core/runners.py | 36 ++++- python/src/space/core/storage.py | 20 ++- python/src/space/core/utils/paths.py | 6 + .../manifests/test_falsifiable_filters.py | 41 +++--- python/tests/core/ops/test_change_data.py | 57 ++++++++ 18 files changed, 443 insertions(+), 57 deletions(-) create mode 100644 python/src/space/core/ops/change_data.py create mode 100644 python/tests/core/ops/test_change_data.py diff --git a/python/pyproject.toml b/python/pyproject.toml index 7d6c6fc..6c5bb0c 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -19,6 +19,7 @@ dependencies = [ "numpy", "protobuf", "pyarrow >= 14.0.0", + "pyroaring", "tensorflow_datasets", "typing_extensions", ] @@ -56,6 +57,7 @@ ignored-modules = [ "array_record", "datasets", "google.protobuf", + "pyroaring", "space.core.proto", "substrait" ] diff --git a/python/src/space/core/datasets.py b/python/src/space/core/datasets.py index 1b80577..fc01cba 100644 --- a/python/src/space/core/datasets.py +++ b/python/src/space/core/datasets.py @@ -66,3 +66,8 @@ def index_files(self) -> List[str]: """A list of full path of index files.""" data_files = self._storage.data_files() return [self._storage.full_path(f.path) for f in data_files.index_files] + + @property + def snapshot_ids(self) -> List[int]: + """A list of all alive snapshot IDs in the dataset.""" + return self._storage.snapshot_ids diff --git a/python/src/space/core/manifests/falsifiable_filters.py b/python/src/space/core/manifests/falsifiable_filters.py index e73848c..08b9edb 100644 --- a/python/src/space/core/manifests/falsifiable_filters.py +++ b/python/src/space/core/manifests/falsifiable_filters.py @@ -20,7 +20,7 @@ https://vldb.org/pvldb/vol14/p3083-edara.pdf. """ -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Set from functools import partial from absl import logging # type: ignore[import-untyped] @@ -36,12 +36,14 @@ from space.core.schema import constants -def build_manifest_filter(schema: pa.Schema, field_name_ids: Dict[str, int], +def build_manifest_filter(schema: pa.Schema, primary_keys: Set[str], + field_name_ids: Dict[str, int], filter_: pc.Expression) -> Optional[pc.Expression]: """Build a falsifiable filter on index manifest files column statistics. Args: schema: the storage schema. + primary_keys: the primary keys. filter_: a filter on data fields. field_name_ids: a dict of field names to IDs mapping. @@ -51,7 +53,8 @@ def build_manifest_filter(schema: pa.Schema, field_name_ids: Dict[str, int], expr = _substrait_expr(schema, filter_) try: - return ~_falsifiable_filter(expr, field_name_ids) # type: ignore[operator] + return ~_falsifiable_filter(expr, primary_keys, + field_name_ids) # type: ignore[operator] except _ExpressionException as e: logging.info( "Index manifest filter push-down is not used, query may be slower; " @@ -78,7 +81,7 @@ class _ExpressionException(Exception): """Raise for exceptions in Substrait expressions.""" -def _falsifiable_filter(filter_: ExtendedExpression, +def _falsifiable_filter(filter_: ExtendedExpression, primary_keys: Set[str], field_name_ids: Dict[str, int]) -> pc.Expression: if len(filter_.referred_expr) != 1: raise _ExpressionException( @@ -87,6 +90,7 @@ def _falsifiable_filter(filter_: ExtendedExpression, return _falsifiable_filter_internal( filter_.extensions, # type: ignore[arg-type] filter_.base_schema, + primary_keys, field_name_ids, filter_.referred_expr[0].expression.scalar_function) @@ -94,7 +98,7 @@ def _falsifiable_filter(filter_: ExtendedExpression, # pylint: disable=too-many-locals,too-many-return-statements def _falsifiable_filter_internal( extensions: List[SimpleExtensionDeclaration], base_schema: NamedStruct, - field_name_ids: Dict[str, int], + primary_keys: Set[str], field_name_ids: Dict[str, int], root: Expression.ScalarFunction) -> pc.Expression: if len(root.arguments) != 2: raise _ExpressionException( @@ -105,7 +109,7 @@ def _falsifiable_filter_internal( rhs = root.arguments[1].value falsifiable_filter_fn = partial(_falsifiable_filter_internal, extensions, - base_schema, field_name_ids) + base_schema, primary_keys, field_name_ids) # TODO: to support one side has scalar function, e.g., False | (a > 1). if _has_scalar_function(lhs) and _has_scalar_function(rhs): @@ -135,6 +139,11 @@ def _falsifiable_filter_internal( field_index = lhs.selection.direct_reference.struct_field.field field_name = base_schema.names[field_index] + # Only primary key fields have column statistics for falsifiable filter + # pruning. + if field_name not in primary_keys: + return pc.scalar(False) + field_id = field_name_ids[field_name] field_min, field_max = _stats_field_min(field_id), _stats_field_max(field_id) value = pc.scalar( diff --git a/python/src/space/core/ops/append.py b/python/src/space/core/ops/append.py index 649785b..8babead 100644 --- a/python/src/space/core/ops/append.py +++ b/python/src/space/core/ops/append.py @@ -228,12 +228,15 @@ def _finish_index_writer(self) -> None: self._index_writer_info.writer.close() # Update metadata in manifest files. + file_path = self._index_writer_info.file_path stats = self._index_manifest_writer.write( - self._index_writer_info.file_path, - self._index_writer_info.writer.writer.metadata) + file_path, self._index_writer_info.writer.writer.metadata) utils.update_index_storage_stats( base=self._patch.storage_statistics_update, update=stats) + self._patch.change_log.added_rows.append( + meta.RowBitmap(file=file_path, all_rows=True)) + self._index_writer_info = None self._cached_index_file_bytes = 0 diff --git a/python/src/space/core/ops/change_data.py b/python/src/space/core/ops/change_data.py new file mode 100644 index 0000000..02dca98 --- /dev/null +++ b/python/src/space/core/ops/change_data.py @@ -0,0 +1,125 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Change data feed that computes delta between two snapshots.""" + +from enum import Enum +from typing import Iterator, Tuple, List + +import pyarrow as pa +from pyroaring import BitMap # type: ignore[import-not-found] + +from space.core.fs.base import BaseFileSystem +from space.core.fs.factory import create_fs +from space.core.ops.read import FileSetReadOp +import space.core.proto.metadata_pb2 as meta +import space.core.proto.runtime_pb2 as runtime +from space.core.storage import Storage +from space.core.utils.paths import StoragePathsMixin + + +class ChangeType(Enum): + """Type of data changes.""" + # For added rows. + ADD = 1 + # For deleted rows. + DELETE = 2 + + +def read_change_data( + storage: Storage, start_snapshot_id: int, + end_snapshot_id: int) -> Iterator[Tuple[ChangeType, pa.Table]]: + """Read change data from a start to an end snapshot. + + start_snapshot_id is excluded; end_snapshot_id is included. + """ + if start_snapshot_id > end_snapshot_id: + raise RuntimeError( + f"End snapshot ID {end_snapshot_id} should not be lower than start " + f"snapshot ID {start_snapshot_id}") + + all_snapshot_ids = sorted(storage.snapshot_ids) + all_snapshot_ids_set = set(all_snapshot_ids) + + if start_snapshot_id not in all_snapshot_ids_set: + raise RuntimeError(f"Start snapshot ID not found: {start_snapshot_id}") + + if end_snapshot_id not in all_snapshot_ids_set: + raise RuntimeError(f"Start snapshot ID not found: {end_snapshot_id}") + + for snapshot_id in all_snapshot_ids: + if snapshot_id <= start_snapshot_id: + continue + + if snapshot_id > end_snapshot_id: + break + + for result in iter(_LocalChangeDataReadOp(storage, snapshot_id)): + yield result + + +class _LocalChangeDataReadOp(StoragePathsMixin): + """Read changes of data from a given snapshot of a dataset.""" + + def __init__(self, storage: Storage, snapshot_id: int): + StoragePathsMixin.__init__(self, storage.location) + + self._storage = storage + self._metadata = self._storage.metadata + + if snapshot_id not in self._metadata.snapshots: + raise RuntimeError( + f"Change data read can't find snapshot ID {snapshot_id}") + + snapshot = self._metadata.snapshots[snapshot_id] + + fs = create_fs(self._location) + change_log_file = self._storage.full_path(snapshot.change_log_file) + self._change_log = _read_change_log_proto(fs, change_log_file) + + def __iter__(self) -> Iterator[Tuple[ChangeType, pa.Table]]: + for bitmap in self._change_log.deleted_rows: + yield self._read_bitmap_rows(ChangeType.DELETE, bitmap) + + for bitmap in self._change_log.added_rows: + yield self._read_bitmap_rows(ChangeType.ADD, bitmap) + + def _read_bitmap_rows(self, change_type: ChangeType, + bitmap: meta.RowBitmap) -> Tuple[ChangeType, pa.Table]: + file_set = runtime.FileSet( + index_files=[runtime.DataFile(path=bitmap.file)]) + read_op = FileSetReadOp(self._storage.location, self._metadata, file_set) + + data = pa.concat_tables(list(iter(read_op))) + # TODO: to read index fields first, apply mask, then read record fields. + if not bitmap.all_rows: + data = data.filter( + mask=_bitmap_mask(bitmap.roaring_bitmap, data.num_rows)) + + return (change_type, data) + + +def _read_change_log_proto(fs: BaseFileSystem, + file_path: str) -> meta.ChangeLog: + return fs.read_proto(file_path, meta.ChangeLog()) + + +def _bitmap_mask(serialized_bitmap: bytes, num_rows: int) -> List[bool]: + bitmap = BitMap.deserialize(serialized_bitmap) + + mask = [False] * num_rows + for row_id in bitmap.to_array(): + mask[row_id] = True + + return mask diff --git a/python/src/space/core/ops/delete.py b/python/src/space/core/ops/delete.py index 89951f5..117605e 100644 --- a/python/src/space/core/ops/delete.py +++ b/python/src/space/core/ops/delete.py @@ -17,9 +17,11 @@ from abc import abstractmethod from typing import List, Optional, Set +import numpy as np import pyarrow as pa import pyarrow.parquet as pq import pyarrow.compute as pc +from pyroaring import BitMap # type: ignore[import-not-found] from space.core.ops import utils from space.core.ops.append import LocalAppendOp @@ -29,6 +31,9 @@ from space.core.utils.paths import StoragePathsMixin from space.core.schema import constants +# A temporary row ID field used for generating deletion row bitmap. +_ROW_ID_FIELD = "__ROW_ID" + class BaseDeleteOp(BaseOp): """Abstract base delete operation class. @@ -73,6 +78,7 @@ def __init__(self, location: str, metadata: meta.StorageMetadata, def delete(self) -> Optional[runtime.Patch]: # The index files and manifests deleted, to remove them from index # manifests. + patch = runtime.Patch() deleted_files: List[str] = [] deleted_manifest_ids: Set[int] = set() @@ -82,8 +88,13 @@ def delete(self) -> Optional[runtime.Patch]: utils.update_index_storage_stats(stats_before_delete, file.storage_statistics) - index_data = pq.read_table(self.full_path(file.path), - filters=self._reinsert_filter) + # TODO: this can be down at row group level. + index_data = pq.read_table(self.full_path(file.path)) + index_data = index_data.append_column( + _ROW_ID_FIELD, + pa.array(np.arange(0, index_data.num_rows, + dtype=np.int32))) # type: ignore[arg-type] + index_data = index_data.filter(mask=self._reinsert_filter) # No row is deleted. No need to re-insert rows. if index_data.num_rows == file.storage_statistics.num_rows: @@ -93,6 +104,11 @@ def delete(self) -> Optional[runtime.Patch]: deleted_rows += (file.storage_statistics.num_rows - index_data.num_rows) all_deleted = index_data.num_rows == 0 + # Compute deleted row bitmap for change log. + patch.change_log.deleted_rows.append( + _build_bitmap(file, index_data, all_deleted)) + index_data = index_data.drop(_ROW_ID_FIELD) # type: ignore[attr-defined] + # Record deleted files and manifests information. deleted_files.append(file.path) deleted_manifest_ids.add(file.manifest_file_id) @@ -130,7 +146,6 @@ def delete(self) -> Optional[runtime.Patch]: reinsert_patch = self._append_op.finish() # Populate the patch for the delete. - patch = runtime.Patch() if reinsert_patch is not None: patch.addition.index_manifest_files.extend( reinsert_patch.addition.index_manifest_files) @@ -189,3 +204,18 @@ def _validate_files(file_set: runtime.FileSet) -> bool: return False return len(file_set.index_manifest_files) > 0 + + +def _build_bitmap(file: runtime.DataFile, index_data: pa.Table, + all_deleted: bool) -> meta.RowBitmap: + row_bitmap = meta.RowBitmap(file=file.path) + if all_deleted: + row_bitmap.all_rows = True + else: + deleted_bitmaps = BitMap() + deleted_bitmaps.add_range(0, file.storage_statistics.num_rows) + survivor_bitmaps = BitMap(index_data.column(_ROW_ID_FIELD).to_numpy()) + deleted_bitmaps.difference_update(survivor_bitmaps) + row_bitmap.roaring_bitmap = deleted_bitmaps.serialize() + + return row_bitmap diff --git a/python/src/space/core/ops/insert.py b/python/src/space/core/ops/insert.py index cbed6ac..9d8b646 100644 --- a/python/src/space/core/ops/insert.py +++ b/python/src/space/core/ops/insert.py @@ -56,8 +56,8 @@ def write(self, data: InputData) -> Optional[runtime.Patch]: class LocalInsertOp(BaseInsertOp, StoragePathsMixin): '''Append data to a dataset.''' - def __init__(self, location: str, storage: Storage, options: InsertOptions): - StoragePathsMixin.__init__(self, location) + def __init__(self, storage: Storage, options: InsertOptions): + StoragePathsMixin.__init__(self, storage.location) self._storage = storage self._metadata = self._storage.metadata diff --git a/python/src/space/core/proto/metadata.proto b/python/src/space/core/proto/metadata.proto index a66c227..82fad91 100644 --- a/python/src/space/core/proto/metadata.proto +++ b/python/src/space/core/proto/metadata.proto @@ -74,7 +74,7 @@ message Schema { // Storage snapshot persisting physical metadata such as manifest file paths. // It is used for obtaining all alive data file paths for a given snapshot. -// NEXT_ID: 5 +// NEXT_ID: 6 message Snapshot { // The snapshot ID. int64 snapshot_id = 1; @@ -91,6 +91,9 @@ message Snapshot { // Statistics of all data in the storage. StorageStatistics storage_statistics = 4; + + // File path of the change log of the snapshot. + string change_log_file = 5; } // Stores information of manifest files. @@ -118,3 +121,30 @@ message StorageStatistics { // Uncompressed bytes of record data. int64 record_uncompressed_bytes = 4; } + + +// Change log stores changes made by a snapshot. +// NEXT_ID: 3 +message ChangeLog { + // Rows deleted in this snapshot. + repeated RowBitmap deleted_rows = 1; + + // New rows added in this snapshot. + repeated RowBitmap added_rows = 2; +} + + +// Mark rows in a file by bitmap. +// NEXT_ID: 4 +message RowBitmap { + // File path that the bit map applies to. + string file = 1; + + // All rows are selected. Bitmap is empty in this case. + bool all_rows = 2; + + oneof bitmap { + // Roaring bitmap. + bytes roaring_bitmap = 3; + } +} diff --git a/python/src/space/core/proto/metadata_pb2.py b/python/src/space/core/proto/metadata_pb2.py index 07481b1..ae41786 100644 --- a/python/src/space/core/proto/metadata_pb2.py +++ b/python/src/space/core/proto/metadata_pb2.py @@ -15,7 +15,7 @@ from substrait import type_pb2 as substrait_dot_type__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1fspace/core/proto/metadata.proto\x12\x0bspace.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x14substrait/type.proto\"#\n\nEntryPoint\x12\x15\n\rmetadata_file\x18\x01 \x01(\t\"\x9f\x03\n\x0fStorageMetadata\x12/\n\x0b\x63reate_time\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x10last_update_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12/\n\x04type\x18\x03 \x01(\x0e\x32!.space.proto.StorageMetadata.Type\x12#\n\x06schema\x18\x04 \x01(\x0b\x32\x13.space.proto.Schema\x12\x1b\n\x13\x63urrent_snapshot_id\x18\x05 \x01(\x03\x12>\n\tsnapshots\x18\x06 \x03(\x0b\x32+.space.proto.StorageMetadata.SnapshotsEntry\x1aG\n\x0eSnapshotsEntry\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12$\n\x05value\x18\x02 \x01(\x0b\x32\x15.space.proto.Snapshot:\x02\x38\x01\")\n\x04Type\x12\x14\n\x10TYPE_UNSPECIFIED\x10\x00\x12\x0b\n\x07\x44\x41TASET\x10\x01\"]\n\x06Schema\x12&\n\x06\x66ields\x18\x01 \x01(\x0b\x32\x16.substrait.NamedStruct\x12\x14\n\x0cprimary_keys\x18\x02 \x03(\t\x12\x15\n\rrecord_fields\x18\x03 \x03(\t\"\xcf\x01\n\x08Snapshot\x12\x13\n\x0bsnapshot_id\x18\x01 \x01(\x03\x12/\n\x0b\x63reate_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x0emanifest_files\x18\x03 \x01(\x0b\x32\x1a.space.proto.ManifestFilesH\x00\x12:\n\x12storage_statistics\x18\x04 \x01(\x0b\x32\x1e.space.proto.StorageStatisticsB\x0b\n\tdata_info\"L\n\rManifestFiles\x12\x1c\n\x14index_manifest_files\x18\x01 \x03(\t\x12\x1d\n\x15record_manifest_files\x18\x02 \x03(\t\"\x8a\x01\n\x11StorageStatistics\x12\x10\n\x08num_rows\x18\x01 \x01(\x03\x12\x1e\n\x16index_compressed_bytes\x18\x02 \x01(\x03\x12 \n\x18index_uncompressed_bytes\x18\x03 \x01(\x03\x12!\n\x19record_uncompressed_bytes\x18\x04 \x01(\x03\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1fspace/core/proto/metadata.proto\x12\x0bspace.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x14substrait/type.proto\"#\n\nEntryPoint\x12\x15\n\rmetadata_file\x18\x01 \x01(\t\"\x9f\x03\n\x0fStorageMetadata\x12/\n\x0b\x63reate_time\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x10last_update_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12/\n\x04type\x18\x03 \x01(\x0e\x32!.space.proto.StorageMetadata.Type\x12#\n\x06schema\x18\x04 \x01(\x0b\x32\x13.space.proto.Schema\x12\x1b\n\x13\x63urrent_snapshot_id\x18\x05 \x01(\x03\x12>\n\tsnapshots\x18\x06 \x03(\x0b\x32+.space.proto.StorageMetadata.SnapshotsEntry\x1aG\n\x0eSnapshotsEntry\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12$\n\x05value\x18\x02 \x01(\x0b\x32\x15.space.proto.Snapshot:\x02\x38\x01\")\n\x04Type\x12\x14\n\x10TYPE_UNSPECIFIED\x10\x00\x12\x0b\n\x07\x44\x41TASET\x10\x01\"]\n\x06Schema\x12&\n\x06\x66ields\x18\x01 \x01(\x0b\x32\x16.substrait.NamedStruct\x12\x14\n\x0cprimary_keys\x18\x02 \x03(\t\x12\x15\n\rrecord_fields\x18\x03 \x03(\t\"\xe8\x01\n\x08Snapshot\x12\x13\n\x0bsnapshot_id\x18\x01 \x01(\x03\x12/\n\x0b\x63reate_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x0emanifest_files\x18\x03 \x01(\x0b\x32\x1a.space.proto.ManifestFilesH\x00\x12:\n\x12storage_statistics\x18\x04 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12\x17\n\x0f\x63hange_log_file\x18\x05 \x01(\tB\x0b\n\tdata_info\"L\n\rManifestFiles\x12\x1c\n\x14index_manifest_files\x18\x01 \x03(\t\x12\x1d\n\x15record_manifest_files\x18\x02 \x03(\t\"\x8a\x01\n\x11StorageStatistics\x12\x10\n\x08num_rows\x18\x01 \x01(\x03\x12\x1e\n\x16index_compressed_bytes\x18\x02 \x01(\x03\x12 \n\x18index_uncompressed_bytes\x18\x03 \x01(\x03\x12!\n\x19record_uncompressed_bytes\x18\x04 \x01(\x03\"e\n\tChangeLog\x12,\n\x0c\x64\x65leted_rows\x18\x01 \x03(\x0b\x32\x16.space.proto.RowBitmap\x12*\n\nadded_rows\x18\x02 \x03(\x0b\x32\x16.space.proto.RowBitmap\"O\n\tRowBitmap\x12\x0c\n\x04\x66ile\x18\x01 \x01(\t\x12\x10\n\x08\x61ll_rows\x18\x02 \x01(\x08\x12\x18\n\x0eroaring_bitmap\x18\x03 \x01(\x0cH\x00\x42\x08\n\x06\x62itmapb\x06proto3') _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'space.core.proto.metadata_pb2', globals()) @@ -35,9 +35,13 @@ _SCHEMA._serialized_start=558 _SCHEMA._serialized_end=651 _SNAPSHOT._serialized_start=654 - _SNAPSHOT._serialized_end=861 - _MANIFESTFILES._serialized_start=863 - _MANIFESTFILES._serialized_end=939 - _STORAGESTATISTICS._serialized_start=942 - _STORAGESTATISTICS._serialized_end=1080 + _SNAPSHOT._serialized_end=886 + _MANIFESTFILES._serialized_start=888 + _MANIFESTFILES._serialized_end=964 + _STORAGESTATISTICS._serialized_start=967 + _STORAGESTATISTICS._serialized_end=1105 + _CHANGELOG._serialized_start=1107 + _CHANGELOG._serialized_end=1208 + _ROWBITMAP._serialized_start=1210 + _ROWBITMAP._serialized_end=1289 # @@protoc_insertion_point(module_scope) diff --git a/python/src/space/core/proto/metadata_pb2.pyi b/python/src/space/core/proto/metadata_pb2.pyi index d6ea363..f397354 100644 --- a/python/src/space/core/proto/metadata_pb2.pyi +++ b/python/src/space/core/proto/metadata_pb2.pyi @@ -174,7 +174,7 @@ global___Schema = Schema class Snapshot(google.protobuf.message.Message): """Storage snapshot persisting physical metadata such as manifest file paths. It is used for obtaining all alive data file paths for a given snapshot. - NEXT_ID: 5 + NEXT_ID: 6 """ DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -183,6 +183,7 @@ class Snapshot(google.protobuf.message.Message): CREATE_TIME_FIELD_NUMBER: builtins.int MANIFEST_FILES_FIELD_NUMBER: builtins.int STORAGE_STATISTICS_FIELD_NUMBER: builtins.int + CHANGE_LOG_FILE_FIELD_NUMBER: builtins.int snapshot_id: builtins.int """The snapshot ID.""" @property @@ -196,6 +197,8 @@ class Snapshot(google.protobuf.message.Message): @property def storage_statistics(self) -> global___StorageStatistics: """Statistics of all data in the storage.""" + change_log_file: builtins.str + """File path of the change log of the snapshot.""" def __init__( self, *, @@ -203,9 +206,10 @@ class Snapshot(google.protobuf.message.Message): create_time: google.protobuf.timestamp_pb2.Timestamp | None = ..., manifest_files: global___ManifestFiles | None = ..., storage_statistics: global___StorageStatistics | None = ..., + change_log_file: builtins.str = ..., ) -> None: ... def HasField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "data_info", b"data_info", "manifest_files", b"manifest_files", "storage_statistics", b"storage_statistics"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "data_info", b"data_info", "manifest_files", b"manifest_files", "snapshot_id", b"snapshot_id", "storage_statistics", b"storage_statistics"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["change_log_file", b"change_log_file", "create_time", b"create_time", "data_info", b"data_info", "manifest_files", b"manifest_files", "snapshot_id", b"snapshot_id", "storage_statistics", b"storage_statistics"]) -> None: ... def WhichOneof(self, oneof_group: typing_extensions.Literal["data_info", b"data_info"]) -> typing_extensions.Literal["manifest_files"] | None: ... global___Snapshot = Snapshot @@ -267,3 +271,59 @@ class StorageStatistics(google.protobuf.message.Message): def ClearField(self, field_name: typing_extensions.Literal["index_compressed_bytes", b"index_compressed_bytes", "index_uncompressed_bytes", b"index_uncompressed_bytes", "num_rows", b"num_rows", "record_uncompressed_bytes", b"record_uncompressed_bytes"]) -> None: ... global___StorageStatistics = StorageStatistics + +@typing_extensions.final +class ChangeLog(google.protobuf.message.Message): + """Change log stores changes made by a snapshot. + NEXT_ID: 3 + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + DELETED_ROWS_FIELD_NUMBER: builtins.int + ADDED_ROWS_FIELD_NUMBER: builtins.int + @property + def deleted_rows(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RowBitmap]: + """Rows deleted in this snapshot.""" + @property + def added_rows(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RowBitmap]: + """New rows added in this snapshot.""" + def __init__( + self, + *, + deleted_rows: collections.abc.Iterable[global___RowBitmap] | None = ..., + added_rows: collections.abc.Iterable[global___RowBitmap] | None = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["added_rows", b"added_rows", "deleted_rows", b"deleted_rows"]) -> None: ... + +global___ChangeLog = ChangeLog + +@typing_extensions.final +class RowBitmap(google.protobuf.message.Message): + """Mark rows in a file by bitmap. + NEXT_ID: 4 + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + FILE_FIELD_NUMBER: builtins.int + ALL_ROWS_FIELD_NUMBER: builtins.int + ROARING_BITMAP_FIELD_NUMBER: builtins.int + file: builtins.str + """File path that the bit map applies to.""" + all_rows: builtins.bool + """All rows are selected. Bitmap is empty in this case.""" + roaring_bitmap: builtins.bytes + """Roaring bitmap.""" + def __init__( + self, + *, + file: builtins.str = ..., + all_rows: builtins.bool = ..., + roaring_bitmap: builtins.bytes = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["bitmap", b"bitmap", "roaring_bitmap", b"roaring_bitmap"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["all_rows", b"all_rows", "bitmap", b"bitmap", "file", b"file", "roaring_bitmap", b"roaring_bitmap"]) -> None: ... + def WhichOneof(self, oneof_group: typing_extensions.Literal["bitmap", b"bitmap"]) -> typing_extensions.Literal["roaring_bitmap"] | None: ... + +global___RowBitmap = RowBitmap diff --git a/python/src/space/core/proto/runtime.proto b/python/src/space/core/proto/runtime.proto index dae9ceb..7eaf3ae 100644 --- a/python/src/space/core/proto/runtime.proto +++ b/python/src/space/core/proto/runtime.proto @@ -42,7 +42,7 @@ message FileSet { } // A patch describing metadata changes to the storage. -// NEXT_ID: 4 +// NEXT_ID: 5 message Patch { // Manifest files to add to the storage. ManifestFiles addition = 1; @@ -52,6 +52,9 @@ message Patch { // The change of the storage statistics. StorageStatistics storage_statistics_update = 3; + + // The change log describing the changes made by the patch. + ChangeLog change_log = 4; } // Result of a job. diff --git a/python/src/space/core/proto/runtime_pb2.py b/python/src/space/core/proto/runtime_pb2.py index 742a4e2..2d868b6 100644 --- a/python/src/space/core/proto/runtime_pb2.py +++ b/python/src/space/core/proto/runtime_pb2.py @@ -14,7 +14,7 @@ from space.core.proto import metadata_pb2 as space_dot_core_dot_proto_dot_metadata__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1espace/core/proto/runtime.proto\x12\x0bspace.proto\x1a\x1fspace/core/proto/metadata.proto\"n\n\x08\x44\x61taFile\x12\x0c\n\x04path\x18\x01 \x01(\t\x12:\n\x12storage_statistics\x18\x02 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12\x18\n\x10manifest_file_id\x18\x03 \x01(\x03\"\xbc\x01\n\x07\x46ileSet\x12*\n\x0bindex_files\x18\x01 \x03(\x0b\x32\x15.space.proto.DataFile\x12J\n\x14index_manifest_files\x18\x02 \x03(\x0b\x32,.space.proto.FileSet.IndexManifestFilesEntry\x1a\x39\n\x17IndexManifestFilesEntry\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xa6\x01\n\x05Patch\x12,\n\x08\x61\x64\x64ition\x18\x01 \x01(\x0b\x32\x1a.space.proto.ManifestFiles\x12,\n\x08\x64\x65letion\x18\x02 \x01(\x0b\x32\x1a.space.proto.ManifestFiles\x12\x41\n\x19storage_statistics_update\x18\x03 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\"\xc3\x01\n\tJobResult\x12+\n\x05state\x18\x01 \x01(\x0e\x32\x1c.space.proto.JobResult.State\x12\x41\n\x19storage_statistics_update\x18\x02 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\"F\n\x05State\x12\x15\n\x11STATE_UNSPECIFIED\x10\x00\x12\r\n\tSUCCEEDED\x10\x01\x12\n\n\x06\x46\x41ILED\x10\x02\x12\x0b\n\x07SKIPPED\x10\x03\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1espace/core/proto/runtime.proto\x12\x0bspace.proto\x1a\x1fspace/core/proto/metadata.proto\"n\n\x08\x44\x61taFile\x12\x0c\n\x04path\x18\x01 \x01(\t\x12:\n\x12storage_statistics\x18\x02 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12\x18\n\x10manifest_file_id\x18\x03 \x01(\x03\"\xbc\x01\n\x07\x46ileSet\x12*\n\x0bindex_files\x18\x01 \x03(\x0b\x32\x15.space.proto.DataFile\x12J\n\x14index_manifest_files\x18\x02 \x03(\x0b\x32,.space.proto.FileSet.IndexManifestFilesEntry\x1a\x39\n\x17IndexManifestFilesEntry\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xd2\x01\n\x05Patch\x12,\n\x08\x61\x64\x64ition\x18\x01 \x01(\x0b\x32\x1a.space.proto.ManifestFiles\x12,\n\x08\x64\x65letion\x18\x02 \x01(\x0b\x32\x1a.space.proto.ManifestFiles\x12\x41\n\x19storage_statistics_update\x18\x03 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12*\n\nchange_log\x18\x04 \x01(\x0b\x32\x16.space.proto.ChangeLog\"\xc3\x01\n\tJobResult\x12+\n\x05state\x18\x01 \x01(\x0e\x32\x1c.space.proto.JobResult.State\x12\x41\n\x19storage_statistics_update\x18\x02 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\"F\n\x05State\x12\x15\n\x11STATE_UNSPECIFIED\x10\x00\x12\r\n\tSUCCEEDED\x10\x01\x12\n\n\x06\x46\x41ILED\x10\x02\x12\x0b\n\x07SKIPPED\x10\x03\x62\x06proto3') _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'space.core.proto.runtime_pb2', globals()) @@ -30,9 +30,9 @@ _FILESET_INDEXMANIFESTFILESENTRY._serialized_start=324 _FILESET_INDEXMANIFESTFILESENTRY._serialized_end=381 _PATCH._serialized_start=384 - _PATCH._serialized_end=550 - _JOBRESULT._serialized_start=553 - _JOBRESULT._serialized_end=748 - _JOBRESULT_STATE._serialized_start=678 - _JOBRESULT_STATE._serialized_end=748 + _PATCH._serialized_end=594 + _JOBRESULT._serialized_start=597 + _JOBRESULT._serialized_end=792 + _JOBRESULT_STATE._serialized_start=722 + _JOBRESULT_STATE._serialized_end=792 # @@protoc_insertion_point(module_scope) diff --git a/python/src/space/core/proto/runtime_pb2.pyi b/python/src/space/core/proto/runtime_pb2.pyi index 3b41bfd..5407d67 100644 --- a/python/src/space/core/proto/runtime_pb2.pyi +++ b/python/src/space/core/proto/runtime_pb2.pyi @@ -107,7 +107,7 @@ global___FileSet = FileSet @typing_extensions.final class Patch(google.protobuf.message.Message): """A patch describing metadata changes to the storage. - NEXT_ID: 4 + NEXT_ID: 5 """ DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -115,6 +115,7 @@ class Patch(google.protobuf.message.Message): ADDITION_FIELD_NUMBER: builtins.int DELETION_FIELD_NUMBER: builtins.int STORAGE_STATISTICS_UPDATE_FIELD_NUMBER: builtins.int + CHANGE_LOG_FIELD_NUMBER: builtins.int @property def addition(self) -> space.core.proto.metadata_pb2.ManifestFiles: """Manifest files to add to the storage.""" @@ -124,15 +125,19 @@ class Patch(google.protobuf.message.Message): @property def storage_statistics_update(self) -> space.core.proto.metadata_pb2.StorageStatistics: """The change of the storage statistics.""" + @property + def change_log(self) -> space.core.proto.metadata_pb2.ChangeLog: + """The change log describing the changes made by the patch.""" def __init__( self, *, addition: space.core.proto.metadata_pb2.ManifestFiles | None = ..., deletion: space.core.proto.metadata_pb2.ManifestFiles | None = ..., storage_statistics_update: space.core.proto.metadata_pb2.StorageStatistics | None = ..., + change_log: space.core.proto.metadata_pb2.ChangeLog | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["addition", b"addition", "deletion", b"deletion", "storage_statistics_update", b"storage_statistics_update"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["addition", b"addition", "deletion", b"deletion", "storage_statistics_update", b"storage_statistics_update"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["addition", b"addition", "change_log", b"change_log", "deletion", b"deletion", "storage_statistics_update", b"storage_statistics_update"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["addition", b"addition", "change_log", b"change_log", "deletion", b"deletion", "storage_statistics_update", b"storage_statistics_update"]) -> None: ... global___Patch = Patch diff --git a/python/src/space/core/runners.py b/python/src/space/core/runners.py index 23f1800..01d2611 100644 --- a/python/src/space/core/runners.py +++ b/python/src/space/core/runners.py @@ -16,17 +16,18 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Iterator, List, Optional +from typing import Iterator, List, Optional, Tuple, Union from absl import logging # type: ignore[import-untyped] import pyarrow as pa import pyarrow.compute as pc from space.core.ops.append import LocalAppendOp -from space.core.ops.insert import InsertOptions, LocalInsertOp +from space.core.ops.base import InputData +from space.core.ops.change_data import ChangeType, read_change_data from space.core.ops.delete import FileSetDeleteOp +from space.core.ops.insert import InsertOptions, LocalInsertOp from space.core.ops.read import FileSetReadOp, ReadOptions -from space.core.ops.base import InputData import space.core.proto.runtime_pb2 as runtime from space.core.storage import Storage from space.core.loaders.array_record import ArrayRecordIndexFn @@ -110,6 +111,14 @@ def _insert(self, data: InputData, def delete(self, filter_: pc.Expression) -> runtime.JobResult: """Delete data matching the filter from the dataset.""" + @abstractmethod + def diff(self, start_version: Union[int], + end_version: Union[int]) -> Iterator[Tuple[ChangeType, pa.Table]]: + """Read the change data between two versions. + + start_version is excluded; end_version is included. + """ + def _try_commit(self, patch: Optional[runtime.Patch]) -> runtime.JobResult: if patch is not None: self._storage.commit(patch) @@ -160,8 +169,7 @@ def append_parquet(self, input_dir: str) -> runtime.JobResult: def _insert(self, data: InputData, mode: InsertOptions.Mode) -> runtime.JobResult: - op = LocalInsertOp(self._storage.location, self._storage, - InsertOptions(mode=mode)) + op = LocalInsertOp(self._storage, InsertOptions(mode=mode)) return self._try_commit(op.write(data)) def delete(self, filter_: pc.Expression) -> runtime.JobResult: @@ -170,6 +178,24 @@ def delete(self, filter_: pc.Expression) -> runtime.JobResult: ds.data_files(filter_), filter_) return self._try_commit(op.delete()) + def diff(self, start_version: Union[int], + end_version: Union[int]) -> Iterator[Tuple[ChangeType, pa.Table]]: + start_snapshot_id, end_snapshot_id = None, None + + if isinstance(start_version, int): + start_snapshot_id = start_version + + if isinstance(end_version, int): + end_snapshot_id = end_version + + if start_snapshot_id is None: + raise RuntimeError(f"Start snapshot ID is invalid: {start_version}") + + if end_snapshot_id is None: + raise RuntimeError(f"End snapshot ID is invalid: {end_version}") + + return read_change_data(self._storage, start_snapshot_id, end_snapshot_id) + def _job_result(patch: Optional[runtime.Patch]) -> runtime.JobResult: if patch is None: diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index dba0f35..ee243b3 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -55,6 +55,8 @@ def __init__(self, location: str, metadata: meta.StorageMetadata): self._field_name_ids: Dict[str, int] = arrow.field_name_to_id_dict( self._physical_schema) + self._primary_keys = set(self._metadata.schema.primary_keys) + @property def metadata(self) -> meta.StorageMetadata: """Return the storage metadata.""" @@ -139,9 +141,6 @@ def load(cls, location: str) -> Storage: meta.StorageMetadata()) return Storage(location, metadata) - def _next_snapshot_id(self) -> int: - return self._metadata.current_snapshot_id + 1 - def commit(self, patch: runtime.Patch) -> None: """Commit changes to the storage. @@ -167,6 +166,11 @@ def commit(self, patch: runtime.Patch) -> None: storage_statistics=current_snapshot.storage_statistics) _patch_manifests(snapshot.manifest_files, patch) + if patch.HasField('change_log'): + change_log_file = self.new_change_log_path() + self._fs.write_proto(change_log_file, patch.change_log) + snapshot.change_log_file = self.short_path(change_log_file) + # Update storage statistics. ops_utils.update_index_storage_stats(snapshot.storage_statistics, patch.storage_statistics_update) @@ -198,6 +202,7 @@ def data_files(self, manifest_filter = None if filter_ is not None: manifest_filter = build_manifest_filter(self._physical_schema, + self._primary_keys, self._field_name_ids, filter_) for manifest_file in manifest_files.index_manifest_files: @@ -213,12 +218,21 @@ def data_files(self, return result + @property + def snapshot_ids(self) -> List[int]: + """A list of all alive snapshot IDs in the dataset.""" + return list(self._metadata.snapshots) + def _initialize(self, metadata_path: str) -> None: """Initialize a new storage by creating folders and files.""" self._fs.create_dir(self._data_dir) self._fs.create_dir(self._metadata_dir) + self._fs.create_dir(self._change_data_dir) self._write_metadata(metadata_path, self._metadata) + def _next_snapshot_id(self) -> int: + return self._metadata.current_snapshot_id + 1 + def _write_metadata( self, metadata_path: str, diff --git a/python/src/space/core/utils/paths.py b/python/src/space/core/utils/paths.py index ce73afa..45f15ee 100644 --- a/python/src/space/core/utils/paths.py +++ b/python/src/space/core/utils/paths.py @@ -21,6 +21,7 @@ _ENTRY_POINT_FILE = "entrypoint.txtpb" _DATA_DIR = "data" _METADATA_DIR = "metadata" +_CHANGE_DATA_DIR = "changes" def new_index_file_path(data_dir_: str): @@ -71,6 +72,7 @@ def __init__(self, location: str): self._data_dir = data_dir(self._location) self._metadata_dir = metadata_dir(self._location) + self._change_data_dir = path.join(self._metadata_dir, _CHANGE_DATA_DIR) self._entry_point_file = entry_point_path(self._location) @property @@ -99,3 +101,7 @@ def full_path(self, short_path: str) -> str: def new_metadata_path(self) -> str: """Return a random metadata file path.""" return new_metadata_path(self._metadata_dir) + + def new_change_log_path(self) -> str: + """Return a random change log file path.""" + return path.join(self._change_data_dir, f"change_{uuid_()}.txtpb") diff --git a/python/tests/core/manifests/test_falsifiable_filters.py b/python/tests/core/manifests/test_falsifiable_filters.py index 4a61e5f..05d8119 100644 --- a/python/tests/core/manifests/test_falsifiable_filters.py +++ b/python/tests/core/manifests/test_falsifiable_filters.py @@ -19,21 +19,28 @@ from space.core.manifests import falsifiable_filters as ff -@pytest.mark.parametrize("filter_,falsifiable_filter", - [((pc.field("a") < 10) | (pc.field("b") > 1), - (pc.field("_STATS_f0", "_MIN") >= 10) & - (pc.field("_STATS_f1", "_MIN") <= 10)), - ((pc.field("a") > 10) & (pc.field("b") == 1), - (pc.field("_STATS_f0", "_MAX") <= 10) | - ((pc.field("_STATS_f1", "_MIN") > 1) | - (pc.field("_STATS_f1", "_MAX") < 1)))]) -def test_build_manifest_filter(filter_, falsifiable_filter): - arrow_schema = pa.schema([("a", pa.int64()), ("b", pa.float64())]) - field_name_ids = {"a": 0, "b": 1} - - falsifiable_filter = ff.build_manifest_filter(arrow_schema, field_name_ids, - filter_) - assert str(falsifiable_filter) == str(falsifiable_filter) +@pytest.mark.parametrize( + "filter_,expected_falsifiable_filter", + [ + ((pc.field("a") < 10) | (pc.field("b") > 1), + (pc.field("_STATS_f0", "_MIN") >= 10) & + (pc.field("_STATS_f1", "_MAX") <= 1)), + ((pc.field("a") > 10) & (pc.field("b") == 1), + (pc.field("_STATS_f0", "_MAX") <= 10) | + ((pc.field("_STATS_f1", "_MIN") > 1) | + (pc.field("_STATS_f1", "_MAX") < 1))), + # Only primary keys are used. + ((pc.field("a") < 10) | (pc.field("c") > "a"), + (pc.field("_STATS_f0", "_MIN") >= 10) & False) + ]) +def test_build_manifest_filter(filter_, expected_falsifiable_filter): + arrow_schema = pa.schema([("a", pa.int64()), ("b", pa.float64()), + ("c", pa.string())]) + field_name_ids = {"a": 0, "b": 1, "c": 2} + + manifest_filter = ff.build_manifest_filter(arrow_schema, set(["a", "b"]), + field_name_ids, filter_) + assert str(manifest_filter) == str(~expected_falsifiable_filter) @pytest.mark.parametrize("filter_", [(pc.field("a") != 10), @@ -42,5 +49,5 @@ def test_build_manifest_filter_not_supported_return_none(filter_): arrow_schema = pa.schema([("a", pa.int64()), ("b", pa.float64())]) field_name_ids = {"a": 0, "b": 1} - assert ff.build_manifest_filter(arrow_schema, field_name_ids, - filter_) is None + assert ff.build_manifest_filter(arrow_schema, set(["a", "b"]), + field_name_ids, filter_) is None diff --git a/python/tests/core/ops/test_change_data.py b/python/tests/core/ops/test_change_data.py new file mode 100644 index 0000000..ae89471 --- /dev/null +++ b/python/tests/core/ops/test_change_data.py @@ -0,0 +1,57 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pyarrow as pa +import pyarrow.compute as pc + +from space import Dataset +from space.core.ops.change_data import ChangeType, read_change_data + + +def test_read_change_data(tmp_path, all_types_schema, all_types_input_data): + location = tmp_path / "dataset" + ds = Dataset.create(location=str(location), + schema=all_types_schema, + primary_keys=["int64"], + record_fields=[]) + + # Validate ADD changes. + def make_iter(): + for d in all_types_input_data: + yield d + + runner = ds.local() + runner.append_from(make_iter()) + + changes = list(runner.diff(0, 1)) + assert len(changes) == 1 + expected_change0 = (ChangeType.ADD, runner.read_all()) + assert changes[0] == expected_change0 + + # Validate DELETE changes. + runner.delete((pc.field("string") == "a") | (pc.field("string") == "A")) + changes = list(runner.diff(1, 2)) + assert len(changes) == 1 + expected_change1 = (ChangeType.DELETE, + pa.Table.from_pydict({ + "int64": [1, 0], + "float64": [0.1, -0.1], + "bool": [True, False], + "string": ["a", "A"] + })) + assert changes[0] == expected_change1 + + changes = list(runner.diff(0, 2)) + assert len(changes) == 2 + assert changes == [expected_change0, expected_change1]