Skip to content

Commit

Permalink
[data] Read->SplitBlocks to ensure requested read parallelism is alwa…
Browse files Browse the repository at this point in the history
…ys met (#36352)

Today, the number of initial blocks of a dataset is limited to the number of input files of the datasource, regardless of the requested parallelism. This is problematic as it means to increase the number of blocks requires a `repartition()` call, which is not always practical in the streaming setting.

This PR inserts a streaming SplitBlocks operator that is fused with read tasks in this case to allow for arbitrarily high requested parallelism (up to number of individual records) without needing a blocking repartition.

Before:
```
ray.data.read_parquet([list, of, 100, parquet, files], parallelism=2000)
# -> num_blocks = 100
```

After:
```
ray.data.read_parquet([list, of, 100, parquet, files], parallelism=2000)
# -> num_blocks = 2000
```

Limitations:
- Until #36071 merges and is integrated with Ray Data, downstream operators of the read may still block until the entire file is read, even if the read would produce multiple blocks.
- The SplitBlocks operator cannot be fused with downstream Map stages, since it is changing the physical partitioning of the stream. If we fused it, then the parallelism increase would not be realized as we could not split the read output to multiple processes.
  • Loading branch information
ericl authored Jun 22, 2023
1 parent e50e182 commit 0ab00ec
Show file tree
Hide file tree
Showing 29 changed files with 325 additions and 124 deletions.
4 changes: 2 additions & 2 deletions doc/source/data/getting-started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ transform datasets. Ray executes transformations in parallel for performance.
.. testoutput::

MaterializedDataset(
num_blocks=1,
num_blocks=...,
num_rows=150,
schema={
sepal length (cm): double,
Expand Down Expand Up @@ -147,7 +147,7 @@ or remote filesystems.

import os

transformed_ds.write_parquet("/tmp/iris")
transformed_ds.repartition(1).write_parquet("/tmp/iris")

print(os.listdir("/tmp/iris"))

Expand Down
4 changes: 2 additions & 2 deletions doc/source/data/key-concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ blocks need to be materialized in the cluster memory at once.
Reading Data
============

Dataset uses Ray tasks to read data from remote storage in parallel. Each read task reads one or more files and produces an output block:
Dataset uses Ray tasks to read data from remote storage in parallel. Each read task reads one or more files and produces one or more output blocks:

.. image:: images/dataset-read.svg
:align: center

..
https://docs.google.com/drawings/d/15B4TB8b5xN15Q9S8-s0MjW6iIvo_PrH7JtV1fL123pU/edit
You can manually specify the number of read tasks, but the final parallelism is always capped by the number of files in the underlying dataset.
You can increase or decrease the number of output blocks by changing the ``parallelism`` parameter.

For an in-depth guide on creating datasets, read :ref:`Loading Data <loading_data>`.

Expand Down
3 changes: 2 additions & 1 deletion doc/source/data/performance-tips.rst
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ By default, Ray Data automatically selects the read ``parallelism`` according to
4. The parallelism is truncated to ``min(num_files, parallelism)``.

Occasionally, it is advantageous to manually tune the parallelism to optimize the application. This can be done when loading data via the ``parallelism`` parameter.
For example, use ``ray.data.read_parquet(path, parallelism=1000)`` to force up to 1000 read tasks to be created.
For example, use ``ray.data.read_parquet(path, parallelism=1000)`` to force up to 1000 read tasks to be created. Note that read tasks can produce multiple output
blocks per file in order to satisfy the requested parallelism.

Tuning Read Resources
~~~~~~~~~~~~~~~~~~~~~
Expand Down
4 changes: 2 additions & 2 deletions doc/source/data/transforming-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,9 @@ Repartitioning data

A :class:`~ray.data.dataset.Dataset` operates on a sequence of distributed data
:term:`blocks <block>`. If you want to achieve more fine-grained parallelization,
increase the number of blocks.
increase the number of blocks by setting a higher ``parallelism`` at read time.

To change the number of blocks, call
To change the number of blocks for an existing Dataset, call
:meth:`Dataset.repartition() <ray.data.Dataset.repartition>`.

.. testcode::
Expand Down
6 changes: 3 additions & 3 deletions python/ray/air/tests/test_legacy_dataset_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def checker(shard, results):
assert len(results[0]) == 5, results
assert results[0] != results[1], results
stats = shard.stats()
assert "RandomizeBlockOrder: 5/5 blocks executed in" in stats, stats
assert "RandomizeBlockOrder:" in stats, stats

ds = ray.data.range(5)
test = TestStream(
Expand All @@ -385,7 +385,7 @@ def checker(shard, results):
# we eliminate the ordering in comparison.
assert set(results[0]) == set(results[1]), results
stats = shard.stats()
assert "RandomizeBlockOrder: 5/5 blocks executed" in stats, stats
assert "RandomizeBlockOrder:" in stats, stats

ds = ray.data.range(5)
test = TestBatch(
Expand All @@ -400,7 +400,7 @@ def checker(shard, results):
assert len(results[0]) == 5, results
assert results[0] != results[1], results
stats = shard.stats()
assert "RandomizeBlockOrder: 5/5 blocks executed in" in stats, stats
assert "RandomizeBlockOrder:" in stats, stats

ds = ray.data.range(5)
test = TestStream(
Expand Down
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,14 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_splitblocks",
size = "medium",
srcs = ["tests/test_splitblocks.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_execution_optimizer",
size = "medium",
Expand Down
7 changes: 7 additions & 0 deletions python/ray/data/_internal/block_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ def __init__(
# Whether the block list is owned by consuming APIs, and if so it can be
# eagerly deleted after read by the consumer.
self._owned_by_consumer = owned_by_consumer
# This field can be set to indicate the number of estimated output blocks,
# since each read task may produce multiple output blocks after splitting.
self._estimated_num_blocks = None

def __repr__(self):
return f"BlockList(owned_by_consumer={self._owned_by_consumer})"
Expand Down Expand Up @@ -217,6 +220,10 @@ def initial_num_blocks(self) -> int:
"""Returns the number of blocks of this BlockList."""
return self._num_blocks

def estimated_num_blocks(self) -> int:
"""Estimate of `executed_num_blocks()`, without triggering actual execution."""
return self._estimated_num_blocks or self._num_blocks

def executed_num_blocks(self) -> int:
"""Returns the number of output blocks after execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ def __init__(
self,
input_data: Optional[List[RefBundle]] = None,
input_data_factory: Callable[[], List[RefBundle]] = None,
num_output_blocks: Optional[int] = None,
):
"""Create an InputDataBuffer.
Args:
input_data: The list of bundles to output from this operator.
input_data_factory: The factory to get input data, if input_data is None.
num_output_blocks: The number of output blocks. If not specified, progress
bars total will be set based on num output bundles instead.
"""
if input_data is not None:
assert input_data_factory is None
Expand All @@ -37,6 +40,7 @@ def __init__(
assert input_data_factory is not None
self._input_data_factory = input_data_factory
self._is_input_initialized = False
self._num_output_blocks = num_output_blocks
super().__init__("Input", [])

def start(self, options: ExecutionOptions) -> None:
Expand All @@ -53,7 +57,7 @@ def get_next(self) -> RefBundle:
return self._input_data.pop(0)

def num_outputs_total(self) -> Optional[int]:
return self._num_outputs
return self._num_output_blocks or self._num_output_bundles

def get_stats(self) -> StatsDict:
return {}
Expand All @@ -64,7 +68,7 @@ def add_input(self, refs, input_index) -> None:
def _initialize_metadata(self):
assert self._input_data is not None and self._is_input_initialized

self._num_outputs = len(self._input_data)
self._num_output_bundles = len(self._input_data)
block_metadata = []
for bundle in self._input_data:
block_metadata.extend([m for (_, m) in bundle.blocks])
Expand Down
17 changes: 16 additions & 1 deletion python/ray/data/_internal/logical/operators/read_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,23 @@ def __init__(
self,
datasource: Datasource,
read_tasks: List[ReadTask],
estimated_num_blocks: int,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
super().__init__(f"Read{datasource.get_name()}", None, ray_remote_args)
if len(read_tasks) == estimated_num_blocks:
suffix = ""
else:
suffix = f"->SplitBlocks({int(estimated_num_blocks / len(read_tasks))})"
super().__init__(f"Read{datasource.get_name()}{suffix}", None, ray_remote_args)
self._datasource = datasource
self._estimated_num_blocks = estimated_num_blocks
self._read_tasks = read_tasks

def fusable(self) -> bool:
"""Whether this should be fused with downstream operators.
When we are outputting multiple blocks per read task, we should disable fusion,
as fusion would prevent the blocks from being dispatched to multiple processes
for parallel processing in downstream operators.
"""
return self._estimated_num_blocks == len(self._read_tasks)
4 changes: 4 additions & 0 deletions python/ray/data/_internal/logical/rules/operator_fusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
Repartition,
)
from ray.data._internal.logical.operators.map_operator import AbstractUDFMap
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.stats import StatsDict
from ray.data.block import Block

Expand Down Expand Up @@ -132,6 +133,9 @@ def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool:
down_logical_op = self._op_map[down_op]
up_logical_op = self._op_map[up_op]

if isinstance(up_logical_op, Read) and not up_logical_op.fusable():
return False

# If the downstream operator takes no input, it cannot be fused with
# the upstream operator.
if not down_logical_op._input_dependencies:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def get_plan_as_string(self, classname: str) -> str:
if dataset_blocks is None:
num_blocks = "?"
else:
num_blocks = dataset_blocks.initial_num_blocks()
num_blocks = dataset_blocks.estimated_num_blocks()
dataset_str = "{}(num_blocks={}, num_rows={}, schema={})".format(
classname, num_blocks, count, schema_str
)
Expand Down
4 changes: 3 additions & 1 deletion python/ray/data/_internal/planner/plan_read_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ def get_input_data() -> List[RefBundle]:
for read_task in read_tasks
]

inputs = InputDataBuffer(input_data_factory=get_input_data)
inputs = InputDataBuffer(
input_data_factory=get_input_data, num_output_blocks=op._estimated_num_blocks
)

def do_read(blocks: Iterator[ReadTask], _: TaskContext) -> Iterator[Block]:
for read_task in blocks:
Expand Down
9 changes: 5 additions & 4 deletions python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def _autodetect_parallelism(
ctx: DataContext,
reader: Optional["Reader"] = None,
avail_cpus: Optional[int] = None,
) -> (int, int):
) -> (int, int, Optional[int]):
"""Returns parallelism to use and the min safe parallelism to avoid OOMs.
This detects parallelism using the following heuristics, applied in order:
Expand All @@ -111,8 +111,9 @@ def _autodetect_parallelism(
avail_cpus: Override avail cpus detection (for testing only).
Returns:
Tuple of detected parallelism (only if -1 was specified), and the min safe
parallelism (which can be used to generate warnings about large blocks).
Tuple of detected parallelism (only if -1 was specified), the min safe
parallelism (which can be used to generate warnings about large blocks),
and the estimated inmemory size of the dataset.
"""
min_safe_parallelism = 1
max_reasonable_parallelism = sys.maxsize
Expand Down Expand Up @@ -140,7 +141,7 @@ def _autodetect_parallelism(
f"estimated_available_cpus={avail_cpus} and "
f"estimated_data_size={mem_size}."
)
return parallelism, min_safe_parallelism
return parallelism, min_safe_parallelism, mem_size


def _estimate_avail_cpus(cur_pg: Optional["PlacementGroup"]) -> int:
Expand Down
10 changes: 5 additions & 5 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ def map_batches(
>>> ds = ds.map_batches(map_fn_with_large_output)
>>> ds
MapBatches(map_fn_with_large_output)
+- Dataset(num_blocks=1, num_rows=1, schema={item: int64})
+- Dataset(num_blocks=..., num_rows=1, schema={item: int64})
Args:
Expand Down Expand Up @@ -753,7 +753,7 @@ def select_columns(
>>> ds
MapBatches(<lambda>)
+- Dataset(
num_blocks=10,
num_blocks=...,
num_rows=10,
schema={col1: int64, col2: int64, col3: int64}
)
Expand Down Expand Up @@ -1700,7 +1700,7 @@ def groupby(self, key: Optional[str]) -> "GroupedData":
... {"A": x % 3, "B": x} for x in range(100)]).groupby(
... "A").count()
Aggregate
+- Dataset(num_blocks=100, num_rows=100, schema={A: int64, B: int64})
+- Dataset(num_blocks=..., num_rows=100, schema={A: int64, B: int64})
Time complexity: O(dataset size * log(dataset size / parallelism))
Expand Down Expand Up @@ -3289,7 +3289,7 @@ def to_tf(
>>> ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
>>> ds
Dataset(
num_blocks=1,
num_blocks=...,
num_rows=150,
schema={
sepal length (cm): double,
Expand Down Expand Up @@ -3320,7 +3320,7 @@ def to_tf(
>>> ds
Concatenator
+- Dataset(
num_blocks=1,
num_blocks=...,
num_rows=150,
schema={
sepal length (cm): double,
Expand Down
35 changes: 34 additions & 1 deletion python/ray/data/datasource/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ class ReadTask(Callable[[], Iterable[Block]]):
def __init__(self, read_fn: Callable[[], Iterable[Block]], metadata: BlockMetadata):
self._metadata = metadata
self._read_fn = read_fn
self._additional_output_splits = 1

def get_metadata(self) -> BlockMetadata:
return self._metadata
Expand All @@ -211,13 +212,27 @@ def __call__(self) -> Iterable[Block]:

if context.block_splitting_enabled:
for block in result:
yield block
yield from self._do_additional_splits(block)
else:
builder = DelegatingBlockBuilder()
for block in result:
builder.add_block(block)
yield builder.build()

def _set_additional_split_factor(self, k: int) -> None:
self._additional_output_splits = k

def _do_additional_splits(self, block: Block) -> Iterable[Block]:
if self._additional_output_splits > 1:
block = BlockAccessor.for_block(block)
offset = 0
split_sizes = _splitrange(block.num_rows(), self._additional_output_splits)
for size in split_sizes:
yield block.slice(offset, offset + size, copy=True)
offset += size
else:
yield block


@PublicAPI
class RangeDatasource(Datasource):
Expand Down Expand Up @@ -478,3 +493,21 @@ def make_block(count: int, num_columns: int) -> Block:
i += block_size

return read_tasks


def _splitrange(n, k):
"""Calculates array lens of np.array_split().
This is the equivalent of
`[len(x) for x in np.array_split(range(n), k)]`.
"""
base = n // k
output = [base] * k
rem = n - sum(output)
for i in range(len(output)):
if rem > 0:
output[i] += 1
rem -= 1
assert rem == 0, (rem, output, n, k)
assert sum(output) == n, (output, n, k)
return output
Loading

0 comments on commit 0ab00ec

Please sign in to comment.