Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Fix serve non atomic shutdown #36927

Merged
merged 29 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
da916a7
WIP: added some logics to ensure atomic shutdown. Still need to clean…
GeneDer Jun 27, 2023
c4fc300
linting
GeneDer Jun 27, 2023
a40b537
WIP: implement is_shutdown on each resources
GeneDer Jun 28, 2023
aed0602
clean up
GeneDer Jun 28, 2023
2089027
cleanup debug comment
GeneDer Jun 29, 2023
3db1291
Merge branch 'master' into fix-serve-non-atomic-shutdown
GeneDer Jun 29, 2023
56e5db7
add some unit tests to app state and dep state
GeneDer Jun 29, 2023
a2e8b69
add tests for http state
GeneDer Jun 29, 2023
1f4b042
linting
GeneDer Jun 29, 2023
e1c6bcd
try add back client waiting shutddown and some cleanup
GeneDer Jun 29, 2023
7f88970
add more test
GeneDer Jun 30, 2023
6041270
finished testing and log related logs to file only
GeneDer Jun 30, 2023
06f668d
check all actors have shut down
GeneDer Jun 30, 2023
ef2b6a0
Merge branch 'master' into fix-serve-non-atomic-shutdown
GeneDer Jun 30, 2023
64a95d1
linting
GeneDer Jun 30, 2023
b6e1db7
Merge branch 'master' into fix-serve-non-atomic-shutdown
GeneDer Jun 30, 2023
d00ae83
Merge branch 'master' into fix-serve-non-atomic-shutdown
GeneDer Jul 3, 2023
0a99341
address PR comments
GeneDer Jul 3, 2023
94756e1
linting
GeneDer Jul 3, 2023
f490b2e
Merge branch 'master' into fix-serve-non-atomic-shutdown
GeneDer Jul 5, 2023
c19ea39
Merge branch 'master' into fix-serve-non-atomic-shutdown
GeneDer Jul 5, 2023
e5a098f
use medium for test_http_state
GeneDer Jul 5, 2023
a2e4a72
use simpler logic to send shutdown signal and wait for controller shu…
GeneDer Jul 5, 2023
812a5a8
Merge branch 'master' into fix-serve-non-atomic-shutdown
GeneDer Jul 5, 2023
21c74ee
update test comments
GeneDer Jul 5, 2023
bba6366
cleanup commented code
GeneDer Jul 5, 2023
5866b2b
added client timeout and test and make logging less frequent
GeneDer Jul 6, 2023
7372a0d
Merge branch 'master' into fix-serve-non-atomic-shutdown
GeneDer Jul 6, 2023
913ff35
use ray.get to execute health check and catch timeout error signaling…
GeneDer Jul 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions python/ray/serve/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -672,3 +672,11 @@ py_test(
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)

py_test(
name = "test_endpoint_state",
size = "small",
srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)
25 changes: 23 additions & 2 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,20 @@ def _delete_deployment(self, name):

def delete(self):
"""Delete the application"""
logger.info(f"Deleting application '{self._name}'")
logger.info(
f"Deleting application '{self._name}'",
extra={"log_to_stderr": False},
)
self._set_target_state(deleting=True)

def is_deleted(self) -> bool:
"""Check whether the application is already deleted.

For an application to be considered deleted, the target state has to be set to
deleting and all deployments have to be deleted.
"""
return self._target_state.deleting and len(self._get_live_deployments()) == 0

def apply_deployment_info(
self, deployment_name: str, deployment_info: DeploymentInfo
) -> None:
Expand Down Expand Up @@ -412,7 +423,7 @@ def update(self) -> bool:

# Check if app is ready to be deleted
if self._target_state.deleting:
return len(self._get_live_deployments()) == 0
return self.is_deleted()
return False

def get_checkpoint_data(self) -> ApplicationTargetState:
Expand Down Expand Up @@ -639,6 +650,16 @@ def shutdown(self) -> None:
for app_state in self._application_states.values():
app_state.delete()

