diff --git a/doc/source/data/getting-started.rst b/doc/source/data/getting-started.rst index 6136564e3cb3..f33b089c341b 100644 --- a/doc/source/data/getting-started.rst +++ b/doc/source/data/getting-started.rst @@ -66,7 +66,7 @@ transform datasets. Ray executes transformations in parallel for performance. .. testoutput:: MaterializedDataset( - num_blocks=1, + num_blocks=..., num_rows=150, schema={ sepal length (cm): double, @@ -147,7 +147,7 @@ or remote filesystems. import os - transformed_ds.write_parquet("/tmp/iris") + transformed_ds.repartition(1).write_parquet("/tmp/iris") print(os.listdir("/tmp/iris")) diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index e430842e598f..cd2db277cc03 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -30,7 +30,7 @@ blocks need to be materialized in the cluster memory at once. Reading Data ============ -Dataset uses Ray tasks to read data from remote storage in parallel. Each read task reads one or more files and produces an output block: +Dataset uses Ray tasks to read data from remote storage in parallel. Each read task reads one or more files and produces one or more output blocks: .. image:: images/dataset-read.svg :align: center @@ -38,7 +38,7 @@ Dataset uses Ray tasks to read data from remote storage in parallel. Each read t .. https://docs.google.com/drawings/d/15B4TB8b5xN15Q9S8-s0MjW6iIvo_PrH7JtV1fL123pU/edit -You can manually specify the number of read tasks, but the final parallelism is always capped by the number of files in the underlying dataset. +You can increase or decrease the number of output blocks by changing the ``parallelism`` parameter. For an in-depth guide on creating datasets, read :ref:`Loading Data `. diff --git a/doc/source/data/performance-tips.rst b/doc/source/data/performance-tips.rst index 21c025108a95..d23eb3cfa334 100644 --- a/doc/source/data/performance-tips.rst +++ b/doc/source/data/performance-tips.rst @@ -135,7 +135,8 @@ By default, Ray Data automatically selects the read ``parallelism`` according to 4. The parallelism is truncated to ``min(num_files, parallelism)``. Occasionally, it is advantageous to manually tune the parallelism to optimize the application. This can be done when loading data via the ``parallelism`` parameter. -For example, use ``ray.data.read_parquet(path, parallelism=1000)`` to force up to 1000 read tasks to be created. +For example, use ``ray.data.read_parquet(path, parallelism=1000)`` to force up to 1000 read tasks to be created. Note that read tasks can produce multiple output +blocks per file in order to satisfy the requested parallelism. Tuning Read Resources ~~~~~~~~~~~~~~~~~~~~~ diff --git a/doc/source/data/transforming-data.rst b/doc/source/data/transforming-data.rst index 5ef6205d3b6f..ddae72bbdc32 100644 --- a/doc/source/data/transforming-data.rst +++ b/doc/source/data/transforming-data.rst @@ -335,9 +335,9 @@ Repartitioning data A :class:`~ray.data.dataset.Dataset` operates on a sequence of distributed data :term:`blocks `. If you want to achieve more fine-grained parallelization, -increase the number of blocks. +increase the number of blocks by setting a higher ``parallelism`` at read time. -To change the number of blocks, call +To change the number of blocks for an existing Dataset, call :meth:`Dataset.repartition() `. .. testcode:: diff --git a/python/ray/air/tests/test_legacy_dataset_config.py b/python/ray/air/tests/test_legacy_dataset_config.py index 9d0af345592b..23344f71b0a6 100644 --- a/python/ray/air/tests/test_legacy_dataset_config.py +++ b/python/ray/air/tests/test_legacy_dataset_config.py @@ -358,7 +358,7 @@ def checker(shard, results): assert len(results[0]) == 5, results assert results[0] != results[1], results stats = shard.stats() - assert "RandomizeBlockOrder: 5/5 blocks executed in" in stats, stats + assert "RandomizeBlockOrder:" in stats, stats ds = ray.data.range(5) test = TestStream( @@ -385,7 +385,7 @@ def checker(shard, results): # we eliminate the ordering in comparison. assert set(results[0]) == set(results[1]), results stats = shard.stats() - assert "RandomizeBlockOrder: 5/5 blocks executed" in stats, stats + assert "RandomizeBlockOrder:" in stats, stats ds = ray.data.range(5) test = TestBatch( @@ -400,7 +400,7 @@ def checker(shard, results): assert len(results[0]) == 5, results assert results[0] != results[1], results stats = shard.stats() - assert "RandomizeBlockOrder: 5/5 blocks executed in" in stats, stats + assert "RandomizeBlockOrder:" in stats, stats ds = ray.data.range(5) test = TestStream( diff --git a/python/ray/data/BUILD b/python/ray/data/BUILD index 90b4b3caf24e..e98954fa4d20 100644 --- a/python/ray/data/BUILD +++ b/python/ray/data/BUILD @@ -289,6 +289,14 @@ py_test( deps = ["//:ray_lib", ":conftest"], ) +py_test( + name = "test_splitblocks", + size = "medium", + srcs = ["tests/test_splitblocks.py"], + tags = ["team:data", "exclusive"], + deps = ["//:ray_lib", ":conftest"], +) + py_test( name = "test_execution_optimizer", size = "medium", diff --git a/python/ray/data/_internal/block_list.py b/python/ray/data/_internal/block_list.py index f90898e38526..4db400605281 100644 --- a/python/ray/data/_internal/block_list.py +++ b/python/ray/data/_internal/block_list.py @@ -32,6 +32,9 @@ def __init__( # Whether the block list is owned by consuming APIs, and if so it can be # eagerly deleted after read by the consumer. self._owned_by_consumer = owned_by_consumer + # This field can be set to indicate the number of estimated output blocks, + # since each read task may produce multiple output blocks after splitting. + self._estimated_num_blocks = None def __repr__(self): return f"BlockList(owned_by_consumer={self._owned_by_consumer})" @@ -217,6 +220,10 @@ def initial_num_blocks(self) -> int: """Returns the number of blocks of this BlockList.""" return self._num_blocks + def estimated_num_blocks(self) -> int: + """Estimate of `executed_num_blocks()`, without triggering actual execution.""" + return self._estimated_num_blocks or self._num_blocks + def executed_num_blocks(self) -> int: """Returns the number of output blocks after execution. diff --git a/python/ray/data/_internal/execution/operators/input_data_buffer.py b/python/ray/data/_internal/execution/operators/input_data_buffer.py index 41b542d2d614..0c1560b8c124 100644 --- a/python/ray/data/_internal/execution/operators/input_data_buffer.py +++ b/python/ray/data/_internal/execution/operators/input_data_buffer.py @@ -19,12 +19,15 @@ def __init__( self, input_data: Optional[List[RefBundle]] = None, input_data_factory: Callable[[], List[RefBundle]] = None, + num_output_blocks: Optional[int] = None, ): """Create an InputDataBuffer. Args: input_data: The list of bundles to output from this operator. input_data_factory: The factory to get input data, if input_data is None. + num_output_blocks: The number of output blocks. If not specified, progress + bars total will be set based on num output bundles instead. """ if input_data is not None: assert input_data_factory is None @@ -37,6 +40,7 @@ def __init__( assert input_data_factory is not None self._input_data_factory = input_data_factory self._is_input_initialized = False + self._num_output_blocks = num_output_blocks super().__init__("Input", []) def start(self, options: ExecutionOptions) -> None: @@ -53,7 +57,7 @@ def get_next(self) -> RefBundle: return self._input_data.pop(0) def num_outputs_total(self) -> Optional[int]: - return self._num_outputs + return self._num_output_blocks or self._num_output_bundles def get_stats(self) -> StatsDict: return {} @@ -64,7 +68,7 @@ def add_input(self, refs, input_index) -> None: def _initialize_metadata(self): assert self._input_data is not None and self._is_input_initialized - self._num_outputs = len(self._input_data) + self._num_output_bundles = len(self._input_data) block_metadata = [] for bundle in self._input_data: block_metadata.extend([m for (_, m) in bundle.blocks]) diff --git a/python/ray/data/_internal/logical/operators/read_operator.py b/python/ray/data/_internal/logical/operators/read_operator.py index 2755c8b446aa..a1a94b4e7b04 100644 --- a/python/ray/data/_internal/logical/operators/read_operator.py +++ b/python/ray/data/_internal/logical/operators/read_operator.py @@ -11,8 +11,23 @@ def __init__( self, datasource: Datasource, read_tasks: List[ReadTask], + estimated_num_blocks: int, ray_remote_args: Optional[Dict[str, Any]] = None, ): - super().__init__(f"Read{datasource.get_name()}", None, ray_remote_args) + if len(read_tasks) == estimated_num_blocks: + suffix = "" + else: + suffix = f"->SplitBlocks({int(estimated_num_blocks / len(read_tasks))})" + super().__init__(f"Read{datasource.get_name()}{suffix}", None, ray_remote_args) self._datasource = datasource + self._estimated_num_blocks = estimated_num_blocks self._read_tasks = read_tasks + + def fusable(self) -> bool: + """Whether this should be fused with downstream operators. + + When we are outputting multiple blocks per read task, we should disable fusion, + as fusion would prevent the blocks from being dispatched to multiple processes + for parallel processing in downstream operators. + """ + return self._estimated_num_blocks == len(self._read_tasks) diff --git a/python/ray/data/_internal/logical/rules/operator_fusion.py b/python/ray/data/_internal/logical/rules/operator_fusion.py index 5ac00a7cdb44..613eb0b507d2 100644 --- a/python/ray/data/_internal/logical/rules/operator_fusion.py +++ b/python/ray/data/_internal/logical/rules/operator_fusion.py @@ -24,6 +24,7 @@ Repartition, ) from ray.data._internal.logical.operators.map_operator import AbstractUDFMap +from ray.data._internal.logical.operators.read_operator import Read from ray.data._internal.stats import StatsDict from ray.data.block import Block @@ -132,6 +133,9 @@ def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool: down_logical_op = self._op_map[down_op] up_logical_op = self._op_map[up_op] + if isinstance(up_logical_op, Read) and not up_logical_op.fusable(): + return False + # If the downstream operator takes no input, it cannot be fused with # the upstream operator. if not down_logical_op._input_dependencies: diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 669729dcb6e5..e61f356afd4a 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -221,7 +221,7 @@ def get_plan_as_string(self, classname: str) -> str: if dataset_blocks is None: num_blocks = "?" else: - num_blocks = dataset_blocks.initial_num_blocks() + num_blocks = dataset_blocks.estimated_num_blocks() dataset_str = "{}(num_blocks={}, num_rows={}, schema={})".format( classname, num_blocks, count, schema_str ) diff --git a/python/ray/data/_internal/planner/plan_read_op.py b/python/ray/data/_internal/planner/plan_read_op.py index fb3695f78237..086d69eeb014 100644 --- a/python/ray/data/_internal/planner/plan_read_op.py +++ b/python/ray/data/_internal/planner/plan_read_op.py @@ -58,7 +58,9 @@ def get_input_data() -> List[RefBundle]: for read_task in read_tasks ] - inputs = InputDataBuffer(input_data_factory=get_input_data) + inputs = InputDataBuffer( + input_data_factory=get_input_data, num_output_blocks=op._estimated_num_blocks + ) def do_read(blocks: Iterator[ReadTask], _: TaskContext) -> Iterator[Block]: for read_task in blocks: diff --git a/python/ray/data/_internal/util.py b/python/ray/data/_internal/util.py index 10139dc54414..cfe9c16c8d55 100644 --- a/python/ray/data/_internal/util.py +++ b/python/ray/data/_internal/util.py @@ -90,7 +90,7 @@ def _autodetect_parallelism( ctx: DataContext, reader: Optional["Reader"] = None, avail_cpus: Optional[int] = None, -) -> (int, int): +) -> (int, int, Optional[int]): """Returns parallelism to use and the min safe parallelism to avoid OOMs. This detects parallelism using the following heuristics, applied in order: @@ -111,8 +111,9 @@ def _autodetect_parallelism( avail_cpus: Override avail cpus detection (for testing only). Returns: - Tuple of detected parallelism (only if -1 was specified), and the min safe - parallelism (which can be used to generate warnings about large blocks). + Tuple of detected parallelism (only if -1 was specified), the min safe + parallelism (which can be used to generate warnings about large blocks), + and the estimated inmemory size of the dataset. """ min_safe_parallelism = 1 max_reasonable_parallelism = sys.maxsize @@ -140,7 +141,7 @@ def _autodetect_parallelism( f"estimated_available_cpus={avail_cpus} and " f"estimated_data_size={mem_size}." ) - return parallelism, min_safe_parallelism + return parallelism, min_safe_parallelism, mem_size def _estimate_avail_cpus(cur_pg: Optional["PlacementGroup"]) -> int: diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 091d247825b4..8d88bdcfc473 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -480,7 +480,7 @@ def map_batches( >>> ds = ds.map_batches(map_fn_with_large_output) >>> ds MapBatches(map_fn_with_large_output) - +- Dataset(num_blocks=1, num_rows=1, schema={item: int64}) + +- Dataset(num_blocks=..., num_rows=1, schema={item: int64}) Args: @@ -753,7 +753,7 @@ def select_columns( >>> ds MapBatches() +- Dataset( - num_blocks=10, + num_blocks=..., num_rows=10, schema={col1: int64, col2: int64, col3: int64} ) @@ -1698,7 +1698,7 @@ def groupby(self, key: Optional[str]) -> "GroupedData": ... {"A": x % 3, "B": x} for x in range(100)]).groupby( ... "A").count() Aggregate - +- Dataset(num_blocks=100, num_rows=100, schema={A: int64, B: int64}) + +- Dataset(num_blocks=..., num_rows=100, schema={A: int64, B: int64}) Time complexity: O(dataset size * log(dataset size / parallelism)) @@ -3287,7 +3287,7 @@ def to_tf( >>> ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv") >>> ds Dataset( - num_blocks=1, + num_blocks=..., num_rows=150, schema={ sepal length (cm): double, @@ -3318,7 +3318,7 @@ def to_tf( >>> ds Concatenator +- Dataset( - num_blocks=1, + num_blocks=..., num_rows=150, schema={ sepal length (cm): double, diff --git a/python/ray/data/datasource/datasource.py b/python/ray/data/datasource/datasource.py index d25100923782..a174561f9802 100644 --- a/python/ray/data/datasource/datasource.py +++ b/python/ray/data/datasource/datasource.py @@ -195,6 +195,7 @@ class ReadTask(Callable[[], Iterable[Block]]): def __init__(self, read_fn: Callable[[], Iterable[Block]], metadata: BlockMetadata): self._metadata = metadata self._read_fn = read_fn + self._additional_output_splits = 1 def get_metadata(self) -> BlockMetadata: return self._metadata @@ -211,13 +212,27 @@ def __call__(self) -> Iterable[Block]: if context.block_splitting_enabled: for block in result: - yield block + yield from self._do_additional_splits(block) else: builder = DelegatingBlockBuilder() for block in result: builder.add_block(block) yield builder.build() + def _set_additional_split_factor(self, k: int) -> None: + self._additional_output_splits = k + + def _do_additional_splits(self, block: Block) -> Iterable[Block]: + if self._additional_output_splits > 1: + block = BlockAccessor.for_block(block) + offset = 0 + split_sizes = _splitrange(block.num_rows(), self._additional_output_splits) + for size in split_sizes: + yield block.slice(offset, offset + size, copy=True) + offset += size + else: + yield block + @PublicAPI class RangeDatasource(Datasource): @@ -478,3 +493,21 @@ def make_block(count: int, num_columns: int) -> Block: i += block_size return read_tasks + + +def _splitrange(n, k): + """Calculates array lens of np.array_split(). + + This is the equivalent of + `[len(x) for x in np.array_split(range(n), k)]`. + """ + base = n // k + output = [base] * k + rem = n - sum(output) + for i in range(len(output)): + if rem > 0: + output[i] += 1 + rem -= 1 + assert rem == 0, (rem, output, n, k) + assert sum(output) == n, (output, n, k) + return output diff --git a/python/ray/data/datasource/parquet_datasource.py b/python/ray/data/datasource/parquet_datasource.py index db74bb2722ba..8b950a9f4bfc 100644 --- a/python/ray/data/datasource/parquet_datasource.py +++ b/python/ray/data/datasource/parquet_datasource.py @@ -288,6 +288,25 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]: if meta.size_bytes is not None: meta.size_bytes = int(meta.size_bytes * self._encoding_ratio) + + if meta.num_rows and meta.size_bytes: + # Make sure the batches read are small enough to enable yielding of + # output blocks incrementally during the read. + row_size = meta.size_bytes / meta.num_rows + # Make sure the row batch size is small enough that block splitting + # is still effective. + max_parquet_reader_row_batch_size = ( + DataContext.get_current().target_max_block_size // 10 + ) + default_read_batch_size = max( + 1, + min( + PARQUET_READER_ROW_BATCH_SIZE, + max_parquet_reader_row_batch_size // row_size, + ), + ) + else: + default_read_batch_size = PARQUET_READER_ROW_BATCH_SIZE block_udf, reader_args, columns, schema = ( self._block_udf, self._reader_args, @@ -299,6 +318,7 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]: lambda p=serialized_pieces: _read_pieces( block_udf, reader_args, + default_read_batch_size, columns, schema, p, @@ -363,7 +383,12 @@ def _estimate_files_encoding_ratio(self) -> float: def _read_pieces( - block_udf, reader_args, columns, schema, serialized_pieces: List[_SerializedPiece] + block_udf, + reader_args, + default_read_batch_size, + columns, + schema, + serialized_pieces: List[_SerializedPiece], ) -> Iterator["pyarrow.Table"]: # This import is necessary to load the tensor extension type. from ray.data.extensions.tensor_extension import ArrowTensorType # noqa @@ -387,7 +412,7 @@ def _read_pieces( logger.debug(f"Reading {len(pieces)} parquet pieces") use_threads = reader_args.pop("use_threads", False) - batch_size = reader_args.pop("batch_size", PARQUET_READER_ROW_BATCH_SIZE) + batch_size = reader_args.pop("batch_size", default_read_batch_size) for piece in pieces: part = _get_partition_keys(piece.partition_expression) batches = piece.to_batches( diff --git a/python/ray/data/iterator.py b/python/ray/data/iterator.py index 3e80bab15900..4b63432a6e44 100644 --- a/python/ray/data/iterator.py +++ b/python/ray/data/iterator.py @@ -48,9 +48,9 @@ class DataIterator(abc.ABC): >>> import ray >>> ds = ray.data.range(5) >>> ds - Dataset(num_blocks=5, num_rows=5, schema={id: int64}) + Dataset(num_blocks=..., num_rows=5, schema={id: int64}) >>> ds.iterator() - DataIterator(Dataset(num_blocks=5, num_rows=5, schema={id: int64})) + DataIterator(Dataset(num_blocks=..., num_rows=5, schema={id: int64})) .. tip:: For debugging purposes, use @@ -635,7 +635,7 @@ def to_tf( ... ) >>> it = ds.iterator(); it DataIterator(Dataset( - num_blocks=1, + num_blocks=..., num_rows=150, schema={ sepal length (cm): double, @@ -666,7 +666,7 @@ def to_tf( >>> it DataIterator(Concatenator +- Dataset( - num_blocks=1, + num_blocks=..., num_rows=150, schema={ sepal length (cm): double, diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index ed11093a2eff..d1649c9c601c 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -1,5 +1,6 @@ import collections import logging +import math from typing import ( TYPE_CHECKING, Any, @@ -136,7 +137,7 @@ def from_items( if parallelism == 0: raise ValueError(f"parallelism must be -1 or > 0, got: {parallelism}") - detected_parallelism, _ = _autodetect_parallelism( + detected_parallelism, _, _ = _autodetect_parallelism( parallelism, ray.util.get_current_placement_group(), DataContext.get_current(), @@ -193,7 +194,7 @@ def range(n: int, *, parallelism: int = -1) -> Dataset: >>> import ray >>> ds = ray.data.range(10000) # doctest: +SKIP >>> ds # doctest: +SKIP - Dataset(num_blocks=200, num_rows=10000, schema={id: int64}) + Dataset(num_blocks=..., num_rows=10000, schema={id: int64}) >>> ds.map(lambda x: {"id": x["id"] * 2}).take(4) # doctest: +SKIP [{"id": 0}, {"id": 2}, {"id": 4}, {"id": 6}] @@ -317,9 +318,12 @@ def read_datasource( force_local = True if force_local: - requested_parallelism, min_safe_parallelism, read_tasks = _get_read_tasks( - datasource, ctx, cur_pg, parallelism, local_uri, read_args - ) + ( + requested_parallelism, + min_safe_parallelism, + inmemory_size, + read_tasks, + ) = _get_read_tasks(datasource, ctx, cur_pg, parallelism, local_uri, read_args) else: # Prepare read in a remote task at same node. # NOTE: in Ray client mode, this is expected to be run on head node. @@ -332,7 +336,12 @@ def read_datasource( _get_read_tasks, retry_exceptions=False, num_cpus=0 ).options(scheduling_strategy=scheduling_strategy) - requested_parallelism, min_safe_parallelism, read_tasks = ray.get( + ( + requested_parallelism, + min_safe_parallelism, + inmemory_size, + read_tasks, + ) = ray.get( get_read_tasks.remote( datasource, ctx, @@ -343,28 +352,35 @@ def read_datasource( ) ) - if read_tasks and len(read_tasks) < min_safe_parallelism * 0.7: - perc = 1 + round((min_safe_parallelism - len(read_tasks)) / len(read_tasks), 1) - logger.warning( - f"{WARN_PREFIX} The blocks of this dataset are estimated to be {perc}x " - "larger than the target block size " - f"of {int(ctx.target_max_block_size / 1024 / 1024)} MiB. This may lead to " - "out-of-memory errors during processing. Consider reducing the size of " - "input files or using `.repartition(n)` to increase the number of " - "dataset blocks." - ) - elif len(read_tasks) < requested_parallelism and ( - len(read_tasks) < ray.available_resources().get("CPU", 1) // 2 - ): - logger.warning( - f"{WARN_PREFIX} The number of blocks in this dataset " - f"({len(read_tasks)}) " - f"limits its parallelism to {len(read_tasks)} concurrent tasks. " - "This is much less than the number " - "of available CPU slots in the cluster. Use `.repartition(n)` to " - "increase the number of " - "dataset blocks." - ) + # Compute the number of blocks the read will return. If the number of blocks is + # expected to be less than the requested parallelism, boost the number of blocks + # by adding an additional split into `k` pieces to each read task. + if read_tasks: + if inmemory_size: + expected_block_size = inmemory_size / len(read_tasks) + logger.debug(f"Expected block size {expected_block_size}") + size_based_splits = round( + max(1, expected_block_size / ctx.target_max_block_size) + ) + else: + size_based_splits = 1 + logger.debug(f"Size based split factor {size_based_splits}") + estimated_num_blocks = len(read_tasks) * size_based_splits + logger.debug(f"Blocks after size splits {estimated_num_blocks}") + + # Add more output splitting for each read task if needed. + if estimated_num_blocks < requested_parallelism: + k = math.ceil(requested_parallelism / estimated_num_blocks) + logger.info( + f"To satisfy the requested parallelism of {requested_parallelism}, " + f"each read task output will be split into {k} smaller blocks." + ) + for r in read_tasks: + r._set_additional_split_factor(k) + estimated_num_blocks = estimated_num_blocks * k + logger.debug("Estimated num output blocks {estimated_num_blocks}") + else: + estimated_num_blocks = 0 read_stage_name = f"Read{datasource.get_name()}" available_cpu_slots = ray.available_resources().get("CPU", 1) @@ -390,10 +406,11 @@ def read_datasource( ray_remote_args=ray_remote_args, owned_by_consumer=False, ) + block_list._estimated_num_blocks = estimated_num_blocks # TODO(hchen): move _get_read_tasks and related code to the Read physical operator, # after removing LazyBlockList code path. - read_op = Read(datasource, read_tasks, ray_remote_args) + read_op = Read(datasource, read_tasks, estimated_num_blocks, ray_remote_args) logical_plan = LogicalPlan(read_op) return Dataset( @@ -516,7 +533,7 @@ def read_parquet( >>> ray.data.read_parquet("example://iris.parquet", ... schema=pa.schema(fields)) Dataset( - num_blocks=1, + num_blocks=..., num_rows=150, schema={ sepal.length: double, @@ -616,13 +633,13 @@ def read_images( >>> path = "s3://anonymous@air-example-data-2/movie-image-small-filesize-1GB" >>> ds = ray.data.read_images(path) # doctest: +SKIP >>> ds # doctest: +SKIP - Dataset(num_blocks=200, num_rows=41979, schema={image: numpy.ndarray(ndim=3, dtype=uint8)}) + Dataset(num_blocks=..., num_rows=41979, schema={image: numpy.ndarray(ndim=3, dtype=uint8)}) If you need image file paths, set ``include_paths=True``. >>> ds = ray.data.read_images(path, include_paths=True) # doctest: +SKIP >>> ds # doctest: +SKIP - Dataset(num_blocks=200, num_rows=41979, schema={image: numpy.ndarray(ndim=3, dtype=uint8), path: string}) + Dataset(num_blocks=..., num_rows=41979, schema={image: numpy.ndarray(ndim=3, dtype=uint8), path: string}) >>> ds.take(1)[0]["path"] # doctest: +SKIP 'air-example-data-2/movie-image-small-filesize-1GB/0.jpg' @@ -645,7 +662,7 @@ def read_images( >>> partitioning = Partitioning("dir", field_names=["class"], base_dir=root) >>> ds = ray.data.read_images(root, size=(224, 224), partitioning=partitioning) # doctest: +SKIP >>> ds # doctest: +SKIP - Dataset(num_blocks=176, num_rows=94946, schema={image: TensorDtype(shape=(224, 224, 3), dtype=uint8), class: object}) + Dataset(num_blocks=..., num_rows=94946, schema={image: TensorDtype(shape=(224, 224, 3), dtype=uint8), class: object}) Args: paths: A single file/directory path or a list of file/directory paths. @@ -1748,22 +1765,22 @@ def from_huggingface( >>> ray_ds = ray.data.from_huggingface(hf_dataset) >>> ray_ds {'train': MaterializedDataset( - num_blocks=1, + num_blocks=..., num_rows=3257, schema={text: string, label: int64} ), 'test': MaterializedDataset( - num_blocks=1, + num_blocks=..., num_rows=1421, schema={text: string, label: int64} ), 'validation': MaterializedDataset( - num_blocks=1, + num_blocks=..., num_rows=374, schema={text: string, label: int64} )} >>> ray_ds = ray.data.from_huggingface(hf_dataset["train"]) >>> ray_ds MaterializedDataset( - num_blocks=1, + num_blocks=..., num_rows=3257, schema={text: string, label: int64} ) @@ -1831,7 +1848,7 @@ def from_tf( >>> dataset, _ = tfds.load('cifar10', split=["train", "test"]) # doctest: +SKIP >>> ds = ray.data.from_tf(dataset) # doctest: +SKIP >>> ds # doctest: +SKIP - Dataset(num_blocks=200, num_rows=50000, schema={id: binary, image: numpy.ndarray(shape=(32, 32, 3), dtype=uint8), label: int64}) + Dataset(num_blocks=..., num_rows=50000, schema={id: binary, image: numpy.ndarray(shape=(32, 32, 3), dtype=uint8), label: int64}) >>> ds.take(1) # doctest: +SKIP [{'id': b'train_16399', 'image': array([[[143, 96, 70], [141, 96, 72], @@ -1885,7 +1902,7 @@ def from_torch( >>> dataset = datasets.MNIST("data", download=True) # doctest: +SKIP >>> ds = ray.data.from_torch(dataset) # doctest: +SKIP >>> ds # doctest: +SKIP - Dataset(num_blocks=200, num_rows=60000, schema={item: object}) + Dataset(num_blocks=..., num_rows=60000, schema={item: object}) >>> ds.take(1) # doctest: +SKIP {"item": (, 5)} @@ -1905,7 +1922,7 @@ def _get_read_tasks( parallelism: int, local_uri: bool, kwargs: dict, -) -> Tuple[int, int, List[ReadTask]]: +) -> Tuple[int, int, Optional[int], List[ReadTask]]: """Generates read tasks. Args: @@ -1917,19 +1934,20 @@ def _get_read_tasks( Returns: Request parallelism from the datasource, the min safe parallelism to avoid - OOM, and the list of read tasks generated. + OOM, the estimated inmemory data size, and list of read tasks generated. """ kwargs = _unwrap_arrow_serialization_workaround(kwargs) if local_uri: kwargs["local_uri"] = local_uri DataContext._set_current(ctx) reader = ds.create_reader(**kwargs) - requested_parallelism, min_safe_parallelism = _autodetect_parallelism( + requested_parallelism, min_safe_parallelism, mem_size = _autodetect_parallelism( parallelism, cur_pg, DataContext.get_current(), reader ) return ( requested_parallelism, min_safe_parallelism, + mem_size, reader.get_read_tasks(requested_parallelism), ) diff --git a/python/ray/data/tests/test_auto_parallelism.py b/python/ray/data/tests/test_auto_parallelism.py index 55355eef2426..a16309369714 100644 --- a/python/ray/data/tests/test_auto_parallelism.py +++ b/python/ray/data/tests/test_auto_parallelism.py @@ -87,7 +87,7 @@ class MockReader: def estimate_inmemory_data_size(self): return data_size - result, _ = _autodetect_parallelism( + result, _, _ = _autodetect_parallelism( parallelism=-1, cur_pg=None, ctx=DataContext.get_current(), diff --git a/python/ray/data/tests/test_consumption.py b/python/ray/data/tests/test_consumption.py index c832033289d8..5e4b9c41bf14 100644 --- a/python/ray/data/tests/test_consumption.py +++ b/python/ray/data/tests/test_consumption.py @@ -157,7 +157,7 @@ def test_empty_dataset(ray_start_regular_shared): ds = ds.materialize() assert ( str(ds) - == "MaterializedDataset(num_blocks=1, num_rows=0, schema=Unknown schema)" + == "MaterializedDataset(num_blocks=2, num_rows=0, schema=Unknown schema)" ) # Test map on empty dataset. @@ -1468,7 +1468,6 @@ def test_read_write_local_node(ray_start_cluster): def check_dataset_is_local(ds): blocks = ds.get_internal_block_refs() - assert len(blocks) == num_files ray.wait(blocks, num_returns=len(blocks), fetch_local=False) location_data = ray.experimental.get_object_locations(blocks) locations = [] @@ -1488,7 +1487,7 @@ def check_dataset_is_local(ds): check_dataset_is_local(ds) # With fusion. - ds = ray.data.read_parquet(local_path).map(lambda x: x).materialize() + ds = ray.data.read_parquet(local_path, parallelism=1).map(lambda x: x).materialize() check_dataset_is_local(ds) # Write back to local scheme. @@ -1666,7 +1665,7 @@ def test_dataset_plan_as_string(ray_start_cluster): ds = ray.data.read_parquet("example://iris.parquet") assert ds._plan.get_plan_as_string("Dataset") == ( "Dataset(\n" - " num_blocks=1,\n" + " num_blocks=8,\n" " num_rows=150,\n" " schema={\n" " sepal.length: double,\n" @@ -1686,7 +1685,7 @@ def test_dataset_plan_as_string(ray_start_cluster): " +- MapBatches()\n" " +- MapBatches()\n" " +- Dataset(\n" - " num_blocks=1,\n" + " num_blocks=8,\n" " num_rows=150,\n" " schema={\n" " sepal.length: double,\n" diff --git a/python/ray/data/tests/test_csv.py b/python/ray/data/tests/test_csv.py index 6fe4f74eae52..2273cb9196fe 100644 --- a/python/ray/data/tests/test_csv.py +++ b/python/ray/data/tests/test_csv.py @@ -660,7 +660,7 @@ def keep_expected_partitions(kv_dict): data_path, partition_filter=partition_path_filter, filesystem=fs, - parallelism=100, + parallelism=6, ) assert_base_partitioned_ds(ds, num_input_files=6, num_computed=6) assert ray.get(kept_file_counter.get.remote()) == 6 diff --git a/python/ray/data/tests/test_execution_optimizer.py b/python/ray/data/tests/test_execution_optimizer.py index ab7a61b564c7..58f9ed755b9b 100644 --- a/python/ray/data/tests/test_execution_optimizer.py +++ b/python/ray/data/tests/test_execution_optimizer.py @@ -72,7 +72,7 @@ def _check_usage_record(op_names: List[str], clear_after_check: Optional[bool] = def test_read_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - op = Read(ParquetDatasource(), []) + op = Read(ParquetDatasource(), [], 0) plan = LogicalPlan(op) physical_op = planner.plan(plan).dag @@ -149,7 +149,7 @@ def method(self, x): for udf, expected_name in zip(udf_list, expected_names): op = MapRows( - Read(ParquetDatasource(), []), + Read(ParquetDatasource(), [], 0), udf, ) assert op.name == f"Map({expected_name})" @@ -157,7 +157,7 @@ def method(self, x): def test_map_batches_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource(), [], 0) op = MapBatches( read_op, lambda x: x, @@ -180,7 +180,7 @@ def test_map_batches_e2e(ray_start_regular_shared, enable_optimizer): def test_map_rows_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource(), [], 0) op = MapRows( read_op, lambda x: x, @@ -203,7 +203,7 @@ def test_map_rows_e2e(ray_start_regular_shared, enable_optimizer): def test_filter_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource(), [], 0) op = Filter( read_op, lambda x: x, @@ -226,7 +226,7 @@ def test_filter_e2e(ray_start_regular_shared, enable_optimizer): def test_flat_map(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource(), [], 0) op = FlatMap( read_op, lambda x: x, @@ -285,7 +285,7 @@ def ensure_sample_size_close(dataset, sample_percent=0.5): def test_random_shuffle_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource(), [], 0) op = RandomShuffle( read_op, seed=0, @@ -317,7 +317,7 @@ def test_random_shuffle_e2e( ) def test_repartition_operator(ray_start_regular_shared, enable_optimizer, shuffle): planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource(), [], 0) op = Repartition(read_op, num_outputs=5, shuffle=shuffle) plan = LogicalPlan(op) physical_op = planner.plan(plan).dag @@ -383,9 +383,9 @@ def _check_repartition_usage_and_stats(ds): @pytest.mark.parametrize("preserve_order", (True, False)) def test_union_operator(ray_start_regular_shared, enable_optimizer, preserve_order): planner = Planner() - read_parquet_op = Read(ParquetDatasource(), []) - read_range_op = Read(RangeDatasource(), []) - read_json_op = Read(JSONDatasource(), []) + read_parquet_op = Read(ParquetDatasource(), [], 0) + read_range_op = Read(RangeDatasource(), [], 0) + read_json_op = Read(JSONDatasource(), [], 0) union_op = Union( read_parquet_op, read_range_op, @@ -451,7 +451,7 @@ def test_union_e2e(ray_start_regular_shared, enable_optimizer, preserve_order): def test_read_map_batches_operator_fusion(ray_start_regular_shared, enable_optimizer): # Test that Read is fused with MapBatches. planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource(), [], 0) op = MapBatches( read_op, lambda x: x, @@ -471,7 +471,7 @@ def test_read_map_batches_operator_fusion(ray_start_regular_shared, enable_optim def test_read_map_chain_operator_fusion(ray_start_regular_shared, enable_optimizer): # Test that a chain of different map operators are fused. planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource(), [], 0) op = MapRows(read_op, lambda x: x) op = MapBatches(op, lambda x: x) op = FlatMap(op, lambda x: x) @@ -517,6 +517,7 @@ def test_read_map_batches_operator_fusion_compatible_remote_args( read_op = Read( ParquetDatasource(), [], + 0, # This case is testing fusing the following 2 map_batches operators. # So we add incompatible remote args to the read op to make sure # it doesn't get fused. @@ -565,6 +566,7 @@ def test_read_map_batches_operator_fusion_incompatible_remote_args( read_op = Read( ParquetDatasource(), [], + 0, # This case is testing fusing the following 2 map_batches operators. # So we add incompatible remote args to the read op to make sure # it doesn't get fused. @@ -599,7 +601,7 @@ def test_read_map_batches_operator_fusion_compute_tasks_to_actors( # Test that a task-based map operator is fused into an actor-based map operator when # the former comes before the latter. planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource(), [], 0) op = MapBatches(read_op, lambda x: x) op = MapBatches(op, lambda x: x, compute=ray.data.ActorPoolStrategy()) logical_plan = LogicalPlan(op) @@ -619,7 +621,7 @@ def test_read_map_batches_operator_fusion_compute_read_to_actors( ): # Test that reads fuse into an actor-based map operator. planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource(), [], 0) op = MapBatches(read_op, lambda x: x, compute=ray.data.ActorPoolStrategy()) logical_plan = LogicalPlan(op) physical_plan = planner.plan(logical_plan) @@ -638,7 +640,7 @@ def test_read_map_batches_operator_fusion_incompatible_compute( ): # Test that map operators are not fused when compute strategies are incompatible. planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource(), [], 0) op = MapBatches(read_op, lambda x: x, compute=ray.data.ActorPoolStrategy()) op = MapBatches(op, lambda x: x) logical_plan = LogicalPlan(op) @@ -662,7 +664,7 @@ def test_read_map_batches_operator_fusion_target_block_size( # Test that fusion of map operators merges their block sizes in the expected way # (taking the max). planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource(), [], 0) op = MapBatches(read_op, lambda x: x, target_block_size=2) op = MapBatches(op, lambda x: x, target_block_size=5) op = MapBatches(op, lambda x: x, target_block_size=3) @@ -864,7 +866,7 @@ def test_write_fusion(ray_start_regular_shared, enable_optimizer, tmp_path): def test_write_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() datasource = ParquetDatasource() - read_op = Read(datasource, []) + read_op = Read(datasource, [], 0) op = Write( read_op, datasource, @@ -880,7 +882,7 @@ def test_write_operator(ray_start_regular_shared, enable_optimizer): def test_sort_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource(), [], 0) op = Sort( read_op, key="col1", @@ -955,7 +957,7 @@ def test_sort_validate_keys( def test_aggregate_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource(), [], 0) op = Aggregate( read_op, key="col1", @@ -1025,8 +1027,8 @@ def test_aggregate_validate_keys( def test_zip_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op1 = Read(ParquetDatasource(), []) - read_op2 = Read(ParquetDatasource(), []) + read_op1 = Read(ParquetDatasource(), [], 0) + read_op2 = Read(ParquetDatasource(), [], 0) op = Zip(read_op1, read_op2) plan = LogicalPlan(op) physical_op = planner.plan(plan).dag diff --git a/python/ray/data/tests/test_mongo.py b/python/ray/data/tests/test_mongo.py index bff4bf5192fb..5574946e76d4 100644 --- a/python/ray/data/tests/test_mongo.py +++ b/python/ray/data/tests/test_mongo.py @@ -131,7 +131,7 @@ def test_read_write_mongo(ray_start_regular_shared, start_mongo): ) assert str(ds) == ( "Dataset(\n" - " num_blocks=5,\n" + " num_blocks=200,\n" " num_rows=5,\n" " schema={_id: fixed_size_binary[12], float_field: double, " "int_field: int32}\n" @@ -148,7 +148,7 @@ def test_read_write_mongo(ray_start_regular_shared, start_mongo): ) assert str(ds) == ( "Dataset(\n" - " num_blocks=5,\n" + " num_blocks=1000,\n" " num_rows=5,\n" " schema={_id: fixed_size_binary[12], float_field: double, " "int_field: int32}\n" @@ -247,7 +247,7 @@ def test_mongo_datasource(ray_start_regular_shared, start_mongo): ).materialize() assert str(ds) == ( "MaterializedDataset(\n" - " num_blocks=5,\n" + " num_blocks=200,\n" " num_rows=5,\n" " schema={_id: fixed_size_binary[12], float_field: double, " "int_field: int32}\n" @@ -265,7 +265,7 @@ def test_mongo_datasource(ray_start_regular_shared, start_mongo): ) assert str(ds) == ( "Dataset(\n" - " num_blocks=5,\n" + " num_blocks=1000,\n" " num_rows=5,\n" " schema={_id: fixed_size_binary[12], float_field: double, " "int_field: int32}\n" diff --git a/python/ray/data/tests/test_numpy.py b/python/ray/data/tests/test_numpy.py index 082f65c24b07..995da1f78862 100644 --- a/python/ray/data/tests/test_numpy.py +++ b/python/ray/data/tests/test_numpy.py @@ -130,7 +130,7 @@ def test_numpy_read(ray_start_regular_shared, tmp_path): path = os.path.join(tmp_path, "test_np_dir") os.mkdir(path) np.save(os.path.join(path, "test.npy"), np.expand_dims(np.arange(0, 10), 1)) - ds = ray.data.read_numpy(path) + ds = ray.data.read_numpy(path, parallelism=1) assert str(ds) == ( "Dataset(\n" " num_blocks=1,\n" @@ -146,7 +146,7 @@ def test_numpy_read(ray_start_regular_shared, tmp_path): with open(os.path.join(path, "foo.txt"), "w") as f: f.write("foobar") - ds = ray.data.read_numpy(path) + ds = ray.data.read_numpy(path, parallelism=1) assert ds.num_blocks() == 1 assert ds.count() == 10 assert str(ds) == ( @@ -186,7 +186,9 @@ def test_numpy_read_meta_provider(ray_start_regular_shared, tmp_path): os.mkdir(path) path = os.path.join(path, "test.npy") np.save(path, np.expand_dims(np.arange(0, 10), 1)) - ds = ray.data.read_numpy(path, meta_provider=FastFileMetadataProvider()) + ds = ray.data.read_numpy( + path, meta_provider=FastFileMetadataProvider(), parallelism=1 + ) assert str(ds) == ( "Dataset(\n" " num_blocks=1,\n" diff --git a/python/ray/data/tests/test_parquet.py b/python/ray/data/tests/test_parquet.py index 4dfc1a8a1d2e..c69ba0e4bd40 100644 --- a/python/ray/data/tests/test_parquet.py +++ b/python/ray/data/tests/test_parquet.py @@ -1045,8 +1045,6 @@ def get_node_id(): # Force reads. blocks = ds.get_internal_block_refs() - assert len(blocks) == 2 - ray.wait(blocks, num_returns=len(blocks), fetch_local=False) location_data = ray.experimental.get_object_locations(blocks) locations = [] diff --git a/python/ray/data/tests/test_randomize_block_order.py b/python/ray/data/tests/test_randomize_block_order.py index 79048dd4e1f2..797d60a7bb4e 100644 --- a/python/ray/data/tests/test_randomize_block_order.py +++ b/python/ray/data/tests/test_randomize_block_order.py @@ -21,7 +21,9 @@ def test_randomize_blocks_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(datasource=ParquetDatasource(), read_tasks=[]) + read_op = Read( + datasource=ParquetDatasource(), read_tasks=[], estimated_num_blocks=0 + ) op = RandomizeBlocks( read_op, seed=0, @@ -36,7 +38,7 @@ def test_randomize_blocks_operator(ray_start_regular_shared, enable_optimizer): def test_randomize_block_order_rule(): - read = Read(datasource=ParquetDatasource(), read_tasks=[]) + read = Read(datasource=ParquetDatasource(), read_tasks=[], estimated_num_blocks=0) operator1 = RandomizeBlocks(input_op=read, seed=None) operator2 = RandomizeBlocks(input_op=operator1, seed=None) operator3 = MapBatches(input_op=operator2, fn=lambda x: x) @@ -59,7 +61,7 @@ def test_randomize_block_order_rule(): def test_randomize_block_order_rule_seed(): - read = Read(datasource=ParquetDatasource(), read_tasks=[]) + read = Read(datasource=ParquetDatasource(), read_tasks=[], estimated_num_blocks=0) operator1 = RandomizeBlocks(input_op=read, seed=None) operator2 = RandomizeBlocks(input_op=operator1, seed=2) operator3 = MapBatches(input_op=operator2, fn=lambda x: x) @@ -86,7 +88,7 @@ def test_randomize_block_order_rule_seed(): def test_randomize_block_order_after_repartition(): - read = Read(datasource=ParquetDatasource(), read_tasks=[]) + read = Read(datasource=ParquetDatasource(), read_tasks=[], estimated_num_blocks=0) operator1 = RandomizeBlocks(input_op=read) operator2 = Repartition(input_op=operator1, num_outputs=1, shuffle=False) operator3 = RandomizeBlocks(input_op=operator2) diff --git a/python/ray/data/tests/test_size_estimation.py b/python/ray/data/tests/test_size_estimation.py index 2b9c61752b8d..11245da54c3c 100644 --- a/python/ray/data/tests/test_size_estimation.py +++ b/python/ray/data/tests/test_size_estimation.py @@ -86,7 +86,7 @@ def gen(name): ray.data.range(1000, parallelism=1).map( lambda _: {"out": LARGE_VALUE} ).write_csv(path) - return ray.data.read_csv(path) + return ray.data.read_csv(path, parallelism=1) # 20MiB ctx.target_max_block_size = 20_000_000 @@ -132,7 +132,7 @@ def gen(name): # will only write to one file, even though there are multiple # blocks created by block splitting. ds.write_parquet(path) - return ray.data.read_parquet(path, parallelism=200) + return ray.data.read_parquet(path, parallelism=1) # 20MiB ctx.target_max_block_size = 20_000_000 @@ -143,17 +143,17 @@ def gen(name): ctx.target_max_block_size = 3_000_000 ds2 = gen("out2") nrow = ds2._block_num_rows() - assert 3 < len(nrow) < 5, nrow + assert 2 < len(nrow) < 5, nrow for x in nrow[:-1]: - assert 50000 < x < 75000, (x, nrow) + assert 50000 < x < 95000, (x, nrow) # 1MiB ctx.target_max_block_size = 1_000_000 ds3 = gen("out3") nrow = ds3._block_num_rows() - assert 8 < len(nrow) < 12, nrow + assert 6 < len(nrow) < 12, nrow for x in nrow[:-1]: - assert 20000 < x < 25000, (x, nrow) + assert 20000 < x < 35000, (x, nrow) @pytest.mark.parametrize("use_actors", [False, True]) diff --git a/python/ray/data/tests/test_splitblocks.py b/python/ray/data/tests/test_splitblocks.py new file mode 100644 index 000000000000..f5839b8e85de --- /dev/null +++ b/python/ray/data/tests/test_splitblocks.py @@ -0,0 +1,80 @@ +import numpy as np +import pytest + +import ray +from ray.data.datasource.datasource import _splitrange +from ray.data.tests.conftest import * # noqa +from ray.tests.conftest import * # noqa + + +def test_splitrange(): + def f(n, k): + assert _splitrange(n, k) == [len(a) for a in np.array_split(range(n), k)] + + f(0, 1) + f(5, 1) + f(5, 3) + f(5, 5) + f(5, 10) + f(50, 1) + f(50, 2) + f(50, 3) + f(50, 4) + f(50, 5) + + +def test_small_file_split(ray_start_10_cpus_shared): + ds = ray.data.read_csv("example://iris.csv", parallelism=1) + assert ds.num_blocks() == 1 + assert ds.materialize().num_blocks() == 1 + assert ds.map_batches(lambda x: x).materialize().num_blocks() == 1 + + ds = ds.map_batches(lambda x: x).materialize() + stats = ds.stats() + assert "Stage 1 ReadCSV->MapBatches" in stats, stats + + ds = ray.data.read_csv("example://iris.csv", parallelism=10) + assert ds.num_blocks() == 1 + assert ds.map_batches(lambda x: x).materialize().num_blocks() == 10 + assert ds.materialize().num_blocks() == 10 + + ds = ray.data.read_csv("example://iris.csv", parallelism=100) + assert ds.num_blocks() == 1 + assert ds.map_batches(lambda x: x).materialize().num_blocks() == 100 + assert ds.materialize().num_blocks() == 100 + + ds = ds.map_batches(lambda x: x).materialize() + stats = ds.stats() + assert "Stage 1 ReadCSV->SplitBlocks(100)" in stats, stats + assert "Stage 2 MapBatches" in stats, stats + + +def test_large_file_additional_split(ray_start_10_cpus_shared, tmp_path): + ctx = ray.data.context.DataContext.get_current() + ctx.target_max_block_size = 10 * 1024 * 1024 + + # ~100MiB of tensor data + ds = ray.data.range_tensor(1000, shape=(10000,)) + ds.repartition(1).write_parquet(tmp_path) + + ds = ray.data.read_parquet(tmp_path, parallelism=1) + assert ds.num_blocks() == 1 + assert 5 < ds.materialize().num_blocks() < 20 # Size-based block split + + ds = ray.data.read_parquet(tmp_path, parallelism=10) + assert ds.num_blocks() == 1 + assert 5 < ds.materialize().num_blocks() < 20 + + ds = ray.data.read_parquet(tmp_path, parallelism=100) + assert ds.num_blocks() == 1 + assert 50 < ds.materialize().num_blocks() < 200 + + ds = ray.data.read_parquet(tmp_path, parallelism=1000) + assert ds.num_blocks() == 1 + assert 500 < ds.materialize().num_blocks() < 2000 + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/train/tests/test_batch_predictor.py b/python/ray/train/tests/test_batch_predictor.py index d600ccf4e787..1e2f5ef73cf8 100644 --- a/python/ray/train/tests/test_batch_predictor.py +++ b/python/ray/train/tests/test_batch_predictor.py @@ -498,7 +498,7 @@ def test_get_and_set_preprocessor(): test_dataset = ray.data.range(4) output_ds = batch_predictor.predict(test_dataset) - assert output_ds.to_pandas().to_numpy().squeeze().tolist() == [ + assert sorted(output_ds.to_pandas().to_numpy().squeeze().tolist()) == [ 0.0, 2.0, 4.0, @@ -510,7 +510,7 @@ def test_get_and_set_preprocessor(): assert batch_predictor.get_preprocessor() == preprocessor2 output_ds = batch_predictor.predict(test_dataset) - assert output_ds.to_pandas().to_numpy().squeeze().tolist() == [ + assert sorted(output_ds.to_pandas().to_numpy().squeeze().tolist()) == [ 0.0, 4.0, 8.0, @@ -522,7 +522,7 @@ def test_get_and_set_preprocessor(): assert batch_predictor.get_preprocessor() is None output_ds = batch_predictor.predict(test_dataset) - assert output_ds.to_pandas().to_numpy().squeeze().tolist() == [ + assert sorted(output_ds.to_pandas().to_numpy().squeeze().tolist()) == [ 0.0, 2.0, 4.0,