Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pick] [data] Rename .cache() to .materialize() (#34169) #34184

Merged
merged 1 commit into from
Apr 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions doc/source/data/api/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ Execution
.. autosummary::
:toctree: doc/

Dataset.cache
Dataset.is_cached
Dataset.materialize

Serialization
-------------
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/creating-datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -884,4 +884,4 @@ inspection functions like :meth:`ds.schema() <ray.data.Dataset.schema>` and
:meth:`ds.show() <ray.data.Dataset.show>` will trigger execution of only one or some
tasks, instead of all tasks. This allows metadata to be inspected right away. Execution
of all read tasks can be triggered manually using the
:meth:`ds.cache() <ray.data.Dataset.cache>` API.
:meth:`ds.materialize() <ray.data.Dataset.materialize>` API.
4 changes: 2 additions & 2 deletions doc/source/data/dataset-internals.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ The Datasets execution by default is:

- **Lazy**: This means that transformations on Dataset are not executed until a
consumption operation (e.g. :meth:`ds.iter_batches() <ray.data.Dataset.iter_batches>`)
or :meth:`Dataset.cache() <ray.data.Dataset.cache>` is called. This creates
or :meth:`Dataset.materialize() <ray.data.Dataset.materialize>` is called. This creates
opportunities for optimizing the execution plan (e.g. :ref:`stage fusion <datasets_stage_fusion>`).
- **Pipelined**: This means that Dataset transformations will be executed in a
streaming way, incrementally on the base data, instead of on all of the data
Expand All @@ -88,7 +88,7 @@ to stage fusion optimizations and aggressive garbage collection of intermediate
Dataset creation and transformation APIs are lazy, with execution only triggered via "sink"
APIs, such as consuming (:meth:`ds.iter_batches() <ray.data.Dataset.iter_batches>`),
writing (:meth:`ds.write_parquet() <ray.data.Dataset.write_parquet>`), or manually triggering via
:meth:`ds.cache() <ray.data.Dataset.cache>`. There are a few
:meth:`ds.materialize() <ray.data.Dataset.materialize>`. There are a few
exceptions to this rule, where transformations such as :meth:`ds.union()
<ray.data.Dataset.union>` and
:meth:`ds.limit() <ray.data.Dataset.limit>` trigger execution; we plan to make these
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/doc_code/creating_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@
"example://iris.parquet",
columns=["sepal.length", "variety"],
filter=pa.dataset.field("sepal.length") > 5.0,
).cache() # Force a full read of the file.
).materialize() # Force a full read of the file.
# -> Dataset(num_blocks=1, num_rows=118, schema={sepal.length: double, variety: string})

ds.show(2)
Expand Down
12 changes: 6 additions & 6 deletions doc/source/data/doc_code/tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def single_col_udf(batch: pd.DataFrame) -> pd.DataFrame:


ds.map_batches(single_col_udf)
ds.cache()
ds.materialize()
# -> Dataset(num_blocks=17, num_rows=1000,
# schema={__value__: TensorDtype(shape=(128, 128, 3), dtype=int64)})
# __create_pandas_end__
Expand All @@ -74,7 +74,7 @@ def multi_col_udf(batch: pd.DataFrame) -> pd.DataFrame:


ds.map_batches(multi_col_udf)
ds.cache()
ds.materialize()
# -> Dataset(num_blocks=17, num_rows=1000,
# schema={image: TensorDtype(shape=(128, 128, 3), dtype=int64),
# embed: TensorDtype(shape=(256,), dtype=uint8)})
Expand Down Expand Up @@ -156,7 +156,7 @@ def multi_col_udf(batch: pd.DataFrame) -> pd.DataFrame:
# two: extension<arrow.py_extension_type<ArrowTensorType>>
# __create_parquet_2_end__

ds.cache()
ds.materialize()
shutil.rmtree(path)

# __create_parquet_3_begin__
Expand Down Expand Up @@ -193,7 +193,7 @@ def cast_udf(block: pa.Table) -> pa.Table:
# -> one: int64
# two: extension<arrow.py_extension_type<ArrowTensorType>>
# __create_parquet_3_end__
ds.cache()
ds.materialize()

# __create_images_begin__
ds = ray.data.read_images("example://image-datasets/simple")
Expand Down Expand Up @@ -449,7 +449,7 @@ def add_one(batch: Dict[str, Any]) -> Dict[str, Any]:
# __consume_numpy_2_end__


