Skip to content

Commit

Permalink
Support reading diff of versions for views
Browse files Browse the repository at this point in the history
  • Loading branch information
coufon committed Dec 29, 2023
1 parent a184289 commit f74fb42
Show file tree
Hide file tree
Showing 13 changed files with 316 additions and 35 deletions.
7 changes: 6 additions & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ dependencies = [
]

[project.optional-dependencies]
dev = ["pyarrow-stubs", "tensorflow", "types-protobuf"]
dev = [
"pyarrow-stubs",
"ray",
"tensorflow",
"types-protobuf",
]

[project.urls]
Homepage = "https://github.com/google/space"
Expand Down
10 changes: 10 additions & 0 deletions python/src/space/core/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from space.core.runners import LocalRunner
from space.core.serializers.base import DictSerializer
from space.core.storage import Storage
from space.core.utils.lazy_imports_utils import ray
from space.core.utils.plans import LogicalPlanBuilder
from space.core.views import View

Expand All @@ -33,6 +34,11 @@ class Dataset(View):
def __init__(self, storage: Storage):
self._storage = storage

@property
def storage(self) -> Storage:
"""Return storage of the dataset."""
return self._storage

@classmethod
def create(cls, location: str, schema: pa.Schema, primary_keys: List[str],
record_fields: List[str]) -> Dataset:
Expand Down Expand Up @@ -91,3 +97,7 @@ def to_relation(self, builder: LogicalPlanBuilder) -> Rel:
location = self._storage.location
return Rel(read=ReadRel(named_table=ReadRel.NamedTable(names=[location]),
base_schema=self._storage.metadata.schema.fields))

def process_source(self, data: pa.Table) -> ray.Dataset:
# Dataset is the source, there is no transform, so simply return the data.
return ray.data.from_arrow(data)
57 changes: 25 additions & 32 deletions python/src/space/core/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import pyarrow as pa
import pyarrow.compute as pc

from space.core.loaders.array_record import ArrayRecordIndexFn
from space.core.loaders.array_record import LocalArrayRecordLoadOp
from space.core.loaders.parquet import LocalParquetLoadOp
from space.core.ops.append import LocalAppendOp
from space.core.ops.base import InputData
from space.core.ops.change_data import ChangeType, read_change_data
Expand All @@ -30,16 +33,11 @@
from space.core.ops.read import FileSetReadOp, ReadOptions
import space.core.proto.runtime_pb2 as runtime
from space.core.storage import Storage
from space.core.loaders.array_record import ArrayRecordIndexFn
from space.core.loaders.array_record import LocalArrayRecordLoadOp
from space.core.loaders.parquet import LocalParquetLoadOp

from space.core.versions.utils import version_to_snapshot_id

class BaseRunner(ABC):
"""Abstract base runner class."""

def __init__(self, storage: Storage):
self._storage = storage
class BaseReadOnlyRunner(ABC):
"""Abstract base read-only runner class."""

