-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Streaming Generator] Make it compatible with wait #36071
Merged
rkooo567
merged 124 commits into
ray-project:master
from
rkooo567:streaming-generator-wait
Jun 23, 2023
Merged
[Streaming Generator] Make it compatible with wait #36071
rkooo567
merged 124 commits into
ray-project:master
from
rkooo567:streaming-generator-wait
Jun 23, 2023
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Signed-off-by: SangBin Cho <rkooo567@gmail.com>
Signed-off-by: SangBin Cho <rkooo567@gmail.com>
Signed-off-by: SangBin Cho <rkooo567@gmail.com>
Signed-off-by: SangBin Cho <rkooo567@gmail.com>
Signed-off-by: SangBin Cho <rkooo567@gmail.com>
Signed-off-by: SangBin Cho <rkooo567@gmail.com>
Signed-off-by: SangBin Cho <rkooo567@gmail.com>
Signed-off-by: SangBin Cho <rkooo567@gmail.com>
This reverts commit 122b705. Signed-off-by: SangBin Cho <rkooo567@gmail.com>
This reverts commit 05f468a. Signed-off-by: SangBin Cho <rkooo567@gmail.com>
Signed-off-by: SangBin Cho <rkooo567@gmail.com>
Signed-off-by: SangBin Cho <rkooo567@gmail.com>
Signed-off-by: SangBin Cho <rkooo567@gmail.com>
Signed-off-by: SangBin Cho <rkooo567@gmail.com>
Signed-off-by: SangBin Cho <rkooo567@gmail.com>
Signed-off-by: SangBin Cho <rkooo567@gmail.com>
Signed-off-by: SangBin Cho <rkooo567@gmail.com>
Signed-off-by: SangBin Cho <rkooo567@gmail.com>
rkooo567
requested review from
richardliaw,
krfricke,
xwjiang2010,
amogkam,
matthewdeng,
Yard1,
maxpumperla,
a team,
scv119,
c21,
scottjlee and
bveeramani
as code owners
June 23, 2023 00:48
rkooo567
changed the base branch from
streaming-generator-remove-busy-waiting
to
master
June 23, 2023 00:48
Signed-off-by: SangBin Cho <rkooo567@gmail.com>
8 tasks
arvind-chandra
pushed a commit
to lmco/ray
that referenced
this pull request
Aug 31, 2023
…ys met (ray-project#36352) Today, the number of initial blocks of a dataset is limited to the number of input files of the datasource, regardless of the requested parallelism. This is problematic as it means to increase the number of blocks requires a `repartition()` call, which is not always practical in the streaming setting. This PR inserts a streaming SplitBlocks operator that is fused with read tasks in this case to allow for arbitrarily high requested parallelism (up to number of individual records) without needing a blocking repartition. Before: ``` ray.data.read_parquet([list, of, 100, parquet, files], parallelism=2000) # -> num_blocks = 100 ``` After: ``` ray.data.read_parquet([list, of, 100, parquet, files], parallelism=2000) # -> num_blocks = 2000 ``` Limitations: - Until ray-project#36071 merges and is integrated with Ray Data, downstream operators of the read may still block until the entire file is read, even if the read would produce multiple blocks. - The SplitBlocks operator cannot be fused with downstream Map stages, since it is changing the physical partitioning of the stream. If we fused it, then the parallelism increase would not be realized as we could not split the read output to multiple processes. Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
arvind-chandra
pushed a commit
to lmco/ray
that referenced
this pull request
Aug 31, 2023
This PR makes the streaming generator compatible with ray.wait. The semantic is as follows; def f(): for _ in range(3): yield 1 generator = f.options(num_returns="streaming").remote() # The generator will be in ready if the next reference is available. Otherwise it is in unready. # This should work with all other options from ray.wait (including fetch_local=True/False) ready, unready = ray.wait([generator]) # if the generator's next ref is not ready in 0.1 second, it will be in unready. # otherwise, it is in ready ready, unready = ray.wait([generator], timeout=0.1) # If the generator's next ref is available, it is considered as 1 return # In this case, this will return if both generator and ref is ready. ready, unready = ray.wait([generator, ref], num_returns=2) # if the generator's next ref is available, it will fetch the object to the local node ready, unready = ray.wait([generator, ref], fetch_local=True) From the previous PR ray-project#36070, we are now able to peek the object reference, and the peeked object is guaranteed to be resolved. We can always peek the next object from the generator and wait on that reference to make the generator compatible to ray.wait. Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Why are these changes needed?
This PR makes the streaming generator compatible with
ray.wait
.The semantic is as follows;
From the previous PR #36070, we are now able to peek the object reference, and the peeked object is guaranteed to be resolved. We can always peek the next object from the generator and wait on that reference to make the generator compatible to ray.wait.
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.