ds.cache()
ds.materialize()
shutil.rmtree("/tmp/some_path")

# __write_1_begin__
Expand All @@ -468,7 +468,7 @@ def add_one(batch: Dict[str, Any]) -> Dict[str, Any]:
# label: string
# __write_1_end__

read_ds.cache()
read_ds.materialize()
shutil.rmtree("/tmp/some_path")

# __write_2_begin__
Expand Down
4 changes: 2 additions & 2 deletions doc/source/data/examples/advanced-pipelines.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ This page covers more advanced examples for dataset pipelines.
Pre-repeat vs post-repeat transforms
====================================

Transformations prior to the call to ``.repeat()`` will be cached. However, note that the initial read will not be cached unless there is a subsequent transformation or ``.cache()`` call. Transformations made to the DatasetPipeline after the repeat will always be executed once for each repetition of the Dataset.
Transformations prior to the call to ``.repeat()`` will be cached. However, note that the initial read will not be cached unless there is a subsequent transformation or ``.materialize()`` call. Transformations made to the DatasetPipeline after the repeat will always be executed once for each repetition of the Dataset.

For example, in the following pipeline, the ``map(func)`` transformation only occurs once. However, the random shuffle is applied to each repetition in the pipeline. However, if we omitted the map transformation, then the pipeline would re-read from the base data on each repetition.

Expand Down Expand Up @@ -50,7 +50,7 @@ For example, in the following pipeline, the ``map(func)`` transformation only oc

.. important::

Result caching only applies if there are *transformation* stages prior to the pipelining operation. If you ``repeat()`` or ``window()`` a Dataset right after the read call (e.g., ``ray.data.read_parquet(...).repeat()``), then the read will still be re-executed on each repetition. This optimization saves memory, at the cost of repeated reads from the datasource. To force result caching in all cases, use ``.cache().repeat()``.
Result caching only applies if there are *transformation* stages prior to the pipelining operation. If you ``repeat()`` or ``window()`` a Dataset right after the read call (e.g., ``ray.data.read_parquet(...).repeat()``), then the read will still be re-executed on each repetition. This optimization saves memory, at the cost of repeated reads from the datasource. To force result caching in all cases, use ``.materialize().repeat()``.

Changing Pipeline Structure
===========================
Expand Down
4 changes: 2 additions & 2 deletions doc/source/data/examples/nyc_taxi_basic_processing.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@
}
],
"source": [
"ds.cache().size_bytes()"
"ds.materialize().size_bytes()"
]
},
{
Expand Down Expand Up @@ -654,7 +654,7 @@
")\n",
"\n",
"# Force full execution of both of the file reads.\n",
"pushdown_ds = pushdown_ds.cache()\n",
"pushdown_ds = pushdown_ds.materialize()\n",
"pushdown_ds"
]
},
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/key-concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Execution mode
==============

Most transformations are lazy. They don't execute until you consume a dataset or call
:meth:`Dataset.cache() <ray.data.Dataset.cache>`.
:meth:`Dataset.materialize() <ray.data.Dataset.materialize>`.

The transformations are executed in a streaming way, incrementally on the data and
with operators processed in parallel, see :ref:`Streaming Execution <datasets_streaming_execution>`.
Expand Down
4 changes: 2 additions & 2 deletions doc/source/data/transforming-datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ aggregation has been computed.
for x in range(10)])

# Group by the A column and calculate the per-group mean for B and C columns.
agg_ds: ray.data.Dataset = ds.groupby("A").mean(["B", "C"]).cache()
agg_ds: ray.data.Dataset = ds.groupby("A").mean(["B", "C"]).materialize()
# -> Sort Sample: 100%|███████████████████████████████████████| 10/10 [00:01<00:00, 9.04it/s]
# -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 23.66it/s]
# -> GroupBy Reduce: 100%|████████████████████████████████████| 10/10 [00:00<00:00, 937.21it/s]
Expand Down Expand Up @@ -542,7 +542,7 @@ with calculated column means.
return df

ds = ds.map_batches(batch_standard_scaler, batch_format="pandas")
ds.cache()
ds.materialize()
# -> Map Progress: 100%|██████████████████████████████████████| 10/10 [00:00<00:00, 144.79it/s]
# -> Dataset(num_blocks=10, num_rows=10, schema={A: int64, B: double, C: double})