def is_ready_for_shutdown(self) -> bool:
"""Return whether all applications have shut down.

Iterate through all application states and check if all their applications
are deleted.
"""
return all(
app_state.is_deleted() for app_state in self._application_states.values()
)

def _save_checkpoint_func(
self, *, writeahead_checkpoints: Optional[Dict[str, ApplicationTargetState]]
) -> None:
Expand Down
59 changes: 11 additions & 48 deletions python/ray/serve/_private/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
CLIENT_POLLING_INTERVAL_S,
CLIENT_CHECK_CREATION_POLLING_INTERVAL_S,
MAX_CACHED_HANDLES,
SERVE_NAMESPACE,
SERVE_DEFAULT_APP_NAME,
)
from ray.serve._private.deploy_utils import get_deploy_args
Expand Down Expand Up @@ -95,7 +94,7 @@ def __del__(self):
def __reduce__(self):
raise RayServeException(("Ray Serve client cannot be serialized."))

def shutdown(self) -> None:
def shutdown(self, timeout_s: float = 30.0) -> None:
"""Completely shut down the connected Serve instance.

Shuts down all processes and deletes all state associated with the
Expand All @@ -107,53 +106,17 @@ def shutdown(self) -> None:
del self.handle_cache[k]

if ray.is_initialized() and not self._shutdown:
ray.get(self._controller.shutdown.remote())
self._wait_for_deployments_shutdown()

ray.kill(self._controller, no_restart=True)

# Wait for the named actor entry gets removed as well.
started = time.time()
while True:
try:
ray.get_actor(self._controller_name, namespace=SERVE_NAMESPACE)
if time.time() - started > 5:
logger.warning(
"Waited 5s for Serve to shutdown gracefully but "
"the controller is still not cleaned up. "
"You can ignore this warning if you are shutting "
"down the Ray cluster."
)
break
except ValueError: # actor name is removed
break

self._shutdown = True

def _wait_for_deployments_shutdown(self, timeout_s: int = 60):
"""Waits for all deployments to be shut down and deleted.

Raises TimeoutError if this doesn't happen before timeout_s.
"""
start = time.time()
while time.time() - start < timeout_s:
deployment_statuses = self.get_all_deployment_statuses()
if len(deployment_statuses) == 0:
break
else:
logger.debug(
f"Waiting for shutdown, {len(deployment_statuses)} "
"deployments still alive."
try:
ray.get(self._controller.graceful_shutdown.remote(), timeout=timeout_s)
except ray.exceptions.RayActorError:
# Controller has been shut down.
pass
except TimeoutError:
logger.warning(
GeneDer marked this conversation as resolved.
Show resolved Hide resolved
f"Controller failed to shut down within {timeout_s}s. "
"Check controller logs for more details."
)
time.sleep(CLIENT_POLLING_INTERVAL_S)
else:
live_names = [
deployment_status.name for deployment_status in deployment_statuses
]
raise TimeoutError(
f"Shutdown didn't complete after {timeout_s}s. "
f"Deployments still alive: {live_names}."
)
self._shutdown = True

def _wait_for_deployment_healthy(self, name: str, timeout_s: int = -1):
"""Waits for the named deployment to enter "HEALTHY" status.
Expand Down
15 changes: 14 additions & 1 deletion python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,10 @@ def _set_target_state_deleting(self) -> None:
self._curr_status_info = DeploymentStatusInfo(
self._name, DeploymentStatus.UPDATING
)
logger.info(f"Deleting deployment {self._name}.")
logger.info(
f"Deleting deployment {self._name}.",
extra={"log_to_stderr": False},
)

def _set_target_state(self, target_info: DeploymentInfo) -> None:
"""Set the target state for the deployment to the provided info."""
Expand Down Expand Up @@ -2284,6 +2287,16 @@ def shutdown(self):
# TODO(jiaodong): Need to add some logic to prevent new replicas
# from being created once shutdown signal is sent.

