diff --git a/doc/source/ray-air/doc_code/air_ingest.py b/doc/source/ray-air/doc_code/air_ingest.py index f8ec5442d44b7..6f2e29d6a58ac 100644 --- a/doc/source/ray-air/doc_code/air_ingest.py +++ b/doc/source/ray-air/doc_code/air_ingest.py @@ -27,7 +27,7 @@ datasets={"train": dataset}, preprocessor=preprocessor, num_epochs=1, # Stop after this number of epochs is read. - prefetch_blocks=1, # Number of blocks to prefetch when reading data. + prefetch_batches=1, # Number of batches to prefetch when reading data. batch_size=None, # Use whole blocks as batches. ) trainer.fit() diff --git a/python/ray/air/util/check_ingest.py b/python/ray/air/util/check_ingest.py index c91ac0330d909..feae9960e43e4 100755 --- a/python/ray/air/util/check_ingest.py +++ b/python/ray/air/util/check_ingest.py @@ -29,7 +29,7 @@ class DummyTrainer(DataParallelTrainer): scaling_config: Configuration for how to scale training. This is the same as for :class:`~ray.train.base_trainer.BaseTrainer`. num_epochs: How many many times to iterate through the datasets for. - prefetch_blocks: The number of blocks to prefetch ahead of the + prefetch_batches: The number of batches to prefetch ahead of the current block during the scan. This is the same as :meth:`~ray.data.dataset.Dataset.iter_batches` time_preprocessing_separately: Whether to time the preprocessing separately @@ -44,16 +44,18 @@ def __init__( *args, scaling_config: Optional[ScalingConfig] = None, num_epochs: int = 1, - prefetch_blocks: int = 1, + prefetch_batches: int = 1, batch_size: Optional[int] = 4096, time_preprocessing_separately: bool = False, + # Deprecated. + prefetch_blocks: int = 0, **kwargs, ): if not scaling_config: scaling_config = ScalingConfig(num_workers=1) super().__init__( train_loop_per_worker=DummyTrainer.make_train_loop( - num_epochs, prefetch_blocks, batch_size + num_epochs, prefetch_batches, prefetch_blocks, batch_size ), *args, scaling_config=scaling_config, @@ -81,7 +83,10 @@ def preprocess_datasets(self): @staticmethod def make_train_loop( - num_epochs: int, prefetch_blocks: int, batch_size: Optional[int] + num_epochs: int, + prefetch_batches: int, + prefetch_blocks: int, + batch_size: Optional[int], ): """Make a debug train loop that runs for the given amount of epochs.""" @@ -99,7 +104,9 @@ def train_loop_per_worker(): epochs_read += 1 batch_start = time.perf_counter() for batch in data_shard.iter_batches( - prefetch_blocks=prefetch_blocks, batch_size=batch_size + prefetch_batches=prefetch_batches, + prefetch_blocks=prefetch_blocks, + batch_size=batch_size, ): batch_delay = time.perf_counter() - batch_start batch_delays.append(batch_delay) @@ -189,11 +196,11 @@ def make_local_dataset_iterator( "--num-epochs", "-e", type=int, default=1, help="Number of epochs to read." ) parser.add_argument( - "--prefetch-blocks", + "--prefetch-batches", "-b", type=int, default=1, - help="Number of blocks to prefetch when reading data.", + help="Number of batches to prefetch when reading data.", ) args = parser.parse_args() @@ -215,7 +222,7 @@ def make_local_dataset_iterator( datasets={"train": dataset}, preprocessor=preprocessor, num_epochs=args.num_epochs, - prefetch_blocks=args.prefetch_blocks, + prefetch_batches=args.prefetch_batches, dataset_config={"train": DatasetConfig()}, batch_size=None, ) diff --git a/python/ray/data/_internal/block_batching/block_batching.py b/python/ray/data/_internal/block_batching/block_batching.py index 8a4fdce0a85ed..414bc012e8d42 100644 --- a/python/ray/data/_internal/block_batching/block_batching.py +++ b/python/ray/data/_internal/block_batching/block_batching.py @@ -11,10 +11,10 @@ format_batches, collate, extract_data_from_batch, - make_async_gen, WaitBlockPrefetcher, ActorBlockPrefetcher, ) +from ray.data._internal.memory_tracing import trace_deallocation from ray.data._internal.stats import DatasetPipelineStats, DatasetStats from ray.data.block import Block, DataBatch from ray.data.context import DatasetContext @@ -45,7 +45,6 @@ def batch_block_refs( shuffle_buffer_min_size: Optional[int] = None, shuffle_seed: Optional[int] = None, ensure_copy: bool = False, - prefetch_batches: int = 0, ) -> Iterator[DataBatch]: """Create formatted batches of data from 1 or more block object references. @@ -79,17 +78,12 @@ def batch_block_refs( shuffle_seed: The seed to use for the local random shuffle. ensure_copy: Whether batches are always copied from the underlying base blocks (not zero-copy views). - prefetch_batches: The number of batches to fetch ahead of the current batch to - process. If set to greater than 0, a separate thread will be used to fetch - the specified amount of formatted batches from blocks. This improves - performance for non-CPU bound UDFs, allowing batch fetching compute and - formatting to be overlapped with the UDF. Defaults to 0 (no prefetching - enabled). Returns: An iterator over record batches. """ - + if stats: + stats._legacy_iter_batches = True context = DatasetContext.get_current() if ( @@ -107,11 +101,10 @@ def batch_block_refs( _prefetch_blocks( block_ref_iter=block_refs, prefetcher=prefetcher, - stats=stats, num_blocks_to_prefetch=prefetch_blocks, + eager_free=eager_free, ), stats=stats, - eager_free=eager_free, ) yield from batch_blocks( @@ -124,7 +117,6 @@ def batch_block_refs( shuffle_buffer_min_size=shuffle_buffer_min_size, shuffle_seed=shuffle_seed, ensure_copy=ensure_copy, - prefetch_batches=prefetch_batches, ) @@ -139,7 +131,6 @@ def batch_blocks( shuffle_buffer_min_size: Optional[int] = None, shuffle_seed: Optional[int] = None, ensure_copy: bool = False, - prefetch_batches: int = 0, ) -> Iterator[DataBatch]: """Create formatted batches of data from 1 or more blocks. @@ -164,17 +155,12 @@ def _iterator_fn(base_iterator: Iterator[Block]) -> Iterator[DataBatch]: ) if collate_fn is not None: - batch_iter = collate(batch_iter, collate_fn=collate_fn) + batch_iter = collate(batch_iter, collate_fn=collate_fn, stats=stats) batch_iter = extract_data_from_batch(batch_iter) yield from batch_iter - if prefetch_batches > 0: - batch_iter = make_async_gen( - blocks, fn=_iterator_fn, num_workers=prefetch_batches - ) - else: - batch_iter = _iterator_fn(blocks) + batch_iter = _iterator_fn(blocks) for formatted_batch in batch_iter: user_timer = stats.iter_user_s.timer() if stats else nullcontext() @@ -186,6 +172,7 @@ def _prefetch_blocks( block_ref_iter: Iterator[ObjectRef[Block]], prefetcher: BlockPrefetcher, num_blocks_to_prefetch: int, + eager_free: bool = False, stats: Optional[Union[DatasetStats, DatasetPipelineStats]] = None, ) -> Iterator[ObjectRef[Block]]: """Given an iterable of Block Object References, returns an iterator @@ -201,6 +188,9 @@ def _prefetch_blocks( if num_blocks_to_prefetch == 0: for block_ref in block_ref_iter: yield block_ref + trace_deallocation( + block_ref, "block_batching._prefetch_blocks", free=eager_free + ) window_size = num_blocks_to_prefetch # Create the initial set of blocks to prefetch. @@ -219,3 +209,6 @@ def _prefetch_blocks( except StopIteration: pass yield block_ref + trace_deallocation( + block_ref, "block_batching._prefetch_blocks", free=eager_free + ) diff --git a/python/ray/data/_internal/block_batching/iter_batches.py b/python/ray/data/_internal/block_batching/iter_batches.py index 24f8ba816e79e..01de86c0934d5 100644 --- a/python/ray/data/_internal/block_batching/iter_batches.py +++ b/python/ray/data/_internal/block_batching/iter_batches.py @@ -1,40 +1,233 @@ import collections -from typing import Dict, Iterator, Optional, Tuple +import sys +from typing import Any, Callable, Dict, Iterator, Optional, Tuple +import ray from ray.types import ObjectRef -from ray.data.block import Block, BlockMetadata +from ray.data.block import Block, BlockMetadata, DataBatch from ray.data._internal.block_batching.interfaces import ( Batch, BlockPrefetcher, ) +from ray.data._internal.block_batching.util import ( + ActorBlockPrefetcher, + WaitBlockPrefetcher, + resolve_block_refs, + blocks_to_batches, + format_batches, + collate, + extract_data_from_batch, + make_async_gen, +) +from ray.data._internal.memory_tracing import trace_deallocation +from ray.data._internal.stats import DatasetStats +from ray.data.context import DatasetContext + +if sys.version_info >= (3, 7): + from contextlib import nullcontext +else: + from contextlib import contextmanager + + @contextmanager + def nullcontext(enter_result=None): + yield enter_result + + +def iter_batches( + block_refs: Iterator[Tuple[ObjectRef[Block], BlockMetadata]], + *, + stats: Optional[DatasetStats] = None, + clear_block_after_read: bool = False, + batch_size: Optional[int] = None, + batch_format: Optional[str] = "default", + drop_last: bool = False, + collate_fn: Optional[Callable[[DataBatch], Any]] = None, + shuffle_buffer_min_size: Optional[int] = None, + shuffle_seed: Optional[int] = None, + ensure_copy: bool = False, + prefetch_batches: int = 1, +) -> Iterator[DataBatch]: + """Create formatted batches of data from an iterator of block object references and + corresponding metadata. + + This takes a block iterator and creates batch_size batches, slicing, + unioning, shuffling, prefetching, and formatting blocks as needed. + + + The algorithm uses both pipeline parallelism and data parallelism: + + If prefetch_batches=2, these are all the batches in flight: + + [User thread] trains on Batch 0 + - [Fetch thread] Batch 1 in output queue + - [Worker thread 1] Batch 2 formatting + collating + - [Worker thread 2] Batch 3 formatting + collating + - [Raylet] Batches 4 + 5 fetched to local object store memory + + At any point in time there are prefetch_batches+1 batches in local heap memory. + And the next set of prefetch_batches in local object store memory. + + The actual steps are as follows: + + In a single async thread, do the following: + 1. Trigger Ray local prefetching of `prefetch_batches` worth of block object + references. + 2. Resolve (i.e. call `ray.get()`) on the block references. + 3. Perform the necessary batch slicing to construct full batches, possibly + shuffling if necessary. + 4. Then, in a threadpool consisting of `prefetch_batches` threads: + a. Format the batches to the provided batch format. + b. Apply the collate function. + 5. Fetch outputs from the threadpool, maintaining order of the batches. + + Args: + block_refs: An iterator over block object references and their corresponding + metadata. + stats: DatasetStats object to record timing and other statistics. + clear_block_after_read: Whether to clear the block from object store + manually (i.e. without waiting for Python's automatic GC) after it + is read. Doing so will reclaim memory faster and hence reduce the + memory footprint. However, the caller has to ensure the safety, i.e. + the block will never be accessed again. + batch_size: Record batch size, or None to let the system pick. + batch_format: The format in which to return each batch. + Specify "default" to use the current block format (promoting + Arrow to pandas automatically), "pandas" to + select ``pandas.DataFrame`` or "pyarrow" to select + ``pyarrow.Table``, or None to use entire blocks + as batches. Default is "default". + drop_last: Whether to drop the last batch if it's incomplete. + collate_fn: A function to apply to each data batch before returning it. + shuffle_buffer_min_size: If non-None, the data will be randomly shuffled using a + local in-memory shuffle buffer, and this value will serve as the minimum + number of rows that must be in the local in-memory shuffle buffer in order + to yield a batch. + shuffle_seed: The seed to use for the local random shuffle. + ensure_copy: Whether batches are always copied from the underlying base + blocks (not zero-copy views). + prefetch_batches: The number of batches to fetch ahead of the current batch to + process. If set to greater than 0, a separate thread will be used to fetch + the specified amount of formatted batches from blocks. This improves + performance for non-CPU bound UDFs, allowing batch fetching compute and + formatting to be overlapped with the UDF. Defaults to 0 (no prefetching + enabled). + + Returns: + An iterator over record batches. + """ + context = DatasetContext.get_current() + + if ( + prefetch_batches > 0 + and context.actor_prefetcher_enabled + and not ray.util.client.ray.is_connected() + ): + prefetcher = ActorBlockPrefetcher() + else: + prefetcher = WaitBlockPrefetcher() + + eager_free = clear_block_after_read and DatasetContext.get_current().eager_free + + def _async_iter_batches( + block_refs: Iterator[Tuple[ObjectRef[Block], BlockMetadata]], + ) -> Iterator[DataBatch]: + + # Step 1: Prefetch logical batches locally. + block_refs = prefetch_batches_locally( + block_ref_iter=block_refs, + prefetcher=prefetcher, + num_batches_to_prefetch=prefetch_batches, + batch_size=batch_size, + eager_free=eager_free, + ) + + # Step 2: Resolve the blocks. + block_iter = resolve_block_refs(block_ref_iter=block_refs, stats=stats) + + # Step 3: Batch and shuffle the resolved blocks. + batch_iter = blocks_to_batches( + block_iter=block_iter, + stats=stats, + batch_size=batch_size, + drop_last=drop_last, + shuffle_buffer_min_size=shuffle_buffer_min_size, + shuffle_seed=shuffle_seed, + ensure_copy=ensure_copy, + ) + + # Step 4: Use a threadpool for formatting and collation. + batch_iter = _format_in_threadpool( + batch_iter, + stats=stats, + batch_format=batch_format, + collate_fn=collate_fn, + num_threadpool_workers=prefetch_batches, + ) + + # Step 5: Restore original order. + batch_iter: Iterator[Batch] = restore_original_order(batch_iter) -""" -The algorithm uses both pipeline parallelism and data parallelism: + yield from extract_data_from_batch(batch_iter) -If prefetch_batches=2, these are all the batches in flight: + # Run everything in a separate thread to not block the main thread when waiting + # for streaming results. + async_batch_iter = make_async_gen(block_refs, fn=_async_iter_batches, num_workers=1) + + while True: + with stats.iter_total_blocked_s.timer() if stats else nullcontext(): + try: + next_batch = next(async_batch_iter) + except StopIteration: + break + with stats.iter_user_s.timer() if stats else nullcontext(): + yield next_batch -[User thread] trains on Batch 0 - - [Fetch thread] Batch 1 in output queue - - [Worker thread 1] Batch 2 formatting + collating - - [Worker thread 2] Batch 3 formatting + collating - - [Raylet] Batches 4 + 5 fetched to local object store memory -At any point in time there are prefetch_batches+1 batches in local heap memory. -And the next set of prefetch_batches in local object store memory. +def _format_in_threadpool( + batch_iter: Iterator[Batch], + stats: DatasetStats, + batch_format: Optional[str], + collate_fn: Optional[Callable[[DataBatch], Any]], + num_threadpool_workers: int, +) -> Iterator[Batch]: + """Executes the batching, formatting, and collation logic in a threadpool. -The actual steps are as follows: + Args: + logical_batch_iterator: An iterator over logical batches. + stats: DatasetStats object to record timing and other statistics. + batch_format: The format in which to return each batch. + Specify "default" to use the current block format (promoting + Arrow to pandas automatically), "pandas" to + select ``pandas.DataFrame`` or "pyarrow" to select + ``pyarrow.Table``, or None to use entire blocks + as batches. + collate_fn: A function to apply to each data batch before returning it. + num_threadpool_workers: The number of threads to use in the threadpool. + """ -In a single async thread, do the following: - 1. Trigger Ray local prefetching of `prefetch_batches` worth of block object - references. - 2. Resolve (i.e. call `ray.get()`) on the block references. - 3. Perform the necessary batch slicing to construct full batches, possibly - shuffling if necessary. - 4. Then, in a threadpool consisting of `prefetch_batches` threads: - a. Format the batches to the provided batch format. - b. Apply the collate function. - 5. Fetch outputs from the threadpool, maintaining order of the batches. -""" + def threadpool_computations( + batch_iter: Iterator[Batch], + ) -> Iterator[Batch]: + # Step 4a: Format the batches. + formatted_batch_iter = format_batches( + batch_iter, batch_format=batch_format, stats=stats + ) + + # Step 4b: Apply the collate function if applicable. + if collate_fn is not None: + formatted_batch_iter = collate( + formatted_batch_iter, collate_fn=collate_fn, stats=stats + ) + yield from formatted_batch_iter + + if num_threadpool_workers > 0: + return make_async_gen( + base_iterator=batch_iter, + fn=threadpool_computations, + num_workers=num_threadpool_workers, + ) + else: + return threadpool_computations(batch_iter) def prefetch_batches_locally( @@ -42,6 +235,7 @@ def prefetch_batches_locally( prefetcher: BlockPrefetcher, num_batches_to_prefetch: int, batch_size: Optional[int], + eager_free: bool = False, ) -> Iterator[ObjectRef[Block]]: """Given an iterator of batched block references, returns an iterator over the same block references while prefetching `num_batches_to_prefetch` batches in advance. @@ -52,11 +246,17 @@ def prefetch_batches_locally( num_batches_to_prefetch: The number of batches to prefetch ahead of the current batch during the scan. batch_size: User specified batch size, or None to let the system pick. + eager_free: Whether to eagerly free the object reference from the object store. """ sliding_window = collections.deque() current_window_size = 0 + if num_batches_to_prefetch <= 0: + for block_ref, metadata in block_ref_iter: + yield block_ref + return + if batch_size is not None: num_rows_to_prefetch = num_batches_to_prefetch * batch_size else: @@ -90,6 +290,7 @@ def prefetch_batches_locally( except StopIteration: pass yield block_ref + trace_deallocation(block_ref, loc="iter_batches", free=eager_free) def restore_original_order(batch_iter: Iterator[Batch]) -> Iterator[Batch]: diff --git a/python/ray/data/_internal/block_batching/util.py b/python/ray/data/_internal/block_batching/util.py index a7eb7e99c5126..7b6782100023f 100644 --- a/python/ray/data/_internal/block_batching/util.py +++ b/python/ray/data/_internal/block_batching/util.py @@ -5,8 +5,8 @@ from typing import Any, Callable, Iterator, List, Optional, Tuple, TypeVar, Union import ray -from ray.actor import ActorHandle from ray.types import ObjectRef +from ray.actor import ActorHandle from ray.data.block import Block, BlockAccessor, DataBatch from ray.data._internal.batcher import Batcher, ShufflingBatcher from ray.data._internal.block_batching.interfaces import ( @@ -14,7 +14,6 @@ CollatedBatch, BlockPrefetcher, ) -from ray.data._internal.memory_tracing import trace_deallocation from ray.data._internal.stats import DatasetPipelineStats, DatasetStats from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy @@ -49,14 +48,12 @@ def _calculate_ref_hits(refs: List[ObjectRef[Any]]) -> Tuple[int, int, int]: def resolve_block_refs( block_ref_iter: Iterator[ObjectRef[Block]], - eager_free: bool = False, stats: Optional[Union[DatasetStats, DatasetPipelineStats]] = None, ) -> Iterator[Block]: """Resolves the block references for each logical batch. Args: block_ref_iter: An iterator over block object references. - eager_free: Whether to eagerly free the object reference from the object store. stats: An optional stats object to recording block hits and misses. """ hits = 0 @@ -73,7 +70,6 @@ def resolve_block_refs( # `ray.get()` call. with stats.iter_get_s.timer() if stats else nullcontext(): block = ray.get(block_ref) - trace_deallocation(block_ref, loc="iter_batches", free=eager_free) yield block if stats: @@ -179,6 +175,7 @@ def format_batches( def collate( batch_iter: Iterator[Batch], collate_fn: Optional[Callable[[DataBatch], Any]], + stats: Optional[DatasetStats] = None, ) -> Iterator[CollatedBatch]: """Returns an iterator with the provided collate_fn applied to items of the batch iterator. @@ -187,7 +184,8 @@ def collate( batch_iter: An iterator over formatted batches. """ for batch in batch_iter: - collated_batch = collate_fn(batch.data) + with stats.iter_collate_batch_s.timer() if stats else nullcontext(): + collated_batch = collate_fn(batch.data) yield CollatedBatch(batch.batch_idx, collated_batch) @@ -210,12 +208,15 @@ def make_async_gen( Args: base_iterator: The iterator to asynchronously fetch from. fn: The function to run on the input iterator. - num_workers: The number of threads to use in the threadpool. + num_workers: The number of threads to use in the threadpool. Defaults to 1. Returns: An iterator with the same elements as outputted from `fn`. """ + if num_workers < 1: + raise ValueError("Size of threadpool must be at least 1.") + # Use a lock to fetch from the base_iterator in a thread-safe fashion. def convert_to_threadsafe_iterator(base_iterator: Iterator[T]) -> Iterator[T]: class ThreadSafeIterator: diff --git a/python/ray/data/_internal/dataset_iterator/__init__.py b/python/ray/data/_internal/dataset_iterator/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python/ray/data/_internal/dataset_iterator_impl.py b/python/ray/data/_internal/dataset_iterator/dataset_iterator_impl.py similarity index 59% rename from python/ray/data/_internal/dataset_iterator_impl.py rename to python/ray/data/_internal/dataset_iterator/dataset_iterator_impl.py index 082f45be9f123..ce888ea97a97b 100644 --- a/python/ray/data/_internal/dataset_iterator_impl.py +++ b/python/ray/data/_internal/dataset_iterator/dataset_iterator_impl.py @@ -1,11 +1,11 @@ -from typing import TYPE_CHECKING, Optional, Union, Iterator, Callable, Any -import time +from typing import TYPE_CHECKING, Optional, Union, Iterator, Tuple import warnings -from ray.data.block import DataBatch +from ray.types import ObjectRef +from ray.data.block import Block, BlockMetadata from ray.data.context import DatasetContext from ray.data.dataset_iterator import DatasetIterator -from ray.data._internal.block_batching import batch_block_refs +from ray.data._internal.stats import DatasetStats if TYPE_CHECKING: import pyarrow @@ -23,38 +23,15 @@ def __init__( def __repr__(self) -> str: return f"DatasetIterator({self._base_dataset})" - def iter_batches( + def _to_block_iterator( self, - *, - prefetch_blocks: int = 0, - batch_size: Optional[int] = 256, - batch_format: Optional[str] = "default", - drop_last: bool = False, - local_shuffle_buffer_size: Optional[int] = None, - local_shuffle_seed: Optional[int] = None, - _collate_fn: Optional[Callable[[DataBatch], Any]] = None, - ) -> Iterator[DataBatch]: - - DatasetContext._set_current(self._base_context) - + ) -> Tuple[ + Iterator[Tuple[ObjectRef[Block], BlockMetadata]], Optional[DatasetStats] + ]: ds = self._base_dataset block_iterator, stats, executor = ds._plan.execute_to_iterator() ds._current_executor = executor - time_start = time.perf_counter() - - yield from batch_block_refs( - block_iterator, - stats=stats, - prefetch_blocks=prefetch_blocks, - batch_size=batch_size, - batch_format=batch_format, - drop_last=drop_last, - collate_fn=_collate_fn, - shuffle_buffer_min_size=local_shuffle_buffer_size, - shuffle_seed=local_shuffle_seed, - ) - - stats.iter_total_s.add(time.perf_counter() - time_start) + return block_iterator, stats def stats(self) -> str: return self._base_dataset.stats() diff --git a/python/ray/data/_internal/pipelined_dataset_iterator.py b/python/ray/data/_internal/dataset_iterator/pipelined_dataset_iterator.py similarity index 74% rename from python/ray/data/_internal/pipelined_dataset_iterator.py rename to python/ray/data/_internal/dataset_iterator/pipelined_dataset_iterator.py index cdaf3f73e4d8a..8cbb579a48a34 100644 --- a/python/ray/data/_internal/pipelined_dataset_iterator.py +++ b/python/ray/data/_internal/dataset_iterator/pipelined_dataset_iterator.py @@ -1,8 +1,10 @@ -from typing import TYPE_CHECKING, Any, Callable, Optional, Union, Iterator +from typing import Any, TYPE_CHECKING, Callable, Optional, Union, Iterator, Tuple import warnings -from ray.data.block import DataBatch +from ray.types import ObjectRef +from ray.data.block import Block, BlockMetadata, DataBatch from ray.data.dataset_iterator import DatasetIterator +from ray.data._internal.stats import DatasetStats if TYPE_CHECKING: import pyarrow @@ -27,26 +29,42 @@ def _get_next_dataset(self) -> "DatasetPipeline": ds = next(self._epoch_iterator) return ds + def _to_block_iterator( + self, + ) -> Tuple[ + Iterator[Tuple[ObjectRef[Block], BlockMetadata]], Optional[DatasetStats] + ]: + epoch_pipeline = self._get_next_dataset() + + def block_iter(): + for ds in epoch_pipeline.iter_datasets(): + yield from ds._plan.execute().iter_blocks_with_metadata() + + return block_iter(), None + def iter_batches( self, *, - prefetch_blocks: int = 0, - batch_size: Optional[int] = 256, + prefetch_batches: int = 0, + batch_size: int = 256, batch_format: Optional[str] = "default", drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, _collate_fn: Optional[Callable[[DataBatch], Any]] = None, + # Deprecated. + prefetch_blocks: int = 0, ) -> Iterator[DataBatch]: - ds = self._get_next_dataset() - return ds.iter_batches( - prefetch_blocks=prefetch_blocks, + # Set prefetch_batches to default of 0 for DatasetPipeline. + return super().iter_batches( + prefetch_batches=prefetch_batches, batch_size=batch_size, batch_format=batch_format, drop_last=drop_last, local_shuffle_buffer_size=local_shuffle_buffer_size, local_shuffle_seed=local_shuffle_seed, _collate_fn=_collate_fn, + prefetch_blocks=prefetch_blocks, ) def stats(self) -> str: diff --git a/python/ray/data/_internal/stream_split_dataset_iterator.py b/python/ray/data/_internal/dataset_iterator/stream_split_dataset_iterator.py similarity index 86% rename from python/ray/data/_internal/stream_split_dataset_iterator.py rename to python/ray/data/_internal/dataset_iterator/stream_split_dataset_iterator.py index 14ee2d882d9e0..205439d361f83 100644 --- a/python/ray/data/_internal/stream_split_dataset_iterator.py +++ b/python/ray/data/_internal/dataset_iterator/stream_split_dataset_iterator.py @@ -7,8 +7,7 @@ Dict, Optional, Iterator, - Callable, - Any, + Tuple, Union, TYPE_CHECKING, ) @@ -16,15 +15,15 @@ import ray from ray.data.dataset_iterator import DatasetIterator -from ray.data.block import Block, DataBatch +from ray.data.block import Block, BlockMetadata from ray.data.context import DatasetContext from ray.data._internal.execution.streaming_executor import StreamingExecutor from ray.data._internal.execution.legacy_compat import ( execute_to_legacy_bundle_iterator, ) -from ray.data._internal.block_batching import batch_block_refs from ray.data._internal.execution.operators.output_splitter import OutputSplitter from ray.data._internal.execution.interfaces import NodeIdStr, RefBundle +from ray.data._internal.stats import DatasetStats from ray.types import ObjectRef from ray.util.debug import log_once from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy @@ -77,20 +76,12 @@ def __init__( self._coord_actor = coord_actor self._output_split_idx = output_split_idx - def iter_batches( + def _to_block_iterator( self, - *, - prefetch_blocks: int = 0, - batch_size: int = 256, - batch_format: Optional[str] = "default", - drop_last: bool = False, - local_shuffle_buffer_size: Optional[int] = None, - local_shuffle_seed: Optional[int] = None, - _collate_fn: Optional[Callable[[DataBatch], Any]] = None, - ) -> Iterator[DataBatch]: - """Implements DatasetIterator.""" - - def gen_blocks() -> Iterator[ObjectRef[Block]]: + ) -> Tuple[ + Iterator[Tuple[ObjectRef[Block], BlockMetadata]], Optional[DatasetStats] + ]: + def gen_blocks() -> Iterator[Tuple[ObjectRef[Block], BlockMetadata]]: cur_epoch = ray.get( self._coord_actor.start_epoch.remote(self._output_split_idx) ) @@ -98,7 +89,9 @@ def gen_blocks() -> Iterator[ObjectRef[Block]]: Optional[ObjectRef[Block]] ] = self._coord_actor.get.remote(cur_epoch, self._output_split_idx) while True: - block_ref: Optional[ObjectRef[Block]] = ray.get(future) + block_ref: Optional[Tuple[ObjectRef[Block], BlockMetadata]] = ray.get( + future + ) if not block_ref: break else: @@ -107,17 +100,7 @@ def gen_blocks() -> Iterator[ObjectRef[Block]]: ) yield block_ref - yield from batch_block_refs( - gen_blocks(), - stats=None, - prefetch_blocks=prefetch_blocks, - batch_size=batch_size, - batch_format=batch_format, - drop_last=drop_last, - collate_fn=_collate_fn, - shuffle_buffer_min_size=local_shuffle_buffer_size, - shuffle_seed=local_shuffle_seed, - ) + return gen_blocks(), None def stats(self) -> str: """Implements DatasetIterator.""" @@ -191,7 +174,9 @@ def start_epoch(self, split_idx: int) -> str: epoch_id = self._barrier(split_idx) return epoch_id - def get(self, epoch_id: int, output_split_idx: int) -> Optional[ObjectRef[Block]]: + def get( + self, epoch_id: int, output_split_idx: int + ) -> Optional[Tuple[ObjectRef[Block], BlockMetadata]]: """Blocking get operation. This is intended to be called concurrently from multiple clients. @@ -215,7 +200,7 @@ def get(self, epoch_id: int, output_split_idx: int) -> Optional[ObjectRef[Block] # This is a BLOCKING call, so do it outside the lock. next_bundle = self._output_iterator.get_next(output_split_idx) - block = next_bundle.blocks.pop()[0] + bundle = next_bundle.blocks.pop() # Accumulate any remaining blocks in next_bundle map as needed. with self._lock: @@ -223,7 +208,7 @@ def get(self, epoch_id: int, output_split_idx: int) -> Optional[ObjectRef[Block] if not next_bundle.blocks: del self._next_bundle[output_split_idx] - return block + return bundle except StopIteration: return None diff --git a/python/ray/data/_internal/execution/legacy_compat.py b/python/ray/data/_internal/execution/legacy_compat.py index 6fffec60d787c..b67d3a402cf91 100644 --- a/python/ray/data/_internal/execution/legacy_compat.py +++ b/python/ray/data/_internal/execution/legacy_compat.py @@ -41,14 +41,14 @@ def execute_to_legacy_block_iterator( plan: ExecutionPlan, allow_clear_input_blocks: bool, dataset_uuid: str, -) -> Iterator[ObjectRef[Block]]: - """Same as execute_to_legacy_bundle_iterator but returning blocks.""" +) -> Iterator[Tuple[ObjectRef[Block], BlockMetadata]]: + """Same as execute_to_legacy_bundle_iterator but returning blocks and metadata.""" bundle_iter = execute_to_legacy_bundle_iterator( executor, plan, allow_clear_input_blocks, dataset_uuid ) for bundle in bundle_iter: - for block, _ in bundle.blocks: - yield block + for block, metadata in bundle.blocks: + yield block, metadata def execute_to_legacy_bundle_iterator( diff --git a/python/ray/data/_internal/logical/operators/map_operator.py b/python/ray/data/_internal/logical/operators/map_operator.py index 44453f50f76ba..0f41501ab72c8 100644 --- a/python/ray/data/_internal/logical/operators/map_operator.py +++ b/python/ray/data/_internal/logical/operators/map_operator.py @@ -87,7 +87,6 @@ def __init__( fn: BatchUDF, batch_size: Optional[int] = DEFAULT_BATCH_SIZE, batch_format: Optional[str] = "default", - prefetch_batches: int = 0, zero_copy_batch: bool = False, fn_args: Optional[Iterable[Any]] = None, fn_kwargs: Optional[Dict[str, Any]] = None, @@ -111,7 +110,6 @@ def __init__( ) self._batch_size = batch_size self._batch_format = batch_format - self._prefetch_batches = prefetch_batches self._zero_copy_batch = zero_copy_batch diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 62dd255cc30cd..bb38b0c9589ff 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -16,6 +16,7 @@ ) import ray +from ray.data.block import BlockMetadata from ray.data._internal.util import capitalize from ray.types import ObjectRef from ray.data._internal.arrow_ops.transform_pyarrow import unify_schemas @@ -481,7 +482,11 @@ def execute_to_iterator( self, allow_clear_input_blocks: bool = True, force_read: bool = False, - ) -> Tuple[Iterator[ObjectRef[Block]], DatasetStats, Optional["Executor"]]: + ) -> Tuple[ + Iterator[Tuple[ObjectRef[Block], BlockMetadata]], + DatasetStats, + Optional["Executor"], + ]: """Execute this plan, returning an iterator. If the streaming execution backend is enabled, this will use streaming @@ -499,7 +504,9 @@ def execute_to_iterator( ctx = DatasetContext.get_current() if not ctx.use_streaming_executor: return ( - self.execute(allow_clear_input_blocks, force_read).iter_blocks(), + self.execute( + allow_clear_input_blocks, force_read + ).iter_blocks_with_metadata(), self._snapshot_stats, None, ) diff --git a/python/ray/data/_internal/planner/map_batches.py b/python/ray/data/_internal/planner/map_batches.py index 711f275b599a8..9db2461ff74a7 100644 --- a/python/ray/data/_internal/planner/map_batches.py +++ b/python/ray/data/_internal/planner/map_batches.py @@ -11,7 +11,6 @@ def generate_map_batches_fn( batch_size: Optional[int] = DEFAULT_BATCH_SIZE, batch_format: Optional[str] = "default", - prefetch_batches: int = 0, zero_copy_batch: bool = False, ) -> Callable[[Iterator[Block], TaskContext, BatchUDF], Iterator[Block]]: """Generate function to apply the batch UDF to blocks.""" @@ -93,7 +92,6 @@ def process_next_batch(batch: DataBatch) -> Iterator[Block]: batch_size=batch_size, batch_format=batch_format, ensure_copy=not zero_copy_batch and batch_size is not None, - prefetch_batches=prefetch_batches, ) for batch in formatted_batch_iter: diff --git a/python/ray/data/_internal/planner/plan_udf_map_op.py b/python/ray/data/_internal/planner/plan_udf_map_op.py index 49a47ae76ffe8..561f00025beeb 100644 --- a/python/ray/data/_internal/planner/plan_udf_map_op.py +++ b/python/ray/data/_internal/planner/plan_udf_map_op.py @@ -34,7 +34,6 @@ def _plan_udf_map_op( transform_fn = generate_map_batches_fn( batch_size=op._batch_size, batch_format=op._batch_format, - prefetch_batches=op._prefetch_batches, zero_copy_batch=op._zero_copy_batch, ) elif isinstance(op, MapRows): diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index 4a402e0f213c3..79bedc02ad6e4 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -34,6 +34,9 @@ class Timer: def __init__(self): self._value: float = 0 + self._min: float = float("inf") + self._max: float = 0 + self._total_count: float = 0 @contextmanager def timer(self) -> None: @@ -41,14 +44,28 @@ def timer(self) -> None: try: yield finally: - self._value += time.perf_counter() - time_start + self.add(time.perf_counter() - time_start) def add(self, value: float) -> None: self._value += value + if value < self._min: + self._min = value + if value > self._max: + self._max = value + self._total_count += 1 def get(self) -> float: return self._value + def min(self) -> float: + return self._min + + def max(self) -> float: + return self._max + + def avg(self) -> float: + return self._value / self._total_count if self._total_count else float("inf") + class _DatasetStatsBuilder: """Helper class for building dataset stats. @@ -216,11 +233,14 @@ def __init__( self.needs_stats_actor = needs_stats_actor self.stats_uuid = stats_uuid + self._legacy_iter_batches = False # Iteration stats, filled out if the user iterates over the dataset. self.iter_wait_s: Timer = Timer() self.iter_get_s: Timer = Timer() self.iter_next_batch_s: Timer = Timer() self.iter_format_batch_s: Timer = Timer() + self.iter_collate_batch_s: Timer = Timer() + self.iter_total_blocked_s: Timer = Timer() self.iter_user_s: Timer = Timer() self.iter_total_s: Timer = Timer() self.extra_metrics = {} @@ -285,10 +305,13 @@ def to_summary(self) -> "DatasetStatsSummary": ) iter_stats = IterStatsSummary( + self._legacy_iter_batches, self.iter_wait_s, self.iter_get_s, self.iter_next_batch_s, self.iter_format_batch_s, + self.iter_collate_batch_s, + self.iter_total_blocked_s, self.iter_user_s, self.iter_total_s, self.iter_blocks_local, @@ -619,14 +642,20 @@ def __str__(self) -> str: @dataclass class IterStatsSummary: - # Time spent in `ray.wait()`, in seconds + # Whether the legacy `iter_batches` is being used. + legacy_iter_batches: bool + # Time spent in actor based prefetching, in seconds. wait_time: Timer # Time spent in `ray.get()`, in seconds get_time: Timer - # Time spent in `batcher.next_batch()`, in seconds + # Time spent in batch building, in seconds next_time: Timer # Time spent in `_format_batch_()`, in seconds format_time: Timer + # Time spent in collate fn, in seconds + collate_time: Timer + # Total time user thread is blocked by iter_batches + block_time: Timer # Time spent in user code, in seconds user_time: Timer # Total time taken by Dataset iterator, in seconds @@ -639,6 +668,80 @@ class IterStatsSummary: iter_unknown_location: int def __str__(self) -> str: + if self.legacy_iter_batches: + return self.to_string_legacy() + else: + return self.to_string() + + def to_string(self) -> str: + out = "" + if ( + self.block_time.get() + or self.total_time.get() + or self.get_time.get() + or self.next_time.get() + or self.format_time.get() + or self.collate_time.get() + ): + out += "\nDataset iterator time breakdown:\n" + if self.block_time.get(): + out += "* Total time user code is blocked: {}\n".format( + fmt(self.block_time.get()) + ) + if self.user_time.get(): + out += "* Total time in user code: {}\n".format( + fmt(self.user_time.get()) + ) + if self.total_time.get(): + out += "* Total time overall: {}\n".format(fmt(self.total_time.get())) + out += "* Num blocks local: {}\n".format(self.iter_blocks_local) + out += "* Num blocks remote: {}\n".format(self.iter_blocks_remote) + out += "* Num blocks unknown location: {}\n".format( + self.iter_unknown_location + ) + out += ( + "* Batch iteration time breakdown (summed across prefetch threads):\n" + ) + if self.get_time.get(): + out += " * In ray.get(): {} min, {} max, {} avg, {} total\n".format( + fmt(self.get_time.min()), + fmt(self.get_time.max()), + fmt(self.get_time.avg()), + fmt(self.get_time.get()), + ) + if self.next_time.get(): + batch_creation_str = ( + " * In batch creation: {} min, {} max, " "{} avg, {} total\n" + ) + out += batch_creation_str.format( + fmt(self.next_time.min()), + fmt(self.next_time.max()), + fmt(self.next_time.avg()), + fmt(self.next_time.get()), + ) + if self.format_time.get(): + format_str = ( + " * In batch formatting: {} min, {} max, " "{} avg, {} total\n" + ) + out += format_str.format( + fmt(self.format_time.min()), + fmt(self.format_time.max()), + fmt(self.format_time.avg()), + fmt(self.format_time.get()), + ) + if self.collate_time.get(): + out += " * In collate_fn: {} min, {} max, {} avg, {} total\n".format( + fmt(self.collate_time.min()), + fmt(self.collate_time.max()), + fmt(self.collate_time.avg()), + fmt(self.collate_time.get()), + ) + + return out + + def to_string_legacy(self) -> str: + """Iteration stats summary for legacy `iter_batches`.""" + out = "" if ( self.total_time.get() diff --git a/python/ray/data/context.py b/python/ray/data/context.py index b6ac7b2db261c..0fb87660ad41c 100644 --- a/python/ray/data/context.py +++ b/python/ray/data/context.py @@ -107,6 +107,9 @@ # Set this env var to enable distributed tqdm (experimental). DEFAULT_USE_RAY_TQDM = bool(int(os.environ.get("RAY_TQDM", "1"))) +# Set this to True to use the legacy iter_batches codepath prior to 2.4. +DEFAULT_USE_LEGACY_ITER_BATCHES = False + # Use this to prefix important warning messages for the user. WARN_PREFIX = "⚠️ " @@ -152,6 +155,7 @@ def __init__( optimizer_enabled: bool, execution_options: "ExecutionOptions", use_ray_tqdm: bool, + use_legacy_iter_batches: bool, ): """Private constructor (use get_current() instead).""" self.block_splitting_enabled = block_splitting_enabled @@ -182,6 +186,7 @@ def __init__( # TODO: expose execution options in Dataset public APIs. self.execution_options = execution_options self.use_ray_tqdm = use_ray_tqdm + self.use_legacy_iter_batches = use_legacy_iter_batches @staticmethod def get_current() -> "DatasetContext": @@ -228,6 +233,7 @@ def get_current() -> "DatasetContext": optimizer_enabled=DEFAULT_OPTIMIZER_ENABLED, execution_options=ExecutionOptions(), use_ray_tqdm=DEFAULT_USE_RAY_TQDM, + use_legacy_iter_batches=DEFAULT_USE_LEGACY_ITER_BATCHES, ) return _default_context diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index b7662805350a4..76d50afbe9244 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -51,8 +51,12 @@ from ray.data._internal.planner.write import generate_write_fn from ray.data.dataset_iterator import DatasetIterator from ray.data._internal.block_list import BlockList -from ray.data._internal.dataset_iterator_impl import DatasetIteratorImpl -from ray.data._internal.stream_split_dataset_iterator import StreamSplitDatasetIterator +from ray.data._internal.dataset_iterator.dataset_iterator_impl import ( + DatasetIteratorImpl, +) +from ray.data._internal.dataset_iterator.stream_split_dataset_iterator import ( + StreamSplitDatasetIterator, +) from ray.data._internal.compute import ( ActorPoolStrategy, CallableClass, @@ -380,7 +384,6 @@ def map_batches( batch_size: Optional[Union[int, Literal["default"]]] = "default", compute: Optional[Union[str, ComputeStrategy]] = None, batch_format: Optional[str] = "default", - prefetch_batches: int = 0, zero_copy_batch: bool = False, fn_args: Optional[Iterable[Any]] = None, fn_kwargs: Optional[Dict[str, Any]] = None, @@ -543,14 +546,6 @@ def map_batches( ``Dict[str, numpy.ndarray]`` for tabular datasets, or None to return the underlying block exactly as is with no additional formatting. The default is "default". - prefetch_batches: The number of batches to fetch ahead of the current batch - to process. If set to greater than 0, a separate thread will be used - to fetch the specified amount of formatted batches from blocks. This - improves performance for non-CPU bound UDFs, allowing batch fetching - compute and formatting to be overlapped with the UDF. Defaults to 0 (no - prefetching enabled.) Increasing the number of batches to prefetch can - result in higher throughput, at the expense of requiring more heap - memory to buffer the batches. zero_copy_batch: Whether ``fn`` should be provided zero-copy, read-only batches. If this is ``True`` and no copy is required for the ``batch_format`` conversion, the batch will be a zero-copy, read-only @@ -650,7 +645,6 @@ def map_batches( transform_fn = generate_map_batches_fn( batch_size=batch_size, batch_format=batch_format, - prefetch_batches=prefetch_batches, zero_copy_batch=zero_copy_batch, ) @@ -3007,13 +3001,15 @@ def iter_rows(self, *, prefetch_blocks: int = 0) -> Iterator[Union[T, TableRow]] def iter_batches( self, *, - prefetch_blocks: int = 0, + prefetch_batches: int = 1, batch_size: Optional[int] = 256, batch_format: Optional[str] = "default", drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, _collate_fn: Optional[Callable[[DataBatch], Any]] = None, + # Deprecated. + prefetch_blocks: int = 0, ) -> Iterator[DataBatch]: """Return a local batched iterator over the dataset. @@ -3025,8 +3021,12 @@ def iter_batches( Time complexity: O(1) Args: - prefetch_blocks: The number of blocks to prefetch ahead of the - current block during the scan. + prefetch_batches: The number of batches to fetch ahead of the current batch + to fetch. If set to greater than 0, a separate threadpool will be used + to fetch the objects to the local node, format the batches, and apply + the collate_fn. Defaults to 1. You can revert back to the old + prefetching behavior that uses `prefetch_blocks` by setting + `use_legacy_iter_batches` to True in the DatasetContext. batch_size: The number of rows in each batch, or None to use entire blocks as batches (blocks may contain different number of rows). The final batch may include fewer than ``batch_size`` rows if @@ -3056,6 +3056,7 @@ def iter_batches( ) return self.iterator().iter_batches( + prefetch_batches=prefetch_batches, prefetch_blocks=prefetch_blocks, batch_size=batch_size, batch_format=batch_format, @@ -3069,7 +3070,7 @@ def iter_batches( def iter_torch_batches( self, *, - prefetch_blocks: int = 0, + prefetch_batches: int = 1, batch_size: Optional[int] = 256, dtypes: Optional[Union["torch.dtype", Dict[str, "torch.dtype"]]] = None, device: Optional[str] = None, @@ -3079,6 +3080,8 @@ def iter_torch_batches( drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, + # Deprecated + prefetch_blocks: int = 0, ) -> Iterator["TorchTensorBatchType"]: """Return a local batched iterator of Torch Tensors over the dataset. @@ -3101,8 +3104,12 @@ def iter_torch_batches( Time complexity: O(1) Args: - prefetch_blocks: The number of blocks to prefetch ahead of the - current block during the scan. + prefetch_batches: The number of batches to fetch ahead of the current batch + to fetch. If set to greater than 0, a separate threadpool will be used + to fetch the objects to the local node, format the batches, and apply + the collate_fn. Defaults to 1. You can revert back to the old + prefetching behavior that uses `prefetch_blocks` by setting + `use_legacy_iter_batches` to True in the DatasetContext. batch_size: The number of rows in each batch, or None to use entire blocks as batches (blocks may contain different number of rows). The final batch may include fewer than ``batch_size`` rows if @@ -3133,6 +3140,7 @@ def iter_torch_batches( An iterator over Torch Tensor batches. """ return self.iterator().iter_torch_batches( + prefetch_batches=prefetch_batches, prefetch_blocks=prefetch_blocks, batch_size=batch_size, dtypes=dtypes, @@ -3147,12 +3155,14 @@ def iter_torch_batches( def iter_tf_batches( self, *, - prefetch_blocks: int = 0, + prefetch_batches: int = 1, batch_size: Optional[int] = 256, dtypes: Optional[Union["tf.dtypes.DType", Dict[str, "tf.dtypes.DType"]]] = None, drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, + # Deprecated + prefetch_blocks: int = 0, ) -> Iterator[TensorFlowTensorBatchType]: """Return a local batched iterator of TensorFlow Tensors over the dataset. @@ -3178,8 +3188,12 @@ def iter_tf_batches( Time complexity: O(1) Args: - prefetch_blocks: The number of blocks to prefetch ahead of the - current block during the scan. + prefetch_batches: The number of batches to fetch ahead of the current batch + to fetch. If set to greater than 0, a separate threadpool will be used + to fetch the objects to the local node, format the batches, and apply + the collate_fn. Defaults to 1. You can revert back to the old + prefetching behavior that uses `prefetch_blocks` by setting + `use_legacy_iter_batches` to True in the DatasetContext. batch_size: The number of rows in each batch, or None to use entire blocks as batches (blocks may contain different number of rows). The final batch may include fewer than ``batch_size`` rows if @@ -3201,6 +3215,7 @@ def iter_tf_batches( An iterator over TensorFlow Tensor batches. """ return self.iterator().iter_tf_batches( + prefetch_batches=prefetch_batches, prefetch_blocks=prefetch_blocks, batch_size=batch_size, dtypes=dtypes, @@ -3222,12 +3237,14 @@ def to_torch( Union["torch.dtype", List["torch.dtype"], Dict[str, "torch.dtype"]] ] = None, batch_size: int = 1, - prefetch_blocks: int = 0, + prefetch_batches: int = 1, drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, unsqueeze_label_tensor: bool = True, unsqueeze_feature_tensors: bool = True, + # Deprecated + prefetch_blocks: int = 0, ) -> "torch.utils.data.IterableDataset": """Return a Torch IterableDataset over this dataset. @@ -3288,8 +3305,12 @@ def to_torch( all tensors. If None, then automatically infer the dtype. batch_size: How many samples per batch to yield at a time. Defaults to 1. - prefetch_blocks: The number of blocks to prefetch ahead of - the current block during the scan. + prefetch_batches: The number of batches to fetch ahead of the current batch + to fetch. If set to greater than 0, a separate threadpool will be used + to fetch the objects to the local node, format the batches, and apply + the collate_fn. Defaults to 1. You can revert back to the old + prefetching behavior that uses `prefetch_blocks` by setting + `use_legacy_iter_batches` to True in the DatasetContext. drop_last: Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch @@ -3324,6 +3345,7 @@ def to_torch( feature_column_dtypes=feature_column_dtypes, batch_size=batch_size, prefetch_blocks=prefetch_blocks, + prefetch_batches=prefetch_batches, drop_last=drop_last, local_shuffle_buffer_size=local_shuffle_buffer_size, local_shuffle_seed=local_shuffle_seed, @@ -3337,11 +3359,13 @@ def to_tf( feature_columns: Union[str, List[str]], label_columns: Union[str, List[str]], *, - prefetch_blocks: int = 0, + prefetch_batches: int = 1, batch_size: int = 1, drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, + # Deprecated + prefetch_blocks: int = 0, ) -> "tf.data.Dataset": """Return a TF Dataset over this dataset. @@ -3406,8 +3430,12 @@ def to_tf( label_column: Columns that correspond to model targets. If this is a string, the target data is a tensor. If this is a list, the target data is a ``dict`` that maps column names to their tensor representation. - prefetch_blocks: The number of blocks to prefetch ahead of the - current block during the scan. + prefetch_batches: The number of batches to fetch ahead of the current batch + to fetch. If set to greater than 0, a separate threadpool will be used + to fetch the objects to the local node, format the batches, and apply + the collate_fn. Defaults to 1. You can revert back to the old + prefetching behavior that uses `prefetch_blocks` by setting + `use_legacy_iter_batches` to True in the DatasetContext. batch_size: Record batch size. Defaults to 1. drop_last: Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If @@ -3436,6 +3464,7 @@ def to_tf( return self.iterator().to_tf( feature_columns=feature_columns, label_columns=label_columns, + prefetch_batches=prefetch_batches, prefetch_blocks=prefetch_blocks, drop_last=drop_last, batch_size=batch_size, diff --git a/python/ray/data/dataset_iterator.py b/python/ray/data/dataset_iterator.py index 374b3a89895a7..a79cfa41a9146 100644 --- a/python/ray/data/dataset_iterator.py +++ b/python/ray/data/dataset_iterator.py @@ -1,10 +1,26 @@ import abc import numpy as np -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union, Iterator - -from ray.data.block import BlockAccessor, DataBatch, T +import time +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + List, + Optional, + Tuple, + Union, + Iterator, +) + +from ray.types import ObjectRef +from ray.data.block import BlockAccessor, Block, BlockMetadata, DataBatch, T +from ray.data.context import DatasetContext from ray.data.row import TableRow from ray.util.annotations import PublicAPI +from ray.data._internal.block_batching import batch_block_refs +from ray.data._internal.block_batching.iter_batches import iter_batches +from ray.data._internal.stats import DatasetStats from ray.data._internal.util import _is_tensor_schema if TYPE_CHECKING: @@ -49,16 +65,32 @@ class DatasetIterator(abc.ABC): """ @abc.abstractmethod + def _to_block_iterator( + self, + ) -> Tuple[ + Iterator[Tuple[ObjectRef[Block], BlockMetadata]], Optional[DatasetStats] + ]: + """Returns the iterator to use for `iter_batches`. + + Returns: + A tuple. The first item of the tuple is an iterator over pairs of Block + object references and their corresponding metadata. The second item of the + tuple is a DatasetStats object used for recording stats during iteration. + """ + raise NotImplementedError + def iter_batches( self, *, - prefetch_blocks: int = 0, + prefetch_batches: int = 1, batch_size: int = 256, batch_format: Optional[str] = "default", drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, _collate_fn: Optional[Callable[[DataBatch], Any]] = None, + # Deprecated. + prefetch_blocks: int = 0, ) -> Iterator[DataBatch]: """Return a local batched iterator over the dataset. @@ -72,8 +104,12 @@ def iter_batches( Time complexity: O(1) Args: - prefetch_blocks: The number of blocks to prefetch ahead of the - current block during the scan. + prefetch_batches: The number of batches to fetch ahead of the current batch + to fetch. If set to greater than 0, a separate threadpool will be used + to fetch the objects to the local node, format the batches, and apply + the collate_fn. Defaults to 1. You can revert back to the old + prefetching behavior that uses `prefetch_blocks` by setting + `use_legacy_iter_batches` to True in the DatasetContext. batch_size: The number of rows in each batch, or None to use entire blocks as batches (blocks may contain different number of rows). The final batch may include fewer than ``batch_size`` rows if @@ -97,7 +133,59 @@ def iter_batches( Returns: An iterator over record batches. """ - raise NotImplementedError + + context = DatasetContext.get_current() + if not context.use_streaming_executor: + # Always use legacy iter_batches for bulk executor. + use_legacy = True + else: + use_legacy = context.use_legacy_iter_batches + + if prefetch_blocks > 0 and not use_legacy: + raise DeprecationWarning( + "`prefetch_blocks` arg is deprecated in Ray 2.4. Use " + "the `prefetch_batches` arg instead to specify the amount of " + "prefetching in terms of batches instead of blocks. If you " + "would like to use the legacy `iter_batches` codepath, " + "you can enable it by setting `use_legacy_iter_batches` " + "to True in the DatasetContext." + ) + + time_start = time.perf_counter() + + block_iterator, stats = self._to_block_iterator() + if use_legacy: + # Legacy iter_batches does not use metadata. + def drop_metadata(block_iterator): + for block_ref, metadata in block_iterator: + yield block_ref + + yield from batch_block_refs( + drop_metadata(block_iterator), + stats=stats, + prefetch_blocks=prefetch_blocks, + batch_size=batch_size, + batch_format=batch_format, + drop_last=drop_last, + collate_fn=_collate_fn, + shuffle_buffer_min_size=local_shuffle_buffer_size, + shuffle_seed=local_shuffle_seed, + ) + else: + yield from iter_batches( + block_iterator, + stats=stats, + batch_size=batch_size, + batch_format=batch_format, + drop_last=drop_last, + collate_fn=_collate_fn, + shuffle_buffer_min_size=local_shuffle_buffer_size, + shuffle_seed=local_shuffle_seed, + prefetch_batches=prefetch_batches, + ) + + if stats: + stats.iter_total_s.add(time.perf_counter() - time_start) def iter_rows(self, *, prefetch_blocks: int = 0) -> Iterator[Union[T, TableRow]]: """Return a local row iterator over the dataset. @@ -121,11 +209,16 @@ def iter_rows(self, *, prefetch_blocks: int = 0) -> Iterator[Union[T, TableRow]] Returns: An iterator over rows of the dataset. """ - for batch in self.iter_batches( - batch_size=None, - prefetch_blocks=prefetch_blocks, - batch_format=None, - ): + iter_batch_args = {"batch_size": None, "batch_format": None} + + context = DatasetContext.get_current() + if context.use_legacy_iter_batches: + iter_batch_args["prefetch_blocks"] = prefetch_blocks + else: + # Since batch_size is None, 1 block is exactly 1 batch. + iter_batch_args["prefetch_batches"] = prefetch_blocks + + for batch in self.iter_batches(**iter_batch_args): batch = BlockAccessor.for_block(BlockAccessor.batch_to_block(batch)) for row in batch.iter_rows(): yield row @@ -143,7 +236,7 @@ def schema(self) -> Union[type, "pyarrow.lib.Schema"]: def iter_torch_batches( self, *, - prefetch_blocks: int = 0, + prefetch_batches: int = 1, batch_size: Optional[int] = 256, dtypes: Optional[Union["torch.dtype", Dict[str, "torch.dtype"]]] = None, device: Optional[str] = None, @@ -153,6 +246,8 @@ def iter_torch_batches( drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, + # Deprecated. + prefetch_blocks: int = 0, ) -> Iterator["TorchTensorBatchType"]: """Return a local batched iterator of Torch Tensors over the dataset. @@ -171,8 +266,12 @@ def iter_torch_batches( Time complexity: O(1) Args: - prefetch_blocks: The number of blocks to prefetch ahead of the - current block during the scan. + prefetch_batches: The number of batches to fetch ahead of the current batch + to fetch. If set to greater than 0, a separate threadpool will be used + to fetch the objects to the local node, format the batches, and apply + the collate_fn. Defaults to 1. You can revert back to the old + prefetching behavior that uses `prefetch_blocks` by setting + `use_legacy_iter_batches` to True in the DatasetContext. batch_size: The number of rows in each batch, or None to use entire blocks as batches (blocks may contain different number of rows). The final batch may include fewer than ``batch_size`` rows if @@ -229,6 +328,7 @@ def collate_fn(batch: Union[np.ndarray, Dict[str, np.ndarray]]): ) yield from self.iter_batches( + prefetch_batches=prefetch_batches, prefetch_blocks=prefetch_blocks, batch_size=batch_size, batch_format="numpy", @@ -241,12 +341,14 @@ def collate_fn(batch: Union[np.ndarray, Dict[str, np.ndarray]]): def iter_tf_batches( self, *, - prefetch_blocks: int = 0, + prefetch_batches: int = 1, batch_size: Optional[int] = 256, dtypes: Optional[Union["tf.dtypes.DType", Dict[str, "tf.dtypes.DType"]]] = None, drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, + # Deprecated. + prefetch_blocks: int = 0, ) -> Iterator["TensorFlowTensorBatchType"]: """Return a local batched iterator of TensorFlow Tensors over the dataset. @@ -272,8 +374,12 @@ def iter_tf_batches( Time complexity: O(1) Args: - prefetch_blocks: The number of blocks to prefetch ahead of the - current block during the scan. + prefetch_batches: The number of batches to fetch ahead of the current batch + to fetch. If set to greater than 0, a separate threadpool will be used + to fetch the objects to the local node, format the batches, and apply + the collate_fn. Defaults to 1. You can revert back to the old + prefetching behavior that uses `prefetch_blocks` by setting + `use_legacy_iter_batches` to True in the DatasetContext. batch_size: The number of rows in each batch, or None to use entire blocks as batches (blocks may contain different number of rows). The final batch may include fewer than ``batch_size`` rows if @@ -299,6 +405,7 @@ def iter_tf_batches( ) for batch in self.iter_batches( + prefetch_batches=prefetch_batches, prefetch_blocks=prefetch_blocks, batch_size=batch_size, batch_format="numpy", @@ -320,12 +427,14 @@ def to_torch( Union["torch.dtype", List["torch.dtype"], Dict[str, "torch.dtype"]] ] = None, batch_size: int = 1, - prefetch_blocks: int = 0, + prefetch_batches: int = 1, drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, unsqueeze_label_tensor: bool = True, unsqueeze_feature_tensors: bool = True, + # Deprecated. + prefetch_blocks: int = 0, ) -> "torch.utils.data.IterableDataset": """Return a Torch IterableDataset over this dataset. @@ -386,8 +495,12 @@ def to_torch( all tensors. If None, then automatically infer the dtype. batch_size: How many samples per batch to yield at a time. Defaults to 1. - prefetch_blocks: The number of blocks to prefetch ahead of - the current block during the scan. + prefetch_batches: The number of batches to fetch ahead of the current batch + to fetch. If set to greater than 0, a separate threadpool will be used + to fetch the objects to the local node, format the batches, and apply + the collate_fn. Defaults to 1. You can revert back to the old + prefetching behavior that uses `prefetch_blocks` by setting + `use_legacy_iter_batches` to True in the DatasetContext. drop_last: Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch @@ -458,6 +571,7 @@ def make_generator(): batch_size=batch_size, batch_format="pandas", prefetch_blocks=prefetch_blocks, + prefetch_batches=prefetch_batches, drop_last=drop_last, local_shuffle_buffer_size=local_shuffle_buffer_size, local_shuffle_seed=local_shuffle_seed, @@ -502,11 +616,13 @@ def to_tf( feature_columns: Union[str, List[str]], label_columns: Union[str, List[str]], *, - prefetch_blocks: int = 0, + prefetch_batches: int = 1, batch_size: int = 1, drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, + # Deprecated. + prefetch_blocks: int = 0, ) -> "tf.data.Dataset": """Return a TF Dataset over this dataset. @@ -573,8 +689,12 @@ def to_tf( label_column: Columns that correspond to model targets. If this is a string, the target data is a tensor. If this is a list, the target data is a ``dict`` that maps column names to their tensor representation. - prefetch_blocks: The number of blocks to prefetch ahead of the - current block during the scan. + prefetch_batches: The number of batches to fetch ahead of the current batch + to fetch. If set to greater than 0, a separate threadpool will be used + to fetch the objects to the local node, format the batches, and apply + the collate_fn. Defaults to 1. You can revert back to the old + prefetching behavior that uses `prefetch_blocks` by setting + `use_legacy_iter_batches` to True in the DatasetContext. batch_size: Record batch size. Defaults to 1. drop_last: Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If @@ -655,6 +775,7 @@ def convert_batch_to_tensors( def generator(): for batch in self.iter_batches( + prefetch_batches=prefetch_batches, prefetch_blocks=prefetch_blocks, batch_size=batch_size, drop_last=drop_last, diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index dea973a43eac3..c02062b3d2c9f 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -28,7 +28,9 @@ PipelineExecutor, PipelineSplitExecutorCoordinator, ) -from ray.data._internal.pipelined_dataset_iterator import PipelinedDatasetIterator +from ray.data._internal.dataset_iterator.pipelined_dataset_iterator import ( + PipelinedDatasetIterator, +) from ray.data._internal.plan import ExecutionPlan from ray.data._internal.stats import DatasetPipelineStats, DatasetStats from ray.data.block import BatchUDF, Block, DataBatch, KeyFn, RowUDF @@ -793,7 +795,6 @@ def map_batches( batch_size: Optional[Union[int, Literal["default"]]] = "default", compute: Optional[Union[str, ComputeStrategy]] = None, batch_format: Optional[str] = "default", - prefetch_batches: int = 0, fn_args: Optional[Iterable[Any]] = None, fn_kwargs: Optional[Dict[str, Any]] = None, fn_constructor_args: Optional[Iterable[Any]] = None, @@ -808,7 +809,6 @@ def map_batches( batch_size=batch_size, compute=compute, batch_format=batch_format, - prefetch_batches=prefetch_batches, fn_args=fn_args, fn_kwargs=fn_kwargs, fn_constructor_args=fn_constructor_args, diff --git a/python/ray/data/tests/block_batching/test_block_batching.py b/python/ray/data/tests/block_batching/test_block_batching.py index 7351ac165af2a..67b44d8b95424 100644 --- a/python/ray/data/tests/block_batching/test_block_batching.py +++ b/python/ray/data/tests/block_batching/test_block_batching.py @@ -1,5 +1,4 @@ import pytest -import time from typing import List from unittest import mock @@ -74,46 +73,6 @@ def prefetch_blocks(self, blocks: List[Block]): assert all(len(window) == num_blocks_to_prefetch for window in windows) -# Test for 3 cases -# 1. Batch size is less than block size -# 2. Batch size is more than block size -# 3. Block size is not divisble by batch size -@pytest.mark.parametrize("batch_size", [4, 10, 7]) -def test_async_batch_fetching(batch_size): - blocks = block_generator(num_blocks=5, num_rows=8) - - def sleep_batch_format(batch_iter, *args, **kwargs): - for batch in batch_iter: - time.sleep(2) - yield batch - - with mock.patch( - "ray.data._internal.block_batching.util.format_batches", - sleep_batch_format, - ): - batch_iter = batch_blocks( - batch_size=batch_size, blocks=blocks, prefetch_batches=1 - ) - outputs = [] - start_time = time.time() - for batch in batch_iter: - time.sleep(3) - outputs.append(batch) - end_time = time.time() - - total_time = end_time - start_time - # Total time should be based on number of times the udf is called - # (which is equal to len(outputs)). - # The 2 seconds sleep in sleep_batch_format is overlapped, so does not count - # towards total time. - assert total_time < len(outputs) * 3 + 3 - - # There should be no dropped rows. - assert sum(len(output_batch) for output_batch in outputs) == 40, sum( - len(output_batch) for output_batch in outputs - ) # 5 blocks with 8 rows each. - - if __name__ == "__main__": import sys diff --git a/python/ray/data/tests/block_batching/test_iter_batches.py b/python/ray/data/tests/block_batching/test_iter_batches.py index f27be2db7aa99..c43463ea5ccc6 100644 --- a/python/ray/data/tests/block_batching/test_iter_batches.py +++ b/python/ray/data/tests/block_batching/test_iter_batches.py @@ -1,14 +1,19 @@ +import itertools import pytest +import time from typing import Iterator, List, Tuple +import pandas as pd import pyarrow as pa +import ray from ray.data.block import Block, BlockMetadata from ray.data._internal.block_batching.interfaces import ( Batch, BlockPrefetcher, ) from ray.data._internal.block_batching.iter_batches import ( + iter_batches, prefetch_batches_locally, restore_original_order, ) @@ -94,6 +99,87 @@ def test_restore_from_original_order(): assert idx == [0, 1, 2, 3] +# Test for 3 cases +# 1. Batch size is less than block size +# 2. Batch size is more than block size +# 3. Block size is not divisble by batch size +@pytest.mark.parametrize("batch_size", [1, 4, 3]) +@pytest.mark.parametrize("drop_last", [True, False]) +@pytest.mark.parametrize("prefetch_batches", [0, 1]) +def test_iter_batches_e2e( + ray_start_regular_shared, batch_size, drop_last, prefetch_batches +): + def collate_fn(batch: pd.DataFrame): + return batch + 1 + + block_refs_iter = itertools.starmap( + lambda block, metadata: (ray.put(block), metadata), + block_generator(num_blocks=4, num_rows=2), + ) + + output_batches = iter_batches( + block_refs_iter, + batch_size=batch_size, + prefetch_batches=prefetch_batches, + batch_format="pandas", + collate_fn=collate_fn, + drop_last=drop_last, + ) + + output_batches = list(output_batches) + + assert len(output_batches) > 0 + for df in output_batches: + # Check batch formatting. + assert isinstance(df, pd.DataFrame) + # Check batch size. + if batch_size == 3 and not drop_last: + assert len(df) in {2, 3} + else: + assert len(df) == batch_size + + concat_df = pd.concat(output_batches) + # Test that collate_fn is applied. + assert concat_df["foo"].iloc[0] == 1 + # Make sure order is preserved. + for i in range(len(concat_df) - 1): + assert concat_df["foo"].iloc[i + 1] >= concat_df["foo"].iloc[i] + + +def test_iter_batches_e2e_async(ray_start_regular_shared): + """We add time.sleep in 3 places: + 1. In the base generator to simulate streaming executor blocking on next results. + 2. In the collate_fn to simulate expensive slicing/formatting/collation + 3. In the user thread to simulate training. + """ + + def collate_fn(batch): + time.sleep(2) + return batch + + block_refs_iter = itertools.starmap( + lambda block, metadata: (ray.put(block), metadata), + block_generator(num_blocks=20, num_rows=2), + ) + start_time = time.time() + output_batches = iter_batches( + block_refs_iter, batch_size=None, collate_fn=collate_fn, prefetch_batches=4 + ) + batches = [] + for batch in output_batches: + time.sleep(1.5) + batches.append(batch) + end_time = time.time() + + # 20 batches, 1.5 second sleep. Should be less than 45 seconds, even with some + # overhead. + # If there was no overlap, then we would expect this to take at least 20*2.5 = 50 + assert end_time - start_time < 45, end_time - start_time + + assert len(batches) == 20 + assert all(len(batch) == 2 for batch in batches) + + if __name__ == "__main__": import sys diff --git a/python/ray/data/tests/block_batching/test_util.py b/python/ray/data/tests/block_batching/test_util.py index 66c1fb85108aa..86a4a12f2f074 100644 --- a/python/ray/data/tests/block_batching/test_util.py +++ b/python/ray/data/tests/block_batching/test_util.py @@ -7,6 +7,7 @@ import ray from ray.data._internal.block_batching.util import ( + _calculate_ref_hits, make_async_gen, blocks_to_batches, format_batches, @@ -172,6 +173,14 @@ def sleep_udf(item): assert end_time - start_time < 9.5 +def test_calculate_ref_hits(ray_start_regular_shared): + refs = [ray.put(0), ray.put(1)] + hits, misses, unknowns = _calculate_ref_hits(refs) + assert hits == 2 + assert misses == 0 + assert unknowns == 0 + + if __name__ == "__main__": import sys diff --git a/python/ray/data/tests/test_bulk_executor.py b/python/ray/data/tests/test_bulk_executor.py index a5d84a0b3510a..c1135f82a07ce 100644 --- a/python/ray/data/tests/test_bulk_executor.py +++ b/python/ray/data/tests/test_bulk_executor.py @@ -98,6 +98,7 @@ def test_basic_stats(ray_start_10_cpus_shared): # TODO(ekl) remove this test once we have the new backend on by default. def test_e2e_bulk_sanity(ray_start_10_cpus_shared): DatasetContext.get_current().new_execution_backend = True + DatasetContext.get_current().use_streaming_executor = False result = ray.data.range(5).map(lambda x: x + 1) assert result.take_all() == [1, 2, 3, 4, 5], result diff --git a/python/ray/data/tests/test_dataset_consumption.py b/python/ray/data/tests/test_dataset_consumption.py index f4882dc4e743b..cfd216e76304d 100644 --- a/python/ray/data/tests/test_dataset_consumption.py +++ b/python/ray/data/tests/test_dataset_consumption.py @@ -609,7 +609,7 @@ def test_iter_batches_basic(ray_start_regular_shared): # Prefetch. batches = list( - ds.iter_batches(prefetch_blocks=1, batch_size=None, batch_format="pandas") + ds.iter_batches(prefetch_batches=1, batch_size=None, batch_format="pandas") ) assert len(batches) == len(dfs) for batch, df in zip(batches, dfs): @@ -618,7 +618,9 @@ def test_iter_batches_basic(ray_start_regular_shared): batch_size = 2 batches = list( - ds.iter_batches(prefetch_blocks=2, batch_size=batch_size, batch_format="pandas") + ds.iter_batches( + prefetch_batches=2, batch_size=batch_size, batch_format="pandas" + ) ) assert all(len(batch) == batch_size for batch in batches) assert len(batches) == math.ceil( @@ -631,7 +633,7 @@ def test_iter_batches_basic(ray_start_regular_shared): # Prefetch more than number of blocks. batches = list( ds.iter_batches( - prefetch_blocks=len(dfs), batch_size=None, batch_format="pandas" + prefetch_batches=len(dfs), batch_size=None, batch_format="pandas" ) ) assert len(batches) == len(dfs) @@ -645,7 +647,7 @@ def test_iter_batches_basic(ray_start_regular_shared): try: context.actor_prefetcher_enabled = False batches = list( - ds.iter_batches(prefetch_blocks=1, batch_size=None, batch_format="pandas") + ds.iter_batches(prefetch_batches=1, batch_size=None, batch_format="pandas") ) assert len(batches) == len(dfs) for batch, df in zip(batches, dfs): diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 0c4dc3c843679..a763741b79d9e 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -914,15 +914,16 @@ def test_streaming_stats_full(ray_start_regular_shared, restore_dataset_context) {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, 'obj_store_mem_peak': N} Dataset iterator time breakdown: -* In ray.wait(): T -* In ray.get(): T +* Total time user code is blocked: T +* Total time in user code: T +* Total time overall: T * Num blocks local: Z * Num blocks remote: Z * Num blocks unknown location: N -* In next_batch(): T -* In format_batch(): T -* In user code: T -* Total time: T +* Batch iteration time breakdown (summed across prefetch threads): + * In ray.get(): T min, T max, T avg, T total + * In batch creation: T min, T max, T avg, T total + * In batch formatting: T min, T max, T avg, T total """ ) diff --git a/python/ray/train/batch_predictor.py b/python/ray/train/batch_predictor.py index 82345a70f395d..392be5501c34a 100644 --- a/python/ray/train/batch_predictor.py +++ b/python/ray/train/batch_predictor.py @@ -354,7 +354,6 @@ def __call__(self, input_batch: DataBatchType) -> DataBatchType: if override_prep is not None else predict_stage_batch_format, batch_size=batch_size, - prefetch_batches=int(num_gpus_per_worker > 0), fn_constructor_kwargs={"override_prep": override_prep}, **ray_remote_args, ) diff --git a/release/air_tests/air_benchmarks/workloads/data_benchmark.py b/release/air_tests/air_benchmarks/workloads/data_benchmark.py index 837704ac80906..dc34435cc6487 100644 --- a/release/air_tests/air_benchmarks/workloads/data_benchmark.py +++ b/release/air_tests/air_benchmarks/workloads/data_benchmark.py @@ -34,7 +34,7 @@ def run_ingest_bulk(dataset, num_workers, num_cpus_per_worker): datasets={"train": dataset}, preprocessor=dummy_prep, num_epochs=1, - prefetch_blocks=1, + prefetch_batches=1, dataset_config={"train": DatasetConfig(split=True)}, ) trainer.fit() diff --git a/release/nightly_tests/dataset/data_ingest_benchmark.py b/release/nightly_tests/dataset/data_ingest_benchmark.py index ec598e04a4aed..f36870638df9f 100644 --- a/release/nightly_tests/dataset/data_ingest_benchmark.py +++ b/release/nightly_tests/dataset/data_ingest_benchmark.py @@ -27,7 +27,7 @@ def get_location(self): def DoConsume(split, rank): - prefetch_blocks = 1 + prefetch_batches = 1 batch_size = 4096 num_epochs = 1 @@ -51,7 +51,7 @@ def generate_epochs(data, epochs: int): epochs_read += 1 batch_start = time.perf_counter() for batch in epoch_data.iter_batches( - prefetch_blocks=prefetch_blocks, batch_size=batch_size + prefetch_batches=prefetch_batches, batch_size=batch_size ): batch_delay = time.perf_counter() - batch_start batch_delays.append(batch_delay) diff --git a/release/nightly_tests/dataset/iter_tensor_batches_benchmark.py b/release/nightly_tests/dataset/iter_tensor_batches_benchmark.py index f3e3e1320c5bd..667ce3ba25bb3 100644 --- a/release/nightly_tests/dataset/iter_tensor_batches_benchmark.py +++ b/release/nightly_tests/dataset/iter_tensor_batches_benchmark.py @@ -1,3 +1,4 @@ +import argparse from typing import Optional, Union, List import ray @@ -55,9 +56,9 @@ def to_tf( return ds -def run_iter_tensor_batches_benchmark(benchmark: Benchmark): +def run_iter_tensor_batches_benchmark(benchmark: Benchmark, data_size_gb: int): ds = ray.data.read_images( - "s3://anonymous@air-example-data-2/1G-image-data-synthetic-raw" + f"s3://anonymous@air-example-data-2/{data_size_gb}G-image-data-synthetic-raw" ).cache() # Repartition both to align the block sizes so we can zip them. @@ -102,6 +103,18 @@ def run_iter_tensor_batches_benchmark(benchmark: Benchmark): batch_size=batch_size, ) + prefetch_batches = [1, 10] + # Test with varying prefetching for iter_torch_batches() + for prefetch_batch in prefetch_batches: + test_name = f"iter-torch-batches-prefetch-{32}-{prefetch_batches}" + benchmark.run( + test_name, + iter_torch_batches, + ds=ds, + batch_size=32, + prefetch_batches=prefetch_batch, + ) + # Test with varying batch sizes and shuffle for iter_torch_batches() and to_tf(). for batch_size in batch_sizes: for shuffle_buffer_size in [batch_size, 2 * batch_size]: @@ -128,8 +141,20 @@ def run_iter_tensor_batches_benchmark(benchmark: Benchmark): if __name__ == "__main__": ray.init() + parser = argparse.ArgumentParser( + description="Helper script to upload files to S3 bucket" + ) + parser.add_argument( + "--data-size-gb", + choices=[1, 10], + type=int, + help="The data size to use for the dataset.", + ) + + args = parser.parse_args() + benchmark = Benchmark("iter-tensor-batches") - run_iter_tensor_batches_benchmark(benchmark) + run_iter_tensor_batches_benchmark(benchmark, args.data_size_gb) benchmark.write_result() diff --git a/release/nightly_tests/dataset/multi_node_benchmark_compute.yaml b/release/nightly_tests/dataset/multi_node_benchmark_compute.yaml new file mode 100644 index 0000000000000..9634146e9a00d --- /dev/null +++ b/release/nightly_tests/dataset/multi_node_benchmark_compute.yaml @@ -0,0 +1,15 @@ +cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} +region: us-west-2 + +max_workers: 0 + +head_node_type: + name: head_node + instance_type: m5.4xlarge + +worker_node_types: + - name: worker_node + instance_type: m5.4xlarge + max_workers: 3 + min_workers: 3 + use_spot: false diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 754b26318761c..42fb33b5d3f29 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -3973,6 +3973,22 @@ timeout: 2400 script: python iter_tensor_batches_benchmark.py +- name: iter_tensor_batches_benchmark_multi_node + group: data-tests + working_dir: nightly_tests/dataset + + frequency: nightly + team: data + cluster: + cluster_env: app_config.yaml + cluster_compute: multi_node_benchmark_compute.yaml + + run: + # Expect the benchmark to finish around 30 minutes. + timeout: 2400 + script: python iter_tensor_batches_benchmark.py --data-size-gb=10 + + - name: iter_batches_benchmark_single_node group: data-tests