Expand Down
2 changes: 1 addition & 1 deletion doc/source/ray-air/check-ingest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ Performance Tips
Dataset Sharing
~~~~~~~~~~~~~~~

When you pass Datasets to a Tuner, Datasets are executed independently per-trial. This could potentially duplicate data reads in the cluster. To share Dataset blocks between trials, call ``ds = ds.cache()`` prior to passing the Dataset to the Tuner. This ensures that the initial read operation will not be repeated per trial.
When you pass Datasets to a Tuner, Datasets are executed independently per-trial. This could potentially duplicate data reads in the cluster. To share Dataset blocks between trials, call ``ds = ds.materialize()`` prior to passing the Dataset to the Tuner. This ensures that the initial read operation will not be repeated per trial.


FAQ
Expand Down
4 changes: 2 additions & 2 deletions doc/source/ray-air/examples/torch_image_example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@
" return {\"image\": images, \"label\": labels}\n",
"\n",
"\n",
"train_dataset = train_dataset.map_batches(convert_batch_to_numpy).cache()\n",
"test_dataset = test_dataset.map_batches(convert_batch_to_numpy).cache()"
"train_dataset = train_dataset.map_batches(convert_batch_to_numpy).materialize()\n",
"test_dataset = test_dataset.map_batches(convert_batch_to_numpy).materialize()"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@
"\n",
" return {\"image\": images, \"label\": labels}\n",
"\n",
" mnist_dataset = mnist_dataset.map_batches(convert_batch_to_numpy).cache()\n",
" mnist_dataset = mnist_dataset.map_batches(convert_batch_to_numpy).materialize()\n",
" return mnist_dataset"
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@
"metadata": {},
"outputs": [],
"source": [
"preds = predictions.cache()\n",
"preds = predictions.materialize()\n",
"preds.schema()\n"
]
},
Expand Down
4 changes: 2 additions & 2 deletions python/ray/air/util/check_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ def preprocess_datasets(self):
print("Starting dataset preprocessing")
super().preprocess_datasets()
if self.time_preprocessing_separately:
for dataset_name, ds in self.datasets.items():
for dataset_name, ds in list(self.datasets.items()):
start = time.perf_counter()
# Force execution to time preprocessing since Datasets are lazy by
# default.
ds.cache()
self.datasets[dataset_name] = ds.materialize()
print(
f"Preprocessed {dataset_name} in",
time.perf_counter() - start,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def pipeline_stage(fn: Callable[[], Dataset[T]]) -> Dataset[T]:
# Force eager evaluation of all blocks in the pipeline stage. This
# prevents resource deadlocks due to overlapping stage execution (e.g.,
# task -> actor stage).
return fn().cache()
return fn().materialize()


class PipelineExecutor:
Expand Down
6 changes: 3 additions & 3 deletions python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,9 +585,9 @@ def execute(
preserve_order=preserve_order,
)
# TODO(ekl) we shouldn't need to set this in the future once we move
# to a fully lazy execution model, unless .cache() is used. The reason
# we need it right now is since the user may iterate over a Dataset
# multiple times after fully executing it once.
# to a fully lazy execution model, unless .materialize() is used. Th
# reason we need it right now is since the user may iterate over a
# Dataset multiple times after fully executing it once.
if not self._run_by_consumer:
blocks._owned_by_consumer = False
stats = executor.get_stats()
Expand Down
36 changes: 15 additions & 21 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2906,7 +2906,7 @@ def write_fn_wrapper(blocks: Iterator[Block], ctx, fn) -> Iterator[Block]:
try:
self._write_ds = Dataset(
plan, self._epoch, self._lazy, logical_plan
).cache()
).materialize()
blocks = ray.get(self._write_ds._plan.execute().get_blocks())
assert all(
isinstance(block, list) and len(block) == 1 for block in blocks
Expand Down Expand Up @@ -4061,37 +4061,31 @@ def __iter__(self):
)
return pipe

@Deprecated(message="Use `Dataset.cache()` instead.")
@Deprecated(message="Use `Dataset.materialize()` instead.")
def fully_executed(self) -> "Dataset[T]":
logger.warning(
"The 'fully_executed' call has been renamed to 'cache'.",
"Deprecation warning: use Dataset.materialize() instead of "
"fully_executed()."
)
return self.cache()
self._plan.execute(force_read=True)
return self

