diff --git a/python/pyproject.toml b/python/pyproject.toml index 02731e8..9193bc5 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -26,7 +26,12 @@ dependencies = [ ] [project.optional-dependencies] -dev = ["pyarrow-stubs", "tensorflow", "types-protobuf"] +dev = [ + "pyarrow-stubs", + "ray", + "tensorflow", + "types-protobuf", +] [project.urls] Homepage = "https://github.com/google/space" diff --git a/python/src/space/core/datasets.py b/python/src/space/core/datasets.py index fe89779..75c90ab 100644 --- a/python/src/space/core/datasets.py +++ b/python/src/space/core/datasets.py @@ -23,6 +23,7 @@ from space.core.runners import LocalRunner from space.core.serializers.base import DictSerializer from space.core.storage import Storage +from space.core.utils.lazy_imports_utils import ray from space.core.utils.plans import LogicalPlanBuilder from space.core.views import View @@ -33,6 +34,11 @@ class Dataset(View): def __init__(self, storage: Storage): self._storage = storage + @property + def storage(self) -> Storage: + """Return storage of the dataset.""" + return self._storage + @classmethod def create(cls, location: str, schema: pa.Schema, primary_keys: List[str], record_fields: List[str]) -> Dataset: @@ -91,3 +97,7 @@ def to_relation(self, builder: LogicalPlanBuilder) -> Rel: location = self._storage.location return Rel(read=ReadRel(named_table=ReadRel.NamedTable(names=[location]), base_schema=self._storage.metadata.schema.fields)) + + def process_source(self, data: pa.Table) -> ray.Dataset: + # Dataset is the source, there is no transform, so simply return the data. + return ray.data.from_arrow(data) diff --git a/python/src/space/core/runners.py b/python/src/space/core/runners.py index 01d2611..b251cda 100644 --- a/python/src/space/core/runners.py +++ b/python/src/space/core/runners.py @@ -22,6 +22,9 @@ import pyarrow as pa import pyarrow.compute as pc +from space.core.loaders.array_record import ArrayRecordIndexFn +from space.core.loaders.array_record import LocalArrayRecordLoadOp +from space.core.loaders.parquet import LocalParquetLoadOp from space.core.ops.append import LocalAppendOp from space.core.ops.base import InputData from space.core.ops.change_data import ChangeType, read_change_data @@ -30,16 +33,11 @@ from space.core.ops.read import FileSetReadOp, ReadOptions import space.core.proto.runtime_pb2 as runtime from space.core.storage import Storage -from space.core.loaders.array_record import ArrayRecordIndexFn -from space.core.loaders.array_record import LocalArrayRecordLoadOp -from space.core.loaders.parquet import LocalParquetLoadOp - +from space.core.versions.utils import version_to_snapshot_id -class BaseRunner(ABC): - """Abstract base runner class.""" - def __init__(self, storage: Storage): - self._storage = storage +class BaseReadOnlyRunner(ABC): + """Abstract base read-only runner class.""" @abstractmethod def read(self, @@ -58,6 +56,21 @@ def read_all(self, return pa.concat_tables( list(self.read(filter_, fields, snapshot_id, reference_read))) + @abstractmethod + def diff(self, start_version: Union[int], + end_version: Union[int]) -> Iterator[Tuple[ChangeType, pa.Table]]: + """Read the change data between two versions. + + start_version is excluded; end_version is included. + """ + + +class BaseReadWriteRunner(BaseReadOnlyRunner): + """Abstract base runner class.""" + + def __init__(self, storage: Storage): + self._storage = storage + @abstractmethod def append(self, data: InputData) -> runtime.JobResult: """Append data into the dataset.""" @@ -111,14 +124,6 @@ def _insert(self, data: InputData, def delete(self, filter_: pc.Expression) -> runtime.JobResult: """Delete data matching the filter from the dataset.""" - @abstractmethod - def diff(self, start_version: Union[int], - end_version: Union[int]) -> Iterator[Tuple[ChangeType, pa.Table]]: - """Read the change data between two versions. - - start_version is excluded; end_version is included. - """ - def _try_commit(self, patch: Optional[runtime.Patch]) -> runtime.JobResult: if patch is not None: self._storage.commit(patch) @@ -126,7 +131,7 @@ def _try_commit(self, patch: Optional[runtime.Patch]) -> runtime.JobResult: return _job_result(patch) -class LocalRunner(BaseRunner): +class LocalRunner(BaseReadWriteRunner): """A runner that runs operations locally.""" def read(self, @@ -180,21 +185,9 @@ def delete(self, filter_: pc.Expression) -> runtime.JobResult: def diff(self, start_version: Union[int], end_version: Union[int]) -> Iterator[Tuple[ChangeType, pa.Table]]: - start_snapshot_id, end_snapshot_id = None, None - - if isinstance(start_version, int): - start_snapshot_id = start_version - - if isinstance(end_version, int): - end_snapshot_id = end_version - - if start_snapshot_id is None: - raise RuntimeError(f"Start snapshot ID is invalid: {start_version}") - - if end_snapshot_id is None: - raise RuntimeError(f"End snapshot ID is invalid: {end_version}") - - return read_change_data(self._storage, start_snapshot_id, end_snapshot_id) + return read_change_data(self._storage, + version_to_snapshot_id(start_version), + version_to_snapshot_id(end_version)) def _job_result(patch: Optional[runtime.Patch]) -> runtime.JobResult: diff --git a/python/src/space/core/transform.py b/python/src/space/core/transform.py index 09804a3..87fef4c 100644 --- a/python/src/space/core/transform.py +++ b/python/src/space/core/transform.py @@ -33,6 +33,7 @@ from space.core.datasets import Dataset import space.core.proto.metadata_pb2 as meta from space.core.schema import arrow +from space.core.utils.lazy_imports_utils import ray from space.core.utils.plans import SIMPLE_UDF_URI from space.core.utils.plans import LogicalPlanBuilder, UserDefinedFn from space.core.views import View @@ -127,6 +128,11 @@ def from_relation(cls, location: str, metadata: meta.StorageMetadata, return MapTransform(*_load_udf(location, metadata, rel.project. expressions[0], rel.project.input, plan)) + def process_source(self, data: pa.Table) -> ray.Dataset: + batch_size = self.udf.batch_size if self.udf.batch_size >= 0 else 'default' + return self.input_.process_source(data).map_batches(self.udf.fn, + batch_size=batch_size) + @dataclass class FilterTransform(BaseUdfTransform): @@ -157,6 +163,9 @@ def from_relation(cls, location: str, metadata: meta.StorageMetadata, return FilterTransform(*_load_udf(location, metadata, rel.filter.condition, rel.filter.input, plan)) + def process_source(self, data: pa.Table) -> ray.Dataset: + return self.input_.process_source(data).filter(self.udf.fn) + @dataclass class _CompactPlan: diff --git a/python/src/space/core/utils/lazy_imports_utils.py b/python/src/space/core/utils/lazy_imports_utils.py index f211e6e..e14a06d 100644 --- a/python/src/space/core/utils/lazy_imports_utils.py +++ b/python/src/space/core/utils/lazy_imports_utils.py @@ -190,3 +190,6 @@ def array_record_error_callback(**kwargs): with lazy_imports(error_callback=array_record_error_callback): from array_record.python import array_record_module # type: ignore[import-untyped] # pylint: disable=unused-import + +with lazy_imports(): + import ray # type: ignore[import-untyped] # pylint: disable=unused-import diff --git a/python/src/space/core/versions/__init__.py b/python/src/space/core/versions/__init__.py new file mode 100644 index 0000000..5678014 --- /dev/null +++ b/python/src/space/core/versions/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/python/src/space/core/versions/utils.py b/python/src/space/core/versions/utils.py new file mode 100644 index 0000000..483d6ad --- /dev/null +++ b/python/src/space/core/versions/utils.py @@ -0,0 +1,30 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Utilities for version management.""" + +from typing import Union + + +def version_to_snapshot_id(version: Union[int]) -> int: + """Convert a version to a snapshot ID.""" + snapshot_id = None + + if isinstance(version, int): + snapshot_id = version + + if snapshot_id is None: + raise RuntimeError(f"Invalid version: {version}") + + return snapshot_id diff --git a/python/src/space/core/views.py b/python/src/space/core/views.py index bb55784..e8d7a96 100644 --- a/python/src/space/core/views.py +++ b/python/src/space/core/views.py @@ -26,8 +26,10 @@ import space.core.proto.metadata_pb2 as meta from space.core.schema import FieldIdManager from space.core.storage import Storage +from space.core.utils.lazy_imports_utils import ray # pylint: disable=unused-import from space.core.utils.paths import UDF_DIR, metadata_dir from space.core.utils.plans import LogicalPlanBuilder, UserDefinedFn +from space.ray.runners import RayReadOnlyRunner if TYPE_CHECKING: from space.core.datasets import Dataset @@ -36,6 +38,9 @@ class View(ABC): """A view is a dataset, or a transform applied to a dataset, or a transform applied to another view. + + Non-dataset views must use Ray runner instead of local runner, because the + transforms are implemented with Ray dataset transforms. """ @property @@ -68,6 +73,14 @@ def to_relation(self, builder: LogicalPlanBuilder) -> Rel: The relation describes a dataset or a transform in the Substrait format. """ + @abstractmethod + def process_source(self, data: pa.Table) -> ray.Dataset: + """Process input data using the transform defined by the view.""" + + def ray(self) -> RayReadOnlyRunner: + """Return a Ray runner for the view.""" + return RayReadOnlyRunner(self) + def materialize(self, location: str) -> MaterializedView: """Materialize a view to files in the Space storage format. diff --git a/python/src/space/ray/__init__.py b/python/src/space/ray/__init__.py new file mode 100644 index 0000000..5678014 --- /dev/null +++ b/python/src/space/ray/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/python/src/space/ray/runners.py b/python/src/space/ray/runners.py new file mode 100644 index 0000000..65a81dd --- /dev/null +++ b/python/src/space/ray/runners.py @@ -0,0 +1,60 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Ray runner implementations.""" + +from __future__ import annotations +from typing import TYPE_CHECKING +from typing import Iterator, List, Optional, Tuple, Union + +import pyarrow as pa +import pyarrow.compute as pc + +from space.core.runners import BaseReadOnlyRunner +from space.core.ops.change_data import ChangeType, read_change_data +from space.core.utils.lazy_imports_utils import ray +from space.core.versions.utils import version_to_snapshot_id + +if TYPE_CHECKING: + from space.core.views import View + + +class RayReadOnlyRunner(BaseReadOnlyRunner): + """A read-only Ray runner.""" + + def __init__(self, view: View): + self._view = view + + def read(self, + filter_: Optional[pc.Expression] = None, + fields: Optional[List[str]] = None, + snapshot_id: Optional[int] = None, + reference_read: bool = False) -> Iterator[pa.Table]: + raise NotImplementedError() + + def diff(self, start_version: Union[int], + end_version: Union[int]) -> Iterator[Tuple[ChangeType, pa.Table]]: + sources = self._view.sources + assert len(sources) == 1, "Views only support a single source dataset" + ds = list(sources.values())[0] + + source_changes = read_change_data(ds.storage, + version_to_snapshot_id(start_version), + version_to_snapshot_id(end_version)) + for change_type, data in source_changes: + # TODO: skip processing the data for deletions; the caller is usually + # only interested at deleted primary keys. + processed_ray_data = self._view.process_source(data) + processed_data = ray.get(processed_ray_data.to_arrow_refs()) + yield change_type, pa.concat_tables(processed_data) diff --git a/python/src/space/tf/data_sources.py b/python/src/space/tf/data_sources.py index bd3f1b1..acb56b8 100644 --- a/python/src/space/tf/data_sources.py +++ b/python/src/space/tf/data_sources.py @@ -114,7 +114,7 @@ def _file_instructions( for file_path, indexes in zip( aggregated[constants.FILE_PATH_FIELD], aggregated[f"{constants.ROW_ID_FIELD}_list"]): - full_file_path = self._ds._storage.full_path(file_path) # pylint: disable=protected-access + full_file_path = self._ds.storage.full_path(file_path) # pylint: disable=protected-access if not indexes: continue diff --git a/python/tests/core/test_views.py b/python/tests/core/test_views.py index 148ba3b..f632591 100644 --- a/python/tests/core/test_views.py +++ b/python/tests/core/test_views.py @@ -49,7 +49,7 @@ def test_map_batches_to_relation(self, tmp_path, sample_dataset, sample_map_batch_plan): # A sample UDF for testing. def _sample_map_udf(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: - batch["float64"] += 1 + batch["float64"] = batch["float64"] + 1 return batch view = sample_dataset.map_batches(fn=_sample_map_udf, diff --git a/python/tests/ray/test_runners.py b/python/tests/ray/test_runners.py new file mode 100644 index 0000000..9e31bb3 --- /dev/null +++ b/python/tests/ray/test_runners.py @@ -0,0 +1,132 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict + +import numpy as np +import pyarrow as pa +import pyarrow.compute as pc +import pytest +import ray + +from space import Dataset +from space.core.ops.change_data import ChangeType + + +def setup_module(module): + ray.init(ignore_reinit_error=True, num_cpus=1) + + +@pytest.fixture +def sample_schema(): + return pa.schema([ + pa.field("int64", pa.int64()), + pa.field("float64", pa.float64()), + pa.field("binary", pa.binary()) + ]) + + +@pytest.fixture +def sample_dataset(tmp_path, sample_schema): + ds = Dataset.create(str(tmp_path / "dataset"), + sample_schema, + primary_keys=["int64"], + record_fields=["binary"]) + return ds + + +class TestRayReadOnlyRunner: + + def test_diff_map_batches(self, sample_dataset): + # A sample UDF for testing. + def _sample_map_udf(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: + batch["float64"] = batch["float64"] + 1 + del batch["binary"] + return batch + + view_schema = pa.schema( + [pa.field("int64", pa.int64()), + pa.field("float64", pa.float64())]) + view = sample_dataset.map_batches(fn=_sample_map_udf, + input_fields=["int64", "binary"], + output_schema=view_schema, + output_record_fields=["binary"]) + + ds_runner = sample_dataset.local() + view_runner = view.ray() + + # Test append. + ds_runner.append({ + "int64": [1, 2, 3], + "float64": [0.1, 0.2, 0.3], + "binary": [b"b1", b"b2", b"b3"] + }) + + expected_change0 = (ChangeType.ADD, + pa.Table.from_pydict({ + "int64": [1, 2, 3], + "float64": [1.1, 1.2, 1.3], + })) + assert list(view_runner.diff(0, 1)) == [expected_change0] + + # Test deletion. + ds_runner.delete(pc.field("int64") == 2) + expected_change1 = (ChangeType.DELETE, + pa.Table.from_pydict({ + "int64": [2], + "float64": [1.2] + })) + assert list(view_runner.diff(1, 2)) == [expected_change1] + + # Test several changes. + assert list(view_runner.diff(0, 2)) == [expected_change0, expected_change1] + + def test_diff_filter(self, sample_dataset): + # A sample UDF for testing. + def _sample_filter_udf(row: Dict[str, Any]) -> Dict[str, Any]: + return row["float64"] > 0.1 + + view = sample_dataset.filter(fn=_sample_filter_udf, + input_fields=["int64", "float64"]) + + ds_runner = sample_dataset.local() + view_runner = view.ray() + + # Test append. + ds_runner.append({ + "int64": [1, 2, 3], + "float64": [0.1, 0.2, 0.3], + "binary": [b"b1", b"b2", b"b3"] + }) + + expected_change0 = (ChangeType.ADD, + pa.Table.from_pydict({ + "int64": [2, 3], + "float64": [0.2, 0.3], + "binary": [b"b2", b"b3"] + })) + assert list(view_runner.diff(0, 1)) == [expected_change0] + + # Test deletion. + ds_runner.delete(pc.field("int64") == 2) + expected_change1 = (ChangeType.DELETE, + pa.Table.from_pydict({ + "int64": [2], + "float64": [0.2], + "binary": [b"b2"] + })) + assert list(view_runner.diff(1, 2)) == [expected_change1] + + # Test several changes. + assert list(view_runner.diff(0, 2)) == [expected_change0, expected_change1]