Skip to content

Commit

Permalink
Revert "[data] Read->SplitBlocks to ensure requested read parallelism…
Browse files Browse the repository at this point in the history
… is always met (ray-project#36352)" (ray-project#36747)

This reverts commit 0ab00ec.

Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
  • Loading branch information
vitsai authored and arvind-chandra committed Aug 31, 2023
1 parent 91dfc87 commit 9871594
Show file tree
Hide file tree
Showing 29 changed files with 124 additions and 325 deletions.
4 changes: 2 additions & 2 deletions doc/source/data/getting-started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ transform datasets. Ray executes transformations in parallel for performance.
.. testoutput::

MaterializedDataset(
num_blocks=...,
num_blocks=1,
num_rows=150,
schema={
sepal length (cm): double,
Expand Down Expand Up @@ -147,7 +147,7 @@ or remote filesystems.

import os

transformed_ds.repartition(1).write_parquet("/tmp/iris")
transformed_ds.write_parquet("/tmp/iris")

print(os.listdir("/tmp/iris"))

Expand Down
4 changes: 2 additions & 2 deletions doc/source/data/key-concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ blocks need to be materialized in the cluster memory at once.
Reading Data
============

Dataset uses Ray tasks to read data from remote storage in parallel. Each read task reads one or more files and produces one or more output blocks:
Dataset uses Ray tasks to read data from remote storage in parallel. Each read task reads one or more files and produces an output block:

.. image:: images/dataset-read.svg
:align: center

..
https://docs.google.com/drawings/d/15B4TB8b5xN15Q9S8-s0MjW6iIvo_PrH7JtV1fL123pU/edit
You can increase or decrease the number of output blocks by changing the ``parallelism`` parameter.
You can manually specify the number of read tasks, but the final parallelism is always capped by the number of files in the underlying dataset.

For an in-depth guide on creating datasets, read :ref:`Loading Data <loading_data>`.

Expand Down
3 changes: 1 addition & 2 deletions doc/source/data/performance-tips.rst
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ By default, Ray Data automatically selects the read ``parallelism`` according to
4. The parallelism is truncated to ``min(num_files, parallelism)``.

Occasionally, it is advantageous to manually tune the parallelism to optimize the application. This can be done when loading data via the ``parallelism`` parameter.
For example, use ``ray.data.read_parquet(path, parallelism=1000)`` to force up to 1000 read tasks to be created. Note that read tasks can produce multiple output
blocks per file in order to satisfy the requested parallelism.
For example, use ``ray.data.read_parquet(path, parallelism=1000)`` to force up to 1000 read tasks to be created.

Tuning Read Resources
~~~~~~~~~~~~~~~~~~~~~
Expand Down
4 changes: 2 additions & 2 deletions doc/source/data/transforming-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,9 @@ Repartitioning data

A :class:`~ray.data.dataset.Dataset` operates on a sequence of distributed data
:term:`blocks <block>`. If you want to achieve more fine-grained parallelization,
increase the number of blocks by setting a higher ``parallelism`` at read time.
increase the number of blocks.

To change the number of blocks for an existing Dataset, call
To change the number of blocks, call
:meth:`Dataset.repartition() <ray.data.Dataset.repartition>`.

.. testcode::
Expand Down
6 changes: 3 additions & 3 deletions python/ray/air/tests/test_legacy_dataset_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def checker(shard, results):
assert len(results[0]) == 5, results
assert results[0] != results[1], results
stats = shard.stats()
assert "RandomizeBlockOrder:" in stats, stats
assert "RandomizeBlockOrder: 5/5 blocks executed in" in stats, stats

ds = ray.data.range(5)
test = TestStream(
Expand All @@ -385,7 +385,7 @@ def checker(shard, results):
# we eliminate the ordering in comparison.
assert set(results[0]) == set(results[1]), results
stats = shard.stats()
assert "RandomizeBlockOrder:" in stats, stats
assert "RandomizeBlockOrder: 5/5 blocks executed" in stats, stats

ds = ray.data.range(5)
test = TestBatch(
Expand All @@ -400,7 +400,7 @@ def checker(shard, results):
assert len(results[0]) == 5, results
assert results[0] != results[1], results
stats = shard.stats()
assert "RandomizeBlockOrder:" in stats, stats
assert "RandomizeBlockOrder: 5/5 blocks executed in" in stats, stats

ds = ray.data.range(5)
test = TestStream(
Expand Down
8 changes: 0 additions & 8 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,6 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_splitblocks",
size = "medium",
srcs = ["tests/test_splitblocks.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_execution_optimizer",
size = "medium",
Expand Down
7 changes: 0 additions & 7 deletions python/ray/data/_internal/block_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ def __init__(
# Whether the block list is owned by consuming APIs, and if so it can be
# eagerly deleted after read by the consumer.
self._owned_by_consumer = owned_by_consumer
# This field can be set to indicate the number of estimated output blocks,
# since each read task may produce multiple output blocks after splitting.
self._estimated_num_blocks = None

def __repr__(self):
return f"BlockList(owned_by_consumer={self._owned_by_consumer})"
Expand Down Expand Up @@ -220,10 +217,6 @@ def initial_num_blocks(self) -> int:
"""Returns the number of blocks of this BlockList."""
return self._num_blocks

def estimated_num_blocks(self) -> int:
"""Estimate of `executed_num_blocks()`, without triggering actual execution."""
return self._estimated_num_blocks or self._num_blocks

def executed_num_blocks(self) -> int:
"""Returns the number of output blocks after execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@ def __init__(
self,
input_data: Optional[List[RefBundle]] = None,
input_data_factory: Callable[[], List[RefBundle]] = None,
num_output_blocks: Optional[int] = None,
):
"""Create an InputDataBuffer.
Args:
input_data: The list of bundles to output from this operator.
input_data_factory: The factory to get input data, if input_data is None.
num_output_blocks: The number of output blocks. If not specified, progress
bars total will be set based on num output bundles instead.
"""
if input_data is not None:
assert input_data_factory is None
Expand All @@ -40,7 +37,6 @@ def __init__(
assert input_data_factory is not None
self._input_data_factory = input_data_factory
self._is_input_initialized = False
self._num_output_blocks = num_output_blocks
super().__init__("Input", [])

def start(self, options: ExecutionOptions) -> None:
Expand All @@ -57,7 +53,7 @@ def get_next(self) -> RefBundle:
return self._input_data.pop(0)

def num_outputs_total(self) -> Optional[int]:
return self._num_output_blocks or self._num_output_bundles
return self._num_outputs

def get_stats(self) -> StatsDict:
return {}
Expand All @@ -68,7 +64,7 @@ def add_input(self, refs, input_index) -> None:
def _initialize_metadata(self):
assert self._input_data is not None and self._is_input_initialized

self._num_output_bundles = len(self._input_data)
self._num_outputs = len(self._input_data)
block_metadata = []
for bundle in self._input_data:
block_metadata.extend([m for (_, m) in bundle.blocks])
Expand Down
17 changes: 1 addition & 16 deletions python/ray/data/_internal/logical/operators/read_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,8 @@ def __init__(
self,
datasource: Datasource,
read_tasks: List[ReadTask],
estimated_num_blocks: int,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
if len(read_tasks) == estimated_num_blocks:
suffix = ""
else:
suffix = f"->SplitBlocks({int(estimated_num_blocks / len(read_tasks))})"
super().__init__(f"Read{datasource.get_name()}{suffix}", None, ray_remote_args)
super().__init__(f"Read{datasource.get_name()}", None, ray_remote_args)
self._datasource = datasource
self._estimated_num_blocks = estimated_num_blocks
self._read_tasks = read_tasks

def fusable(self) -> bool:
"""Whether this should be fused with downstream operators.
When we are outputting multiple blocks per read task, we should disable fusion,
as fusion would prevent the blocks from being dispatched to multiple processes
for parallel processing in downstream operators.
"""
return self._estimated_num_blocks == len(self._read_tasks)
4 changes: 0 additions & 4 deletions python/ray/data/_internal/logical/rules/operator_fusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
Repartition,
)
from ray.data._internal.logical.operators.map_operator import AbstractUDFMap
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.stats import StatsDict
from ray.data.block import Block

Expand Down Expand Up @@ -133,9 +132,6 @@ def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool:
down_logical_op = self._op_map[down_op]
up_logical_op = self._op_map[up_op]

if isinstance(up_logical_op, Read) and not up_logical_op.fusable():
return False

# If the downstream operator takes no input, it cannot be fused with
# the upstream operator.
if not down_logical_op._input_dependencies:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def get_plan_as_string(self, classname: str) -> str:
if dataset_blocks is None:
num_blocks = "?"
else:
num_blocks = dataset_blocks.estimated_num_blocks()
num_blocks = dataset_blocks.initial_num_blocks()
dataset_str = "{}(num_blocks={}, num_rows={}, schema={})".format(
classname, num_blocks, count, schema_str
)
Expand Down
4 changes: 1 addition & 3 deletions python/ray/data/_internal/planner/plan_read_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ def get_input_data() -> List[RefBundle]:
for read_task in read_tasks
]

inputs = InputDataBuffer(
input_data_factory=get_input_data, num_output_blocks=op._estimated_num_blocks
)
inputs = InputDataBuffer(input_data_factory=get_input_data)

def do_read(blocks: Iterator[ReadTask], _: TaskContext) -> Iterator[Block]:
for read_task in blocks:
Expand Down
9 changes: 4 additions & 5 deletions python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def _autodetect_parallelism(
ctx: DataContext,
reader: Optional["Reader"] = None,
avail_cpus: Optional[int] = None,
) -> (int, int, Optional[int]):
) -> (int, int):
"""Returns parallelism to use and the min safe parallelism to avoid OOMs.
This detects parallelism using the following heuristics, applied in order:
Expand All @@ -111,9 +111,8 @@ def _autodetect_parallelism(
avail_cpus: Override avail cpus detection (for testing only).
Returns:
Tuple of detected parallelism (only if -1 was specified), the min safe
parallelism (which can be used to generate warnings about large blocks),
and the estimated inmemory size of the dataset.
Tuple of detected parallelism (only if -1 was specified), and the min safe
parallelism (which can be used to generate warnings about large blocks).
"""
min_safe_parallelism = 1
max_reasonable_parallelism = sys.maxsize
Expand Down Expand Up @@ -141,7 +140,7 @@ def _autodetect_parallelism(
f"estimated_available_cpus={avail_cpus} and "
f"estimated_data_size={mem_size}."
)
return parallelism, min_safe_parallelism, mem_size
return parallelism, min_safe_parallelism


def _estimate_avail_cpus(cur_pg: Optional["PlacementGroup"]) -> int:
Expand Down
10 changes: 5 additions & 5 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ def map_batches(
>>> ds = ds.map_batches(map_fn_with_large_output)
>>> ds
MapBatches(map_fn_with_large_output)
+- Dataset(num_blocks=..., num_rows=1, schema={item: int64})
+- Dataset(num_blocks=1, num_rows=1, schema={item: int64})
Args:
Expand Down Expand Up @@ -753,7 +753,7 @@ def select_columns(
>>> ds
MapBatches(<lambda>)
+- Dataset(
num_blocks=...,
num_blocks=10,
num_rows=10,
schema={col1: int64, col2: int64, col3: int64}
)
Expand Down Expand Up @@ -1698,7 +1698,7 @@ def groupby(self, key: Optional[str]) -> "GroupedData":
... {"A": x % 3, "B": x} for x in range(100)]).groupby(
... "A").count()
Aggregate
+- Dataset(num_blocks=..., num_rows=100, schema={A: int64, B: int64})
+- Dataset(num_blocks=100, num_rows=100, schema={A: int64, B: int64})
Time complexity: O(dataset size * log(dataset size / parallelism))
Expand Down Expand Up @@ -3287,7 +3287,7 @@ def to_tf(
>>> ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
>>> ds
Dataset(
num_blocks=...,
num_blocks=1,
num_rows=150,
schema={
sepal length (cm): double,
Expand Down Expand Up @@ -3318,7 +3318,7 @@ def to_tf(
>>> ds
Concatenator
+- Dataset(
num_blocks=...,
num_blocks=1,
num_rows=150,
schema={
sepal length (cm): double,
Expand Down
35 changes: 1 addition & 34 deletions python/ray/data/datasource/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ class ReadTask(Callable[[], Iterable[Block]]):
def __init__(self, read_fn: Callable[[], Iterable[Block]], metadata: BlockMetadata):
self._metadata = metadata
self._read_fn = read_fn
self._additional_output_splits = 1

def get_metadata(self) -> BlockMetadata:
return self._metadata
Expand All @@ -212,27 +211,13 @@ def __call__(self) -> Iterable[Block]:

if context.block_splitting_enabled:
for block in result:
yield from self._do_additional_splits(block)
yield block
else:
builder = DelegatingBlockBuilder()
for block in result:
builder.add_block(block)
yield builder.build()

def _set_additional_split_factor(self, k: int) -> None:
self._additional_output_splits = k

def _do_additional_splits(self, block: Block) -> Iterable[Block]:
if self._additional_output_splits > 1:
block = BlockAccessor.for_block(block)
offset = 0
split_sizes = _splitrange(block.num_rows(), self._additional_output_splits)
for size in split_sizes:
yield block.slice(offset, offset + size, copy=True)
offset += size
else:
yield block


@PublicAPI
class RangeDatasource(Datasource):
Expand Down Expand Up @@ -493,21 +478,3 @@ def make_block(count: int, num_columns: int) -> Block:
i += block_size

return read_tasks


def _splitrange(n, k):
"""Calculates array lens of np.array_split().
This is the equivalent of
`[len(x) for x in np.array_split(range(n), k)]`.
"""
base = n // k
output = [base] * k
rem = n - sum(output)
for i in range(len(output)):
if rem > 0:
output[i] += 1
rem -= 1
assert rem == 0, (rem, output, n, k)
assert sum(output) == n, (output, n, k)
return output
Loading

0 comments on commit 9871594

Please sign in to comment.