-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data] Read->SplitBlocks to ensure requested read parallelism is always met #36352
Conversation
Signed-off-by: Eric Liang <ekhliang@gmail.com>
python/ray/data/_internal/execution/operators/input_data_buffer.py
Outdated
Show resolved
Hide resolved
Closes #31501 |
@@ -130,6 +131,9 @@ def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool: | |||
down_logical_op = self._op_map[down_op] | |||
up_logical_op = self._op_map[up_op] | |||
|
|||
if isinstance(up_logical_op, Read) and not up_logical_op.fusable(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the extra check if it's a Read
op?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fusable
method is part of the Read class only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can define fusable
in the base LogicalOperator class. Other op may need it as well in the future.
Signed-off-by: Eric Liang <ekhliang@gmail.com>
This is ready for review. |
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Updated. |
Signed-off-by: Eric Liang <ekhliang@gmail.com>
if len(read_tasks) == estimated_num_blocks: | ||
suffix = "" | ||
else: | ||
suffix = f"->SplitBlocks({int(estimated_num_blocks / len(read_tasks))})" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like SplitBlocks is a separate op. What about Read(spit_blocks=N)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, +1 for ReadXXX(split_blocks=N)
, otherwise Dataset.__repr__
would become confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand this--- the original proposal is that SplitBlock is supposed to be a logical operator, since it only applies to the output of the read. It seems more clear therefore using the chaining syntax of ->
instead of making it part of the Read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry if I miss any context, why don't we implement SplitBlock
as a separate logical & physical operator?
The current implementation is inside Datasource
, so it looks like part of Read & InputDataBuffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should, but it would get fused with Read anyways. So here we only implement it as part of Read since we have yet to decide whether it should be a general operator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
E.g., for dynamic_repartition()
or such.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, +1 to make it a general operator.
@@ -130,6 +131,9 @@ def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool: | |||
down_logical_op = self._op_map[down_op] | |||
up_logical_op = self._op_map[up_op] | |||
|
|||
if isinstance(up_logical_op, Read) and not up_logical_op.fusable(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can define fusable
in the base LogicalOperator class. Other op may need it as well in the future.
@@ -480,7 +480,7 @@ def map_batches( | |||
>>> ds = ds.map_batches(map_fn_with_large_output) | |||
>>> ds | |||
MapBatches(map_fn_with_large_output) | |||
+- Dataset(num_blocks=1, num_rows=1, schema={item: int64}) | |||
+- Dataset(num_blocks=..., num_rows=1, schema={item: int64}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my understanding, what does this change mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a test rule where ellipsis can match any value.
Merging so we can test in master. Let's discuss the future of split blocks as an operator separately. |
…allelism is always met (ray-project#36352)" (ray-project#36747)" This reverts commit 96a7145.
…ys met (ray-project#36352) Today, the number of initial blocks of a dataset is limited to the number of input files of the datasource, regardless of the requested parallelism. This is problematic as it means to increase the number of blocks requires a `repartition()` call, which is not always practical in the streaming setting. This PR inserts a streaming SplitBlocks operator that is fused with read tasks in this case to allow for arbitrarily high requested parallelism (up to number of individual records) without needing a blocking repartition. Before: ``` ray.data.read_parquet([list, of, 100, parquet, files], parallelism=2000) # -> num_blocks = 100 ``` After: ``` ray.data.read_parquet([list, of, 100, parquet, files], parallelism=2000) # -> num_blocks = 2000 ``` Limitations: - Until ray-project#36071 merges and is integrated with Ray Data, downstream operators of the read may still block until the entire file is read, even if the read would produce multiple blocks. - The SplitBlocks operator cannot be fused with downstream Map stages, since it is changing the physical partitioning of the stream. If we fused it, then the parallelism increase would not be realized as we could not split the read output to multiple processes. Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
… is always met (ray-project#36352)" (ray-project#36747) This reverts commit 0ab00ec. Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
Why are these changes needed?
Today, the number of initial blocks of a dataset is limited to the number of input files of the datasource, regardless of the requested parallelism. This is problematic as it means to increase the number of blocks requires a
repartition()
call, which is not always practical in the streaming setting.This PR inserts a streaming SplitBlocks operator that is fused with read tasks in this case to allow for arbitrarily high requested parallelism (up to number of individual records) without needing a blocking repartition.
Before:
After:
Limitations:
Related issue number
Closes #31501
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.]([Dataset] Split input files to launch as many read tasks as user-specified parallelism #31501)