Skip to content

Commit

Permalink
Implement change data feed
Browse files Browse the repository at this point in the history
  • Loading branch information
coufon committed Dec 28, 2023
1 parent 3ddac64 commit 0136c0a
Show file tree
Hide file tree
Showing 18 changed files with 443 additions and 57 deletions.
2 changes: 2 additions & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies = [
"numpy",
"protobuf",
"pyarrow >= 14.0.0",
"pyroaring",
"tensorflow_datasets",
"typing_extensions",
]
Expand Down Expand Up @@ -56,6 +57,7 @@ ignored-modules = [
"array_record",
"datasets",
"google.protobuf",
"pyroaring",
"space.core.proto",
"substrait"
]
5 changes: 5 additions & 0 deletions python/src/space/core/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,8 @@ def index_files(self) -> List[str]:
"""A list of full path of index files."""
data_files = self._storage.data_files()
return [self._storage.full_path(f.path) for f in data_files.index_files]

@property
def snapshot_ids(self) -> List[int]:
"""A list of all alive snapshot IDs in the dataset."""
return self._storage.snapshot_ids
21 changes: 15 additions & 6 deletions python/src/space/core/manifests/falsifiable_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
https://vldb.org/pvldb/vol14/p3083-edara.pdf.
"""

from typing import Dict, List, Optional
from typing import Dict, List, Optional, Set
from functools import partial

from absl import logging # type: ignore[import-untyped]
Expand All @@ -36,12 +36,14 @@
from space.core.schema import constants


def build_manifest_filter(schema: pa.Schema, field_name_ids: Dict[str, int],
def build_manifest_filter(schema: pa.Schema, primary_keys: Set[str],
field_name_ids: Dict[str, int],
filter_: pc.Expression) -> Optional[pc.Expression]:
"""Build a falsifiable filter on index manifest files column statistics.
Args:
schema: the storage schema.
primary_keys: the primary keys.
filter_: a filter on data fields.
field_name_ids: a dict of field names to IDs mapping.
Expand All @@ -51,7 +53,8 @@ def build_manifest_filter(schema: pa.Schema, field_name_ids: Dict[str, int],
expr = _substrait_expr(schema, filter_)

try:
return ~_falsifiable_filter(expr, field_name_ids) # type: ignore[operator]
return ~_falsifiable_filter(expr, primary_keys,
field_name_ids) # type: ignore[operator]
except _ExpressionException as e:
logging.info(
"Index manifest filter push-down is not used, query may be slower; "
Expand All @@ -78,7 +81,7 @@ class _ExpressionException(Exception):
"""Raise for exceptions in Substrait expressions."""


def _falsifiable_filter(filter_: ExtendedExpression,
def _falsifiable_filter(filter_: ExtendedExpression, primary_keys: Set[str],
field_name_ids: Dict[str, int]) -> pc.Expression:
if len(filter_.referred_expr) != 1:
raise _ExpressionException(
Expand All @@ -87,14 +90,15 @@ def _falsifiable_filter(filter_: ExtendedExpression,
return _falsifiable_filter_internal(
filter_.extensions, # type: ignore[arg-type]
filter_.base_schema,
primary_keys,
field_name_ids,
filter_.referred_expr[0].expression.scalar_function)


# pylint: disable=too-many-locals,too-many-return-statements
def _falsifiable_filter_internal(
extensions: List[SimpleExtensionDeclaration], base_schema: NamedStruct,
field_name_ids: Dict[str, int],
primary_keys: Set[str], field_name_ids: Dict[str, int],
root: Expression.ScalarFunction) -> pc.Expression:
if len(root.arguments) != 2:
raise _ExpressionException(
Expand All @@ -105,7 +109,7 @@ def _falsifiable_filter_internal(
rhs = root.arguments[1].value

falsifiable_filter_fn = partial(_falsifiable_filter_internal, extensions,
base_schema, field_name_ids)
base_schema, primary_keys, field_name_ids)

# TODO: to support one side has scalar function, e.g., False | (a > 1).
if _has_scalar_function(lhs) and _has_scalar_function(rhs):
Expand Down Expand Up @@ -135,6 +139,11 @@ def _falsifiable_filter_internal(

field_index = lhs.selection.direct_reference.struct_field.field
field_name = base_schema.names[field_index]
# Only primary key fields have column statistics for falsifiable filter
# pruning.
if field_name not in primary_keys:
return pc.scalar(False)

field_id = field_name_ids[field_name]
field_min, field_max = _stats_field_min(field_id), _stats_field_max(field_id)
value = pc.scalar(
Expand Down
7 changes: 5 additions & 2 deletions python/src/space/core/ops/append.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,15 @@ def _finish_index_writer(self) -> None:
self._index_writer_info.writer.close()

# Update metadata in manifest files.
file_path = self._index_writer_info.file_path
stats = self._index_manifest_writer.write(
self._index_writer_info.file_path,
self._index_writer_info.writer.writer.metadata)
file_path, self._index_writer_info.writer.writer.metadata)
utils.update_index_storage_stats(
base=self._patch.storage_statistics_update, update=stats)

self._patch.change_log.added_rows.append(
meta.RowBitmap(file=file_path, all_rows=True))

self._index_writer_info = None
self._cached_index_file_bytes = 0

Expand Down
125 changes: 125 additions & 0 deletions python/src/space/core/ops/change_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Change data feed that computes delta between two snapshots."""

from enum import Enum
from typing import Iterator, Tuple, List

import pyarrow as pa
from pyroaring import BitMap # type: ignore[import-not-found]

from space.core.fs.base import BaseFileSystem
from space.core.fs.factory import create_fs
from space.core.ops.read import FileSetReadOp
import space.core.proto.metadata_pb2 as meta
import space.core.proto.runtime_pb2 as runtime
from space.core.storage import Storage
from space.core.utils.paths import StoragePathsMixin


class ChangeType(Enum):
"""Type of data changes."""
# For added rows.
ADD = 1
# For deleted rows.
DELETE = 2


def read_change_data(
storage: Storage, start_snapshot_id: int,
end_snapshot_id: int) -> Iterator[Tuple[ChangeType, pa.Table]]:
"""Read change data from a start to an end snapshot.
start_snapshot_id is excluded; end_snapshot_id is included.
"""
if start_snapshot_id > end_snapshot_id:
raise RuntimeError(
f"End snapshot ID {end_snapshot_id} should not be lower than start "
f"snapshot ID {start_snapshot_id}")

all_snapshot_ids = sorted(storage.snapshot_ids)
all_snapshot_ids_set = set(all_snapshot_ids)

if start_snapshot_id not in all_snapshot_ids_set:
raise RuntimeError(f"Start snapshot ID not found: {start_snapshot_id}")

if end_snapshot_id not in all_snapshot_ids_set:
raise RuntimeError(f"Start snapshot ID not found: {end_snapshot_id}")

for snapshot_id in all_snapshot_ids:
if snapshot_id <= start_snapshot_id:
continue

if snapshot_id > end_snapshot_id:
break

for result in iter(_LocalChangeDataReadOp(storage, snapshot_id)):
yield result


class _LocalChangeDataReadOp(StoragePathsMixin):
"""Read changes of data from a given snapshot of a dataset."""

def __init__(self, storage: Storage, snapshot_id: int):
StoragePathsMixin.__init__(self, storage.location)

self._storage = storage
self._metadata = self._storage.metadata

if snapshot_id not in self._metadata.snapshots:
raise RuntimeError(
f"Change data read can't find snapshot ID {snapshot_id}")

snapshot = self._metadata.snapshots[snapshot_id]

fs = create_fs(self._location)
change_log_file = self._storage.full_path(snapshot.change_log_file)
self._change_log = _read_change_log_proto(fs, change_log_file)

def __iter__(self) -> Iterator[Tuple[ChangeType, pa.Table]]:
for bitmap in self._change_log.deleted_rows:
yield self._read_bitmap_rows(ChangeType.DELETE, bitmap)

for bitmap in self._change_log.added_rows:
yield self._read_bitmap_rows(ChangeType.ADD, bitmap)

def _read_bitmap_rows(self, change_type: ChangeType,
bitmap: meta.RowBitmap) -> Tuple[ChangeType, pa.Table]:
file_set = runtime.FileSet(
index_files=[runtime.DataFile(path=bitmap.file)])
read_op = FileSetReadOp(self._storage.location, self._metadata, file_set)

data = pa.concat_tables(list(iter(read_op)))
# TODO: to read index fields first, apply mask, then read record fields.
if not bitmap.all_rows:
data = data.filter(
mask=_bitmap_mask(bitmap.roaring_bitmap, data.num_rows))

return (change_type, data)


def _read_change_log_proto(fs: BaseFileSystem,
file_path: str) -> meta.ChangeLog:
return fs.read_proto(file_path, meta.ChangeLog())


def _bitmap_mask(serialized_bitmap: bytes, num_rows: int) -> List[bool]:
bitmap = BitMap.deserialize(serialized_bitmap)

mask = [False] * num_rows
for row_id in bitmap.to_array():
mask[row_id] = True

return mask
36 changes: 33 additions & 3 deletions python/src/space/core/ops/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
from abc import abstractmethod
from typing import List, Optional, Set

import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc
from pyroaring import BitMap # type: ignore[import-not-found]

from space.core.ops import utils
from space.core.ops.append import LocalAppendOp
Expand All @@ -29,6 +31,9 @@
from space.core.utils.paths import StoragePathsMixin
from space.core.schema import constants

# A temporary row ID field used for generating deletion row bitmap.
_ROW_ID_FIELD = "__ROW_ID"


class BaseDeleteOp(BaseOp):
"""Abstract base delete operation class.
Expand Down Expand Up @@ -73,6 +78,7 @@ def __init__(self, location: str, metadata: meta.StorageMetadata,
def delete(self) -> Optional[runtime.Patch]:
# The index files and manifests deleted, to remove them from index
# manifests.
patch = runtime.Patch()
deleted_files: List[str] = []
deleted_manifest_ids: Set[int] = set()

Expand All @@ -82,8 +88,13 @@ def delete(self) -> Optional[runtime.Patch]:
utils.update_index_storage_stats(stats_before_delete,
file.storage_statistics)

index_data = pq.read_table(self.full_path(file.path),
filters=self._reinsert_filter)
# TODO: this can be down at row group level.
index_data = pq.read_table(self.full_path(file.path))
index_data = index_data.append_column(
_ROW_ID_FIELD,
pa.array(np.arange(0, index_data.num_rows,
dtype=np.int32))) # type: ignore[arg-type]
index_data = index_data.filter(mask=self._reinsert_filter)

# No row is deleted. No need to re-insert rows.
if index_data.num_rows == file.storage_statistics.num_rows:
Expand All @@ -93,6 +104,11 @@ def delete(self) -> Optional[runtime.Patch]:
deleted_rows += (file.storage_statistics.num_rows - index_data.num_rows)
all_deleted = index_data.num_rows == 0

# Compute deleted row bitmap for change log.
patch.change_log.deleted_rows.append(
_build_bitmap(file, index_data, all_deleted))
index_data = index_data.drop(_ROW_ID_FIELD) # type: ignore[attr-defined]

# Record deleted files and manifests information.
deleted_files.append(file.path)
deleted_manifest_ids.add(file.manifest_file_id)
Expand Down Expand Up @@ -130,7 +146,6 @@ def delete(self) -> Optional[runtime.Patch]:
reinsert_patch = self._append_op.finish()

# Populate the patch for the delete.
patch = runtime.Patch()
if reinsert_patch is not None:
patch.addition.index_manifest_files.extend(
reinsert_patch.addition.index_manifest_files)
Expand Down Expand Up @@ -189,3 +204,18 @@ def _validate_files(file_set: runtime.FileSet) -> bool:
return False

return len(file_set.index_manifest_files) > 0


def _build_bitmap(file: runtime.DataFile, index_data: pa.Table,
all_deleted: bool) -> meta.RowBitmap:
row_bitmap = meta.RowBitmap(file=file.path)
if all_deleted:
row_bitmap.all_rows = True
else:
deleted_bitmaps = BitMap()
deleted_bitmaps.add_range(0, file.storage_statistics.num_rows)
survivor_bitmaps = BitMap(index_data.column(_ROW_ID_FIELD).to_numpy())
deleted_bitmaps.difference_update(survivor_bitmaps)
row_bitmap.roaring_bitmap = deleted_bitmaps.serialize()

return row_bitmap
4 changes: 2 additions & 2 deletions python/src/space/core/ops/insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ def write(self, data: InputData) -> Optional[runtime.Patch]:
class LocalInsertOp(BaseInsertOp, StoragePathsMixin):
'''Append data to a dataset.'''

def __init__(self, location: str, storage: Storage, options: InsertOptions):
StoragePathsMixin.__init__(self, location)
def __init__(self, storage: Storage, options: InsertOptions):
StoragePathsMixin.__init__(self, storage.location)

self._storage = storage
self._metadata = self._storage.metadata
Expand Down
32 changes: 31 additions & 1 deletion python/src/space/core/proto/metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ message Schema {

// Storage snapshot persisting physical metadata such as manifest file paths.
// It is used for obtaining all alive data file paths for a given snapshot.
// NEXT_ID: 5
// NEXT_ID: 6
message Snapshot {
// The snapshot ID.
int64 snapshot_id = 1;
Expand All @@ -91,6 +91,9 @@ message Snapshot {

// Statistics of all data in the storage.
StorageStatistics storage_statistics = 4;

// File path of the change log of the snapshot.
string change_log_file = 5;
}

// Stores information of manifest files.
Expand Down Expand Up @@ -118,3 +121,30 @@ message StorageStatistics {
// Uncompressed bytes of record data.
int64 record_uncompressed_bytes = 4;
}


// Change log stores changes made by a snapshot.
// NEXT_ID: 3
message ChangeLog {
// Rows deleted in this snapshot.
repeated RowBitmap deleted_rows = 1;

// New rows added in this snapshot.
repeated RowBitmap added_rows = 2;
}


// Mark rows in a file by bitmap.
// NEXT_ID: 4
message RowBitmap {
// File path that the bit map applies to.
string file = 1;

// All rows are selected. Bitmap is empty in this case.
bool all_rows = 2;

oneof bitmap {
// Roaring bitmap.
bytes roaring_bitmap = 3;
}
}
Loading

0 comments on commit 0136c0a

Please sign in to comment.