Skip to content

Commit

Permalink
[core] Suppress harmless ObjectRefStreamEndOfStreamError when using a…
Browse files Browse the repository at this point in the history
…syncio (#37062) (#37200)

The last ref returned by a streaming generator is a sentinel ObjectRef that contains the end-of-stream error. This suppresses an error from asyncio that the exception is never retrieved (which is expected).
Related issue number

Closes #36956.

---------

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
  • Loading branch information
stephanie-wang authored Jul 7, 2023
1 parent 12a569f commit de579a6
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 95 deletions.
3 changes: 3 additions & 0 deletions python/ray/_private/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
TaskUnschedulableError,
WorkerCrashedError,
OutOfMemoryError,
ObjectRefStreamEndOfStreamError,
)
from ray.util import serialization_addons
from ray.util import inspect_serializability
Expand Down Expand Up @@ -359,6 +360,8 @@ def _deserialize_object(self, data, metadata, object_ref):
elif error_type == ErrorType.Value("ACTOR_UNSCHEDULABLE_ERROR"):
error_info = self._deserialize_error_info(data, metadata_fields)
return ActorUnschedulableError(error_info.error_message)
elif error_type == ErrorType.Value("END_OF_STREAMING_GENERATOR"):
return ObjectRefStreamEndOfStreamError()
else:
return RaySystemError("Unrecognized error type " + str(error_type))
elif data:
Expand Down
18 changes: 13 additions & 5 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ from ray.exceptions import (
AsyncioActorExit,
PendingCallsLimitExceeded,
RpcError,
ObjectRefStreamEndOfStreamError,
)
from ray._private import external_storage
from ray.util.scheduling_strategies import (
Expand Down Expand Up @@ -220,10 +221,6 @@ class ObjectRefGenerator:
return len(self._refs)


class ObjectRefStreamEndOfStreamError(RayError):
pass


class StreamingObjectRefGenerator:
def __init__(self, generator_ref: ObjectRef, worker: "Worker"):
# The reference to a generator task.
Expand Down Expand Up @@ -323,6 +320,16 @@ class StreamingObjectRefGenerator:
raise StopIteration
return ref

async def suppress_exceptions(self, ref: ObjectRef):
# Wrap a streamed ref to avoid asyncio warnings about not retrieving
# the exception when we are just waiting for the ref to become ready.
# The exception will get returned (or warned) to the user once they
# actually await the ref.
try:
await ref
except Exception:
pass

async def _next_async(
self,
timeout_s: Optional[float] = None,
Expand All @@ -334,7 +341,8 @@ class StreamingObjectRefGenerator:
ref = core_worker.peek_object_ref_stream(
self._generator_ref)
# TODO(swang): Avoid fetching the value.
ready, unready = await asyncio.wait([ref], timeout=timeout_s)
ready, unready = await asyncio.wait([self.suppress_exceptions(ref)],
timeout=timeout_s)
if len(unready) > 0:
return ObjectRef.nil()

Expand Down
9 changes: 9 additions & 0 deletions python/ray/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,15 @@ def __str__(self):
return f"The actor is not schedulable: {self.error_message}"


@DeveloperAPI
class ObjectRefStreamEndOfStreamError(RayError):
"""Raised by streaming generator tasks when there are no more ObjectRefs to
read.
"""

pass


RAY_EXCEPTION_TYPES = [
PlasmaObjectNotAvailable,
RayError,
Expand Down
90 changes: 0 additions & 90 deletions python/ray/tests/test_streaming_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,96 +144,6 @@ def test_streaming_object_ref_generator_task_failed_unit(mocked_worker):
ref = generator._next_sync(timeout_s=0)


@pytest.mark.asyncio
async def test_streaming_object_ref_generator_unit_async(mocked_worker):
"""
Verify the basic case:
create a generator -> read values -> nothing more to read -> delete.
"""
c = mocked_worker.core_worker
generator_ref = ray.ObjectRef.from_random()
generator = StreamingObjectRefGenerator(generator_ref, mocked_worker)
c.try_read_next_object_ref_stream.return_value = ray.ObjectRef.nil()

# Test when there's no new ref, it returns a nil.
next_ref = ray.ObjectRef.from_random()

async def coro_ref():
await asyncio.sleep(1)
return next_ref

c.peek_object_ref_stream.return_value = coro_ref()
ref = await generator._next_async(timeout_s=0)
assert ref.is_nil()

# When the new ref is available, next should return it.
for _ in range(3):
next_ref = ray.ObjectRef.from_random()

async def coro_ref():
return next_ref

c.peek_object_ref_stream.return_value = coro_ref()
c.try_read_next_object_ref_stream.return_value = next_ref
ref = await generator._next_async(timeout_s=0)
assert next_ref == ref

# When try_read_next_object_ref_stream raises a
# ObjectRefStreamEndOfStreamError, it should raise a stop iteration.

async def coro_ref():
return next_ref

c.peek_object_ref_stream.return_value = coro_ref()
generator._generator_ref = coro_ref()
c.try_read_next_object_ref_stream.side_effect = ObjectRefStreamEndOfStreamError(
""
) # noqa
with pytest.raises(StopAsyncIteration):
ref = await generator._next_async(timeout_s=0)


@pytest.mark.asyncio
async def test_async_ref_generator_task_failed_unit(mocked_worker):
"""
Verify when a task is failed by a system error,
the generator ref is returned.
"""
c = mocked_worker.core_worker
generator_ref = ray.ObjectRef.from_random()
generator = StreamingObjectRefGenerator(generator_ref, mocked_worker)

# Simulate the worker failure happens.
next_ref = ray.ObjectRef.from_random()

async def coro_ref():
return next_ref

c.peek_object_ref_stream.return_value = coro_ref()

# generator ref should raise an exception when a task fails.

async def generator_ref_coro():
raise WorkerCrashedError()

generator_coro = generator_ref_coro()
generator._generator_ref = generator_coro
c.try_read_next_object_ref_stream.side_effect = ObjectRefStreamEndOfStreamError(
""
) # noqa
ref = await generator._next_async(timeout_s=0)
# If the generator task fails by a systsem error,
# meaning the ref will raise an exception
# it should be returned.
assert ref == generator_coro

# Once exception is raised, it should always
# raise stopIteration regardless of what
# the ref contains now.
with pytest.raises(StopAsyncIteration):
ref = await generator._next_async(timeout_s=0)


def test_generator_basic(shutdown_only):
ray.init(num_cpus=1)

Expand Down
7 changes: 7 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,13 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
local_raylet_client_,
options_.check_signals,
[this](const RayObject &obj) {
rpc::ErrorType error_type;
if (obj.IsException(&error_type) &&
error_type == rpc::ErrorType::END_OF_STREAMING_GENERATOR) {
// End-of-stream ObjectRefs are sentinels and should never get
// returned to the caller.
return;
}
// Run this on the event loop to avoid calling back into the language runtime
// from the middle of user operations.
io_service_.post(
Expand Down

0 comments on commit de579a6

Please sign in to comment.