def is_ready_for_shutdown(self) -> bool:
"""Return whether all deployments are shutdown.

Check there are no deployment states and no checkpoints.
"""
return (
len(self._deployment_states) == 0
and self._kv_store.get(CHECKPOINT_KEY) is None
GeneDer marked this conversation as resolved.
Show resolved Hide resolved
)

def _save_checkpoint_func(
self, *, writeahead_checkpoints: Optional[Dict[str, Tuple]]
) -> None:
Expand Down
8 changes: 8 additions & 0 deletions python/ray/serve/_private/endpoint_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ def __init__(self, kv_store: KVStoreBase, long_poll_host: LongPollHost):
def shutdown(self):
self._kv_store.delete(CHECKPOINT_KEY)

def is_ready_for_shutdown(self) -> bool:
"""Returns whether the endpoint checkpoint has been deleted.

Get the endpoint checkpoint from the kv store. If it is None, then it has been
deleted.
"""
return self._kv_store.get(CHECKPOINT_KEY) is None

def _checkpoint(self):
self._kv_store.put(CHECKPOINT_KEY, cloudpickle.dumps(self._endpoints))

Expand Down
32 changes: 32 additions & 0 deletions python/ray/serve/_private/http_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,27 @@ def shutdown(self):
self._shutting_down = True
ray.kill(self.actor_handle, no_restart=True)

def is_ready_for_shutdown(self) -> bool:
"""Return whether the HTTP proxy actor is shutdown.

For an HTTP proxy actor to be considered shutdown, it must be marked as
_shutting_down and the actor must be dead. If the actor is dead, the health
check will return RayActorError.
"""
if not self._shutting_down:
return False

try:
ray.get(self._actor_handle.check_health.remote(), timeout=0.001)
except ray.exceptions.RayActorError:
# The actor is dead, so it's ready for shutdown.
return True
except ray.exceptions.GetTimeoutError:
# The actor is still alive, so it's not ready for shutdown.
return False

return False


class HTTPState:
"""Manages all state for HTTP proxies in the system.
Expand Down Expand Up @@ -269,6 +290,17 @@ def shutdown(self) -> None:
for proxy_state in self._proxy_states.values():
proxy_state.shutdown()

def is_ready_for_shutdown(self) -> bool:
"""Return whether all proxies are shutdown.

Iterate through all proxy states and check if all their proxy actors
are shutdown.
"""
return all(
proxy_state.is_ready_for_shutdown()
for proxy_state in self._proxy_states.values()
)

def get_config(self):
return self._config

Expand Down
98 changes: 95 additions & 3 deletions python/ray/serve/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ async def __init__(
worker_id=ray.get_runtime_context().get_worker_id(),
log_file_path=get_component_logger_file_path(),
)
self._shutting_down = False
self._shutdown = asyncio.Event()
self._shutdown_start_time = None

run_background_task(self.run_control_loop())

Expand Down Expand Up @@ -307,6 +310,12 @@ async def run_control_loop(self) -> None:
recovering_timeout = RECOVERING_LONG_POLL_BROADCAST_TIMEOUT_S
start_time = time.time()
while True:
if self._shutting_down:
try:
self.shutdown()
except Exception:
logger.exception("Exception during shutdown.")

if (
not self.done_recovering_event.is_set()
and time.time() - start_time > recovering_timeout
Expand Down Expand Up @@ -439,15 +448,84 @@ def get_root_url(self):
)
return http_config.root_url

def config_checkpoint_deleted(self) -> bool:
"""Returns whether the config checkpoint has been deleted.

Get the config checkpoint from the kv store. If it is None, then it has been
deleted.
"""
return self.kv_store.get(CONFIG_CHECKPOINT_KEY) is None

def shutdown(self):
"""Shuts down the serve instance completely."""
"""Shuts down the serve instance completely.

