Skip to content

Commit

Permalink
[Serve] Recover PENDING_INITIALIZATION status actor & Fix the actor r…
Browse files Browse the repository at this point in the history
…ecover bug

Signed-off-by: Sihan Wang <sihanwang41@gmail.com>
  • Loading branch information
sihanwang41 committed Mar 30, 2023
1 parent 816ef72 commit 7221899
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 4 deletions.
6 changes: 4 additions & 2 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,6 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[DeploymentVersion]
# TODO(simon): fully implement reconfigure for Java replicas.
if self._is_cross_language:
return ReplicaStartupStatus.SUCCEEDED, None

deployment_config, version = ray.get(self._ready_obj_ref)
self._max_concurrent_queries = deployment_config.max_concurrent_queries
self._graceful_shutdown_timeout_s = (
Expand All @@ -497,7 +496,10 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[DeploymentVersion]
self._allocated_obj_ref
)
except Exception:
logger.exception(f"Exception in deployment '{self._deployment_name}'")
logger.exception(
f"Exception in replica '{self._replica_tag}',"
"the replica will be stopped."
)
return ReplicaStartupStatus.FAILED, None
if self._deployment_is_cross_language:
# todo: The replica's userconfig whitch java client created
Expand Down
11 changes: 9 additions & 2 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ async def initialize_replica():
# or, alternatively, create an async get_replica() method?
self.replica = None
self._initialize_replica = initialize_replica
self._replica_tag = replica_tag

@ray.method(num_returns=2)
async def handle_request(
Expand Down Expand Up @@ -243,9 +244,15 @@ async def reconfigure(
if user_config is not None:
await self.replica.reconfigure(user_config)

return self.get_metadata()
return await self.get_metadata()

def get_metadata(self) -> Tuple[DeploymentConfig, DeploymentVersion]:
async def get_metadata(
self,
) -> Tuple[Optional[DeploymentConfig], Optional[DeploymentVersion]]:
# When the replica is empty, potentially replica is still under initialization.
# Relying on DeploymentStateManager to timeout the replica.
while self.replica is None:
await asyncio.sleep(0.2)
return self.replica.deployment_config, self.replica.version

async def prepare_for_shutdown(self):
Expand Down
43 changes: 43 additions & 0 deletions python/ray/serve/tests/test_controller_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,5 +200,48 @@ def make_nonblocking_calls(expected, expect_blocking=False, num_returns=1):
make_nonblocking_calls({"2": 2}, num_returns=2)


def test_controller_recover_initializing_actor(serve_instance):
"""Recover the actor which is under PENDING_INITIALIZATION"""

signal = SignalActor.remote()
signal2 = SignalActor.remote()
client = serve_instance

@ray.remote
def pending_init_indicator():
ray.get(signal2.wait.remote())
return True

@serve.deployment
class V1:
def __init__(self):
signal2.send.remote()
ray.get(signal.wait.remote())

def __call__(self, request):
return f"1|{os.getpid()}"

serve.run(V1.bind(), _blocking=False)
ray.get(pending_init_indicator.remote())

def get_actor_tag(name: str):
all_current_actors = ray.util.list_named_actors(all_namespaces=True)
for actor in all_current_actors:
if name in actor["name"]:
return actor["name"]
return None

actor_tag = get_actor_tag(V1.name)

ray.kill(serve.context._global_client._controller, no_restart=False)

# Let the actor proceed initialization
signal.send.remote()
client._wait_for_deployment_healthy(V1.name)
# Make sure the actor before controller dead is staying alive.
actor_tag2 = get_actor_tag(V1.name)
assert actor_tag == actor_tag2


if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))

0 comments on commit 7221899

Please sign in to comment.