Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streaming Generator] Make it compatible with wait #36071

Merged
merged 124 commits into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
124 commits
Select commit Hold shift + click to select a range
452ed1f
initial version
rkooo567 May 12, 2023
3ebe327
in progress.
rkooo567 May 12, 2023
c140a5c
finished basics.
rkooo567 May 12, 2023
b83af80
fix cpp error
rkooo567 May 13, 2023
509b311
working now.
rkooo567 May 13, 2023
d0795e5
Merge branch 'master' into streaming-generator-1
rkooo567 May 13, 2023
f8a90f6
fix a bug
rkooo567 May 13, 2023
0a9169d
Basic version finished.
rkooo567 May 14, 2023
05f468a
[Please Revert] Work e2e.
rkooo567 May 14, 2023
122b705
[Revert Please] Support core worker APIs and a generator.
rkooo567 May 14, 2023
7a8fe2c
fix a bug
rkooo567 May 14, 2023
d880763
Revert "[Revert Please] Support core worker APIs and a generator."
rkooo567 May 14, 2023
f501c22
Revert "[Please Revert] Work e2e."
rkooo567 May 14, 2023
1942394
Merge branch 'master' into streaming-generator-1
rkooo567 May 15, 2023
3e0212e
Fix failing tests.
rkooo567 May 15, 2023
c9a932e
Merge branch 'master' into streaming-generator-2
rkooo567 May 15, 2023
ffe20fd
Merge branch 'streaming-generator-1' into streaming-generator-2
rkooo567 May 15, 2023
0e89ad7
Merge branch 'master' into streaming-generator-3
rkooo567 May 15, 2023
d520e47
Merge branch 'streaming-generator-2' into streaming-generator-3
rkooo567 May 15, 2023
7610474
Fix
rkooo567 May 15, 2023
aaa0582
Fix a broken test.
rkooo567 May 15, 2023
a52f74b
Merge branch 'streaming-generator-1' into streaming-generator-2
rkooo567 May 15, 2023
37c3bdd
Merge branch 'master' into streaming-generator-1
rkooo567 May 15, 2023
fd83edd
Merge branch 'streaming-generator-1' into streaming-generator-2
rkooo567 May 15, 2023
ef08b64
Merge branch 'streaming-generator-2' into streaming-generator-3
rkooo567 May 15, 2023
74a2e31
Finished async actor.
rkooo567 May 16, 2023
8b9ba39
Add a unit test.
rkooo567 May 16, 2023
a4b62ac
done
rkooo567 May 16, 2023
d350b5d
Merge branch 'master' into streaming-generator-1
rkooo567 May 16, 2023
9ed05d9
Addressed code review.
rkooo567 May 16, 2023
e2f1980
removed a test file
rkooo567 May 16, 2023
de41fbe
Merge branch 'streaming-generator-1' into streaming-generator-2
rkooo567 May 16, 2023
177ff88
Merge branch 'streaming-generator-2' into streaming-generator-3
rkooo567 May 16, 2023
805c7bb
Updated
rkooo567 May 16, 2023
fba4a5d
Merge branch 'streaming-generator-3' into streaming-generator-4
rkooo567 May 16, 2023
fa7fe24
done
rkooo567 May 16, 2023
5dc6b98
Fixed a unit test.
rkooo567 May 16, 2023
6e51a5e
Merge branch 'streaming-generator-1' into streaming-generator-2
rkooo567 May 16, 2023
7c449be
fix apis
rkooo567 May 17, 2023
5afd081
Merge branch 'streaming-generator-2' into streaming-generator-3
rkooo567 May 17, 2023
0e87d73
Merge branch 'streaming-generator-3' into streaming-generator-4
rkooo567 May 17, 2023
d7ebad1
lint.
rkooo567 May 17, 2023
2b046b6
Ready for a benchmark.
rkooo567 May 17, 2023
985f59c
in progress.
rkooo567 May 17, 2023
c726484
Made it work.
rkooo567 May 18, 2023
1151a28
done.
rkooo567 May 18, 2023
f5d3956
Merge branch 'master' into streaming-generator-2
rkooo567 May 18, 2023
1f79ad8
Merge branch 'streaming-generator-2' into streaming-generator-3
rkooo567 May 18, 2023
5e09c4c
Merge branch 'streaming-generator-3' into streaming-generator-4
rkooo567 May 18, 2023
7113e3d
Merge branch 'streaming-generator-4' into streaming-generator-5
rkooo567 May 18, 2023
bc4f49e
Merge branch 'streaming-generator-5' into streaming-generator-6
rkooo567 May 18, 2023
c72dc03
Merge branch 'master' into streaming-generator-2
rkooo567 May 19, 2023
b7be576
Addressed code review.
rkooo567 May 19, 2023
e0e74cb
Merge branch 'streaming-generator-2' into streaming-generator-3
rkooo567 May 19, 2023
11686d4
Addressed code review.
rkooo567 May 19, 2023
98d7292
lint
rkooo567 May 19, 2023
4d55f33
Merge branch 'streaming-generator-3' into streaming-generator-4
rkooo567 May 20, 2023
5397b78
Merge branch 'streaming-generator-4' into streaming-generator-5
rkooo567 May 20, 2023
391eb0f
addressed
rkooo567 May 20, 2023
253b465
Merge branch 'streaming-generator-5' into streaming-generator-6
rkooo567 May 21, 2023
3f2e4b0
in progress.
rkooo567 May 21, 2023
f4415d1
Merge branch 'master' into streaming-generator-5
rkooo567 May 22, 2023
e49d1a5
working
rkooo567 May 22, 2023
ce1e79d
[Revert] Add more complicated tests
rkooo567 May 22, 2023
9a39db9
Merge branch 'streaming-generator-5' into streaming-generator-6
rkooo567 May 22, 2023
6c0448b
Addressed code review.
rkooo567 May 23, 2023
6b3d4f4
Merge branch 'master' into streaming-generator-6
rkooo567 May 24, 2023
9450a07
Merge branch 'master' into streaming-generator-5
rkooo567 May 24, 2023
82b07c8
Merge branch 'streaming-generator-5' into streaming-generator-6
rkooo567 May 24, 2023
0bb370f
Fix a bug that caused check failures.
rkooo567 May 25, 2023
c912f8a
Merge branch 'master' into streaming-generator-6
rkooo567 May 25, 2023
8b188f7
in progress
rkooo567 May 25, 2023
1a5827a
Merge branch 'streaming-generator-fix-exception-cannot-be-caught-bug'…
rkooo567 May 25, 2023
fd7ce2c
in progress 2
rkooo567 May 25, 2023
927ccbc
Working now.
rkooo567 May 25, 2023
11223cb
clean up.
rkooo567 May 25, 2023
e7b3881
Merge branch 'master' into streaming-generator-6
rkooo567 May 25, 2023
909f8f6
Merge branch 'master' into streaming-generator-6
rkooo567 May 25, 2023
a4bc814
Fix a test failure.
rkooo567 May 26, 2023
0c4f7a2
skip unless it is linux
rkooo567 May 26, 2023
045efa6
Merge branch 'master' into streaming-generator-6
rkooo567 May 29, 2023
7fee5df
Skip stale reports.
rkooo567 May 29, 2023
2672c72
Remove close RPCs
rkooo567 May 30, 2023
19907c3
Finished removing busy waiting
rkooo567 Jun 5, 2023
10290b6
Add a test
rkooo567 Jun 5, 2023
1abb3b0
wait working.
rkooo567 Jun 5, 2023
bf4f3ad
Merge branch 'master' into streaming-generator-6
rkooo567 Jun 5, 2023
c60bd86
Fix a cpp error
rkooo567 Jun 5, 2023
3196eeb
Fixed an edge case and add more tests.
rkooo567 Jun 5, 2023
16905bb
Merge branch 'streaming-generator-last' into streaming-generator-remo…
rkooo567 Jun 5, 2023
e2941f0
Merge branch 'streaming-generator-remove-busy-waiting' into streaming…
rkooo567 Jun 6, 2023
1f96561
Fix a compiler failure.
rkooo567 Jun 6, 2023
e380ae7
Merge branch 'streaming-generator-6' into streaming-generator-last
rkooo567 Jun 6, 2023
80b0334
handle an edge case where exception is raised upon lineage reconstruc…
rkooo567 Jun 6, 2023
60864df
Merge branch 'streaming-generator-last' into streaming-generator-remo…
rkooo567 Jun 6, 2023
07533cf
Add a new test
rkooo567 Jun 6, 2023
7618dab
Merge branch 'streaming-generator-remove-busy-waiting' into streaming…
rkooo567 Jun 6, 2023
ad5e930
Add a stress test
rkooo567 Jun 6, 2023
b96b9d8
lint
rkooo567 Jun 6, 2023
8efacb4
Merge branch 'master' into streaming-generator-6
rkooo567 Jun 8, 2023
36aebd5
Addressed code reivew + add max return value tests.
rkooo567 Jun 8, 2023
080a923
Merge branch 'streaming-generator-6' into streaming-generator-last
rkooo567 Jun 8, 2023
b612553
Merge branch 'streaming-generator-last' into streaming-generator-remo…
rkooo567 Jun 8, 2023
9e7e198
Merge branch 'streaming-generator-remove-busy-waiting' into streaming…
rkooo567 Jun 8, 2023
6e4d501
comments
stephanie-wang Jun 15, 2023
3df98b6
comment
stephanie-wang Jun 16, 2023
6baefd6
cleanup
stephanie-wang Jun 16, 2023
c89332b
fix
stephanie-wang Jun 16, 2023
8e9a821
fix
stephanie-wang Jun 16, 2023
a945412
Update python/ray/tests/test_streaming_generator.py
stephanie-wang Jun 16, 2023
60d3c42
unit test
stephanie-wang Jun 20, 2023
0a8148e
comment
stephanie-wang Jun 20, 2023
7d87a7b
lint
stephanie-wang Jun 20, 2023
1d7a934
Merge remote-tracking branch 'upstream/master' into HEAD
stephanie-wang Jun 20, 2023
749ff4e
Merge branch 'master' into streaming-generator-remove-busy-waiting
rkooo567 Jun 20, 2023
34612d7
Fixed broken tests.
rkooo567 Jun 22, 2023
c24e57f
Merge branch 'streaming-generator-remove-busy-waiting' into streaming…
rkooo567 Jun 22, 2023
a160bfb
Fixed.
rkooo567 Jun 22, 2023
256d648
Removed unnecessary logs
rkooo567 Jun 22, 2023
18a690c
Merge branch 'master' into streaming-generator-remove-busy-waiting
rkooo567 Jun 22, 2023
12d73ac
Fixed a test failure.
rkooo567 Jun 22, 2023
a0218f3
Merge branch 'streaming-generator-remove-busy-waiting' into streaming…
rkooo567 Jun 22, 2023
c58a9fa
Merge branch 'master' into streaming-generator-wait
rkooo567 Jun 23, 2023
b26e86f
Fix a test failure.
rkooo567 Jun 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import ray.job_config
import ray.remote_function
from ray import ActorID, JobID, Language, ObjectRef
from ray._raylet import StreamingObjectRefGenerator
from ray._private import ray_option_utils
from ray._private.client_mode_hook import client_mode_hook
from ray._private.function_manager import FunctionActorManager
Expand Down Expand Up @@ -2463,7 +2464,7 @@ def get(
with profiling.profile("ray.get"):
# TODO(sang): Should make StreamingObjectRefGenerator
# compatible to ray.get for dataset.
if isinstance(object_refs, ray._raylet.StreamingObjectRefGenerator):
if isinstance(object_refs, StreamingObjectRefGenerator):
return object_refs

is_individual_id = isinstance(object_refs, ray.ObjectRef)
Expand Down Expand Up @@ -2605,8 +2606,9 @@ def wait(
- :doc:`/ray-core/patterns/ray-get-submission-order`

Args:
object_refs: List of object refs for objects that may
or may not be ready. Note that these IDs must be unique.
object_refs: List of :class:`~ObjectRefs` or
:class:`~StreamingObjectRefGenerators` for objects that may or may
not be ready. Note that these must be unique.
num_returns: The number of object refs that should be returned.
timeout: The maximum amount of time in seconds to wait before
returning.
Expand Down Expand Up @@ -2637,14 +2639,20 @@ def wait(
)
blocking_wait_inside_async_warned = True

if isinstance(object_refs, ObjectRef):
if isinstance(object_refs, ObjectRef) or isinstance(
object_refs, StreamingObjectRefGenerator
):
raise TypeError(
"wait() expected a list of ray.ObjectRef, got a single ray.ObjectRef"
"wait() expected a list of ray.ObjectRef or ray.StreamingObjectRefGenerator"
", got a single ray.ObjectRef or ray.StreamingObjectRefGenerator "
f"{object_refs}"
)

if not isinstance(object_refs, list):
raise TypeError(
"wait() expected a list of ray.ObjectRef, " f"got {type(object_refs)}"
"wait() expected a list of ray.ObjectRef or "
"ray.StreamingObjectRefGenerator, "
f"got {type(object_refs)}"
)

if timeout is not None and timeout < 0:
Expand All @@ -2653,13 +2661,16 @@ def wait(
)

for object_ref in object_refs:
if not isinstance(object_ref, ObjectRef):
if not isinstance(object_ref, ObjectRef) and not isinstance(
object_ref, StreamingObjectRefGenerator
):
raise TypeError(
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
"wait() expected a list of ray.ObjectRef, "
"wait() expected a list of ray.ObjectRef or "
"ray.StreamingObjectRefGenerator, "
f"got list containing {type(object_ref)}"
)

worker.check_connected()

# TODO(swang): Check main thread.
with profiling.profile("ray.wait"):
# TODO(rkn): This is a temporary workaround for
Expand Down
42 changes: 30 additions & 12 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,15 @@ class StreamingObjectRefGenerator:
self._generator_task_exception = None
# Ray's worker class. ray._private.worker.global_worker
self.worker = worker
self.worker.check_connected()
assert hasattr(worker, "core_worker")

def get_next_ref(self) -> ObjectRef:
self.worker.check_connected()
core_worker = self.worker.core_worker
return core_worker.peek_object_ref_stream(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the code for peek_object_ref_stream, I couldn't find it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is in the previous PR

self._generator_ref)

def __iter__(self) -> "StreamingObjectRefGenerator":
return self

Expand Down Expand Up @@ -284,10 +291,7 @@ class StreamingObjectRefGenerator:
timeout_s: If the next object is not ready within
this timeout, it returns the nil object ref.
"""
if not hasattr(self.worker, "core_worker"):
raise ValueError(
"Cannot access the core worker. "
"Did you already shutdown Ray via ray.shutdown()?")
self.worker.check_connected()
core_worker = self.worker.core_worker

# Wait for the next ObjectRef to become ready.
Expand Down Expand Up @@ -325,10 +329,7 @@ class StreamingObjectRefGenerator:
timeout_s: Optional[float] = None,
sleep_interval_s: float = 0.0001):
"""Same API as _next_sync, but it is for async context."""
if not hasattr(self.worker, "core_worker"):
raise ValueError(
"Cannot access the core worker. "
"Did you already shutdown Ray via ray.shutdown()?")
self.worker.check_connected()
core_worker = self.worker.core_worker

ref = core_worker.peek_object_ref_stream(
Expand Down Expand Up @@ -2941,13 +2942,30 @@ cdef class CoreWorker:

return c_object_id.Binary()

def wait(self, object_refs, int num_returns, int64_t timeout_ms,
def wait(self, object_refs_or_generators, int num_returns, int64_t timeout_ms,
TaskID current_task_id, c_bool fetch_local):
cdef:
c_vector[CObjectID] wait_ids
c_vector[c_bool] results
CTaskID c_task_id = current_task_id.native()

object_refs = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider pre-allocating array size to avoid high amortized O(1) append? we may see perf regression in ray.wait release test, it waits 10k object refs in a loop IIRC

object_refs = [None] * len(object_refs_or_generator)
for i, ref_or_generator in enumerate(object_refs_or_generator):
  ...
  object_refs[i] = ...

fine to ignore, don't want to prematurely optimize

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I feel like if the test perf is heavily affected by just this, it should be re-written... (feel like it is not a good test) but I can run a test and see

for ref_or_generator in object_refs_or_generators:
if (not isinstance(ref_or_generator, ObjectRef)
and not isinstance(ref_or_generator, StreamingObjectRefGenerator)):
raise TypeError(
"wait() expected a list of ray.ObjectRef "
"or StreamingObjectRefGenerator, "
f"got list containing {type(ref_or_generator)}"
)

if isinstance(ref_or_generator, StreamingObjectRefGenerator):
# Before calling wait,
# get the next reference from a generator.
object_refs.append(ref_or_generator.get_next_ref())
else:
object_refs.append(ref_or_generator)

wait_ids = ObjectRefsToVector(object_refs)
with nogil:
op_status = CCoreWorkerProcess.GetCoreWorker().Wait(
Expand All @@ -2957,11 +2975,11 @@ cdef class CoreWorker:
assert len(results) == len(object_refs)

ready, not_ready = [], []
for i, object_ref in enumerate(object_refs):
for i, object_ref_or_generator in enumerate(object_refs_or_generators):
if results[i]:
ready.append(object_ref)
ready.append(object_ref_or_generator)
else:
not_ready.append(object_ref)
not_ready.append(object_ref_or_generator)

return ready, not_ready

Expand Down
Loading