This method will only be triggered when `self._shutting_down` is true. It
shrekris-anyscale marked this conversation as resolved.
Show resolved Hide resolved
deletes the kv store for config checkpoints, sets application state to deleting,
delete all deployments, and shuts down all HTTP proxies. Once all these
resources are released, it then kills the controller actor.
"""
if not self._shutting_down:
return

if self._shutdown_start_time is None:
self._shutdown_start_time = time.time()

logger.info("Controller shutdown started!", extra={"log_to_stderr": False})
self.kv_store.delete(CONFIG_CHECKPOINT_KEY)
self.application_state_manager.shutdown()
self.deployment_state_manager.shutdown()
self.endpoint_state.shutdown()
if self.http_state:
self.http_state.shutdown()

config_checkpoint_deleted = self.config_checkpoint_deleted()
application_is_shutdown = self.application_state_manager.is_ready_for_shutdown()
deployment_is_shutdown = self.deployment_state_manager.is_ready_for_shutdown()
endpoint_is_shutdown = self.endpoint_state.is_ready_for_shutdown()
http_state_is_shutdown = (
self.http_state is None or self.http_state.is_ready_for_shutdown()
)
if (
config_checkpoint_deleted
and application_is_shutdown
and deployment_is_shutdown
and endpoint_is_shutdown
and http_state_is_shutdown
):
logger.warning(
"All resources have shut down, shutting down controller!",
extra={"log_to_stderr": False},
)
_controller_actor = ray.get_runtime_context().current_actor
self._shutdown.set()
ray.kill(_controller_actor, no_restart=True)
GeneDer marked this conversation as resolved.
Show resolved Hide resolved
elif time.time() - self._shutdown_start_time > 10:
if not config_checkpoint_deleted:
logger.warning(
f"{CONFIG_CHECKPOINT_KEY} not yet deleted",
extra={"log_to_stderr": False},
)
if not application_is_shutdown:
logger.warning(
"application not yet shutdown",
extra={"log_to_stderr": False},
)
if not deployment_is_shutdown:
logger.warning(
"deployment not yet shutdown",
extra={"log_to_stderr": False},
)
if not endpoint_is_shutdown:
logger.warning(
"endpoint not yet shutdown",
extra={"log_to_stderr": False},
)
if not http_state_is_shutdown:
logger.warning(
"http_state not yet shutdown",
extra={"log_to_stderr": False},
)

def deploy(
self,
name: str,
Expand Down Expand Up @@ -835,6 +913,20 @@ def record_multiplexed_replica_info(self, info: MultiplexedReplicaInfo):
"""
self.deployment_state_manager.record_multiplexed_replica_info(info)

async def graceful_shutdown(self, wait: bool = True):
"""Set the shutting down flag on controller to signal shutdown in
run_control_loop().

This is used to signal to the controller that it should proceed with shutdown
process, so it can shut down gracefully. It also waits until the shutdown
event is triggered if wait is true.
"""
self._shutting_down = True
if not wait:
return

await self._shutdown.wait()


@ray.remote(num_cpus=0, max_calls=1)
def deploy_serve_application(
Expand Down Expand Up @@ -943,7 +1035,7 @@ def __init__(
http_proxy_port: int = 8000,
):
try:
self._controller = ray.get_actor(controller_name, namespace="serve")
self._controller = ray.get_actor(controller_name, namespace=SERVE_NAMESPACE)
except ValueError:
self._controller = None
if self._controller is None:
Expand All @@ -956,7 +1048,7 @@ def __init__(
max_restarts=-1,
max_task_retries=-1,
resources={HEAD_NODE_RESOURCE_NAME: 0.001},
namespace="serve",
namespace=SERVE_NAMESPACE,
max_concurrency=CONTROLLER_MAX_CONCURRENCY,
).remote(
controller_name,
Expand Down
Loading