@Deprecated(message="Use `Dataset.is_cached()` instead.")
# Note: will be deprecated in 2.5.
def is_fully_executed(self) -> bool:
logger.warning(
"The 'is_fully_executed' call has been renamed to 'is_cached'.",
)
return self.is_cached()

def is_cached(self) -> bool:
"""Returns whether this Dataset has been cached in memory.

This will return False if the output of its final stage hasn't been computed
yet.
"""
return self._plan.has_computed_output()

@ConsumptionAPI(pattern="store memory.", insert_after=True)
def cache(self) -> "Dataset[T]":
"""Evaluate and cache the blocks of this Dataset in object store memory.
def materialize(self) -> "Dataset[T]":
"""Execute and materialize this dataset into object store memory.

This can be used to read all blocks into memory. By default, Datasets
This can be used to read all blocks into memory. By default, Dataset
doesn't read blocks from the datasource until the first transform.

Note that this does not mutate the original Dataset. Only the blocks of the
returned Dataset class are pinned in memory.

Returns:
A Dataset with all blocks fully materialized in memory.
A Dataset holding the materialized data blocks.
"""
self._plan.execute(force_read=True)
return self
Expand Down Expand Up @@ -4138,7 +4132,7 @@ def lazy(self) -> "Dataset[T]":
The returned dataset is a lazy dataset, where all subsequent operations on the
dataset won't be executed until the dataset is consumed (e.g. ``.take()``,
``.iter_batches()``, ``.to_torch()``, ``.to_tf()``, etc.) or execution is
manually triggered via ``.cache()``.
manually triggered via ``.materialize()``.
"""
ds = Dataset(
self._plan, self._epoch, lazy=True, logical_plan=self._logical_plan
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/preprocessors/test_concatenator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_raise_if_missing(self):
)

with pytest.raises(ValueError, match="'b'"):
prep.transform(ds).cache()
prep.transform(ds).materialize()

@pytest.mark.parametrize("exclude", ("b", ["b"]))
def test_exclude(self, exclude):
Expand Down
8 changes: 4 additions & 4 deletions python/ray/data/tests/preprocessors/test_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def test_ordinal_encoder():

# Verify transform fails for null values.
with pytest.raises(ValueError):
null_encoder.transform(null_ds).cache()
null_encoder.transform(null_ds).materialize()
null_encoder.transform(nonnull_ds)

# Verify transform_batch fails for null values.
Expand Down Expand Up @@ -299,7 +299,7 @@ def test_one_hot_encoder():

# Verify transform fails for null values.
with pytest.raises(ValueError):
null_encoder.transform(null_ds).cache()
null_encoder.transform(null_ds).materialize()
null_encoder.transform(nonnull_ds)

# Verify transform_batch fails for null values.
Expand Down Expand Up @@ -408,7 +408,7 @@ def test_multi_hot_encoder():

# Verify transform fails for null values.
with pytest.raises(ValueError):
null_encoder.transform(null_ds).cache()
null_encoder.transform(null_ds).materialize()
null_encoder.transform(nonnull_ds)

# Verify transform_batch fails for null values.
Expand Down Expand Up @@ -511,7 +511,7 @@ def test_label_encoder():

# Verify transform fails for null values.
with pytest.raises(ValueError):
null_encoder.transform(null_ds).cache()
null_encoder.transform(null_ds).materialize()
null_encoder.transform(nonnull_ds)

# Verify transform_batch fails for null values.
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/preprocessors/test_torch.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_invalid_transform_raises_value_error(self):
preprocessor = TorchVisionPreprocessor(columns=["image"], transform=transform)

with pytest.raises(ValueError):
preprocessor.transform(dataset).cache()
preprocessor.transform(dataset).materialize()


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/test_bulk_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def reverse_sort(inputs: List[RefBundle], ctx):

def test_basic_stats(ray_start_10_cpus_shared):
executor = BulkExecutor(ExecutionOptions())
prev_stats = ray.data.range(10).cache()._plan.stats()
prev_stats = ray.data.range(10).materialize()._plan.stats()
inputs = make_ref_bundles([[x] for x in range(20)])
o1 = InputDataBuffer(inputs)
o2 = MapOperator.create(
Expand Down
Loading