Skip to content

Commit

Permalink
[Serve] Run __del__ even if constructor is still in-progress (#45882)
Browse files Browse the repository at this point in the history
  • Loading branch information
shrekris-anyscale authored Jun 18, 2024
1 parent 179b3d5 commit b585ef0
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 6 deletions.
23 changes: 20 additions & 3 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,19 @@ async def perform_graceful_shutdown(self):
# can skip the wait period.
if self._user_callable_initialized:
await self._drain_ongoing_requests()

try:
await self._user_callable_wrapper.call_destructor()
except: # noqa: E722
# We catch a blanket exception since the constructor may still be
# running, so instance variables used by the destructor may not exist.
if self._user_callable_initialized:
logger.exception(
"__del__ ran before replica finished initializing, and "
"raised an exception."
)
else:
logger.exception("__del__ raised an exception.")

await self._metrics_manager.shutdown()

Expand Down Expand Up @@ -1159,10 +1171,15 @@ async def call_user_method(
async def call_destructor(self):
"""Explicitly call the `__del__` method of the user callable.
Calling this multiple times has no effect; only the first call will actually
call the destructor.
Calling this multiple times has no effect; only the first call will
actually call the destructor.
"""
self._raise_if_not_initialized("call_destructor")
if self._callable is None:
logger.info(
"This replica has not yet started running user code. "
"Skipping __del__."
)
return

# Only run the destructor once. This is safe because there is no `await` between
# checking the flag here and flipping it to `True` below.
Expand Down
46 changes: 46 additions & 0 deletions python/ray/serve/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,52 @@ def g():
assert requests.get("http://127.0.0.1:8000/app_g").text == "got g"


@pytest.mark.asyncio
async def test_delete_while_initializing(serve_instance):
"""Test that __del__ runs when a replica terminates while initializing."""

@ray.remote
class Counter:
def __init__(self):
self.count = 0

def incr(self):
self.count += 1

def get_count(self) -> int:
return self.count

signal = SignalActor.remote()
counter = Counter.remote()

@serve.deployment(graceful_shutdown_timeout_s=0.01)
class HangingStart:
async def __init__(
self, signal: ray.actor.ActorHandle, counter: ray.actor.ActorHandle
):
self.signal = signal
self.counter = counter
await signal.send.remote()
print("HangingStart set the EventHolder.")
await asyncio.sleep(10000)

async def __del__(self):
print("Running __del__")
await self.counter.incr.remote()

serve._run(HangingStart.bind(signal, counter), _blocking=False)

print("Waiting for the deployment to start initialization.")
await signal.wait.remote()

print("Calling serve.delete().")
serve.delete(name=SERVE_DEFAULT_APP_NAME)

# Ensure that __del__ ran once, even though the deployment terminated
# during initialization.
assert (await counter.get_count.remote()) == 1


def test_deployment_name_with_app_name(serve_instance):
"""Test replica name with app name as prefix"""

Expand Down
3 changes: 0 additions & 3 deletions python/ray/serve/tests/unit/test_user_callable_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,6 @@ async def test_calling_methods_before_initialize():
with pytest.raises(RuntimeError):
await user_callable_wrapper.call_reconfigure(None)

with pytest.raises(RuntimeError):
await user_callable_wrapper.call_destructor()


@pytest.mark.asyncio
async def test_basic_class_callable():
Expand Down

0 comments on commit b585ef0

Please sign in to comment.