From 385d0b608259ce7575f5f2fa295c5b2e4df0f0e3 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Sat, 17 Jun 2023 22:47:27 -0700 Subject: [PATCH 01/15] [serve] address issue when downscale nodes with ongoing requests and stop routing to nodes without replica issue: https://github.com/anyscale/product/issues/21404 Signed-off-by: Gene Su --- dashboard/client/src/type/serve.ts | 1 + python/ray/serve/_private/common.py | 1 + python/ray/serve/_private/deployment_state.py | 24 +++++++- python/ray/serve/_private/http_proxy.py | 30 ++++++++++ python/ray/serve/_private/http_state.py | 55 +++++++++++++++---- python/ray/serve/controller.py | 15 +++++ 6 files changed, 114 insertions(+), 12 deletions(-) diff --git a/dashboard/client/src/type/serve.ts b/dashboard/client/src/type/serve.ts index ea396b329919..85b2ecd375b9 100644 --- a/dashboard/client/src/type/serve.ts +++ b/dashboard/client/src/type/serve.ts @@ -90,6 +90,7 @@ export enum ServeSystemActorStatus { STARTING = "STARTING", HEALTHY = "HEALTHY", UNHEALTHY = "UNHEALTHY", + INACTIVE = "INACTIVE", } export type ServeSystemActor = { diff --git a/python/ray/serve/_private/common.py b/python/ray/serve/_private/common.py index b5a5929b4d69..3c34c63e9c37 100644 --- a/python/ray/serve/_private/common.py +++ b/python/ray/serve/_private/common.py @@ -375,6 +375,7 @@ class HTTPProxyStatus(str, Enum): STARTING = "STARTING" HEALTHY = "HEALTHY" UNHEALTHY = "UNHEALTHY" + INACTIVE = "INACTIVE" class ServeComponentType(str, Enum): diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 2547ac6bb2be..3a00f28bf6d7 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -10,7 +10,7 @@ from collections import defaultdict, OrderedDict from copy import copy from enum import Enum -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union import ray from ray import ObjectRef, cloudpickle @@ -1230,6 +1230,17 @@ def get_running_replica_infos(self) -> List[RunningReplicaInfo]: for replica in self._replicas.get([ReplicaState.RUNNING]) ] + def get_running_replica_node_ids(self) -> List[str]: + """Get the node ids of all running replicas in this deployment. + + This is used to determine which node has replicas and in terms determine + the active flag on the http proxy. + """ + return [ + replica.actor_node_id() + for replica in self._replicas.get([ReplicaState.RUNNING]) + ] + def list_replica_details(self) -> List[ReplicaDetails]: return [replica.actor_details for replica in self._replicas.get()] @@ -2486,3 +2497,14 @@ def record_multiplexed_replica_info(self, info: MultiplexedReplicaInfo): self._deployment_states[info.deployment_name].record_multiplexed_model_ids( info.replica_tag, info.model_ids ) + + def get_node_ids_with_running_replicas(self) -> Set[str]: + """Return set of node ids with running replicas. + + This is used to determine which node has replicas and in terms determine + the active flag on the http proxy. + """ + running_replicas = set() + for deployment_state in self._deployment_states.values(): + running_replicas.update(deployment_state.get_running_replica_node_ids()) + return running_replicas diff --git a/python/ray/serve/_private/http_proxy.py b/python/ray/serve/_private/http_proxy.py index 7cb91dd60498..97651f45674f 100644 --- a/python/ray/serve/_private/http_proxy.py +++ b/python/ray/serve/_private/http_proxy.py @@ -243,6 +243,9 @@ def get_handle(name): "status_code", ), ) + self._ongoing_requests = 0 + self._ongoing_requests_dummy_obj_ref = None + self.active = True def _update_routes(self, endpoints: Dict[EndpointTag, EndpointInfo]) -> None: self.route_info: Dict[str, Tuple[EndpointTag, List[str]]] = dict() @@ -273,6 +276,13 @@ async def _not_found(self, scope, receive, send): ) await response.send(scope, receive, send) + async def _service_unavailable(self, scope, receive, send): + response = Response( + "This node has no replica. Http proxy is unavailable.", + status_code=503, + ) + await response.send(scope, receive, send) + async def receive_asgi_messages(self, request_id: str) -> List[Message]: queue = self.asgi_receive_queues.get(request_id, None) if queue is None: @@ -297,6 +307,9 @@ async def __call__(self, scope, receive, send): route_path = scope["path"][len(root_path) :] if route_path == "/-/routes": + if not self.active: + return await self._service_unavailable(scope, receive, send) + self.request_counter.inc( tags={ "route": route_path, @@ -310,6 +323,9 @@ async def __call__(self, scope, receive, send): ) if route_path == "/-/healthz": + if not self.active: + return await self._service_unavailable(scope, receive, send) + self.request_counter.inc( tags={ "route": route_path, @@ -322,6 +338,14 @@ async def __call__(self, scope, receive, send): scope, receive, send ) + # The current autoscale logic can downscale nodes with ongoing requests if the + # node doesn't have replicas and has no object references. This counter and + # the dummy object reference will have to keep the node alive while draining + # requests, so they are not dropped unintentionally. + self._ongoing_requests += 1 + if self._ongoing_requests > 0 and self._ongoing_requests_dummy_obj_ref is None: + self._ongoing_requests_dummy_obj_ref = ray.put("ongoing_requests") + route_prefix, handle, app_name = self.prefix_router.match_route(route_path) if route_prefix is None: self.request_error_counter.inc( @@ -417,6 +441,12 @@ async def __call__(self, scope, receive, send): } ) + # Decrement the ongoing request counter and drop the dummy object reference + # signaling that the node can be downscaled safely. + self._ongoing_requests -= 1 + if self._ongoing_requests == 0: + self._ongoing_requests_dummy_obj_ref = None + async def send_request_to_replica_unary( self, handle: RayServeHandle, diff --git a/python/ray/serve/_private/http_state.py b/python/ray/serve/_private/http_state.py index 3ce02692c2b0..27821c1769d4 100644 --- a/python/ray/serve/_private/http_state.py +++ b/python/ray/serve/_private/http_state.py @@ -4,7 +4,7 @@ import random import time import traceback -from typing import Dict, List, Tuple +from typing import Dict, List, Set, Tuple import ray from ray.actor import ActorHandle @@ -76,6 +76,15 @@ def set_status(self, status: HTTPProxyStatus) -> None: self._status = status self.update_actor_details(status=self._status) + def set_active_flag(self, node_id: str, active: bool): + """Set the active flag on the http proxy. + + Set the active flag on the http proxy. Also log when active state changes. + """ + if self._actor_handle.app.active != active: + logger.info(f"Setting active flag on node {node_id} to {active}.") + self._actor_handle.app.active = active + def try_update_status(self, status: HTTPProxyStatus): """Try update with the new status and only update when the conditions are met. @@ -120,15 +129,18 @@ def update(self): 1) When the HTTP proxy is already shutting down, do nothing. 2) When the HTTP proxy is starting, check ready object reference. If ready - object reference returns a successful call, set status to HEALTHY. If the call - to ready() on the HTTP Proxy actor has any exception or timeout, increment the - consecutive health check failure counter and retry on the next update call. The - status is only set to UNHEALTHY when all retries have exhausted. + object reference returns a successful call and the http proxy is active, set + status to HEALTHY. If the http proxy is not active, set status to inactive. + If the call to ready() on the HTTP Proxy actor has any exception or timeout, + increment the consecutive health check failure counter and retry on the next + update call. The status is only set to UNHEALTHY when all retries have + exhausted. 3) When the HTTP proxy already has an in-progress health check. If health check - object returns a successful call, set status to HEALTHY. If the call has any - exception or timeout, count towards 1 of the consecutive health check failures - and retry on the next update call. The status is only set to UNHEALTHY when all - retries have exhausted. + object returns a successful call and the http proxy is active, set status to + HEALTHY. If the http proxy is not active, set status to inactive. If the call + has any exception or timeout, count towards 1 of the consecutive health check + failures and retry on the next update call. The status is only set to UNHEALTHY + when all retries have exhausted. 4) When the HTTP proxy need to setup another health check (when none of the above met and the time since the last health check is longer than PROXY_HEALTH_CHECK_PERIOD_S with some margin). Reset @@ -143,7 +155,12 @@ def update(self): if finished: try: worker_id, log_file_path = json.loads(ray.get(finished[0])) - self.try_update_status(HTTPProxyStatus.HEALTHY) + _status = ( + HTTPProxyStatus.HEALTHY + if self._actor_handle.app.active + else HTTPProxyStatus.INACTIVE + ) + self.try_update_status(_status) self.update_actor_details( worker_id=worker_id, log_file_path=log_file_path, @@ -173,7 +190,12 @@ def update(self): self._health_check_obj_ref = None try: ray.get(finished[0]) - self.try_update_status(HTTPProxyStatus.HEALTHY) + _status = ( + HTTPProxyStatus.HEALTHY + if self._actor_handle.app.active + else HTTPProxyStatus.INACTIVE + ) + self.try_update_status(_status) except Exception as e: logger.warning( f"Health check for HTTP proxy {self._actor_name} failed: {e}" @@ -404,3 +426,14 @@ async def ensure_http_route_exists(self, endpoint: EndpointTag, timeout_s: float for proxy in self._proxy_states.values() ] ) + + def update_active_flags(self, active_nodes: Set[str]): + """Update the active states of all HTTP proxies. + + Given a set of active nodes, set the active flag of all HTTP proxies. + """ + for node_id, proxy_state in self._proxy_states.items(): + if node_id in active_nodes: + proxy_state.set_active(node_id=node_id, active=True) + else: + proxy_state.set_active(node_id=node_id, active=False) diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index b1bdee46b98f..c27b55bc9169 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -271,6 +271,19 @@ def get_http_proxy_names(self) -> bytes: ) return actor_name_list.SerializeToString() + def _update_http_proxy_active_flag(self): + """Update the active flag of all http proxies. + + Get a list of node ids that have running replicas and update the active on http + proxies. + """ + if self.http_state is None: + return + + self.http_state.update_active_flags( + self.deployment_state_manager.get_node_ids_with_running_replicas() + ) + async def run_control_loop(self) -> None: # NOTE(edoakes): we catch all exceptions here and simply log them, # because an unhandled exception would cause the main control loop to @@ -309,6 +322,8 @@ async def run_control_loop(self) -> None: except Exception: logger.exception("Exception updating application state.") + self._update_http_proxy_active_flag() + try: self._put_serve_snapshot() except Exception: From 6944a4eac9918d381e3575dcf61fd3942f254b74 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Sun, 18 Jun 2023 09:21:00 -0700 Subject: [PATCH 02/15] manual test and fixes issues Signed-off-by: Gene Su --- python/ray/serve/_private/deployment_state.py | 2 +- python/ray/serve/_private/http_proxy.py | 10 ++++++ python/ray/serve/_private/http_state.py | 34 ++++++++----------- 3 files changed, 26 insertions(+), 20 deletions(-) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 3a00f28bf6d7..573cdc859833 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -1237,7 +1237,7 @@ def get_running_replica_node_ids(self) -> List[str]: the active flag on the http proxy. """ return [ - replica.actor_node_id() + replica.actor_node_id for replica in self._replicas.get([ReplicaState.RUNNING]) ] diff --git a/python/ray/serve/_private/http_proxy.py b/python/ray/serve/_private/http_proxy.py index 97651f45674f..9a5b7eff83ec 100644 --- a/python/ray/serve/_private/http_proxy.py +++ b/python/ray/serve/_private/http_proxy.py @@ -742,3 +742,13 @@ async def check_health(self): async def receive_asgi_messages(self, request_id: str) -> bytes: return pickle.dumps(await self.app.receive_asgi_messages(request_id)) + + async def set_active_flag(self, node_id: str, active: bool): + """Set the active flag on the http proxy. + + Set the active flag on the http proxy to signal `/-/healthz` and `/-/routes` + endpoints returns 503 on inactive proxies. Also log when active state changes. + """ + if self.app.active != active: + logger.info(f"Setting active flag on node {node_id} to {active}.") + self.app.active = active diff --git a/python/ray/serve/_private/http_state.py b/python/ray/serve/_private/http_state.py index 27821c1769d4..1bac1fa9798a 100644 --- a/python/ray/serve/_private/http_state.py +++ b/python/ray/serve/_private/http_state.py @@ -79,11 +79,12 @@ def set_status(self, status: HTTPProxyStatus) -> None: def set_active_flag(self, node_id: str, active: bool): """Set the active flag on the http proxy. - Set the active flag on the http proxy. Also log when active state changes. + Set the active flag on the http proxy. When the flag is set to false, also + update status to from HEALTHY to INACTIVE to display on the dashboard. """ - if self._actor_handle.app.active != active: - logger.info(f"Setting active flag on node {node_id} to {active}.") - self._actor_handle.app.active = active + self._actor_handle.set_active_flag.remote(node_id=node_id, active=active) + if self._status == HTTPProxyStatus.HEALTHY and not active: + self.try_update_status(HTTPProxyStatus.INACTIVE) def try_update_status(self, status: HTTPProxyStatus): """Try update with the new status and only update when the conditions are met. @@ -155,12 +156,7 @@ def update(self): if finished: try: worker_id, log_file_path = json.loads(ray.get(finished[0])) - _status = ( - HTTPProxyStatus.HEALTHY - if self._actor_handle.app.active - else HTTPProxyStatus.INACTIVE - ) - self.try_update_status(_status) + self.try_update_status(HTTPProxyStatus.HEALTHY) self.update_actor_details( worker_id=worker_id, log_file_path=log_file_path, @@ -190,12 +186,7 @@ def update(self): self._health_check_obj_ref = None try: ray.get(finished[0]) - _status = ( - HTTPProxyStatus.HEALTHY - if self._actor_handle.app.active - else HTTPProxyStatus.INACTIVE - ) - self.try_update_status(_status) + self.try_update_status(HTTPProxyStatus.HEALTHY) except Exception as e: logger.warning( f"Health check for HTTP proxy {self._actor_name} failed: {e}" @@ -430,10 +421,15 @@ async def ensure_http_route_exists(self, endpoint: EndpointTag, timeout_s: float def update_active_flags(self, active_nodes: Set[str]): """Update the active states of all HTTP proxies. - Given a set of active nodes, set the active flag of all HTTP proxies. + Given a set of active nodes, set the active flag of all HTTP proxies, except + for head node. Head node will always be active. """ for node_id, proxy_state in self._proxy_states.items(): + # Head node will always be active. + if node_id == self._head_node_id: + continue + if node_id in active_nodes: - proxy_state.set_active(node_id=node_id, active=True) + proxy_state.set_active_flag(node_id=node_id, active=True) else: - proxy_state.set_active(node_id=node_id, active=False) + proxy_state.set_active_flag(node_id=node_id, active=False) From 59b3b5c5a8dce6b2bd8ca833813712700991a129 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Mon, 19 Jun 2023 17:29:00 -0700 Subject: [PATCH 03/15] add unit test for get_running_replica_node_ids() and get_node_ids_with_running_replicas() Signed-off-by: Gene Su --- .../ray/serve/tests/test_deployment_state.py | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/python/ray/serve/tests/test_deployment_state.py b/python/ray/serve/tests/test_deployment_state.py index 411f42785904..92b08d991f89 100644 --- a/python/ray/serve/tests/test_deployment_state.py +++ b/python/ray/serve/tests/test_deployment_state.py @@ -230,6 +230,11 @@ def check_health(self): self.health_check_called = True return self.healthy + def set_scheduling_strategy( + self, scheduling_strategy: NodeAffinitySchedulingStrategy + ): + self._scheduling_strategy = scheduling_strategy + class MockKVStore: def __init__(self): @@ -2685,5 +2690,62 @@ def test_recover(self): assert actor_replica._actor_handle.is_allocated_called +@patch.object(DriverDeploymentState, "_get_all_node_ids") +def test_get_node_ids_with_running_replicas( + mock_get_all_node_ids, mock_deployment_state_manager_full +): + """Test get_running_replica_node_ids() and get_node_ids_with_running_replicas() are + collecting the correct node ids + + When there are no running replicas, both methods should return empty results. When + the replicas are in the RUNNING state, get_running_replica_node_ids() should return + a list of all node ids. `get_node_ids_with_running_replicas()` should return a set + of all node ids. + """ + node_ids = ("node1", "node2", "node2") + mock_get_all_node_ids.return_value = [node_ids] + + tag = "test_deployment" + create_deployment_state_manager, _ = mock_deployment_state_manager_full + deployment_state_manager = create_deployment_state_manager() + + # Deploy deployment with version "1" and 3 replicas + info1, version1 = deployment_info(version="1", num_replicas=3) + updating = deployment_state_manager.deploy(tag, info1) + deployment_state = deployment_state_manager._deployment_states[tag] + assert updating + + # When the replicas are in the STARTING state, both `get_running_replica_node_ids()` + # and `get_node_ids_with_running_replicas()` should returning empty results. + deployment_state_manager.update() + check_counts( + deployment_state, + total=3, + version=version1, + by_state=[(ReplicaState.STARTING, 3)], + ) + mocked_replicas = deployment_state._replicas.get() + assert deployment_state.get_running_replica_node_ids() == [] + assert deployment_state_manager.get_node_ids_with_running_replicas() == set() + + # When the replicas are in RUNNING state, `get_running_replica_node_ids()` should + # return the same results as `node_ids` in a list. + # `get_node_ids_with_running_replicas()` should return a set of `node_ids`. + for idx, mocked_replica in enumerate(mocked_replicas): + mocked_replica._actor.set_ready() + mocked_replica._actor.set_scheduling_strategy( + NodeAffinitySchedulingStrategy(node_id=node_ids[idx], soft=True) + ) + deployment_state_manager.update() + check_counts( + deployment_state, + total=3, + version=version1, + by_state=[(ReplicaState.RUNNING, 3)], + ) + assert deployment_state.get_running_replica_node_ids() == list(node_ids) + assert deployment_state_manager.get_node_ids_with_running_replicas() == set(node_ids) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__])) From f21839408846d620ef463000accf7e7062587399 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Tue, 20 Jun 2023 08:44:31 -0700 Subject: [PATCH 04/15] add test to test http state and proxies Signed-off-by: Gene Su --- python/ray/serve/_private/http_state.py | 19 ++++---- python/ray/serve/tests/test_http_state.py | 57 ++++++++++++++++++++++- 2 files changed, 63 insertions(+), 13 deletions(-) diff --git a/python/ray/serve/_private/http_state.py b/python/ray/serve/_private/http_state.py index 1bac1fa9798a..5e453322d2a7 100644 --- a/python/ray/serve/_private/http_state.py +++ b/python/ray/serve/_private/http_state.py @@ -130,18 +130,15 @@ def update(self): 1) When the HTTP proxy is already shutting down, do nothing. 2) When the HTTP proxy is starting, check ready object reference. If ready - object reference returns a successful call and the http proxy is active, set - status to HEALTHY. If the http proxy is not active, set status to inactive. - If the call to ready() on the HTTP Proxy actor has any exception or timeout, - increment the consecutive health check failure counter and retry on the next - update call. The status is only set to UNHEALTHY when all retries have - exhausted. + object reference returns a successful call, set status to HEALTHY. If the call + to ready() on the HTTP Proxy actor has any exception or timeout, increment the + consecutive health check failure counter and retry on the next update call. The + status is only set to UNHEALTHY when all retries have exhausted. 3) When the HTTP proxy already has an in-progress health check. If health check - object returns a successful call and the http proxy is active, set status to - HEALTHY. If the http proxy is not active, set status to inactive. If the call - has any exception or timeout, count towards 1 of the consecutive health check - failures and retry on the next update call. The status is only set to UNHEALTHY - when all retries have exhausted. + object returns a successful call, set status to HEALTHY. If the call has any + exception or timeout, count towards 1 of the consecutive health check failures + and retry on the next update call. The status is only set to UNHEALTHY when all + retries have exhausted. 4) When the HTTP proxy need to setup another health check (when none of the above met and the time since the last health check is longer than PROXY_HEALTH_CHECK_PERIOD_S with some margin). Reset diff --git a/python/ray/serve/tests/test_http_state.py b/python/ray/serve/tests/test_http_state.py index 1c51e26b2fdc..98b1284395c4 100644 --- a/python/ray/serve/tests/test_http_state.py +++ b/python/ray/serve/tests/test_http_state.py @@ -10,6 +10,7 @@ from ray.serve.config import DeploymentMode, HTTPOptions from ray.serve._private.common import HTTPProxyStatus from ray.serve._private.http_state import HTTPState, HTTPProxyState +from ray.serve._private.http_proxy import HTTPProxyActor HEAD_NODE_ID = "node_id-index-head" @@ -54,9 +55,11 @@ async def check_health(self): def _create_http_proxy_state( proxy_actor_class: Any = MockHTTPProxyActor, status: HTTPProxyStatus = HTTPProxyStatus.STARTING, + node_id: str = "mock_node_id", + **kwargs, ) -> HTTPProxyState: - proxy = proxy_actor_class.options(lifetime="detached").remote() - state = HTTPProxyState(proxy, "alice", "mock_node_id", "mock_node_ip") + proxy = proxy_actor_class.options(lifetime="detached").remote(**kwargs) + state = HTTPProxyState(proxy, "alice", node_id, "mock_node_ip") state.set_status(status=status) print(f"The http proxy state created has the status of: {state.status}") return state @@ -531,6 +534,56 @@ def test_http_proxy_state_update_unhealthy_check_health_succeed(): assert proxy_state._consecutive_health_check_failures == 0 +def test_update_active_flags(mock_get_all_node_ids): + """Test update_active_flags() method + + When update nodes to inactive, head node http proxy should always be active while + worker node http proxy should change to inactive. + """ + worker_node_id = "worker-node-id-0" + state = _make_http_state(HTTPOptions(location=DeploymentMode.EveryNode)) + + # Setup http proxy for head node + state._proxy_states[HEAD_NODE_ID] = _create_http_proxy_state( + proxy_actor_class=HTTPProxyActor, + status=HTTPProxyStatus.HEALTHY, + node_id=HEAD_NODE_ID, + host="localhost", + port=8000, + root_path='/', + controller_name='mock_controller_name', + node_ip_address='foo', + ) + # Setup http proxy for a worker node + state._proxy_states[worker_node_id] = _create_http_proxy_state( + proxy_actor_class=HTTPProxyActor, + status=HTTPProxyStatus.HEALTHY, + node_id=worker_node_id, + host="localhost", + port=8000, + root_path='/', + controller_name='mock_controller_name', + node_ip_address='bar', + ) + + # Update flag for no nodes active + state.update_active_flags(set()) + + # Head node proxy should continue to be active + wait_for_condition( + condition_predictor=_update_and_check_proxy_status, + state=state._proxy_states[HEAD_NODE_ID], + status=HTTPProxyStatus.HEALTHY, + ) + + # Worker node proxy should turn inactive + wait_for_condition( + condition_predictor=_update_and_check_proxy_status, + state=state._proxy_states[worker_node_id], + status=HTTPProxyStatus.INACTIVE, + ) + + if __name__ == "__main__": import sys From fc593f09291826ce77f0c18f3118a68389b878f5 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Wed, 21 Jun 2023 09:54:35 -0700 Subject: [PATCH 05/15] add test for expected responses on head and worker node routes Signed-off-by: Gene Su --- python/ray/serve/_private/http_proxy.py | 3 +- python/ray/serve/_private/http_state.py | 17 ++- .../ray/serve/tests/test_deployment_state.py | 4 +- python/ray/serve/tests/test_http_state.py | 12 +-- python/ray/serve/tests/test_standalone3.py | 100 +++++++++++++++++- 5 files changed, 125 insertions(+), 11 deletions(-) diff --git a/python/ray/serve/_private/http_proxy.py b/python/ray/serve/_private/http_proxy.py index 9a5b7eff83ec..c0df372ddbd5 100644 --- a/python/ray/serve/_private/http_proxy.py +++ b/python/ray/serve/_private/http_proxy.py @@ -297,7 +297,6 @@ async def __call__(self, scope, receive, send): See details at: https://asgi.readthedocs.io/en/latest/specs/index.html. """ - assert scope["type"] in {"http", "websocket"} method = scope.get("method", "websocket").upper() @@ -664,7 +663,7 @@ def __init__( self.wrapped_app = middleware.cls(self.wrapped_app, **middleware.options) # Start running the HTTP server on the event loop. - # This task should be running forever. We track it in case of failure. + # This task should be running forever. we track it in case of failure. self.running_task = get_or_create_event_loop().create_task(self.run()) async def ready(self): diff --git a/python/ray/serve/_private/http_state.py b/python/ray/serve/_private/http_state.py index 5e453322d2a7..70955625fac8 100644 --- a/python/ray/serve/_private/http_state.py +++ b/python/ray/serve/_private/http_state.py @@ -1,6 +1,7 @@ import asyncio import json import logging +import os import random import time import traceback @@ -335,6 +336,20 @@ def _generate_actor_name(self, node_id: str) -> str: def _start_proxy( self, name: str, node_id: str, node_ip_address: str ) -> ActorHandle: + port = self._config.port + + # This is used for test. Setting up `TEST_WORKER_NODE_PORT` env var will help + # head node and worker nodes to be opening on different ports. + if ( + node_id != self._head_node_id + and os.getenv("TEST_WORKER_NODE_PORT") is not None + ): + logger.warning( + f"`TEST_WORKER_NODE_PORT` env var is set. " + f"Using it for worker node {node_id}." + ) + port = int(os.getenv("TEST_WORKER_NODE_PORT")) + proxy = HTTPProxyActor.options( num_cpus=self._config.num_cpus, name=name, @@ -345,7 +360,7 @@ def _start_proxy( scheduling_strategy=NodeAffinitySchedulingStrategy(node_id, soft=False), ).remote( self._config.host, - self._config.port, + port, self._config.root_path, controller_name=self._controller_name, node_ip_address=node_ip_address, diff --git a/python/ray/serve/tests/test_deployment_state.py b/python/ray/serve/tests/test_deployment_state.py index 92b08d991f89..755504cff3f9 100644 --- a/python/ray/serve/tests/test_deployment_state.py +++ b/python/ray/serve/tests/test_deployment_state.py @@ -2744,7 +2744,9 @@ def test_get_node_ids_with_running_replicas( by_state=[(ReplicaState.RUNNING, 3)], ) assert deployment_state.get_running_replica_node_ids() == list(node_ids) - assert deployment_state_manager.get_node_ids_with_running_replicas() == set(node_ids) + assert deployment_state_manager.get_node_ids_with_running_replicas() == set( + node_ids + ) if __name__ == "__main__": diff --git a/python/ray/serve/tests/test_http_state.py b/python/ray/serve/tests/test_http_state.py index 98b1284395c4..6d04f070656b 100644 --- a/python/ray/serve/tests/test_http_state.py +++ b/python/ray/serve/tests/test_http_state.py @@ -550,9 +550,9 @@ def test_update_active_flags(mock_get_all_node_ids): node_id=HEAD_NODE_ID, host="localhost", port=8000, - root_path='/', - controller_name='mock_controller_name', - node_ip_address='foo', + root_path="/", + controller_name="mock_controller_name", + node_ip_address="foo", ) # Setup http proxy for a worker node state._proxy_states[worker_node_id] = _create_http_proxy_state( @@ -561,9 +561,9 @@ def test_update_active_flags(mock_get_all_node_ids): node_id=worker_node_id, host="localhost", port=8000, - root_path='/', - controller_name='mock_controller_name', - node_ip_address='bar', + root_path="/", + controller_name="mock_controller_name", + node_ip_address="bar", ) # Update flag for no nodes active diff --git a/python/ray/serve/tests/test_standalone3.py b/python/ray/serve/tests/test_standalone3.py index 116ca6236f6b..972bdea77f96 100644 --- a/python/ray/serve/tests/test_standalone3.py +++ b/python/ray/serve/tests/test_standalone3.py @@ -17,7 +17,7 @@ wait_for_condition, SignalActor, ) -from ray.cluster_utils import AutoscalingCluster +from ray.cluster_utils import AutoscalingCluster, Cluster from ray.exceptions import RayActorError from ray.serve._private.constants import ( SYNC_HANDLE_IN_DAG_FEATURE_FLAG_ENV_KEY, @@ -378,6 +378,9 @@ def ready(self): lambda: len(list(filter(lambda n: n["Alive"], ray.nodes()))) == 1 ) + # Clean up serve. + serve.shutdown() + def test_legacy_sync_handle_env_var(call_ray_stop_only): # noqa: F811 script = """ @@ -415,5 +418,100 @@ def predict(self, inp): ) +def test_healthz_and_routes_on_head_and_worker_nodes( + shutdown_ray, call_ray_stop_only # noqa: F811 +): + """Test `/-/healthz` and `/-/routes` return the correct responses for head and + worker nodes. + + When there are replicas on all nodes, `/-/routes` and `/-/routes` on all nodes + should return 200. When there are no replicas on any nodes, `/-/routes` and + `/-/routes` on the head node should continue to return 200. `/-/routes` and + `/-/routes` on the worker node should start to return 503 + """ + # Setup worker http proxy to be pointing to port 8001. Head node http proxy will + # continue to be pointing to the default port 8000. + os.environ["TEST_WORKER_NODE_PORT"] = "8001" + + # Setup a cluster with 2 nodes + cluster = Cluster() + cluster.add_node(num_cpus=3) + cluster.add_node(num_cpus=3) + cluster.wait_for_nodes() + ray.init(address=cluster.address) + serve.start(http_options={"location": "EveryNode"}) + + # Deploy 2 replicas, one to each node + @serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 2}) + class HelloModel: + def __call__(self): + return "hello" + + model = HelloModel.bind() + serve.run(target=model) + + # Ensure total actors of 2 proxies, 1 controller, and 2 replicas, and 2 nodes exist. + wait_for_condition(lambda: len(ray._private.state.actors()) == 5) + assert len(ray.nodes()) == 2 + + # Ensure `/-/healthz` and `/-/routes` return 200 and expected responses + # on both nodes. + assert requests.get("http://127.0.0.1:8000/-/healthz").status_code == 200 + assert requests.get("http://127.0.0.1:8000/-/healthz").text == "success" + assert requests.get("http://127.0.0.1:8000/-/routes").status_code == 200 + assert ( + requests.get("http://127.0.0.1:8000/-/routes").text + == '{"/":"default_HelloModel"}' + ) + assert requests.get("http://127.0.0.1:8001/-/healthz").status_code == 200 + assert requests.get("http://127.0.0.1:8001/-/healthz").text == "success" + assert requests.get("http://127.0.0.1:8001/-/routes").status_code == 200 + assert ( + requests.get("http://127.0.0.1:8001/-/routes").text + == '{"/":"default_HelloModel"}' + ) + + # Delete the deployment should bring the active actors down to 3 and drop + # replicas on all nodes. + serve.delete(name="default") + + def _check(): + _actors = ray._private.state.actors().values() + return ( + len( + list( + filter( + lambda a: a["State"] == "ALIVE", + _actors, + ) + ) + ) + == 3 + ) + + wait_for_condition(_check) + + # Ensure head node `/-/healthz` and `/-/routes` continue to return 200 and expected + # responses. Also, the worker node `/-/healthz` and `/-/routes` should return 503 + # and unavailable responses. + assert requests.get("http://127.0.0.1:8000/-/healthz").text == "success" + assert requests.get("http://127.0.0.1:8000/-/healthz").status_code == 200 + assert requests.get("http://127.0.0.1:8000/-/routes").text == "{}" + assert requests.get("http://127.0.0.1:8000/-/routes").status_code == 200 + assert ( + requests.get("http://127.0.0.1:8001/-/healthz").text + == "This node has no replica. Http proxy is unavailable." + ) + assert requests.get("http://127.0.0.1:8001/-/healthz").status_code == 503 + assert ( + requests.get("http://127.0.0.1:8001/-/routes").text + == "This node has no replica. Http proxy is unavailable." + ) + assert requests.get("http://127.0.0.1:8001/-/routes").status_code == 503 + + # Clean up serve. + serve.shutdown() + + if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__])) From 5cf7163f9ff2f57eb45380d0d6de5ed3451d043f Mon Sep 17 00:00:00 2001 From: Gene Su Date: Wed, 21 Jun 2023 10:20:59 -0700 Subject: [PATCH 06/15] optimize set active call Signed-off-by: Gene Su --- python/ray/serve/_private/http_state.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/ray/serve/_private/http_state.py b/python/ray/serve/_private/http_state.py index 70955625fac8..63fe97cd9d9f 100644 --- a/python/ray/serve/_private/http_state.py +++ b/python/ray/serve/_private/http_state.py @@ -441,7 +441,5 @@ def update_active_flags(self, active_nodes: Set[str]): if node_id == self._head_node_id: continue - if node_id in active_nodes: - proxy_state.set_active_flag(node_id=node_id, active=True) - else: - proxy_state.set_active_flag(node_id=node_id, active=False) + active = node_id in active_nodes + proxy_state.set_active_flag(node_id=node_id, active=active) From a86ed1963e1e9b74a12713e926b48871e12b7e0b Mon Sep 17 00:00:00 2001 From: Gene Su Date: Wed, 21 Jun 2023 10:36:05 -0700 Subject: [PATCH 07/15] add new INACTIVE to sort order and color map Signed-off-by: Gene Su --- dashboard/client/src/components/StatusChip.tsx | 1 + dashboard/client/src/pages/serve/hook/useServeApplications.ts | 1 + 2 files changed, 2 insertions(+) diff --git a/dashboard/client/src/components/StatusChip.tsx b/dashboard/client/src/components/StatusChip.tsx index 4437321ac7e2..d138363f13bd 100644 --- a/dashboard/client/src/components/StatusChip.tsx +++ b/dashboard/client/src/components/StatusChip.tsx @@ -78,6 +78,7 @@ const colorMap = { [ServeSystemActorStatus.HEALTHY]: green, [ServeSystemActorStatus.UNHEALTHY]: red, [ServeSystemActorStatus.STARTING]: orange, + [ServeSystemActorStatus.INACTIVE]: white, }, serveController: { [ServeSystemActorStatus.HEALTHY]: green, diff --git a/dashboard/client/src/pages/serve/hook/useServeApplications.ts b/dashboard/client/src/pages/serve/hook/useServeApplications.ts index 1f7fdfb759ca..f48463609e82 100644 --- a/dashboard/client/src/pages/serve/hook/useServeApplications.ts +++ b/dashboard/client/src/pages/serve/hook/useServeApplications.ts @@ -13,6 +13,7 @@ const SERVE_HTTP_PROXY_STATUS_SORT_ORDER: Record< [ServeSystemActorStatus.UNHEALTHY]: 0, [ServeSystemActorStatus.STARTING]: 1, [ServeSystemActorStatus.HEALTHY]: 2, + [ServeSystemActorStatus.INACTIVE]: 3, }; export const useServeApplications = () => { From c04edc4c91a483c42f119580d9325e07a0318324 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Wed, 21 Jun 2023 10:54:55 -0700 Subject: [PATCH 08/15] use blueGrey for inactive Signed-off-by: Gene Su --- dashboard/client/src/components/StatusChip.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dashboard/client/src/components/StatusChip.tsx b/dashboard/client/src/components/StatusChip.tsx index d138363f13bd..4b325c3ed42f 100644 --- a/dashboard/client/src/components/StatusChip.tsx +++ b/dashboard/client/src/components/StatusChip.tsx @@ -78,7 +78,7 @@ const colorMap = { [ServeSystemActorStatus.HEALTHY]: green, [ServeSystemActorStatus.UNHEALTHY]: red, [ServeSystemActorStatus.STARTING]: orange, - [ServeSystemActorStatus.INACTIVE]: white, + [ServeSystemActorStatus.INACTIVE]: blueGrey, }, serveController: { [ServeSystemActorStatus.HEALTHY]: green, From f37d17d6ea78855381216ca35aa5dd66a13095b8 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Wed, 21 Jun 2023 14:00:03 -0700 Subject: [PATCH 09/15] address comments Signed-off-by: Gene Su --- .../client/src/components/StatusChip.tsx | 2 +- .../pages/serve/hook/useServeApplications.ts | 2 +- dashboard/client/src/type/serve.ts | 2 +- python/ray/serve/_private/common.py | 2 +- python/ray/serve/_private/deployment_state.py | 28 ++++---- python/ray/serve/_private/http_proxy.py | 71 +++++++++++-------- python/ray/serve/_private/http_state.py | 37 +++++----- python/ray/serve/controller.py | 12 ++-- .../ray/serve/tests/test_deployment_state.py | 4 +- python/ray/serve/tests/test_http_state.py | 18 ++--- python/ray/serve/tests/test_standalone3.py | 4 +- 11 files changed, 102 insertions(+), 80 deletions(-) diff --git a/dashboard/client/src/components/StatusChip.tsx b/dashboard/client/src/components/StatusChip.tsx index 4b325c3ed42f..4919d5465b08 100644 --- a/dashboard/client/src/components/StatusChip.tsx +++ b/dashboard/client/src/components/StatusChip.tsx @@ -78,7 +78,7 @@ const colorMap = { [ServeSystemActorStatus.HEALTHY]: green, [ServeSystemActorStatus.UNHEALTHY]: red, [ServeSystemActorStatus.STARTING]: orange, - [ServeSystemActorStatus.INACTIVE]: blueGrey, + [ServeSystemActorStatus.DRAINING]: blueGrey, }, serveController: { [ServeSystemActorStatus.HEALTHY]: green, diff --git a/dashboard/client/src/pages/serve/hook/useServeApplications.ts b/dashboard/client/src/pages/serve/hook/useServeApplications.ts index f48463609e82..0d35dc6d2794 100644 --- a/dashboard/client/src/pages/serve/hook/useServeApplications.ts +++ b/dashboard/client/src/pages/serve/hook/useServeApplications.ts @@ -13,7 +13,7 @@ const SERVE_HTTP_PROXY_STATUS_SORT_ORDER: Record< [ServeSystemActorStatus.UNHEALTHY]: 0, [ServeSystemActorStatus.STARTING]: 1, [ServeSystemActorStatus.HEALTHY]: 2, - [ServeSystemActorStatus.INACTIVE]: 3, + [ServeSystemActorStatus.DRAINING]: 3, }; export const useServeApplications = () => { diff --git a/dashboard/client/src/type/serve.ts b/dashboard/client/src/type/serve.ts index c6cf0cc7dc62..5f848dca3942 100644 --- a/dashboard/client/src/type/serve.ts +++ b/dashboard/client/src/type/serve.ts @@ -91,7 +91,7 @@ export enum ServeSystemActorStatus { STARTING = "STARTING", HEALTHY = "HEALTHY", UNHEALTHY = "UNHEALTHY", - INACTIVE = "INACTIVE", + DRAINING = "DRAINING", } export type ServeSystemActor = { diff --git a/python/ray/serve/_private/common.py b/python/ray/serve/_private/common.py index ae168d9b90f4..628ba2336eff 100644 --- a/python/ray/serve/_private/common.py +++ b/python/ray/serve/_private/common.py @@ -376,7 +376,7 @@ class HTTPProxyStatus(str, Enum): STARTING = "STARTING" HEALTHY = "HEALTHY" UNHEALTHY = "UNHEALTHY" - INACTIVE = "INACTIVE" + DRAINING = "DRAINING" class ServeComponentType(str, Enum): diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index bcbf94b0ed69..f9ee5f7af0e9 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -1227,16 +1227,18 @@ def get_running_replica_infos(self) -> List[RunningReplicaInfo]: for replica in self._replicas.get([ReplicaState.RUNNING]) ] - def get_running_replica_node_ids(self) -> List[str]: + def get_running_replica_node_ids(self) -> Set[str]: """Get the node ids of all running replicas in this deployment. - This is used to determine which node has replicas and in terms determine - the active flag on the http proxy. + This is used to determine which node has replicas. Only nodes with replicas and + head node should have active proxies. """ - return [ - replica.actor_node_id - for replica in self._replicas.get([ReplicaState.RUNNING]) - ] + return set( + [ + replica.actor_node_id + for replica in self._replicas.get([ReplicaState.RUNNING]) + ] + ) def list_replica_details(self) -> List[ReplicaDetails]: return [replica.actor_details for replica in self._replicas.get()] @@ -2497,12 +2499,12 @@ def record_multiplexed_replica_info(self, info: MultiplexedReplicaInfo): ) def get_node_ids_with_running_replicas(self) -> Set[str]: - """Return set of node ids with running replicas. + """Return set of node ids with running replicas of any deployment. - This is used to determine which node has replicas and in terms determine - the active flag on the http proxy. + This is used to determine which node has replicas. Only nodes with replicas and + head node should have active proxies. """ - running_replicas = set() + node_ids = set() for deployment_state in self._deployment_states.values(): - running_replicas.update(deployment_state.get_running_replica_node_ids()) - return running_replicas + node_ids.update(deployment_state.get_running_replica_node_ids()) + return node_ids diff --git a/python/ray/serve/_private/http_proxy.py b/python/ray/serve/_private/http_proxy.py index c0df372ddbd5..65d143be2223 100644 --- a/python/ray/serve/_private/http_proxy.py +++ b/python/ray/serve/_private/http_proxy.py @@ -244,8 +244,8 @@ def get_handle(name): ), ) self._ongoing_requests = 0 - self._ongoing_requests_dummy_obj_ref = None - self.active = True + self._prevent_node_downscale_ref = None + self.draining = False def _update_routes(self, endpoints: Dict[EndpointTag, EndpointInfo]) -> None: self.route_info: Dict[str, Tuple[EndpointTag, List[str]]] = dict() @@ -276,9 +276,9 @@ async def _not_found(self, scope, receive, send): ) await response.send(scope, receive, send) - async def _service_unavailable(self, scope, receive, send): + async def _draining_response(self, scope, receive, send): response = Response( - "This node has no replica. Http proxy is unavailable.", + "This node is being drained.", status_code=503, ) await response.send(scope, receive, send) @@ -291,6 +291,30 @@ async def receive_asgi_messages(self, request_id: str) -> List[Message]: await queue.wait_for_message() return queue.get_messages_nowait() + def _ongoing_requests_start(self): + """Ongoing requests start. + + The current autoscale logic can downscale nodes with ongoing requests if the + node doesn't have replicas and has no object references. This counter and + the dummy object reference will have to keep the node alive while draining + requests, so they are not dropped unintentionally. + """ + self._ongoing_requests += 1 + if self._ongoing_requests > 0 and self._prevent_node_downscale_ref is None: + logger.info("Putting keep alive object reference to prevent downscaling.") + self._prevent_node_downscale_ref = ray.put("ongoing_requests") + + def _ongoing_requests_end(self): + """Ongoing requests end. + + Decrement the ongoing request counter and drop the dummy object reference + signaling that the node can be downscaled safely. + """ + self._ongoing_requests -= 1 + if self._ongoing_requests == 0: + logger.info("Dropping keep alive object reference to allow downscaling.") + self._prevent_node_downscale_ref = None + async def __call__(self, scope, receive, send): """Implements the ASGI protocol. @@ -306,8 +330,8 @@ async def __call__(self, scope, receive, send): route_path = scope["path"][len(root_path) :] if route_path == "/-/routes": - if not self.active: - return await self._service_unavailable(scope, receive, send) + if self.draining: + return await self._draining_response(scope, receive, send) self.request_counter.inc( tags={ @@ -322,8 +346,8 @@ async def __call__(self, scope, receive, send): ) if route_path == "/-/healthz": - if not self.active: - return await self._service_unavailable(scope, receive, send) + if self.draining: + return await self._draining_response(scope, receive, send) self.request_counter.inc( tags={ @@ -337,13 +361,7 @@ async def __call__(self, scope, receive, send): scope, receive, send ) - # The current autoscale logic can downscale nodes with ongoing requests if the - # node doesn't have replicas and has no object references. This counter and - # the dummy object reference will have to keep the node alive while draining - # requests, so they are not dropped unintentionally. - self._ongoing_requests += 1 - if self._ongoing_requests > 0 and self._ongoing_requests_dummy_obj_ref is None: - self._ongoing_requests_dummy_obj_ref = ray.put("ongoing_requests") + self._ongoing_requests_start() route_prefix, handle, app_name = self.prefix_router.match_route(route_path) if route_prefix is None: @@ -440,11 +458,7 @@ async def __call__(self, scope, receive, send): } ) - # Decrement the ongoing request counter and drop the dummy object reference - # signaling that the node can be downscaled safely. - self._ongoing_requests -= 1 - if self._ongoing_requests == 0: - self._ongoing_requests_dummy_obj_ref = None + self._ongoing_requests_end() async def send_request_to_replica_unary( self, @@ -663,7 +677,7 @@ def __init__( self.wrapped_app = middleware.cls(self.wrapped_app, **middleware.options) # Start running the HTTP server on the event loop. - # This task should be running forever. we track it in case of failure. + # This task should be running forever. We track it in case of failure. self.running_task = get_or_create_event_loop().create_task(self.run()) async def ready(self): @@ -742,12 +756,13 @@ async def check_health(self): async def receive_asgi_messages(self, request_id: str) -> bytes: return pickle.dumps(await self.app.receive_asgi_messages(request_id)) - async def set_active_flag(self, node_id: str, active: bool): - """Set the active flag on the http proxy. + async def set_draining_flag(self, node_id: str, draining: bool): + """Set the draining flag on the http proxy. - Set the active flag on the http proxy to signal `/-/healthz` and `/-/routes` - endpoints returns 503 on inactive proxies. Also log when active state changes. + Set the draining flag on the http proxy to signal `/-/healthz` and `/-/routes` + endpoints returns 503 on draining proxies. Also log when draining state + changes. """ - if self.app.active != active: - logger.info(f"Setting active flag on node {node_id} to {active}.") - self.app.active = active + if self.app.draining != draining: + logger.info(f"Setting draining flag on node {node_id} to {draining}.") + self.app.draining = draining diff --git a/python/ray/serve/_private/http_state.py b/python/ray/serve/_private/http_state.py index 63fe97cd9d9f..94ac5a29cdf6 100644 --- a/python/ray/serve/_private/http_state.py +++ b/python/ray/serve/_private/http_state.py @@ -77,15 +77,15 @@ def set_status(self, status: HTTPProxyStatus) -> None: self._status = status self.update_actor_details(status=self._status) - def set_active_flag(self, node_id: str, active: bool): - """Set the active flag on the http proxy. + def set_draining_flag(self, node_id: str, draining: bool): + """Set the draining flag on the http proxy. - Set the active flag on the http proxy. When the flag is set to false, also - update status to from HEALTHY to INACTIVE to display on the dashboard. + Set the draining flag on the http proxy. When the flag is set to false, also + update status to from HEALTHY to DRAINING to display on the dashboard. """ - self._actor_handle.set_active_flag.remote(node_id=node_id, active=active) - if self._status == HTTPProxyStatus.HEALTHY and not active: - self.try_update_status(HTTPProxyStatus.INACTIVE) + self._actor_handle.set_draining_flag.remote(node_id=node_id, draining=draining) + if self._status == HTTPProxyStatus.HEALTHY and draining: + self.try_update_status(HTTPProxyStatus.DRAINING) def try_update_status(self, status: HTTPProxyStatus): """Try update with the new status and only update when the conditions are met. @@ -336,10 +336,15 @@ def _generate_actor_name(self, node_id: str) -> str: def _start_proxy( self, name: str, node_id: str, node_ip_address: str ) -> ActorHandle: + """Helper to start a single HTTP proxy. + + Takes the name of the proxy, the node id, and the node ip address. and creates a + new HTTPProxyActor actor handle for the proxy. Also, setting up + `TEST_WORKER_NODE_PORT` env var will help head node and worker nodes to be + opening on different ports. + """ port = self._config.port - # This is used for test. Setting up `TEST_WORKER_NODE_PORT` env var will help - # head node and worker nodes to be opening on different ports. if ( node_id != self._head_node_id and os.getenv("TEST_WORKER_NODE_PORT") is not None @@ -430,16 +435,16 @@ async def ensure_http_route_exists(self, endpoint: EndpointTag, timeout_s: float ] ) - def update_active_flags(self, active_nodes: Set[str]): - """Update the active states of all HTTP proxies. + def update_draining_flags(self, active_nodes: Set[str]): + """Update the draining states of all HTTP proxies. - Given a set of active nodes, set the active flag of all HTTP proxies, except - for head node. Head node will always be active. + Given a set of active nodes, set the draining flag of all HTTP proxies, except + for head node. Head node will never be draining. """ for node_id, proxy_state in self._proxy_states.items(): - # Head node will always be active. + # Head node will always be draining. if node_id == self._head_node_id: continue - active = node_id in active_nodes - proxy_state.set_active_flag(node_id=node_id, active=active) + draining = node_id not in active_nodes + proxy_state.set_draining_flag(node_id=node_id, draining=draining) diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index 45e949e98d45..d0ceedb5f519 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -271,16 +271,16 @@ def get_http_proxy_names(self) -> bytes: ) return actor_name_list.SerializeToString() - def _update_http_proxy_active_flag(self): - """Update the active flag of all http proxies. + def _update_http_proxy_draining_flag(self): + """Update the draining flag of all http proxies. - Get a list of node ids that have running replicas and update the active on http - proxies. + Get a list of node ids that have running replicas and update the draining flag + on http proxies. """ if self.http_state is None: return - self.http_state.update_active_flags( + self.http_state.update_draining_flags( self.deployment_state_manager.get_node_ids_with_running_replicas() ) @@ -322,7 +322,7 @@ async def run_control_loop(self) -> None: except Exception: logger.exception("Exception updating application state.") - self._update_http_proxy_active_flag() + self._update_http_proxy_draining_flag() try: self._put_serve_snapshot() diff --git a/python/ray/serve/tests/test_deployment_state.py b/python/ray/serve/tests/test_deployment_state.py index da0b5dc8994a..e330ae3be38d 100644 --- a/python/ray/serve/tests/test_deployment_state.py +++ b/python/ray/serve/tests/test_deployment_state.py @@ -2722,7 +2722,7 @@ def test_get_node_ids_with_running_replicas( by_state=[(ReplicaState.STARTING, 3)], ) mocked_replicas = deployment_state._replicas.get() - assert deployment_state.get_running_replica_node_ids() == [] + assert deployment_state.get_running_replica_node_ids() == set() assert deployment_state_manager.get_node_ids_with_running_replicas() == set() # When the replicas are in RUNNING state, `get_running_replica_node_ids()` should @@ -2740,7 +2740,7 @@ def test_get_node_ids_with_running_replicas( version=version1, by_state=[(ReplicaState.RUNNING, 3)], ) - assert deployment_state.get_running_replica_node_ids() == list(node_ids) + assert deployment_state.get_running_replica_node_ids() == set(node_ids) assert deployment_state_manager.get_node_ids_with_running_replicas() == set( node_ids ) diff --git a/python/ray/serve/tests/test_http_state.py b/python/ray/serve/tests/test_http_state.py index 6d04f070656b..a5c29ac67de3 100644 --- a/python/ray/serve/tests/test_http_state.py +++ b/python/ray/serve/tests/test_http_state.py @@ -534,11 +534,11 @@ def test_http_proxy_state_update_unhealthy_check_health_succeed(): assert proxy_state._consecutive_health_check_failures == 0 -def test_update_active_flags(mock_get_all_node_ids): - """Test update_active_flags() method +def test_update_draining_flags(mock_get_all_node_ids): + """Test update_draining_flags() method - When update nodes to inactive, head node http proxy should always be active while - worker node http proxy should change to inactive. + When update nodes to inactive, head node http proxy should never be draining while + worker node http proxy should change to draining. """ worker_node_id = "worker-node-id-0" state = _make_http_state(HTTPOptions(location=DeploymentMode.EveryNode)) @@ -566,21 +566,21 @@ def test_update_active_flags(mock_get_all_node_ids): node_ip_address="bar", ) - # Update flag for no nodes active - state.update_active_flags(set()) + # Update flag for no nodes draining + state.update_draining_flags(set()) - # Head node proxy should continue to be active + # Head node proxy should continue to be HEALTHY wait_for_condition( condition_predictor=_update_and_check_proxy_status, state=state._proxy_states[HEAD_NODE_ID], status=HTTPProxyStatus.HEALTHY, ) - # Worker node proxy should turn inactive + # Worker node proxy should turn DRAINING wait_for_condition( condition_predictor=_update_and_check_proxy_status, state=state._proxy_states[worker_node_id], - status=HTTPProxyStatus.INACTIVE, + status=HTTPProxyStatus.DRAINING, ) diff --git a/python/ray/serve/tests/test_standalone3.py b/python/ray/serve/tests/test_standalone3.py index 972bdea77f96..d27f725404d5 100644 --- a/python/ray/serve/tests/test_standalone3.py +++ b/python/ray/serve/tests/test_standalone3.py @@ -500,12 +500,12 @@ def _check(): assert requests.get("http://127.0.0.1:8000/-/routes").status_code == 200 assert ( requests.get("http://127.0.0.1:8001/-/healthz").text - == "This node has no replica. Http proxy is unavailable." + == "This node is being drained." ) assert requests.get("http://127.0.0.1:8001/-/healthz").status_code == 503 assert ( requests.get("http://127.0.0.1:8001/-/routes").text - == "This node has no replica. Http proxy is unavailable." + == "This node is being drained." ) assert requests.get("http://127.0.0.1:8001/-/routes").status_code == 503 From d60036b95aabd9f0b48848b411458f58d25e9360 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Wed, 21 Jun 2023 14:10:04 -0700 Subject: [PATCH 10/15] use set comprehension Signed-off-by: Gene Su --- python/ray/serve/_private/deployment_state.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index f9ee5f7af0e9..6d015c14f447 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -1234,10 +1234,8 @@ def get_running_replica_node_ids(self) -> Set[str]: head node should have active proxies. """ return set( - [ - replica.actor_node_id - for replica in self._replicas.get([ReplicaState.RUNNING]) - ] + replica.actor_node_id + for replica in self._replicas.get([ReplicaState.RUNNING]) ) def list_replica_details(self) -> List[ReplicaDetails]: From d8887fac102aa4d1ca7b6b70f59c2a13b90e7f3f Mon Sep 17 00:00:00 2001 From: Gene Su Date: Wed, 21 Jun 2023 14:53:29 -0700 Subject: [PATCH 11/15] use {} syntax Signed-off-by: Gene Su --- python/ray/serve/_private/deployment_state.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 6d015c14f447..a0f9ab5c9a91 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -1233,10 +1233,10 @@ def get_running_replica_node_ids(self) -> Set[str]: This is used to determine which node has replicas. Only nodes with replicas and head node should have active proxies. """ - return set( + return { replica.actor_node_id for replica in self._replicas.get([ReplicaState.RUNNING]) - ) + } def list_replica_details(self) -> List[ReplicaDetails]: return [replica.actor_details for replica in self._replicas.get()] From da718afdf22af859b36edd322cca66d12199a747 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Thu, 22 Jun 2023 17:54:26 -0700 Subject: [PATCH 12/15] address comments and use long poll to pass active nodes Signed-off-by: Gene Su --- python/ray/serve/_private/deployment_state.py | 17 +- python/ray/serve/_private/http_proxy.py | 256 ++++++++++-------- python/ray/serve/_private/http_state.py | 83 +++--- python/ray/serve/_private/long_poll.py | 1 + python/ray/serve/controller.py | 28 +- .../ray/serve/tests/test_deployment_state.py | 45 +-- python/ray/serve/tests/test_http_state.py | 79 ++++-- 7 files changed, 308 insertions(+), 201 deletions(-) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index a0f9ab5c9a91..80152f4a3272 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -1227,16 +1227,19 @@ def get_running_replica_infos(self) -> List[RunningReplicaInfo]: for replica in self._replicas.get([ReplicaState.RUNNING]) ] - def get_running_replica_node_ids(self) -> Set[str]: + def get_active_node_ids(self) -> Set[str]: """Get the node ids of all running replicas in this deployment. This is used to determine which node has replicas. Only nodes with replicas and head node should have active proxies. """ - return { - replica.actor_node_id - for replica in self._replicas.get([ReplicaState.RUNNING]) - } + active_states = [ + ReplicaState.STARTING, + ReplicaState.UPDATING, + ReplicaState.RECOVERING, + ReplicaState.RUNNING, + ] + return {replica.actor_node_id for replica in self._replicas.get(active_states)} def list_replica_details(self) -> List[ReplicaDetails]: return [replica.actor_details for replica in self._replicas.get()] @@ -2496,7 +2499,7 @@ def record_multiplexed_replica_info(self, info: MultiplexedReplicaInfo): info.replica_tag, info.model_ids ) - def get_node_ids_with_running_replicas(self) -> Set[str]: + def get_active_node_ids(self) -> Set[str]: """Return set of node ids with running replicas of any deployment. This is used to determine which node has replicas. Only nodes with replicas and @@ -2504,5 +2507,5 @@ def get_node_ids_with_running_replicas(self) -> Set[str]: """ node_ids = set() for deployment_state in self._deployment_states.values(): - node_ids.update(deployment_state.get_running_replica_node_ids()) + node_ids.update(deployment_state.get_active_node_ids()) return node_ids diff --git a/python/ray/serve/_private/http_proxy.py b/python/ray/serve/_private/http_proxy.py index 65d143be2223..23dedef5f0af 100644 --- a/python/ray/serve/_private/http_proxy.py +++ b/python/ray/serve/_private/http_proxy.py @@ -6,7 +6,7 @@ import pickle import socket import time -from typing import Callable, Dict, List, Optional, Tuple +from typing import Callable, Dict, List, Optional, Set, Tuple import uvicorn import starlette.responses @@ -28,7 +28,7 @@ Response, set_socket_reuse_port, ) -from ray.serve._private.common import EndpointInfo, EndpointTag, ApplicationName +from ray.serve._private.common import EndpointInfo, EndpointTag, ApplicationName, NodeId from ray.serve._private.constants import ( SERVE_LOGGER_NAME, SERVE_MULTIPLEXED_MODEL_ID, @@ -165,7 +165,8 @@ class HTTPProxy: >>> uvicorn.run(HTTPProxy(controller_name)) # doctest: +SKIP """ - def __init__(self, controller_name: str): + def __init__(self, controller_name: str, node_id: NodeId): + self._node_id = node_id # Set the controller name so that serve will connect to the # controller instance this proxy is running in. ray.serve.context._set_internal_replica_context( @@ -198,6 +199,7 @@ def get_handle(name): ray.get_actor(controller_name, namespace=SERVE_NAMESPACE), { LongPollNamespace.ROUTE_TABLE: self._update_routes, + LongPollNamespace.ACTIVE_NODES: self._update_draining, }, call_in_event_loop=get_or_create_event_loop(), ) @@ -243,9 +245,14 @@ def get_handle(name): "status_code", ), ) + # `self._ongoing_requests` is used to count the number of ongoing requests + # and determine whether to set/unset `self._prevent_node_downscale_ref` self._ongoing_requests = 0 + # `self._prevent_node_downscale_ref` is used to prevent the node from being + # downscaled when there are ongoing requests self._prevent_node_downscale_ref = None - self.draining = False + # `self._draining` is used to indicate whether the node is the draining state. + self._draining = False def _update_routes(self, endpoints: Dict[EndpointTag, EndpointInfo]) -> None: self.route_info: Dict[str, Tuple[EndpointTag, List[str]]] = dict() @@ -255,6 +262,24 @@ def _update_routes(self, endpoints: Dict[EndpointTag, EndpointInfo]) -> None: self.prefix_router.update_routes(endpoints) + def _update_draining(self, active_nodes: Set[str]): + """Update draining flag on http proxy. + + This is a callback for when controller detects there being a change in active + nodes. Each http proxy will check if it's nodes is still active and set + draining flag accordingly. Also, log a message when the draining flag is + changed. + """ + draining = self._node_id not in active_nodes + if draining != self._draining: + logger.info(f"Setting draining flag on node {self._node_id} to {draining}.") + self._draining = draining + + # Since the draining flag is changed, we need to check if + # `self._prevent_node_downscale_ref` is set to prevent the node from being + # downscaled when there are ongoing requests. + self._try_set_prevent_downscale_ref() + async def block_until_endpoint_exists( self, endpoint: EndpointTag, timeout_s: float ): @@ -291,18 +316,36 @@ async def receive_asgi_messages(self, request_id: str) -> List[Message]: await queue.wait_for_message() return queue.get_messages_nowait() + def _try_set_prevent_downscale_ref(self): + """Try to set put a primary copy of object in the object store to prevent node + from downscale. + + The only time we need to put the object store is when there are ongoing + requests, the node is not draining, and the object reference is not set yet. + This should be checked when either `self._ongoing_requests` or `self._draining` + is changed. Also, log when the object reference is set. + """ + if ( + self._ongoing_requests > 0 + and self._draining + and self._prevent_node_downscale_ref is None + ): + logger.info("Putting keep alive object reference to prevent downscaling.") + self._prevent_node_downscale_ref = ray.put("ongoing_requests") + def _ongoing_requests_start(self): """Ongoing requests start. The current autoscale logic can downscale nodes with ongoing requests if the - node doesn't have replicas and has no object references. This counter and - the dummy object reference will have to keep the node alive while draining - requests, so they are not dropped unintentionally. + node doesn't have replicas and has no primary copies of objects in the object + store. The counter and the dummy object reference will help to keep the node + alive while draining requests, so they are not dropped unintentionally. """ self._ongoing_requests += 1 - if self._ongoing_requests > 0 and self._prevent_node_downscale_ref is None: - logger.info("Putting keep alive object reference to prevent downscaling.") - self._prevent_node_downscale_ref = ray.put("ongoing_requests") + # Since the ongoing request is changed, we need to check if + # `self._prevent_node_downscale_ref` is set to prevent the node from being + # downscaled when the draining flag is true. + self._try_set_prevent_downscale_ref() def _ongoing_requests_end(self): """Ongoing requests end. @@ -330,7 +373,7 @@ async def __call__(self, scope, receive, send): route_path = scope["path"][len(root_path) :] if route_path == "/-/routes": - if self.draining: + if self._draining: return await self._draining_response(scope, receive, send) self.request_counter.inc( @@ -346,7 +389,7 @@ async def __call__(self, scope, receive, send): ) if route_path == "/-/healthz": - if self.draining: + if self._draining: return await self._draining_response(scope, receive, send) self.request_counter.inc( @@ -361,104 +404,107 @@ async def __call__(self, scope, receive, send): scope, receive, send ) - self._ongoing_requests_start() - - route_prefix, handle, app_name = self.prefix_router.match_route(route_path) - if route_prefix is None: - self.request_error_counter.inc( - tags={ - "route": route_path, - "error_code": "404", - "method": method, - } - ) - self.request_counter.inc( - tags={ - "route": route_path, - "method": method, - "application": "", - "status_code": "404", - } - ) - return await self._not_found(scope, receive, send) - - # Modify the path and root path so that reverse lookups and redirection - # work as expected. We do this here instead of in replicas so it can be - # changed without restarting the replicas. - if route_prefix != "/": - assert not route_prefix.endswith("/") - scope["path"] = route_path.replace(route_prefix, "", 1) - scope["root_path"] = root_path + route_prefix - - request_id = get_random_letters(10) - request_context_info = { - "route": route_path, - "request_id": request_id, - "app_name": app_name, - } - start_time = time.time() - for key, value in scope.get("headers", []): - if key.decode() == SERVE_MULTIPLEXED_MODEL_ID: - request_context_info["multiplexed_model_id"] = value.decode() - break - ray.serve.context._serve_request_context.set( - ray.serve.context.RequestContext(**request_context_info) - ) - - if RAY_SERVE_ENABLE_EXPERIMENTAL_STREAMING: - status_code = await self.send_request_to_replica_streaming( - request_id, handle, scope, receive, send - ) - else: - status_code = await self.send_request_to_replica_unary( - handle, scope, receive, send - ) - - self.request_counter.inc( - tags={ + try: + self._ongoing_requests_start() + + route_prefix, handle, app_name = self.prefix_router.match_route(route_path) + if route_prefix is None: + self.request_error_counter.inc( + tags={ + "route": route_path, + "error_code": "404", + "method": method, + } + ) + self.request_counter.inc( + tags={ + "route": route_path, + "method": method, + "application": "", + "status_code": "404", + } + ) + return await self._not_found(scope, receive, send) + + # Modify the path and root path so that reverse lookups and redirection + # work as expected. We do this here instead of in replicas so it can be + # changed without restarting the replicas. + if route_prefix != "/": + assert not route_prefix.endswith("/") + scope["path"] = route_path.replace(route_prefix, "", 1) + scope["root_path"] = root_path + route_prefix + + request_id = get_random_letters(10) + request_context_info = { "route": route_path, - "method": method, - "application": app_name, - "status_code": status_code, + "request_id": request_id, + "app_name": app_name, } - ) + start_time = time.time() + for key, value in scope.get("headers", []): + if key.decode() == SERVE_MULTIPLEXED_MODEL_ID: + request_context_info["multiplexed_model_id"] = value.decode() + break + ray.serve.context._serve_request_context.set( + ray.serve.context.RequestContext(**request_context_info) + ) - latency_ms = (time.time() - start_time) * 1000.0 - self.processing_latency_tracker.observe( - latency_ms, - tags={ - "route": route_path, - "application": app_name, - "status_code": status_code, - }, - ) - logger.info( - access_log_msg( - method=method, - status=str(status_code), - latency_ms=latency_ms, - ), - extra={"log_to_stderr": False}, - ) - if status_code != "200": - self.request_error_counter.inc( + if RAY_SERVE_ENABLE_EXPERIMENTAL_STREAMING: + status_code = await self.send_request_to_replica_streaming( + request_id, handle, scope, receive, send + ) + else: + status_code = await self.send_request_to_replica_unary( + handle, scope, receive, send + ) + + self.request_counter.inc( tags={ "route": route_path, - "error_code": status_code, "method": method, + "application": app_name, + "status_code": status_code, } ) - self.deployment_request_error_counter.inc( + + latency_ms = (time.time() - start_time) * 1000.0 + self.processing_latency_tracker.observe( + latency_ms, tags={ - "deployment": handle.deployment_name, - "error_code": status_code, - "method": method, "route": route_path, "application": app_name, - } + "status_code": status_code, + }, ) - - self._ongoing_requests_end() + logger.info( + access_log_msg( + method=method, + status=str(status_code), + latency_ms=latency_ms, + ), + extra={"log_to_stderr": False}, + ) + if status_code != "200": + self.request_error_counter.inc( + tags={ + "route": route_path, + "error_code": status_code, + "method": method, + } + ) + self.deployment_request_error_counter.inc( + tags={ + "deployment": handle.deployment_name, + "error_code": status_code, + "method": method, + "route": route_path, + "application": app_name, + } + ) + finally: + # If anything during the request failed, we still want to ensure the ongoing + # request counter is decremented and possibly reset the keep alive object. + self._ongoing_requests_end() async def send_request_to_replica_unary( self, @@ -655,6 +701,7 @@ def __init__( root_path: str, controller_name: str, node_ip_address: str, + node_id: NodeId, http_middlewares: Optional[List["starlette.middleware.Middleware"]] = None, ): # noqa: F821 configure_component_logger( @@ -670,7 +717,7 @@ def __init__( self.setup_complete = asyncio.Event() - self.app = HTTPProxy(controller_name) + self.app = HTTPProxy(controller_name, node_id) self.wrapped_app = self.app for middleware in http_middlewares: @@ -755,14 +802,3 @@ async def check_health(self): async def receive_asgi_messages(self, request_id: str) -> bytes: return pickle.dumps(await self.app.receive_asgi_messages(request_id)) - - async def set_draining_flag(self, node_id: str, draining: bool): - """Set the draining flag on the http proxy. - - Set the draining flag on the http proxy to signal `/-/healthz` and `/-/routes` - endpoints returns 503 on draining proxies. Also log when draining state - changes. - """ - if self.app.draining != draining: - logger.info(f"Setting draining flag on node {node_id} to {draining}.") - self.app.draining = draining diff --git a/python/ray/serve/_private/http_state.py b/python/ray/serve/_private/http_state.py index 94ac5a29cdf6..671d2bec3a6f 100644 --- a/python/ray/serve/_private/http_state.py +++ b/python/ray/serve/_private/http_state.py @@ -10,9 +10,11 @@ import ray from ray.actor import ActorHandle from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy +from ray._private.utils import get_or_create_event_loop from ray._raylet import GcsClient from ray.serve.config import HTTPOptions, DeploymentMode +from ray.serve._private.long_poll import LongPollClient, LongPollNamespace from ray.serve._private.constants import ( ASYNC_CONCURRENCY, DEFAULT_HEALTH_CHECK_TIMEOUT_S, @@ -36,7 +38,12 @@ class HTTPProxyState: def __init__( - self, actor_handle: ActorHandle, actor_name: str, node_id: str, node_ip: str + self, + actor_handle: ActorHandle, + actor_name: str, + node_id: str, + node_ip: str, + controller_name: str, ): self._actor_handle = actor_handle self._actor_name = actor_name @@ -55,6 +62,14 @@ def __init__( actor_name=self._actor_name, status=self._status, ) + self._long_poll_client = LongPollClient( + ray.get_actor(controller_name, namespace=SERVE_NAMESPACE), + { + LongPollNamespace.ACTIVE_NODES: self._update_draining, + }, + call_in_event_loop=get_or_create_event_loop(), + ) + self._draining = False @property def actor_handle(self) -> ActorHandle: @@ -77,15 +92,14 @@ def set_status(self, status: HTTPProxyStatus) -> None: self._status = status self.update_actor_details(status=self._status) - def set_draining_flag(self, node_id: str, draining: bool): - """Set the draining flag on the http proxy. + def _update_draining(self, active_nodes: Set[NodeId]): + """Update draining flag on http proxy state. - Set the draining flag on the http proxy. When the flag is set to false, also - update status to from HEALTHY to DRAINING to display on the dashboard. + This is a callback for when controller detects there being a change in active + nodes. Each http proxy state will check if it's nodes is still active and set + draining flag accordingly. DRAINING status will be set in the update() call. """ - self._actor_handle.set_draining_flag.remote(node_id=node_id, draining=draining) - if self._status == HTTPProxyStatus.HEALTHY and draining: - self.try_update_status(HTTPProxyStatus.DRAINING) + self._draining = self._node_id not in active_nodes def try_update_status(self, status: HTTPProxyStatus): """Try update with the new status and only update when the conditions are met. @@ -105,8 +119,8 @@ def try_update_status(self, status: HTTPProxyStatus): self._consecutive_health_check_failures += 1 return - # Reset self._consecutive_health_check_failures when status is set to HEALTHY. - if status == HTTPProxyStatus.HEALTHY: + # Reset self._consecutive_health_check_failures when status is not UNHEALTHY. + if status != HTTPProxyStatus.UNHEALTHY: self._consecutive_health_check_failures = 0 self.set_status(status=status) @@ -131,15 +145,17 @@ def update(self): 1) When the HTTP proxy is already shutting down, do nothing. 2) When the HTTP proxy is starting, check ready object reference. If ready - object reference returns a successful call, set status to HEALTHY. If the call - to ready() on the HTTP Proxy actor has any exception or timeout, increment the - consecutive health check failure counter and retry on the next update call. The - status is only set to UNHEALTHY when all retries have exhausted. + object reference returns a successful call and the draining flag is false, set + status to HEALTHY. If the draining flag is true, set status to DRAINING. If the + call to ready() on the HTTP Proxy actor has any exception or timeout, increment + the consecutive health check failure counter and retry on the next update call. + The status is only set to UNHEALTHY when all retries have exhausted. 3) When the HTTP proxy already has an in-progress health check. If health check - object returns a successful call, set status to HEALTHY. If the call has any - exception or timeout, count towards 1 of the consecutive health check failures - and retry on the next update call. The status is only set to UNHEALTHY when all - retries have exhausted. + object returns a successful call and the draining flag is false, set status to + HEALTHY. If the draining flag is true, set status to DRAINING. If the call has + any exception or timeout, count towards 1 of the consecutive health check + failures and retry on the next update call. The status is only set to UNHEALTHY + when all retries have exhausted. 4) When the HTTP proxy need to setup another health check (when none of the above met and the time since the last health check is longer than PROXY_HEALTH_CHECK_PERIOD_S with some margin). Reset @@ -154,7 +170,12 @@ def update(self): if finished: try: worker_id, log_file_path = json.loads(ray.get(finished[0])) - self.try_update_status(HTTPProxyStatus.HEALTHY) + status = ( + HTTPProxyStatus.HEALTHY + if not self._draining + else HTTPProxyStatus.DRAINING + ) + self.try_update_status(status) self.update_actor_details( worker_id=worker_id, log_file_path=log_file_path, @@ -184,7 +205,12 @@ def update(self): self._health_check_obj_ref = None try: ray.get(finished[0]) - self.try_update_status(HTTPProxyStatus.HEALTHY) + status = ( + HTTPProxyStatus.HEALTHY + if not self._draining + else HTTPProxyStatus.DRAINING + ) + self.try_update_status(status) except Exception as e: logger.warning( f"Health check for HTTP proxy {self._actor_name} failed: {e}" @@ -369,6 +395,7 @@ def _start_proxy( self._config.root_path, controller_name=self._controller_name, node_ip_address=node_ip_address, + node_id=node_id, http_middlewares=self._config.middlewares, ) return proxy @@ -396,7 +423,7 @@ def _start_proxies_if_needed(self) -> None: ) self._proxy_states[node_id] = HTTPProxyState( - proxy, name, node_id, node_ip_address + proxy, name, node_id, node_ip_address, self._controller_name ) def _stop_proxies_if_needed(self) -> bool: @@ -434,17 +461,3 @@ async def ensure_http_route_exists(self, endpoint: EndpointTag, timeout_s: float for proxy in self._proxy_states.values() ] ) - - def update_draining_flags(self, active_nodes: Set[str]): - """Update the draining states of all HTTP proxies. - - Given a set of active nodes, set the draining flag of all HTTP proxies, except - for head node. Head node will never be draining. - """ - for node_id, proxy_state in self._proxy_states.items(): - # Head node will always be draining. - if node_id == self._head_node_id: - continue - - draining = node_id not in active_nodes - proxy_state.set_draining_flag(node_id=node_id, draining=draining) diff --git a/python/ray/serve/_private/long_poll.py b/python/ray/serve/_private/long_poll.py index 0c4f49e56068..bd116bfa6a35 100644 --- a/python/ray/serve/_private/long_poll.py +++ b/python/ray/serve/_private/long_poll.py @@ -44,6 +44,7 @@ def __repr__(self): RUNNING_REPLICAS = auto() ROUTE_TABLE = auto() + ACTIVE_NODES = auto() @dataclass diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index d0ceedb5f519..0543ab3e2a5c 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -48,7 +48,7 @@ configure_component_logger, get_component_logger_file_path, ) -from ray.serve._private.long_poll import LongPollHost +from ray.serve._private.long_poll import LongPollHost, LongPollNamespace from ray.serve.exceptions import RayServeException from ray.serve.schema import ( ServeApplicationSchema, @@ -178,6 +178,9 @@ async def __init__( run_background_task(self.run_control_loop()) self._recover_config_from_checkpoint() + self._head_node_id = head_node_id + self._active_nodes = set() + self._update_active_nodes() def check_alive(self) -> None: """No-op to check if this controller is alive.""" @@ -271,18 +274,19 @@ def get_http_proxy_names(self) -> bytes: ) return actor_name_list.SerializeToString() - def _update_http_proxy_draining_flag(self): - """Update the draining flag of all http proxies. + def _update_active_nodes(self): + """Update the active nodes set. - Get a list of node ids that have running replicas and update the draining flag - on http proxies. + Controller keeps the state of active nodes (head node and nodes with deployment + replicas). If the active nodes set changes, it will notify the long poll client. """ - if self.http_state is None: - return - - self.http_state.update_draining_flags( - self.deployment_state_manager.get_node_ids_with_running_replicas() - ) + new_active_nodes = self.deployment_state_manager.get_active_node_ids() + new_active_nodes.add(self._head_node_id) + if self._active_nodes != new_active_nodes: + self._active_nodes = new_active_nodes + self.long_poll_host.notify_changed( + LongPollNamespace.ACTIVE_NODES, self._active_nodes + ) async def run_control_loop(self) -> None: # NOTE(edoakes): we catch all exceptions here and simply log them, @@ -322,7 +326,7 @@ async def run_control_loop(self) -> None: except Exception: logger.exception("Exception updating application state.") - self._update_http_proxy_draining_flag() + self._update_active_nodes() try: self._put_serve_snapshot() diff --git a/python/ray/serve/tests/test_deployment_state.py b/python/ray/serve/tests/test_deployment_state.py index e330ae3be38d..d5ba9538a7ba 100644 --- a/python/ray/serve/tests/test_deployment_state.py +++ b/python/ray/serve/tests/test_deployment_state.py @@ -2688,15 +2688,12 @@ def test_recover(self): @patch.object(DriverDeploymentState, "_get_all_node_ids") -def test_get_node_ids_with_running_replicas( - mock_get_all_node_ids, mock_deployment_state_manager_full -): - """Test get_running_replica_node_ids() and get_node_ids_with_running_replicas() are - collecting the correct node ids +def test_get_active_node_ids(mock_get_all_node_ids, mock_deployment_state_manager_full): + """Test get_active_node_ids() are collecting the correct node ids When there are no running replicas, both methods should return empty results. When the replicas are in the RUNNING state, get_running_replica_node_ids() should return - a list of all node ids. `get_node_ids_with_running_replicas()` should return a set + a list of all node ids. `get_active_node_ids()` should return a set of all node ids. """ node_ids = ("node1", "node2", "node2") @@ -2712,8 +2709,8 @@ def test_get_node_ids_with_running_replicas( deployment_state = deployment_state_manager._deployment_states[tag] assert updating - # When the replicas are in the STARTING state, both `get_running_replica_node_ids()` - # and `get_node_ids_with_running_replicas()` should returning empty results. + # When the replicas are in the STARTING state, `get_active_node_ids()` should + # return a set of node ids. deployment_state_manager.update() check_counts( deployment_state, @@ -2722,17 +2719,17 @@ def test_get_node_ids_with_running_replicas( by_state=[(ReplicaState.STARTING, 3)], ) mocked_replicas = deployment_state._replicas.get() - assert deployment_state.get_running_replica_node_ids() == set() - assert deployment_state_manager.get_node_ids_with_running_replicas() == set() - - # When the replicas are in RUNNING state, `get_running_replica_node_ids()` should - # return the same results as `node_ids` in a list. - # `get_node_ids_with_running_replicas()` should return a set of `node_ids`. for idx, mocked_replica in enumerate(mocked_replicas): - mocked_replica._actor.set_ready() mocked_replica._actor.set_scheduling_strategy( NodeAffinitySchedulingStrategy(node_id=node_ids[idx], soft=True) ) + assert deployment_state.get_active_node_ids() == set(node_ids) + assert deployment_state_manager.get_active_node_ids() == set(node_ids) + + # When the replicas are in RUNNING state, `get_active_node_ids()` should + # return a set of `node_ids`. + for mocked_replica in mocked_replicas: + mocked_replica._actor.set_ready() deployment_state_manager.update() check_counts( deployment_state, @@ -2740,10 +2737,22 @@ def test_get_node_ids_with_running_replicas( version=version1, by_state=[(ReplicaState.RUNNING, 3)], ) - assert deployment_state.get_running_replica_node_ids() == set(node_ids) - assert deployment_state_manager.get_node_ids_with_running_replicas() == set( - node_ids + assert deployment_state.get_active_node_ids() == set(node_ids) + assert deployment_state_manager.get_active_node_ids() == set(node_ids) + + # When the replicas are in the STOPPING state, `get_active_node_ids()` should + # return empty set. + for _ in mocked_replicas: + deployment_state._stop_one_running_replica_for_testing() + deployment_state_manager.update() + check_counts( + deployment_state, + total=3, + version=version1, + by_state=[(ReplicaState.STOPPING, 3)], ) + assert deployment_state.get_active_node_ids() == set() + assert deployment_state_manager.get_active_node_ids() == set() if __name__ == "__main__": diff --git a/python/ray/serve/tests/test_http_state.py b/python/ray/serve/tests/test_http_state.py index a5c29ac67de3..77cc3b324a13 100644 --- a/python/ray/serve/tests/test_http_state.py +++ b/python/ray/serve/tests/test_http_state.py @@ -11,6 +11,9 @@ from ray.serve._private.common import HTTPProxyStatus from ray.serve._private.http_state import HTTPState, HTTPProxyState from ray.serve._private.http_proxy import HTTPProxyActor +from ray.serve._private.constants import SERVE_CONTROLLER_NAME, SERVE_NAMESPACE +from ray.serve.controller import ServeController + HEAD_NODE_ID = "node_id-index-head" @@ -20,7 +23,7 @@ def _make_http_state( head_node_id: str = HEAD_NODE_ID, ) -> HTTPState: return HTTPState( - "mock_controller_name", + SERVE_CONTROLLER_NAME, detached=True, config=http_options, head_node_id=head_node_id, @@ -43,6 +46,31 @@ def mock_get_all_node_ids(all_nodes): yield +@pytest.fixture() +def setup_controller(): + try: + controller = ray.get_actor(SERVE_CONTROLLER_NAME, namespace=SERVE_NAMESPACE) + except ValueError: + controller = ServeController.options( + name=SERVE_CONTROLLER_NAME, namespace=SERVE_NAMESPACE + ).remote( + SERVE_CONTROLLER_NAME, + http_config=None, + head_node_id=HEAD_NODE_ID, + detached=True, + _disable_http_proxy=True, + ) + controller_actor_id = controller._ray_actor_id.hex() + + def check_controller_alive(): + controller_actor_info = ray._private.state.actors(controller_actor_id) + controller_actor_state = controller_actor_info["State"] + return controller_actor_state == "ALIVE" + + wait_for_condition(check_controller_alive) + yield + + @ray.remote(num_cpus=0) class MockHTTPProxyActor: async def ready(self): @@ -58,8 +86,12 @@ def _create_http_proxy_state( node_id: str = "mock_node_id", **kwargs, ) -> HTTPProxyState: + if kwargs: + kwargs["node_id"] = node_id proxy = proxy_actor_class.options(lifetime="detached").remote(**kwargs) - state = HTTPProxyState(proxy, "alice", node_id, "mock_node_ip") + state = HTTPProxyState( + proxy, "alice", node_id, "mock_node_ip", SERVE_CONTROLLER_NAME + ) state.set_status(status=status) print(f"The http proxy state created has the status of: {state.status}") return state @@ -109,7 +141,9 @@ def test_node_selection(all_nodes, mock_get_all_node_ids): assert set(another_seed) != set(selected_nodes) -def test_http_state_update_restarts_unhealthy_proxies(mock_get_all_node_ids): +def test_http_state_update_restarts_unhealthy_proxies( + mock_get_all_node_ids, setup_controller +): """Test the update method in HTTPState would kill and restart unhealthy proxies. Set up a HTTPProxyState with UNHEALTHY status. Calls the update method on the @@ -149,7 +183,7 @@ def _update_state_and_check_proxy_status( assert new_proxy != old_proxy -def test_http_proxy_state_update_shutting_down(): +def test_http_proxy_state_update_shutting_down(setup_controller): """Test calling update method on HTTPProxyState when the proxy state is shutting down. @@ -168,7 +202,7 @@ def test_http_proxy_state_update_shutting_down(): assert previous_status == current_status -def test_http_proxy_state_update_starting_ready_succeed(): +def test_http_proxy_state_update_starting_ready_succeed(setup_controller): """Test calling update method on HTTPProxyState when the proxy state is STARTING and when the ready call succeeded. @@ -188,7 +222,7 @@ def test_http_proxy_state_update_starting_ready_succeed(): ) -def test_http_proxy_state_update_starting_ready_failed_once(): +def test_http_proxy_state_update_starting_ready_failed_once(setup_controller): """Test calling update method on HTTPProxyState when the proxy state is STARTING and when the ready call failed once and succeeded for the following call. @@ -229,7 +263,7 @@ async def check_health(self): ) -def test_http_proxy_state_update_starting_ready_always_fails(): +def test_http_proxy_state_update_starting_ready_always_fails(setup_controller): """Test calling update method on HTTPProxyState when the proxy state is STARTING and when the ready call is always failing. @@ -263,7 +297,7 @@ async def check_health(self): @patch("ray.serve._private.http_state.PROXY_READY_CHECK_TIMEOUT_S", 1) -def test_http_proxy_state_update_starting_ready_always_timeout(): +def test_http_proxy_state_update_starting_ready_always_timeout(setup_controller): """Test calling update method on HTTPProxyState when the proxy state is STARTING and when the ready call always timed out. @@ -294,7 +328,7 @@ async def check_health(self): @patch("ray.serve._private.http_state.PROXY_HEALTH_CHECK_PERIOD_S", 0.1) -def test_http_proxy_state_update_healthy_check_health_succeed(): +def test_http_proxy_state_update_healthy_check_health_succeed(setup_controller): """Test calling update method on HTTPProxyState when the proxy state is HEALTHY and when the check_health call succeeded @@ -325,7 +359,7 @@ def test_http_proxy_state_update_healthy_check_health_succeed(): @patch("ray.serve._private.http_state.PROXY_HEALTH_CHECK_PERIOD_S", 0.1) -def test_http_proxy_state_update_healthy_check_health_failed_once(): +def test_http_proxy_state_update_healthy_check_health_failed_once(setup_controller): """Test calling update method on HTTPProxyState when the proxy state is HEALTHY and when the check_health call failed once and succeeded for the following call. @@ -376,7 +410,7 @@ async def check_health(self): @patch("ray.serve._private.http_state.PROXY_HEALTH_CHECK_PERIOD_S", 0.1) -def test_http_proxy_state_update_healthy_check_health_always_fails(): +def test_http_proxy_state_update_healthy_check_health_always_fails(setup_controller): """Test calling update method on HTTPProxyState when the proxy state is HEALTHY and when the check_health call is always failing. @@ -421,7 +455,9 @@ async def check_health(self): @patch("ray.serve._private.http_state.DEFAULT_HEALTH_CHECK_TIMEOUT_S", 0.1) @patch("ray.serve._private.http_state.PROXY_HEALTH_CHECK_PERIOD_S", 0.1) -def test_http_proxy_state_check_health_always_timeout_timeout_eq_period(): +def test_http_proxy_state_check_health_always_timeout_timeout_eq_period( + setup_controller, +): """Test calling update method on HTTPProxyState when the proxy state is HEALTHY and when the ready call always timed out and health check timeout and period equals. @@ -466,7 +502,9 @@ async def check_health(self): @patch("ray.serve._private.http_state.DEFAULT_HEALTH_CHECK_TIMEOUT_S", 1) @patch("ray.serve._private.http_state.PROXY_HEALTH_CHECK_PERIOD_S", 0.1) -def test_http_proxy_state_check_health_always_timeout_timeout_greater_than_period(): +def test_http_proxy_state_check_health_always_timeout_timeout_greater_than_period( + setup_controller, +): """Test calling update method on HTTPProxyState when the proxy state is HEALTHY and when the ready call always timed out and health check timeout greater than period. @@ -510,7 +548,7 @@ async def check_health(self): @patch("ray.serve._private.http_state.PROXY_HEALTH_CHECK_PERIOD_S", 0.1) -def test_http_proxy_state_update_unhealthy_check_health_succeed(): +def test_http_proxy_state_update_unhealthy_check_health_succeed(setup_controller): """Test calling update method on HTTPProxyState when the proxy state is UNHEALTHY and when the check_health call succeeded. @@ -534,8 +572,8 @@ def test_http_proxy_state_update_unhealthy_check_health_succeed(): assert proxy_state._consecutive_health_check_failures == 0 -def test_update_draining_flags(mock_get_all_node_ids): - """Test update_draining_flags() method +def test_update_draining(mock_get_all_node_ids, setup_controller): + """Test _update_draining() method When update nodes to inactive, head node http proxy should never be draining while worker node http proxy should change to draining. @@ -551,7 +589,7 @@ def test_update_draining_flags(mock_get_all_node_ids): host="localhost", port=8000, root_path="/", - controller_name="mock_controller_name", + controller_name=SERVE_CONTROLLER_NAME, node_ip_address="foo", ) # Setup http proxy for a worker node @@ -562,12 +600,15 @@ def test_update_draining_flags(mock_get_all_node_ids): host="localhost", port=8000, root_path="/", - controller_name="mock_controller_name", + controller_name=SERVE_CONTROLLER_NAME, node_ip_address="bar", ) # Update flag for no nodes draining - state.update_draining_flags(set()) + for node_id in [HEAD_NODE_ID, worker_node_id]: + proxy = state._proxy_states[node_id] + proxy._update_draining(set(HEAD_NODE_ID)) + proxy.update() # Head node proxy should continue to be HEALTHY wait_for_condition( From 6965e39bea600e20755a08a37966e2f64d8e9b47 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Thu, 22 Jun 2023 21:04:48 -0700 Subject: [PATCH 13/15] fix tests Signed-off-by: Gene Su --- python/ray/serve/tests/test_callback.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/serve/tests/test_callback.py b/python/ray/serve/tests/test_callback.py index 4ca3d2a34580..837b41a92bdd 100644 --- a/python/ray/serve/tests/test_callback.py +++ b/python/ray/serve/tests/test_callback.py @@ -162,6 +162,7 @@ def test_callback_fail(ray_instance): root_path="/", controller_name="controller", node_ip_address="127.0.0.1", + node_id="123", ) with pytest.raises(RayActorError, match="this is from raise_error_callback"): ray.get(handle.ready.remote()) @@ -195,6 +196,7 @@ def test_http_proxy_return_aribitary_objects(ray_instance): root_path="/", controller_name="controller", node_ip_address="127.0.0.1", + node_id="123", ) with pytest.raises( RayActorError, match="must return a list of Starlette middlewares" From b443d2210521219c6a56793a4ac873283e4328eb Mon Sep 17 00:00:00 2001 From: Gene Su Date: Fri, 23 Jun 2023 10:52:57 -0700 Subject: [PATCH 14/15] drop the long poll client on http state and use update to set the draining state Signed-off-by: Gene Su --- python/ray/serve/_private/http_state.py | 31 ++------ python/ray/serve/controller.py | 2 +- python/ray/serve/tests/test_http_state.py | 89 +++++++++++++---------- 3 files changed, 59 insertions(+), 63 deletions(-) diff --git a/python/ray/serve/_private/http_state.py b/python/ray/serve/_private/http_state.py index 4559a5d785ba..58147015d0ea 100644 --- a/python/ray/serve/_private/http_state.py +++ b/python/ray/serve/_private/http_state.py @@ -62,14 +62,6 @@ def __init__( actor_name=self._actor_name, status=self._status, ) - self._long_poll_client = LongPollClient( - ray.get_actor(controller_name, namespace=SERVE_NAMESPACE), - { - LongPollNamespace.ACTIVE_NODES: self._update_draining, - }, - call_in_event_loop=get_or_create_event_loop(), - ) - self._draining = False @property def actor_handle(self) -> ActorHandle: @@ -92,15 +84,6 @@ def set_status(self, status: HTTPProxyStatus) -> None: self._status = status self.update_actor_details(status=self._status) - def _update_draining(self, active_nodes: Set[NodeId]): - """Update draining flag on http proxy state. - - This is a callback for when controller detects there being a change in active - nodes. Each http proxy state will check if it's nodes is still active and set - draining flag accordingly. DRAINING status will be set in the update() call. - """ - self._draining = self._node_id not in active_nodes - def try_update_status(self, status: HTTPProxyStatus): """Try update with the new status and only update when the conditions are met. @@ -140,7 +123,7 @@ def update_actor_details(self, **kwargs) -> None: details_kwargs.update(kwargs) self._actor_details = HTTPProxyDetails(**details_kwargs) - def update(self): + def update(self, draining: bool = False): """Update the status of the current HTTP proxy. 1) When the HTTP proxy is already shutting down, do nothing. @@ -172,7 +155,7 @@ def update(self): worker_id, log_file_path = json.loads(ray.get(finished[0])) status = ( HTTPProxyStatus.HEALTHY - if not self._draining + if not draining else HTTPProxyStatus.DRAINING ) self.try_update_status(status) @@ -207,7 +190,7 @@ def update(self): ray.get(finished[0]) status = ( HTTPProxyStatus.HEALTHY - if not self._draining + if not draining else HTTPProxyStatus.DRAINING ) self.try_update_status(status) @@ -306,7 +289,7 @@ def get_http_proxy_details(self) -> Dict[NodeId, HTTPProxyDetails]: for node_id, state in self._proxy_states.items() } - def update(self): + def update(self, active_nodes: Set[NodeId] = set()): """Update the state of all HTTP proxies. Start proxies on all nodes if not already exist and stop the proxies on nodes @@ -315,8 +298,10 @@ def update(self): """ self._start_proxies_if_needed() self._stop_proxies_if_needed() - for proxy_state in self._proxy_states.values(): - proxy_state.update() + for node_id, proxy_state in self._proxy_states.items(): + draining = node_id not in active_nodes + draining = draining if self._head_node_id != node_id else False + proxy_state.update(draining) def _get_target_nodes(self) -> List[Tuple[str, str]]: """Return the list of (node_id, ip_address) to deploy HTTP servers on.""" diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index f4fbe9a722b4..e2898ac1c94d 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -318,7 +318,7 @@ async def run_control_loop(self) -> None: # info about available deployments & their replicas. if self.http_state and self.done_recovering_event.is_set(): try: - self.http_state.update() + self.http_state.update(active_nodes=self._active_nodes) except Exception: logger.exception("Exception updating HTTP state.") diff --git a/python/ray/serve/tests/test_http_state.py b/python/ray/serve/tests/test_http_state.py index f0eebcc05e3d..99239dc009cb 100644 --- a/python/ray/serve/tests/test_http_state.py +++ b/python/ray/serve/tests/test_http_state.py @@ -102,6 +102,17 @@ def _update_and_check_proxy_status(state: HTTPProxyState, status: HTTPProxyStatu return state.status == status +def _update_and_check_http_state( + http_state: HTTPState, + node_ids: List[str], + statuses: List[HTTPProxyStatus], + **kwargs, +): + http_state.update(**kwargs) + proxy_states = http_state._proxy_states + return all([proxy_states[node_ids[idx]].status == statuses[idx] for idx in range(len(node_ids))]) + + def test_node_selection(all_nodes, mock_get_all_node_ids): # Test NoServer state = _make_http_state(HTTPOptions(location=DeploymentMode.NoServer)) @@ -572,56 +583,56 @@ def test_http_proxy_state_update_unhealthy_check_health_succeed(setup_controller assert proxy_state._consecutive_health_check_failures == 0 -def test_update_draining(mock_get_all_node_ids, setup_controller): - """Test _update_draining() method +@patch("ray.serve._private.http_state.PROXY_HEALTH_CHECK_PERIOD_S", 0.1) +def test_update_draining(mock_get_all_node_ids, setup_controller, all_nodes): + """Test update draining logics. When update nodes to inactive, head node http proxy should never be draining while - worker node http proxy should change to draining. + worker node http proxy should change to draining. When update nodes to active, head + node http proxy should continue to be healthy while worker node http proxy should + be healthy. """ - worker_node_id = "worker-node-id-0" + worker_node_id = all_nodes[1][0] state = _make_http_state(HTTPOptions(location=DeploymentMode.EveryNode)) - # Setup http proxy for head node - state._proxy_states[HEAD_NODE_ID] = _create_http_proxy_state( - proxy_actor_class=HTTPProxyActor, - status=HTTPProxyStatus.HEALTHY, - node_id=HEAD_NODE_ID, - host="localhost", - port=8000, - root_path="/", - controller_name=SERVE_CONTROLLER_NAME, - node_ip_address="foo", - ) - # Setup http proxy for a worker node - state._proxy_states[worker_node_id] = _create_http_proxy_state( - proxy_actor_class=HTTPProxyActor, - status=HTTPProxyStatus.HEALTHY, - node_id=worker_node_id, - host="localhost", - port=8000, - root_path="/", - controller_name=SERVE_CONTROLLER_NAME, - node_ip_address="bar", - ) + for node_id, node_ip_address in all_nodes: + state._proxy_states[node_id] = _create_http_proxy_state( + proxy_actor_class=HTTPProxyActor, + status=HTTPProxyStatus.HEALTHY, + node_id=node_id, + host="localhost", + port=8000, + root_path="/", + controller_name=SERVE_CONTROLLER_NAME, + node_ip_address=node_ip_address, + ) - # Update flag for no nodes draining - for node_id in [HEAD_NODE_ID, worker_node_id]: - proxy = state._proxy_states[node_id] - proxy._update_draining(set(HEAD_NODE_ID)) - proxy.update() + # No active nodes + active_nodes = set() - # Head node proxy should continue to be HEALTHY + # Head node proxy should continue to be HEALTHY. + # Worker node proxy should turn DRAINING. wait_for_condition( - condition_predictor=_update_and_check_proxy_status, - state=state._proxy_states[HEAD_NODE_ID], - status=HTTPProxyStatus.HEALTHY, + condition_predictor=_update_and_check_http_state, + timeout=15, + http_state=state, + node_ids=[HEAD_NODE_ID, worker_node_id], + statuses=[HTTPProxyStatus.HEALTHY, HTTPProxyStatus.DRAINING], + active_nodes=active_nodes, ) - # Worker node proxy should turn DRAINING + # All nodes are active + active_nodes = set([node_id for node_id, _ in all_nodes]) + + # Head node proxy should continue to be HEALTHY. + # Worker node proxy should turn HEALTHY. wait_for_condition( - condition_predictor=_update_and_check_proxy_status, - state=state._proxy_states[worker_node_id], - status=HTTPProxyStatus.DRAINING, + condition_predictor=_update_and_check_http_state, + timeout=15, + http_state=state, + node_ids=[HEAD_NODE_ID, worker_node_id], + statuses=[HTTPProxyStatus.HEALTHY, HTTPProxyStatus.HEALTHY], + active_nodes=active_nodes, ) From fdc68e5dddf4781bf7a41470a7bc5f562461d3f1 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Fri, 23 Jun 2023 10:58:53 -0700 Subject: [PATCH 15/15] linting Signed-off-by: Gene Su --- python/ray/serve/_private/http_state.py | 11 +++++++---- python/ray/serve/tests/test_http_state.py | 9 +++++++-- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/python/ray/serve/_private/http_state.py b/python/ray/serve/_private/http_state.py index 58147015d0ea..0f161ee59513 100644 --- a/python/ray/serve/_private/http_state.py +++ b/python/ray/serve/_private/http_state.py @@ -10,11 +10,9 @@ import ray from ray.actor import ActorHandle from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy -from ray._private.utils import get_or_create_event_loop from ray._raylet import GcsClient from ray.serve.config import HTTPOptions, DeploymentMode -from ray.serve._private.long_poll import LongPollClient, LongPollNamespace from ray.serve._private.constants import ( ASYNC_CONCURRENCY, PROXY_HEALTH_CHECK_TIMEOUT_S, @@ -289,18 +287,23 @@ def get_http_proxy_details(self) -> Dict[NodeId, HTTPProxyDetails]: for node_id, state in self._proxy_states.items() } - def update(self, active_nodes: Set[NodeId] = set()): + def update(self, active_nodes: Set[NodeId] = None): """Update the state of all HTTP proxies. Start proxies on all nodes if not already exist and stop the proxies on nodes that are no longer exist. Update all proxy states. Kill and restart unhealthy proxies. """ + # Ensure head node is always active. + if active_nodes is None: + active_nodes = {self._head_node_id} + else: + active_nodes.add(self._head_node_id) + self._start_proxies_if_needed() self._stop_proxies_if_needed() for node_id, proxy_state in self._proxy_states.items(): draining = node_id not in active_nodes - draining = draining if self._head_node_id != node_id else False proxy_state.update(draining) def _get_target_nodes(self) -> List[Tuple[str, str]]: diff --git a/python/ray/serve/tests/test_http_state.py b/python/ray/serve/tests/test_http_state.py index 99239dc009cb..37d7d7f42028 100644 --- a/python/ray/serve/tests/test_http_state.py +++ b/python/ray/serve/tests/test_http_state.py @@ -110,7 +110,12 @@ def _update_and_check_http_state( ): http_state.update(**kwargs) proxy_states = http_state._proxy_states - return all([proxy_states[node_ids[idx]].status == statuses[idx] for idx in range(len(node_ids))]) + return all( + [ + proxy_states[node_ids[idx]].status == statuses[idx] + for idx in range(len(node_ids)) + ] + ) def test_node_selection(all_nodes, mock_get_all_node_ids): @@ -622,7 +627,7 @@ def test_update_draining(mock_get_all_node_ids, setup_controller, all_nodes): ) # All nodes are active - active_nodes = set([node_id for node_id, _ in all_nodes]) + active_nodes = {node_id for node_id, _ in all_nodes} # Head node proxy should continue to be HEALTHY. # Worker node proxy should turn HEALTHY.