@abstractmethod
def read(self,
Expand All @@ -58,6 +56,21 @@ def read_all(self,
return pa.concat_tables(
list(self.read(filter_, fields, snapshot_id, reference_read)))

@abstractmethod
def diff(self, start_version: Union[int],
end_version: Union[int]) -> Iterator[Tuple[ChangeType, pa.Table]]:
"""Read the change data between two versions.
start_version is excluded; end_version is included.
"""


class BaseReadWriteRunner(BaseReadOnlyRunner):
"""Abstract base runner class."""

def __init__(self, storage: Storage):
self._storage = storage

@abstractmethod
def append(self, data: InputData) -> runtime.JobResult:
"""Append data into the dataset."""
Expand Down Expand Up @@ -111,22 +124,14 @@ def _insert(self, data: InputData,
def delete(self, filter_: pc.Expression) -> runtime.JobResult:
"""Delete data matching the filter from the dataset."""

@abstractmethod
def diff(self, start_version: Union[int],
end_version: Union[int]) -> Iterator[Tuple[ChangeType, pa.Table]]:
"""Read the change data between two versions.
start_version is excluded; end_version is included.
"""

def _try_commit(self, patch: Optional[runtime.Patch]) -> runtime.JobResult:
if patch is not None:
self._storage.commit(patch)

return _job_result(patch)


class LocalRunner(BaseRunner):
class LocalRunner(BaseReadWriteRunner):
"""A runner that runs operations locally."""

def read(self,
Expand Down Expand Up @@ -180,21 +185,9 @@ def delete(self, filter_: pc.Expression) -> runtime.JobResult:

def diff(self, start_version: Union[int],
end_version: Union[int]) -> Iterator[Tuple[ChangeType, pa.Table]]:
start_snapshot_id, end_snapshot_id = None, None

if isinstance(start_version, int):
start_snapshot_id = start_version

if isinstance(end_version, int):
end_snapshot_id = end_version

if start_snapshot_id is None:
raise RuntimeError(f"Start snapshot ID is invalid: {start_version}")

if end_snapshot_id is None:
raise RuntimeError(f"End snapshot ID is invalid: {end_version}")

return read_change_data(self._storage, start_snapshot_id, end_snapshot_id)
return read_change_data(self._storage,
version_to_snapshot_id(start_version),
version_to_snapshot_id(end_version))


def _job_result(patch: Optional[runtime.Patch]) -> runtime.JobResult:
Expand Down
9 changes: 9 additions & 0 deletions python/src/space/core/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from space.core.datasets import Dataset
import space.core.proto.metadata_pb2 as meta
from space.core.schema import arrow
from space.core.utils.lazy_imports_utils import ray
from space.core.utils.plans import SIMPLE_UDF_URI
from space.core.utils.plans import LogicalPlanBuilder, UserDefinedFn
from space.core.views import View
Expand Down Expand Up @@ -127,6 +128,11 @@ def from_relation(cls, location: str, metadata: meta.StorageMetadata,
return MapTransform(*_load_udf(location, metadata, rel.project.
expressions[0], rel.project.input, plan))

def process_source(self, data: pa.Table) -> ray.Dataset:
batch_size = self.udf.batch_size if self.udf.batch_size >= 0 else 'default'
return self.input_.process_source(data).map_batches(self.udf.fn,
batch_size=batch_size)


@dataclass
class FilterTransform(BaseUdfTransform):
Expand Down Expand Up @@ -157,6 +163,9 @@ def from_relation(cls, location: str, metadata: meta.StorageMetadata,
return FilterTransform(*_load_udf(location, metadata, rel.filter.condition,
rel.filter.input, plan))

def process_source(self, data: pa.Table) -> ray.Dataset:
return self.input_.process_source(data).filter(self.udf.fn)


@dataclass
class _CompactPlan:
Expand Down
3 changes: 3 additions & 0 deletions python/src/space/core/utils/lazy_imports_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,6 @@ def array_record_error_callback(**kwargs):

with lazy_imports(error_callback=array_record_error_callback):
from array_record.python import array_record_module # type: ignore[import-untyped] # pylint: disable=unused-import

with lazy_imports():
import ray # type: ignore[import-untyped] # pylint: disable=unused-import
13 changes: 13 additions & 0 deletions python/src/space/core/versions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
30 changes: 30 additions & 0 deletions python/src/space/core/versions/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Utilities for version management."""

from typing import Union


def version_to_snapshot_id(version: Union[int]) -> int:
"""Convert a version to a snapshot ID."""
snapshot_id = None

if isinstance(version, int):
snapshot_id = version

if snapshot_id is None:
raise RuntimeError(f"Invalid version: {version}")

return snapshot_id
13 changes: 13 additions & 0 deletions python/src/space/core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import space.core.proto.metadata_pb2 as meta
from space.core.schema import FieldIdManager
from space.core.storage import Storage
from space.core.utils.lazy_imports_utils import ray # pylint: disable=unused-import
from space.core.utils.paths import UDF_DIR, metadata_dir
from space.core.utils.plans import LogicalPlanBuilder, UserDefinedFn
from space.ray.runners import RayReadOnlyRunner

if TYPE_CHECKING:
from space.core.datasets import Dataset
Expand All @@ -36,6 +38,9 @@
class View(ABC):
"""A view is a dataset, or a transform applied to a dataset, or a transform
applied to another view.
Non-dataset views must use Ray runner instead of local runner, because the
transforms are implemented with Ray dataset transforms.
"""

@property
Expand Down Expand Up @@ -68,6 +73,14 @@ def to_relation(self, builder: LogicalPlanBuilder) -> Rel:
The relation describes a dataset or a transform in the Substrait format.
"""

@abstractmethod
def process_source(self, data: pa.Table) -> ray.Dataset:
"""Process input data using the transform defined by the view."""

def ray(self) -> RayReadOnlyRunner:
"""Return a Ray runner for the view."""
return RayReadOnlyRunner(self)

def materialize(self, location: str) -> MaterializedView:
"""Materialize a view to files in the Space storage format.
Expand Down
13 changes: 13 additions & 0 deletions python/src/space/ray/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
60 changes: 60 additions & 0 deletions python/src/space/ray/runners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Ray runner implementations."""

from __future__ import annotations
from typing import TYPE_CHECKING
from typing import Iterator, List, Optional, Tuple, Union

import pyarrow as pa
import pyarrow.compute as pc

from space.core.runners import BaseReadOnlyRunner
from space.core.ops.change_data import ChangeType, read_change_data
from space.core.utils.lazy_imports_utils import ray
from space.core.versions.utils import version_to_snapshot_id

if TYPE_CHECKING:
from space.core.views import View


class RayReadOnlyRunner(BaseReadOnlyRunner):
"""A read-only Ray runner."""

def __init__(self, view: View):
self._view = view

def read(self,
filter_: Optional[pc.Expression] = None,
fields: Optional[List[str]] = None,
snapshot_id: Optional[int] = None,
reference_read: bool = False) -> Iterator[pa.Table]:
raise NotImplementedError()

def diff(self, start_version: Union[int],
end_version: Union[int]) -> Iterator[Tuple[ChangeType, pa.Table]]:
sources = self._view.sources
assert len(sources) == 1, "Views only support a single source dataset"
ds = list(sources.values())[0]

source_changes = read_change_data(ds.storage,
version_to_snapshot_id(start_version),
version_to_snapshot_id(end_version))
for change_type, data in source_changes:
# TODO: skip processing the data for deletions; the caller is usually
# only interested at deleted primary keys.
processed_ray_data = self._view.process_source(data)
processed_data = ray.get(processed_ray_data.to_arrow_refs())
yield change_type, pa.concat_tables(processed_data)
2 changes: 1 addition & 1 deletion python/src/space/tf/data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def _file_instructions(
for file_path, indexes in zip(
aggregated[constants.FILE_PATH_FIELD],
aggregated[f"{constants.ROW_ID_FIELD}_list"]):
full_file_path = self._ds._storage.full_path(file_path) # pylint: disable=protected-access
full_file_path = self._ds.storage.full_path(file_path) # pylint: disable=protected-access
if not indexes:
continue

Expand Down
2 changes: 1 addition & 1 deletion python/tests/core/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_map_batches_to_relation(self, tmp_path, sample_dataset,
sample_map_batch_plan):
# A sample UDF for testing.
def _sample_map_udf(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
batch["float64"] += 1
batch["float64"] = batch["float64"] + 1
return batch

view = sample_dataset.map_batches(fn=_sample_map_udf,
Expand Down
Loading

0 comments on commit f74fb42

Please sign in to comment.