Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Read->SplitBlocks to ensure requested read parallelism is always met #36352

Merged
merged 28 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions doc/source/data/getting-started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ transform datasets. Ray executes transformations in parallel for performance.
.. testoutput::

MaterializedDataset(
num_blocks=1,
num_blocks=...,
num_rows=150,
schema={
sepal length (cm): double,
Expand Down Expand Up @@ -147,7 +147,7 @@ or remote filesystems.

import os

transformed_ds.write_parquet("/tmp/iris")
transformed_ds.repartition(1).write_parquet("/tmp/iris")

print(os.listdir("/tmp/iris"))

Expand Down
4 changes: 2 additions & 2 deletions doc/source/data/key-concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ blocks need to be materialized in the cluster memory at once.
Reading Data
============

Dataset uses Ray tasks to read data from remote storage in parallel. Each read task reads one or more files and produces an output block:
Dataset uses Ray tasks to read data from remote storage in parallel. Each read task reads one or more files and produces one or more output blocks:

.. image:: images/dataset-read.svg
:align: center

..
https://docs.google.com/drawings/d/15B4TB8b5xN15Q9S8-s0MjW6iIvo_PrH7JtV1fL123pU/edit

You can manually specify the number of read tasks, but the final parallelism is always capped by the number of files in the underlying dataset.
You can increase or decrease the number of output blocks by changing the ``parallelism`` parameter.

For an in-depth guide on creating datasets, read :ref:`Loading Data <loading_data>`.

Expand Down
3 changes: 2 additions & 1 deletion doc/source/data/performance-tips.rst
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ By default, Ray Data automatically selects the read ``parallelism`` according to
4. The parallelism is truncated to ``min(num_files, parallelism)``.

Occasionally, it is advantageous to manually tune the parallelism to optimize the application. This can be done when loading data via the ``parallelism`` parameter.
For example, use ``ray.data.read_parquet(path, parallelism=1000)`` to force up to 1000 read tasks to be created.
For example, use ``ray.data.read_parquet(path, parallelism=1000)`` to force up to 1000 read tasks to be created. Note that read tasks can produce multiple output
blocks per file in order to satisfy the requested parallelism.

Tuning Read Resources
~~~~~~~~~~~~~~~~~~~~~
Expand Down
4 changes: 2 additions & 2 deletions doc/source/data/transforming-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,9 @@ Repartitioning data

A :class:`~ray.data.dataset.Dataset` operates on a sequence of distributed data
:term:`blocks <block>`. If you want to achieve more fine-grained parallelization,
increase the number of blocks.
increase the number of blocks by setting a higher ``parallelism`` at read time.

To change the number of blocks, call
To change the number of blocks for an existing Dataset, call
:meth:`Dataset.repartition() <ray.data.Dataset.repartition>`.

.. testcode::
Expand Down
6 changes: 3 additions & 3 deletions python/ray/air/tests/test_legacy_dataset_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def checker(shard, results):
assert len(results[0]) == 5, results
assert results[0] != results[1], results
stats = shard.stats()
assert "RandomizeBlockOrder: 5/5 blocks executed in" in stats, stats
assert "RandomizeBlockOrder:" in stats, stats

ds = ray.data.range(5)
test = TestStream(
Expand All @@ -385,7 +385,7 @@ def checker(shard, results):
# we eliminate the ordering in comparison.
assert set(results[0]) == set(results[1]), results
stats = shard.stats()
assert "RandomizeBlockOrder: 5/5 blocks executed" in stats, stats
assert "RandomizeBlockOrder:" in stats, stats

ds = ray.data.range(5)
test = TestBatch(
Expand All @@ -400,7 +400,7 @@ def checker(shard, results):
assert len(results[0]) == 5, results
assert results[0] != results[1], results
stats = shard.stats()
assert "RandomizeBlockOrder: 5/5 blocks executed in" in stats, stats
assert "RandomizeBlockOrder:" in stats, stats

ds = ray.data.range(5)
test = TestStream(
Expand Down
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,14 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_splitblocks",
size = "medium",
srcs = ["tests/test_splitblocks.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_execution_optimizer",
size = "medium",
Expand Down
7 changes: 7 additions & 0 deletions python/ray/data/_internal/block_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ def __init__(
# Whether the block list is owned by consuming APIs, and if so it can be
# eagerly deleted after read by the consumer.
self._owned_by_consumer = owned_by_consumer
# This field can be set to indicate the number of estimated output blocks,
# since each read task may produce multiple output blocks after splitting.
self._estimated_num_blocks = None
ericl marked this conversation as resolved.
Show resolved Hide resolved

def __repr__(self):
return f"BlockList(owned_by_consumer={self._owned_by_consumer})"
Expand Down Expand Up @@ -217,6 +220,10 @@ def initial_num_blocks(self) -> int:
"""Returns the number of blocks of this BlockList."""
return self._num_blocks

def estimated_num_blocks(self) -> int:
"""Estimate of `executed_num_blocks()`, without triggering actual execution."""
return self._estimated_num_blocks or self._num_blocks

def executed_num_blocks(self) -> int:
"""Returns the number of output blocks after execution.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ def __init__(
self,
input_data: Optional[List[RefBundle]] = None,
input_data_factory: Callable[[], List[RefBundle]] = None,
num_output_blocks: Optional[int] = None,
):
"""Create an InputDataBuffer.

Args:
input_data: The list of bundles to output from this operator.
input_data_factory: The factory to get input data, if input_data is None.
num_output_blocks: The number of output blocks. If not specified, progress
bars total will be set based on num output bundles instead.
"""
if input_data is not None:
assert input_data_factory is None
Expand All @@ -37,6 +40,7 @@ def __init__(
assert input_data_factory is not None
self._input_data_factory = input_data_factory
self._is_input_initialized = False
self._num_output_blocks = num_output_blocks
super().__init__("Input", [])

def start(self, options: ExecutionOptions) -> None:
Expand All @@ -53,7 +57,7 @@ def get_next(self) -> RefBundle:
return self._input_data.pop(0)

def num_outputs_total(self) -> Optional[int]:
return self._num_outputs
return self._num_output_blocks or self._num_output_bundles

def get_stats(self) -> StatsDict:
return {}
Expand All @@ -64,7 +68,7 @@ def add_input(self, refs, input_index) -> None:
def _initialize_metadata(self):
assert self._input_data is not None and self._is_input_initialized

self._num_outputs = len(self._input_data)
self._num_output_bundles = len(self._input_data)
block_metadata = []
for bundle in self._input_data:
block_metadata.extend([m for (_, m) in bundle.blocks])
Expand Down
17 changes: 16 additions & 1 deletion python/ray/data/_internal/logical/operators/read_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,23 @@ def __init__(
self,
datasource: Datasource,
read_tasks: List[ReadTask],
estimated_num_blocks: int,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
super().__init__(f"Read{datasource.get_name()}", None, ray_remote_args)
if len(read_tasks) == estimated_num_blocks:
suffix = ""
else:
suffix = f"->SplitBlocks({int(estimated_num_blocks / len(read_tasks))})"
ericl marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like SplitBlocks is a separate op. What about Read(spit_blocks=N)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, +1 for ReadXXX(split_blocks=N), otherwise Dataset.__repr__ would become confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this--- the original proposal is that SplitBlock is supposed to be a logical operator, since it only applies to the output of the read. It seems more clear therefore using the chaining syntax of -> instead of making it part of the Read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I miss any context, why don't we implement SplitBlock as a separate logical & physical operator?

The current implementation is inside Datasource, so it looks like part of Read & InputDataBuffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should, but it would get fused with Read anyways. So here we only implement it as part of Read since we have yet to decide whether it should be a general operator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g., for dynamic_repartition() or such.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, +1 to make it a general operator.

super().__init__(f"Read{datasource.get_name()}{suffix}", None, ray_remote_args)
self._datasource = datasource
self._estimated_num_blocks = estimated_num_blocks
self._read_tasks = read_tasks

def fusable(self) -> bool:
ericl marked this conversation as resolved.
Show resolved Hide resolved
"""Whether this should be fused with downstream operators.

When we are outputting multiple blocks per read task, we should disable fusion,
as fusion would prevent the blocks from being dispatched to multiple processes
for parallel processing in downstream operators.
"""
return self._estimated_num_blocks == len(self._read_tasks)
4 changes: 4 additions & 0 deletions python/ray/data/_internal/logical/rules/operator_fusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
Repartition,
)
from ray.data._internal.logical.operators.map_operator import AbstractUDFMap
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.stats import StatsDict
from ray.data.block import Block

Expand Down Expand Up @@ -132,6 +133,9 @@ def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool:
down_logical_op = self._op_map[down_op]
up_logical_op = self._op_map[up_op]

if isinstance(up_logical_op, Read) and not up_logical_op.fusable():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the extra check if it's a Read op?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fusable method is part of the Read class only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can define fusable in the base LogicalOperator class. Other op may need it as well in the future.

return False

# If the downstream operator takes no input, it cannot be fused with
# the upstream operator.
if not down_logical_op._input_dependencies:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def get_plan_as_string(self, classname: str) -> str:
if dataset_blocks is None:
num_blocks = "?"
else:
num_blocks = dataset_blocks.initial_num_blocks()
num_blocks = dataset_blocks.estimated_num_blocks()
ericl marked this conversation as resolved.
Show resolved Hide resolved
dataset_str = "{}(num_blocks={}, num_rows={}, schema={})".format(
classname, num_blocks, count, schema_str
)
Expand Down
4 changes: 3 additions & 1 deletion python/ray/data/_internal/planner/plan_read_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ def get_input_data() -> List[RefBundle]:
for read_task in read_tasks
]

inputs = InputDataBuffer(input_data_factory=get_input_data)
inputs = InputDataBuffer(
input_data_factory=get_input_data, num_output_blocks=op._estimated_num_blocks
)

def do_read(blocks: Iterator[ReadTask], _: TaskContext) -> Iterator[Block]:
for read_task in blocks:
Expand Down
9 changes: 5 additions & 4 deletions python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def _autodetect_parallelism(
ctx: DataContext,
reader: Optional["Reader"] = None,
avail_cpus: Optional[int] = None,
) -> (int, int):
) -> (int, int, Optional[int]):
"""Returns parallelism to use and the min safe parallelism to avoid OOMs.

This detects parallelism using the following heuristics, applied in order:
Expand All @@ -111,8 +111,9 @@ def _autodetect_parallelism(
avail_cpus: Override avail cpus detection (for testing only).

Returns:
Tuple of detected parallelism (only if -1 was specified), and the min safe
parallelism (which can be used to generate warnings about large blocks).
Tuple of detected parallelism (only if -1 was specified), the min safe
parallelism (which can be used to generate warnings about large blocks),
and the estimated inmemory size of the dataset.
"""
min_safe_parallelism = 1
max_reasonable_parallelism = sys.maxsize
Expand Down Expand Up @@ -140,7 +141,7 @@ def _autodetect_parallelism(
f"estimated_available_cpus={avail_cpus} and "
f"estimated_data_size={mem_size}."
)
return parallelism, min_safe_parallelism
return parallelism, min_safe_parallelism, mem_size


def _estimate_avail_cpus(cur_pg: Optional["PlacementGroup"]) -> int:
Expand Down
10 changes: 5 additions & 5 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ def map_batches(
>>> ds = ds.map_batches(map_fn_with_large_output)
>>> ds
MapBatches(map_fn_with_large_output)
+- Dataset(num_blocks=1, num_rows=1, schema={item: int64})
+- Dataset(num_blocks=..., num_rows=1, schema={item: int64})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, what does this change mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a test rule where ellipsis can match any value.



Args:
Expand Down Expand Up @@ -753,7 +753,7 @@ def select_columns(
>>> ds
MapBatches(<lambda>)
+- Dataset(
num_blocks=10,
num_blocks=...,
num_rows=10,
schema={col1: int64, col2: int64, col3: int64}
)
Expand Down Expand Up @@ -1700,7 +1700,7 @@ def groupby(self, key: Optional[str]) -> "GroupedData":
... {"A": x % 3, "B": x} for x in range(100)]).groupby(
... "A").count()
Aggregate
+- Dataset(num_blocks=100, num_rows=100, schema={A: int64, B: int64})
+- Dataset(num_blocks=..., num_rows=100, schema={A: int64, B: int64})

Time complexity: O(dataset size * log(dataset size / parallelism))

Expand Down Expand Up @@ -3289,7 +3289,7 @@ def to_tf(
>>> ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
>>> ds
Dataset(
num_blocks=1,
num_blocks=...,
num_rows=150,
schema={
sepal length (cm): double,
Expand Down Expand Up @@ -3320,7 +3320,7 @@ def to_tf(
>>> ds
Concatenator
+- Dataset(
num_blocks=1,
num_blocks=...,
num_rows=150,
schema={
sepal length (cm): double,
Expand Down
35 changes: 34 additions & 1 deletion python/ray/data/datasource/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ class ReadTask(Callable[[], Iterable[Block]]):
def __init__(self, read_fn: Callable[[], Iterable[Block]], metadata: BlockMetadata):
self._metadata = metadata
self._read_fn = read_fn
self._additional_output_splits = 1

def get_metadata(self) -> BlockMetadata:
return self._metadata
Expand All @@ -211,13 +212,27 @@ def __call__(self) -> Iterable[Block]:

if context.block_splitting_enabled:
for block in result:
yield block
yield from self._do_additional_splits(block)
else:
builder = DelegatingBlockBuilder()
for block in result:
builder.add_block(block)
yield builder.build()

def _set_additional_split_factor(self, k: int) -> None:
self._additional_output_splits = k

def _do_additional_splits(self, block: Block) -> Iterable[Block]:
if self._additional_output_splits > 1:
block = BlockAccessor.for_block(block)
offset = 0
split_sizes = _splitrange(block.num_rows(), self._additional_output_splits)
for size in split_sizes:
yield block.slice(offset, offset + size, copy=True)
ericl marked this conversation as resolved.
Show resolved Hide resolved
offset += size
else:
yield block


@PublicAPI
class RangeDatasource(Datasource):
Expand Down Expand Up @@ -478,3 +493,21 @@ def make_block(count: int, num_columns: int) -> Block:
i += block_size

return read_tasks


def _splitrange(n, k):
"""Calculates array lens of np.array_split().

This is the equivalent of
`[len(x) for x in np.array_split(range(n), k)]`.
"""
base = n // k
output = [base] * k
rem = n - sum(output)
for i in range(len(output)):
if rem > 0:
output[i] += 1
rem -= 1
assert rem == 0, (rem, output, n, k)
assert sum(output) == n, (output, n, k)
return output
Loading