From be0dcb86b0ab6c68274ed6e6d02ce002580c72c1 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Tue, 27 Jun 2023 13:37:10 -0700 Subject: [PATCH 01/22] [Serve] fix flaky healthz test Signed-off-by: Gene Su --- python/ray/serve/tests/test_standalone3.py | 24 ++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/python/ray/serve/tests/test_standalone3.py b/python/ray/serve/tests/test_standalone3.py index a88144747b2c..7ccfe9a7866a 100644 --- a/python/ray/serve/tests/test_standalone3.py +++ b/python/ray/serve/tests/test_standalone3.py @@ -440,14 +440,14 @@ def test_healthz_and_routes_on_head_and_worker_nodes( # Setup a cluster with 2 nodes cluster = Cluster() - cluster.add_node(num_cpus=3) - cluster.add_node(num_cpus=3) + cluster.add_node(num_cpus=1) + cluster.add_node(num_cpus=2) cluster.wait_for_nodes() ray.init(address=cluster.address) serve.start(http_options={"location": "EveryNode"}) - # Deploy 2 replicas, one to each node - @serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 2}) + # Deploy 3 replicas, at least one on each node. + @serve.deployment(num_replicas=3, ray_actor_options={"num_cpus": 0.9}) class HelloModel: def __call__(self): return "hello" @@ -455,8 +455,20 @@ def __call__(self): model = HelloModel.bind() serve.run(target=model) - # Ensure total actors of 2 proxies, 1 controller, and 2 replicas, and 2 nodes exist. - wait_for_condition(lambda: len(ray._private.state.actors()) == 5) + # Ensure each node has at least one replica. + def check_replicas_on_all_nodes(): + _actors = ray._private.state.actors().values() + replica_nodes = [ + a["Address"]["NodeID"] + for a in _actors + if a["ActorClassName"].startswith("ServeReplica") + ] + return len(set(replica_nodes)) == 2 + + wait_for_condition(check_replicas_on_all_nodes) + + # Ensure total actors of 2 proxies, 1 controller, and 3 replicas, and 2 nodes exist. + wait_for_condition(lambda: len(ray._private.state.actors()) == 6) assert len(ray.nodes()) == 2 # Ensure `/-/healthz` and `/-/routes` return 200 and expected responses From fd1b9d391c5bc8d198e2f9b54ed981436b986bed Mon Sep 17 00:00:00 2001 From: Gene Su Date: Tue, 27 Jun 2023 15:27:45 -0700 Subject: [PATCH 02/22] make all replicas running on the worker node Signed-off-by: Gene Su --- python/ray/serve/tests/test_standalone3.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/python/ray/serve/tests/test_standalone3.py b/python/ray/serve/tests/test_standalone3.py index 7ccfe9a7866a..e39ccb48fdc9 100644 --- a/python/ray/serve/tests/test_standalone3.py +++ b/python/ray/serve/tests/test_standalone3.py @@ -440,14 +440,14 @@ def test_healthz_and_routes_on_head_and_worker_nodes( # Setup a cluster with 2 nodes cluster = Cluster() - cluster.add_node(num_cpus=1) + cluster.add_node(num_cpus=0) cluster.add_node(num_cpus=2) cluster.wait_for_nodes() ray.init(address=cluster.address) serve.start(http_options={"location": "EveryNode"}) - # Deploy 3 replicas, at least one on each node. - @serve.deployment(num_replicas=3, ray_actor_options={"num_cpus": 0.9}) + # Deploy 2 replicas, both should be on the worker node. + @serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 0.9}) class HelloModel: def __call__(self): return "hello" @@ -455,20 +455,20 @@ def __call__(self): model = HelloModel.bind() serve.run(target=model) - # Ensure each node has at least one replica. - def check_replicas_on_all_nodes(): + # Ensure worker node has both replicas. + def check_replicas_on_worker_nodes(): _actors = ray._private.state.actors().values() replica_nodes = [ a["Address"]["NodeID"] for a in _actors if a["ActorClassName"].startswith("ServeReplica") ] - return len(set(replica_nodes)) == 2 + return len(set(replica_nodes)) == 1 - wait_for_condition(check_replicas_on_all_nodes) + wait_for_condition(check_replicas_on_worker_nodes) - # Ensure total actors of 2 proxies, 1 controller, and 3 replicas, and 2 nodes exist. - wait_for_condition(lambda: len(ray._private.state.actors()) == 6) + # Ensure total actors of 2 proxies, 1 controller, and 2 replicas, and 2 nodes exist. + wait_for_condition(lambda: len(ray._private.state.actors()) == 5) assert len(ray.nodes()) == 2 # Ensure `/-/healthz` and `/-/routes` return 200 and expected responses From 3a8234d6584971c12b13a1cc7a9caed88b5c285e Mon Sep 17 00:00:00 2001 From: Gene Su Date: Wed, 28 Jun 2023 12:57:40 -0700 Subject: [PATCH 03/22] drop controller requires head node during init and do direct head node look up in http state Signed-off-by: Gene Su --- python/ray/serve/_private/api.py | 6 -- python/ray/serve/_private/deployment_state.py | 6 +- python/ray/serve/_private/http_state.py | 6 +- python/ray/serve/_private/utils.py | 16 ++++ python/ray/serve/controller.py | 15 ++-- python/ray/serve/tests/test_callback.py | 1 - .../ray/serve/tests/test_deployment_state.py | 64 ++++++++++++++++ python/ray/serve/tests/test_http_state.py | 18 +++-- python/ray/serve/tests/test_standalone3.py | 74 ++++++++++++------- python/ray/serve/tests/test_util.py | 31 ++++++++ 10 files changed, 188 insertions(+), 49 deletions(-) diff --git a/python/ray/serve/_private/api.py b/python/ray/serve/_private/api.py index 5b8147f93893..1a6604daab46 100644 --- a/python/ray/serve/_private/api.py +++ b/python/ray/serve/_private/api.py @@ -145,10 +145,6 @@ def _start_controller( controller_name = SERVE_CONTROLLER_NAME else: controller_name = format_actor_name(get_random_letters(), SERVE_CONTROLLER_NAME) - - # Used for scheduling things to the head node explicitly. - # Assumes that `serve.start` runs on the head node. - head_node_id = ray.get_runtime_context().get_node_id() controller_actor_options = { "num_cpus": 1 if dedicated_cpu else 0, "name": controller_name, @@ -164,7 +160,6 @@ def _start_controller( controller = ServeController.options(**controller_actor_options).remote( controller_name, http_config=http_options, - head_node_id=head_node_id, detached=detached, _disable_http_proxy=True, ) @@ -186,7 +181,6 @@ def _start_controller( controller = ServeController.options(**controller_actor_options).remote( controller_name, http_config=http_options, - head_node_id=head_node_id, detached=detached, ) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 29e0c8811fe2..0f4b6c9cd27a 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -1239,7 +1239,11 @@ def get_active_node_ids(self) -> Set[str]: ReplicaState.RECOVERING, ReplicaState.RUNNING, ] - return {replica.actor_node_id for replica in self._replicas.get(active_states)} + return { + replica.actor_node_id + for replica in self._replicas.get(active_states) + if replica.actor_node_id is not None + } def list_replica_details(self) -> List[ReplicaDetails]: return [replica.actor_details for replica in self._replicas.get()] diff --git a/python/ray/serve/_private/http_state.py b/python/ray/serve/_private/http_state.py index 895d35989eb4..1730842d6b62 100644 --- a/python/ray/serve/_private/http_state.py +++ b/python/ray/serve/_private/http_state.py @@ -27,6 +27,7 @@ from ray.serve._private.utils import ( format_actor_name, get_all_node_ids, + get_head_node_id, ) from ray.serve._private.common import EndpointTag, NodeId, HTTPProxyStatus from ray.serve.schema import HTTPProxyDetails @@ -242,7 +243,6 @@ def __init__( controller_name: str, detached: bool, config: HTTPOptions, - head_node_id: str, gcs_client: GcsClient, # Used by unit testing _start_proxies_on_init: bool = True, @@ -254,11 +254,11 @@ def __init__( else: self._config = HTTPOptions() self._proxy_states: Dict[NodeId, HTTPProxyState] = dict() - self._head_node_id: str = head_node_id + self._head_node_id: str = get_head_node_id() self._gcs_client = gcs_client - assert isinstance(head_node_id, str) + assert isinstance(self._head_node_id, str) # Will populate self.proxy_actors with existing actors. if _start_proxies_on_init: diff --git a/python/ray/serve/_private/utils.py b/python/ray/serve/_private/utils.py index fad624f9ba33..d6c58daf9731 100644 --- a/python/ray/serve/_private/utils.py +++ b/python/ray/serve/_private/utils.py @@ -42,6 +42,7 @@ from ray._raylet import MessagePackSerializer from ray._private.utils import import_attr from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag +from ray._private.resource_spec import HEAD_NODE_RESOURCE_NAME import __main__ @@ -672,3 +673,18 @@ def call_function_from_import_path(import_path: str) -> Any: return callback_func() except Exception as e: raise RuntimeError(f"The function {import_path} raised an exception: {e}") + + +def get_head_node_id() -> str: + """Get the head node id. + + Iterate through all nodes in the ray cluster and return the node id of the first + alive node with head node resource. + """ + head_node_id = "" + for node in ray.nodes(): + if HEAD_NODE_RESOURCE_NAME in node["Resources"] and node["Alive"]: + head_node_id = node["NodeID"] + break + + return head_node_id diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index df4f033e8a6b..13de7e33ee98 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -64,6 +64,7 @@ DEFAULT, override_runtime_envs_except_env_vars, call_function_from_import_path, + get_head_node_id, ) from ray.serve._private.application_state import ApplicationStateManager @@ -108,10 +109,14 @@ async def __init__( controller_name: str, *, http_config: HTTPOptions, - head_node_id: str, detached: bool = False, _disable_http_proxy: bool = False, ): + controller_node_id = ray.get_runtime_context().get_node_id() + assert ( + controller_node_id == get_head_node_id() + ), "Controller must be on the head node." + configure_component_logger( component_name="controller", component_id=str(os.getpid()) ) @@ -143,7 +148,6 @@ async def __init__( controller_name, detached, http_config, - head_node_id, gcs_client, ) @@ -186,7 +190,6 @@ async def __init__( run_background_task(self.run_control_loop()) self._recover_config_from_checkpoint() - self._head_node_id = head_node_id self._active_nodes = set() self._update_active_nodes() @@ -289,7 +292,8 @@ def _update_active_nodes(self): replicas). If the active nodes set changes, it will notify the long poll client. """ new_active_nodes = self.deployment_state_manager.get_active_node_ids() - new_active_nodes.add(self._head_node_id) + head_node_id = ray.get_runtime_context().node_id + new_active_nodes.add(head_node_id) if self._active_nodes != new_active_nodes: self._active_nodes = new_active_nodes self.long_poll_host.notify_changed( @@ -932,8 +936,6 @@ def __init__( except ValueError: self._controller = None if self._controller is None: - # Used for scheduling things to the head node explicitly. - head_node_id = ray.get_runtime_context().get_node_id() http_config = HTTPOptions() http_config.port = http_proxy_port self._controller = ServeController.options( @@ -948,7 +950,6 @@ def __init__( ).remote( controller_name, http_config=http_config, - head_node_id=head_node_id, detached=detached, ) diff --git a/python/ray/serve/tests/test_callback.py b/python/ray/serve/tests/test_callback.py index 837b41a92bdd..12396a9486a9 100644 --- a/python/ray/serve/tests/test_callback.py +++ b/python/ray/serve/tests/test_callback.py @@ -171,7 +171,6 @@ def test_callback_fail(ray_instance): handle = actor_def.remote( "controller", http_config={}, - head_node_id="123", ) with pytest.raises(RayActorError, match="cannot be imported"): ray.get(handle.check_alive.remote()) diff --git a/python/ray/serve/tests/test_deployment_state.py b/python/ray/serve/tests/test_deployment_state.py index d5ba9538a7ba..187ebe5a5ca2 100644 --- a/python/ray/serve/tests/test_deployment_state.py +++ b/python/ray/serve/tests/test_deployment_state.py @@ -73,6 +73,7 @@ def __init__( deployment_name: str, version: DeploymentVersion, scheduling_strategy="SPREAD", + node_id=None, ): self._actor_name = actor_name self._replica_tag = replica_tag @@ -99,6 +100,8 @@ def __init__( self._is_cross_language = False self._scheduling_strategy = scheduling_strategy self._actor_handle = MockActorHandle() + self._node_id = node_id + self._node_id_is_set = False @property def is_cross_language(self) -> bool: @@ -146,6 +149,8 @@ def worker_id(self) -> Optional[str]: @property def node_id(self) -> Optional[str]: + if self._node_id_is_set: + return self._node_id if isinstance(self._scheduling_strategy, NodeAffinitySchedulingStrategy): return self._scheduling_strategy.node_id if self.ready == ReplicaStartupStatus.SUCCEEDED or self.started: @@ -180,6 +185,10 @@ def set_starting_version(self, version: DeploymentVersion): """Mocked deployment_worker return version from reconfigure()""" self.starting_version = version + def set_node_id(self, node_id: str): + self._node_id = node_id + self._node_id_is_set = True + def start(self, deployment_info: DeploymentInfo): self.started = True @@ -2755,5 +2764,60 @@ def test_get_active_node_ids(mock_get_all_node_ids, mock_deployment_state_manage assert deployment_state_manager.get_active_node_ids() == set() +@patch.object(DriverDeploymentState, "_get_all_node_ids") +def test_get_active_node_ids_none( + mock_get_all_node_ids, mock_deployment_state_manager_full +): + """Test get_active_node_ids() are not collecting none node ids. + + When the running replicas has None as the node id, `get_active_node_ids()` should + not include it in the set. + """ + node_ids = ("node1", "node2", "node2") + mock_get_all_node_ids.return_value = [node_ids] + + tag = "test_deployment" + create_deployment_state_manager, _ = mock_deployment_state_manager_full + deployment_state_manager = create_deployment_state_manager() + + # Deploy deployment with version "1" and 3 replicas + info1, version1 = deployment_info(version="1", num_replicas=3) + updating = deployment_state_manager.deploy(tag, info1) + deployment_state = deployment_state_manager._deployment_states[tag] + assert updating + + # When the replicas are in the STARTING state, `get_active_node_ids()` should + # return a set of node ids. + deployment_state_manager.update() + check_counts( + deployment_state, + total=3, + version=version1, + by_state=[(ReplicaState.STARTING, 3)], + ) + mocked_replicas = deployment_state._replicas.get() + for idx, mocked_replica in enumerate(mocked_replicas): + mocked_replica._actor.set_scheduling_strategy( + NodeAffinitySchedulingStrategy(node_id=node_ids[idx], soft=True) + ) + assert deployment_state.get_active_node_ids() == set(node_ids) + assert deployment_state_manager.get_active_node_ids() == set(node_ids) + + # When the replicas are in the RUNNING state and are having None node id, + # `get_active_node_ids()` should return empty set. + for mocked_replica in mocked_replicas: + mocked_replica._actor.set_node_id(None) + mocked_replica._actor.set_ready() + deployment_state_manager.update() + check_counts( + deployment_state, + total=3, + version=version1, + by_state=[(ReplicaState.RUNNING, 3)], + ) + assert None not in deployment_state.get_active_node_ids() + assert None not in deployment_state_manager.get_active_node_ids() + + if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/tests/test_http_state.py b/python/ray/serve/tests/test_http_state.py index 37d7d7f42028..ab4ece722f8b 100644 --- a/python/ray/serve/tests/test_http_state.py +++ b/python/ray/serve/tests/test_http_state.py @@ -20,13 +20,11 @@ def _make_http_state( http_options: HTTPOptions, - head_node_id: str = HEAD_NODE_ID, ) -> HTTPState: return HTTPState( SERVE_CONTROLLER_NAME, detached=True, config=http_options, - head_node_id=head_node_id, gcs_client=None, _start_proxies_on_init=False, ) @@ -46,6 +44,13 @@ def mock_get_all_node_ids(all_nodes): yield +@pytest.fixture() +def mock_get_head_node_id(all_nodes): + with patch("ray.serve._private.http_state.get_head_node_id") as func: + func.return_value = HEAD_NODE_ID + yield + + @pytest.fixture() def setup_controller(): try: @@ -56,7 +61,6 @@ def setup_controller(): ).remote( SERVE_CONTROLLER_NAME, http_config=None, - head_node_id=HEAD_NODE_ID, detached=True, _disable_http_proxy=True, ) @@ -118,7 +122,7 @@ def _update_and_check_http_state( ) -def test_node_selection(all_nodes, mock_get_all_node_ids): +def test_node_selection(all_nodes, mock_get_all_node_ids, mock_get_head_node_id): # Test NoServer state = _make_http_state(HTTPOptions(location=DeploymentMode.NoServer)) assert state._get_target_nodes() == [] @@ -158,7 +162,7 @@ def test_node_selection(all_nodes, mock_get_all_node_ids): def test_http_state_update_restarts_unhealthy_proxies( - mock_get_all_node_ids, setup_controller + mock_get_all_node_ids, setup_controller, mock_get_head_node_id ): """Test the update method in HTTPState would kill and restart unhealthy proxies. @@ -589,7 +593,9 @@ def test_http_proxy_state_update_unhealthy_check_health_succeed(setup_controller @patch("ray.serve._private.http_state.PROXY_HEALTH_CHECK_PERIOD_S", 0.1) -def test_update_draining(mock_get_all_node_ids, setup_controller, all_nodes): +def test_update_draining( + mock_get_all_node_ids, setup_controller, all_nodes, mock_get_head_node_id +): """Test update draining logics. When update nodes to inactive, head node http proxy should never be draining while diff --git a/python/ray/serve/tests/test_standalone3.py b/python/ray/serve/tests/test_standalone3.py index 9ec70582f33c..28ab6e640c04 100644 --- a/python/ray/serve/tests/test_standalone3.py +++ b/python/ray/serve/tests/test_standalone3.py @@ -438,7 +438,7 @@ def test_healthz_and_routes_on_head_and_worker_nodes( serve.start(http_options={"location": "EveryNode"}) # Deploy 2 replicas, both should be on the worker node. - @serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 0.9}) + @serve.deployment(num_replicas=2) class HelloModel: def __call__(self): return "hello" @@ -464,19 +464,33 @@ def check_replicas_on_worker_nodes(): # Ensure `/-/healthz` and `/-/routes` return 200 and expected responses # on both nodes. - assert requests.get("http://127.0.0.1:8000/-/healthz").status_code == 200 - assert requests.get("http://127.0.0.1:8000/-/healthz").text == "success" - assert requests.get("http://127.0.0.1:8000/-/routes").status_code == 200 - assert ( - requests.get("http://127.0.0.1:8000/-/routes").text - == '{"/":"default_HelloModel"}' + def check_request(url: str, expected_code: int, expected_text: str): + req = requests.get(url) + return req.status_code == expected_code and req.text == expected_text + + wait_for_condition( + condition_predictor=check_request, + url="http://127.0.0.1:8000/-/healthz", + expected_code=200, + expected_text="success", ) - assert requests.get("http://127.0.0.1:8001/-/healthz").status_code == 200 - assert requests.get("http://127.0.0.1:8001/-/healthz").text == "success" - assert requests.get("http://127.0.0.1:8001/-/routes").status_code == 200 - assert ( - requests.get("http://127.0.0.1:8001/-/routes").text - == '{"/":"default_HelloModel"}' + wait_for_condition( + condition_predictor=check_request, + url="http://127.0.0.1:8000/-/routes", + expected_code=200, + expected_text='{"/":"default_HelloModel"}', + ) + wait_for_condition( + condition_predictor=check_request, + url="http://127.0.0.1:8001/-/healthz", + expected_code=200, + expected_text="success", + ) + wait_for_condition( + condition_predictor=check_request, + url="http://127.0.0.1:8001/-/routes", + expected_code=200, + expected_text='{"/":"default_HelloModel"}', ) # Delete the deployment should bring the active actors down to 3 and drop @@ -502,20 +516,30 @@ def _check(): # Ensure head node `/-/healthz` and `/-/routes` continue to return 200 and expected # responses. Also, the worker node `/-/healthz` and `/-/routes` should return 503 # and unavailable responses. - assert requests.get("http://127.0.0.1:8000/-/healthz").text == "success" - assert requests.get("http://127.0.0.1:8000/-/healthz").status_code == 200 - assert requests.get("http://127.0.0.1:8000/-/routes").text == "{}" - assert requests.get("http://127.0.0.1:8000/-/routes").status_code == 200 - assert ( - requests.get("http://127.0.0.1:8001/-/healthz").text - == "This node is being drained." + wait_for_condition( + condition_predictor=check_request, + url="http://127.0.0.1:8000/-/healthz", + expected_code=200, + expected_text="success", ) - assert requests.get("http://127.0.0.1:8001/-/healthz").status_code == 503 - assert ( - requests.get("http://127.0.0.1:8001/-/routes").text - == "This node is being drained." + wait_for_condition( + condition_predictor=check_request, + url="http://127.0.0.1:8000/-/routes", + expected_code=200, + expected_text="{}", + ) + wait_for_condition( + condition_predictor=check_request, + url="http://127.0.0.1:8001/-/healthz", + expected_code=503, + expected_text="This node is being drained.", + ) + wait_for_condition( + condition_predictor=check_request, + url="http://127.0.0.1:8001/-/routes", + expected_code=503, + expected_text="This node is being drained.", ) - assert requests.get("http://127.0.0.1:8001/-/routes").status_code == 503 # Clean up serve. serve.shutdown() diff --git a/python/ray/serve/tests/test_util.py b/python/ray/serve/tests/test_util.py index 5237ee7bf98b..533b4c861d27 100644 --- a/python/ray/serve/tests/test_util.py +++ b/python/ray/serve/tests/test_util.py @@ -4,6 +4,7 @@ import sys import tempfile from copy import deepcopy +from unittest.mock import patch import numpy as np import pytest @@ -20,7 +21,9 @@ msgpack_deserialize, snake_to_camel_case, dict_keys_snake_to_camel_case, + get_head_node_id, ) +from ray._private.resource_spec import HEAD_NODE_RESOURCE_NAME def test_serialize(): @@ -531,6 +534,34 @@ def test_shallow_copy(self): assert camel_dict["nested"]["list2"] is list2 +def test_get_head_node_id(): + """Test get_head_node_id() returning the correct head node id. + + When there are woker node, dead head node, and other alive head nodes, + get_head_node_id() should return the node id of the first alive head node. + """ + nodes = [ + {"NodeID": "worker_node1", "Alive": True, "Resources": {"CPU": 1}}, + { + "NodeID": "dead_head_node1", + "Alive": False, + "Resources": {"CPU": 1, HEAD_NODE_RESOURCE_NAME: 1.0}, + }, + { + "NodeID": "alive_head_node1", + "Alive": True, + "Resources": {"CPU": 1, HEAD_NODE_RESOURCE_NAME: 1.0}, + }, + { + "NodeID": "alive_head_node2", + "Alive": True, + "Resources": {"CPU": 1, HEAD_NODE_RESOURCE_NAME: 1.0}, + }, + ] + with patch("ray.nodes", return_value=nodes): + assert get_head_node_id() == "alive_head_node1" + + if __name__ == "__main__": import sys From cc0daf4c6e9176487653d5066e512b5b013a946a Mon Sep 17 00:00:00 2001 From: Gene Su Date: Wed, 28 Jun 2023 13:56:43 -0700 Subject: [PATCH 04/22] address somments Signed-off-by: Gene Su --- python/ray/serve/_private/http_state.py | 6 +-- python/ray/serve/_private/utils.py | 3 +- python/ray/serve/controller.py | 8 ++-- python/ray/serve/tests/test_http_state.py | 17 +++----- python/ray/serve/tests/test_standalone3.py | 51 ++++++++-------------- python/ray/serve/tests/test_util.py | 5 +++ 6 files changed, 38 insertions(+), 52 deletions(-) diff --git a/python/ray/serve/_private/http_state.py b/python/ray/serve/_private/http_state.py index 1730842d6b62..895d35989eb4 100644 --- a/python/ray/serve/_private/http_state.py +++ b/python/ray/serve/_private/http_state.py @@ -27,7 +27,6 @@ from ray.serve._private.utils import ( format_actor_name, get_all_node_ids, - get_head_node_id, ) from ray.serve._private.common import EndpointTag, NodeId, HTTPProxyStatus from ray.serve.schema import HTTPProxyDetails @@ -243,6 +242,7 @@ def __init__( controller_name: str, detached: bool, config: HTTPOptions, + head_node_id: str, gcs_client: GcsClient, # Used by unit testing _start_proxies_on_init: bool = True, @@ -254,11 +254,11 @@ def __init__( else: self._config = HTTPOptions() self._proxy_states: Dict[NodeId, HTTPProxyState] = dict() - self._head_node_id: str = get_head_node_id() + self._head_node_id: str = head_node_id self._gcs_client = gcs_client - assert isinstance(self._head_node_id, str) + assert isinstance(head_node_id, str) # Will populate self.proxy_actors with existing actors. if _start_proxies_on_init: diff --git a/python/ray/serve/_private/utils.py b/python/ray/serve/_private/utils.py index d6c58daf9731..e4b58f53c596 100644 --- a/python/ray/serve/_private/utils.py +++ b/python/ray/serve/_private/utils.py @@ -681,10 +681,11 @@ def get_head_node_id() -> str: Iterate through all nodes in the ray cluster and return the node id of the first alive node with head node resource. """ - head_node_id = "" + head_node_id = None for node in ray.nodes(): if HEAD_NODE_RESOURCE_NAME in node["Resources"] and node["Alive"]: head_node_id = node["NodeID"] break + assert head_node_id is not None, "Cannot find alive head node." return head_node_id diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index 13de7e33ee98..e7be1deea8fc 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -112,9 +112,9 @@ async def __init__( detached: bool = False, _disable_http_proxy: bool = False, ): - controller_node_id = ray.get_runtime_context().get_node_id() + self._controller_node_id = ray.get_runtime_context().get_node_id() assert ( - controller_node_id == get_head_node_id() + self._controller_node_id == get_head_node_id() ), "Controller must be on the head node." configure_component_logger( @@ -148,6 +148,7 @@ async def __init__( controller_name, detached, http_config, + self._controller_node_id, gcs_client, ) @@ -292,8 +293,7 @@ def _update_active_nodes(self): replicas). If the active nodes set changes, it will notify the long poll client. """ new_active_nodes = self.deployment_state_manager.get_active_node_ids() - head_node_id = ray.get_runtime_context().node_id - new_active_nodes.add(head_node_id) + new_active_nodes.add(self._controller_node_id) if self._active_nodes != new_active_nodes: self._active_nodes = new_active_nodes self.long_poll_host.notify_changed( diff --git a/python/ray/serve/tests/test_http_state.py b/python/ray/serve/tests/test_http_state.py index ab4ece722f8b..6a75f67858f6 100644 --- a/python/ray/serve/tests/test_http_state.py +++ b/python/ray/serve/tests/test_http_state.py @@ -20,11 +20,13 @@ def _make_http_state( http_options: HTTPOptions, + head_node_id: str = HEAD_NODE_ID, ) -> HTTPState: return HTTPState( SERVE_CONTROLLER_NAME, detached=True, config=http_options, + head_node_id=head_node_id, gcs_client=None, _start_proxies_on_init=False, ) @@ -44,13 +46,6 @@ def mock_get_all_node_ids(all_nodes): yield -@pytest.fixture() -def mock_get_head_node_id(all_nodes): - with patch("ray.serve._private.http_state.get_head_node_id") as func: - func.return_value = HEAD_NODE_ID - yield - - @pytest.fixture() def setup_controller(): try: @@ -122,7 +117,7 @@ def _update_and_check_http_state( ) -def test_node_selection(all_nodes, mock_get_all_node_ids, mock_get_head_node_id): +def test_node_selection(all_nodes, mock_get_all_node_ids): # Test NoServer state = _make_http_state(HTTPOptions(location=DeploymentMode.NoServer)) assert state._get_target_nodes() == [] @@ -162,7 +157,7 @@ def test_node_selection(all_nodes, mock_get_all_node_ids, mock_get_head_node_id) def test_http_state_update_restarts_unhealthy_proxies( - mock_get_all_node_ids, setup_controller, mock_get_head_node_id + mock_get_all_node_ids, setup_controller ): """Test the update method in HTTPState would kill and restart unhealthy proxies. @@ -593,9 +588,7 @@ def test_http_proxy_state_update_unhealthy_check_health_succeed(setup_controller @patch("ray.serve._private.http_state.PROXY_HEALTH_CHECK_PERIOD_S", 0.1) -def test_update_draining( - mock_get_all_node_ids, setup_controller, all_nodes, mock_get_head_node_id -): +def test_update_draining(mock_get_all_node_ids, setup_controller, all_nodes): """Test update draining logics. When update nodes to inactive, head node http proxy should never be draining while diff --git a/python/ray/serve/tests/test_standalone3.py b/python/ray/serve/tests/test_standalone3.py index 28ab6e640c04..71f910781776 100644 --- a/python/ray/serve/tests/test_standalone3.py +++ b/python/ray/serve/tests/test_standalone3.py @@ -474,23 +474,17 @@ def check_request(url: str, expected_code: int, expected_text: str): expected_code=200, expected_text="success", ) - wait_for_condition( - condition_predictor=check_request, - url="http://127.0.0.1:8000/-/routes", - expected_code=200, - expected_text='{"/":"default_HelloModel"}', - ) - wait_for_condition( - condition_predictor=check_request, - url="http://127.0.0.1:8001/-/healthz", - expected_code=200, - expected_text="success", + assert requests.get("http://127.0.0.1:8000/-/routes").status_code == 200 + assert ( + requests.get("http://127.0.0.1:8000/-/routes").text + == '{"/":"default_HelloModel"}' ) - wait_for_condition( - condition_predictor=check_request, - url="http://127.0.0.1:8001/-/routes", - expected_code=200, - expected_text='{"/":"default_HelloModel"}', + assert requests.get("http://127.0.0.1:8001/-/healthz").status_code == 200 + assert requests.get("http://127.0.0.1:8001/-/healthz").text == "success" + assert requests.get("http://127.0.0.1:8001/-/routes").status_code == 200 + assert ( + requests.get("http://127.0.0.1:8001/-/routes").text + == '{"/":"default_HelloModel"}' ) # Delete the deployment should bring the active actors down to 3 and drop @@ -522,23 +516,16 @@ def _check(): expected_code=200, expected_text="success", ) - wait_for_condition( - condition_predictor=check_request, - url="http://127.0.0.1:8000/-/routes", - expected_code=200, - expected_text="{}", - ) - wait_for_condition( - condition_predictor=check_request, - url="http://127.0.0.1:8001/-/healthz", - expected_code=503, - expected_text="This node is being drained.", + assert requests.get("http://127.0.0.1:8000/-/routes").text == "{}" + assert requests.get("http://127.0.0.1:8000/-/routes").status_code == 200 + assert ( + requests.get("http://127.0.0.1:8001/-/healthz").text + == "This node is being drained." ) - wait_for_condition( - condition_predictor=check_request, - url="http://127.0.0.1:8001/-/routes", - expected_code=503, - expected_text="This node is being drained.", + assert requests.get("http://127.0.0.1:8001/-/healthz").status_code == 503 + assert ( + requests.get("http://127.0.0.1:8001/-/routes").text + == "This node is being drained." ) # Clean up serve. diff --git a/python/ray/serve/tests/test_util.py b/python/ray/serve/tests/test_util.py index 533b4c861d27..a5e477476b96 100644 --- a/python/ray/serve/tests/test_util.py +++ b/python/ray/serve/tests/test_util.py @@ -539,6 +539,7 @@ def test_get_head_node_id(): When there are woker node, dead head node, and other alive head nodes, get_head_node_id() should return the node id of the first alive head node. + When there are no alive head nodes, get_head_node_id() should raise assertion error. """ nodes = [ {"NodeID": "worker_node1", "Alive": True, "Resources": {"CPU": 1}}, @@ -561,6 +562,10 @@ def test_get_head_node_id(): with patch("ray.nodes", return_value=nodes): assert get_head_node_id() == "alive_head_node1" + with patch("ray.nodes", return_value=[]): + with pytest.raises(AssertionError): + get_head_node_id() + if __name__ == "__main__": import sys From c4664614009d1d5336c1e4aa2989b68fe70ca82e Mon Sep 17 00:00:00 2001 From: Gene Su Date: Wed, 28 Jun 2023 14:04:45 -0700 Subject: [PATCH 05/22] add missing status code assertion Signed-off-by: Gene Su --- python/ray/serve/tests/test_standalone3.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/serve/tests/test_standalone3.py b/python/ray/serve/tests/test_standalone3.py index 71f910781776..4ea6b599f0e6 100644 --- a/python/ray/serve/tests/test_standalone3.py +++ b/python/ray/serve/tests/test_standalone3.py @@ -527,6 +527,7 @@ def _check(): requests.get("http://127.0.0.1:8001/-/routes").text == "This node is being drained." ) + assert requests.get("http://127.0.0.1:8001/-/routes").status_code == 503 # Clean up serve. serve.shutdown() From 4c18bc1b30fc8ec8a483f921ebbd2a6d6af14809 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Wed, 28 Jun 2023 15:25:34 -0700 Subject: [PATCH 06/22] wait for condition to flip for worker node proxies Signed-off-by: Gene Su --- python/ray/serve/tests/test_standalone3.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/python/ray/serve/tests/test_standalone3.py b/python/ray/serve/tests/test_standalone3.py index 4ea6b599f0e6..444d1ee9d020 100644 --- a/python/ray/serve/tests/test_standalone3.py +++ b/python/ray/serve/tests/test_standalone3.py @@ -479,8 +479,12 @@ def check_request(url: str, expected_code: int, expected_text: str): requests.get("http://127.0.0.1:8000/-/routes").text == '{"/":"default_HelloModel"}' ) - assert requests.get("http://127.0.0.1:8001/-/healthz").status_code == 200 - assert requests.get("http://127.0.0.1:8001/-/healthz").text == "success" + wait_for_condition( + condition_predictor=check_request, + url="http://127.0.0.1:8001/-/healthz", + expected_code=200, + expected_text="success", + ) assert requests.get("http://127.0.0.1:8001/-/routes").status_code == 200 assert ( requests.get("http://127.0.0.1:8001/-/routes").text @@ -516,18 +520,19 @@ def _check(): expected_code=200, expected_text="success", ) - assert requests.get("http://127.0.0.1:8000/-/routes").text == "{}" assert requests.get("http://127.0.0.1:8000/-/routes").status_code == 200 - assert ( - requests.get("http://127.0.0.1:8001/-/healthz").text - == "This node is being drained." + assert requests.get("http://127.0.0.1:8000/-/routes").text == "{}" + wait_for_condition( + condition_predictor=check_request, + url="http://127.0.0.1:8001/-/healthz", + expected_code=503, + expected_text="This node is being drained.", ) - assert requests.get("http://127.0.0.1:8001/-/healthz").status_code == 503 + assert requests.get("http://127.0.0.1:8001/-/routes").status_code == 503 assert ( requests.get("http://127.0.0.1:8001/-/routes").text == "This node is being drained." ) - assert requests.get("http://127.0.0.1:8001/-/routes").status_code == 503 # Clean up serve. serve.shutdown() From b8a5d6c2cf04d098d30f5d4e66308ff4d918394a Mon Sep 17 00:00:00 2001 From: Gene Su Date: Wed, 28 Jun 2023 22:13:18 -0700 Subject: [PATCH 07/22] try to debug test_serve_ha Signed-off-by: Gene Su --- python/ray/serve/tests/test_serve_ha.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/serve/tests/test_serve_ha.py b/python/ray/serve/tests/test_serve_ha.py index 87fe6211a313..98998c3e3c50 100644 --- a/python/ray/serve/tests/test_serve_ha.py +++ b/python/ray/serve/tests/test_serve_ha.py @@ -85,6 +85,7 @@ def test_ray_serve_basic(docker_cluster): output = worker.exec_run(cmd=f"python -c '{check_script.format(num_replicas=1)}'") + print(output.output) assert output.exit_code == 0 # Kill the head node From 5564f879164ed2c2699a05021c76dc0cb74ff3b4 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Wed, 28 Jun 2023 22:15:09 -0700 Subject: [PATCH 08/22] add more logging Signed-off-by: Gene Su --- python/ray/serve/tests/test_serve_ha.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/ray/serve/tests/test_serve_ha.py b/python/ray/serve/tests/test_serve_ha.py index 98998c3e3c50..723f4fbc3fd3 100644 --- a/python/ray/serve/tests/test_serve_ha.py +++ b/python/ray/serve/tests/test_serve_ha.py @@ -79,13 +79,15 @@ def test_ray_serve_basic(docker_cluster): output = worker.exec_run(cmd=f"python -c '{scripts.format(num_replicas=1)}'") assert output.exit_code == 0 assert b"Adding 1 replica to deployment Counter." in output.output + print("SERVE SA OUTPUT1", output.output) + # somehow this is not working and the port is not exposed to the host. # worker_cli = worker.client() # print(worker_cli.request("GET", "/api/incr")) output = worker.exec_run(cmd=f"python -c '{check_script.format(num_replicas=1)}'") - print(output.output) + print("SERVE SA OUTPUT2", output.output) assert output.exit_code == 0 # Kill the head node From ef7108d389a580a2c2f4d80da4b5ea5d80c3f8a0 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Thu, 29 Jun 2023 07:37:29 -0700 Subject: [PATCH 09/22] add more loggings Signed-off-by: Gene Su --- python/ray/serve/_private/api.py | 3 +++ python/ray/serve/controller.py | 5 +++++ python/ray/serve/tests/test_serve_ha.py | 2 ++ 3 files changed, 10 insertions(+) diff --git a/python/ray/serve/_private/api.py b/python/ray/serve/_private/api.py index 1a6604daab46..0a88934f2efd 100644 --- a/python/ray/serve/_private/api.py +++ b/python/ray/serve/_private/api.py @@ -145,6 +145,9 @@ def _start_controller( controller_name = SERVE_CONTROLLER_NAME else: controller_name = format_actor_name(get_random_letters(), SERVE_CONTROLLER_NAME) + + head_node_id = ray.get_runtime_context().get_node_id() + logger.info(f"NODE ID FROM API {head_node_id}") controller_actor_options = { "num_cpus": 1 if dedicated_cpu else 0, "name": controller_name, diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index af9c7cf32213..aa750c4cf5e2 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -116,6 +116,7 @@ async def __init__( assert ( self._controller_node_id == get_head_node_id() ), "Controller must be on the head node." + logger.info(f"CONTROLLER NODE ID: {self._controller_node_id}") configure_component_logger( component_name="controller", component_id=str(os.getpid()) @@ -940,6 +941,10 @@ def __init__( except ValueError: self._controller = None if self._controller is None: + head_node_id = ray.get_runtime_context().get_node_id() + logger.info( + f"CREATING CONTROLLER ACTOR {controller_name} on {head_node_id}" + ) http_config = HTTPOptions() http_config.port = http_proxy_port self._controller = ServeController.options( diff --git a/python/ray/serve/tests/test_serve_ha.py b/python/ray/serve/tests/test_serve_ha.py index 723f4fbc3fd3..563a22f27fd8 100644 --- a/python/ray/serve/tests/test_serve_ha.py +++ b/python/ray/serve/tests/test_serve_ha.py @@ -40,6 +40,8 @@ def pid(self): serve.start(detached=True) Counter.options(num_replicas={num_replicas}).deploy() + +print(ray.nodes()) """ check_script = """ From 47ced871643afd430c39312f8910d2b70de300ac Mon Sep 17 00:00:00 2001 From: Gene Su Date: Thu, 29 Jun 2023 12:51:37 -0700 Subject: [PATCH 10/22] ping healthz for health check Signed-off-by: Gene Su --- python/ray/serve/tests/test_serve_ha.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/python/ray/serve/tests/test_serve_ha.py b/python/ray/serve/tests/test_serve_ha.py index 563a22f27fd8..822ee009d785 100644 --- a/python/ray/serve/tests/test_serve_ha.py +++ b/python/ray/serve/tests/test_serve_ha.py @@ -47,6 +47,16 @@ def pid(self): check_script = """ import requests import json +import time + + +start_time = time.time() +while time.time() - start_time < 30: + healthz = requests.get("http://127.0.0.1:8000/-/healthz/").text + print(f"HEALTHZ: {healthz}") + if healthz == "success": + break + if {num_replicas} == 1: b = json.loads(requests.get("http://127.0.0.1:8000/api/").text)["count"] for i in range(5): From 53aa78ef1855e6b3f960b0a015247d669fdebd06 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Thu, 29 Jun 2023 14:03:35 -0700 Subject: [PATCH 11/22] escape {} Signed-off-by: Gene Su --- python/ray/serve/tests/test_serve_ha.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/serve/tests/test_serve_ha.py b/python/ray/serve/tests/test_serve_ha.py index 822ee009d785..b891382cf054 100644 --- a/python/ray/serve/tests/test_serve_ha.py +++ b/python/ray/serve/tests/test_serve_ha.py @@ -53,7 +53,7 @@ def pid(self): start_time = time.time() while time.time() - start_time < 30: healthz = requests.get("http://127.0.0.1:8000/-/healthz/").text - print(f"HEALTHZ: {healthz}") + print(f"HEALTHZ: {{healthz}}") if healthz == "success": break From 200e625e0823212b5518e403cb138d8cddf5c768 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Thu, 29 Jun 2023 15:03:35 -0700 Subject: [PATCH 12/22] comment out http state assertion Signed-off-by: Gene Su --- python/ray/serve/_private/http_state.py | 8 ++++---- python/ray/serve/tests/test_serve_ha.py | 6 +++++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/python/ray/serve/_private/http_state.py b/python/ray/serve/_private/http_state.py index 45cd98003e14..7483cb2b23a4 100644 --- a/python/ray/serve/_private/http_state.py +++ b/python/ray/serve/_private/http_state.py @@ -321,10 +321,10 @@ def _get_target_nodes(self) -> List[Tuple[str, str]]: for node_id, ip_address in target_nodes if node_id == self._head_node_id ] - assert len(nodes) == 1, ( - f"Head node not found! Head node id: {self._head_node_id}, " - f"all nodes: {target_nodes}." - ) + # assert len(nodes) == 1, ( + # f"Head node not found! Head node id: {self._head_node_id}, " + # f"all nodes: {target_nodes}." + # ) return nodes if location == DeploymentMode.FixedNumber: diff --git a/python/ray/serve/tests/test_serve_ha.py b/python/ray/serve/tests/test_serve_ha.py index b891382cf054..e6eeb31f5d53 100644 --- a/python/ray/serve/tests/test_serve_ha.py +++ b/python/ray/serve/tests/test_serve_ha.py @@ -41,10 +41,14 @@ def pid(self): Counter.options(num_replicas={num_replicas}).deploy() -print(ray.nodes()) +print("ray nodes in setup script", ray.nodes()) """ check_script = """ +import ray +print("ray nodes in check script", ray.nodes()) + + import requests import json import time From 2ebf27d6c5834c1726a120fafb99935f020d6aee Mon Sep 17 00:00:00 2001 From: Gene Su Date: Thu, 29 Jun 2023 16:05:44 -0700 Subject: [PATCH 13/22] init ray before calling ray.nodes() Signed-off-by: Gene Su --- python/ray/serve/tests/test_serve_ha.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/serve/tests/test_serve_ha.py b/python/ray/serve/tests/test_serve_ha.py index e6eeb31f5d53..07aec8850c6c 100644 --- a/python/ray/serve/tests/test_serve_ha.py +++ b/python/ray/serve/tests/test_serve_ha.py @@ -46,6 +46,7 @@ def pid(self): check_script = """ import ray +ray.init(address="auto", namespace="g") print("ray nodes in check script", ray.nodes()) From 2cd61f138edb713af93f0d7fa7a2fe4eb54c848f Mon Sep 17 00:00:00 2001 From: Gene Su Date: Thu, 29 Jun 2023 17:46:21 -0700 Subject: [PATCH 14/22] revert http state assertion and add optional head node id on controller Signed-off-by: Gene Su --- python/ray/serve/_private/api.py | 2 ++ python/ray/serve/_private/http_state.py | 8 ++++---- python/ray/serve/controller.py | 2 ++ 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/python/ray/serve/_private/api.py b/python/ray/serve/_private/api.py index 0a88934f2efd..77a49072b255 100644 --- a/python/ray/serve/_private/api.py +++ b/python/ray/serve/_private/api.py @@ -163,6 +163,7 @@ def _start_controller( controller = ServeController.options(**controller_actor_options).remote( controller_name, http_config=http_options, + head_node_id=head_node_id, detached=detached, _disable_http_proxy=True, ) @@ -184,6 +185,7 @@ def _start_controller( controller = ServeController.options(**controller_actor_options).remote( controller_name, http_config=http_options, + head_node_id=head_node_id, detached=detached, ) diff --git a/python/ray/serve/_private/http_state.py b/python/ray/serve/_private/http_state.py index 7483cb2b23a4..45cd98003e14 100644 --- a/python/ray/serve/_private/http_state.py +++ b/python/ray/serve/_private/http_state.py @@ -321,10 +321,10 @@ def _get_target_nodes(self) -> List[Tuple[str, str]]: for node_id, ip_address in target_nodes if node_id == self._head_node_id ] - # assert len(nodes) == 1, ( - # f"Head node not found! Head node id: {self._head_node_id}, " - # f"all nodes: {target_nodes}." - # ) + assert len(nodes) == 1, ( + f"Head node not found! Head node id: {self._head_node_id}, " + f"all nodes: {target_nodes}." + ) return nodes if location == DeploymentMode.FixedNumber: diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index aa750c4cf5e2..e8432833fdd0 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -109,6 +109,7 @@ async def __init__( controller_name: str, *, http_config: HTTPOptions, + head_node_id: str = "", detached: bool = False, _disable_http_proxy: bool = False, ): @@ -117,6 +118,7 @@ async def __init__( self._controller_node_id == get_head_node_id() ), "Controller must be on the head node." logger.info(f"CONTROLLER NODE ID: {self._controller_node_id}") + logger.indo(f"PASSED HEAD NODE ID: {head_node_id}") configure_component_logger( component_name="controller", component_id=str(os.getpid()) From c7e31d20d58253d2767c6d80719d2661dff61462 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Thu, 29 Jun 2023 20:38:18 -0700 Subject: [PATCH 15/22] fix typo Signed-off-by: Gene Su --- python/ray/serve/controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index e8432833fdd0..bbdbdbe78d3a 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -118,7 +118,7 @@ async def __init__( self._controller_node_id == get_head_node_id() ), "Controller must be on the head node." logger.info(f"CONTROLLER NODE ID: {self._controller_node_id}") - logger.indo(f"PASSED HEAD NODE ID: {head_node_id}") + logger.info(f"PASSED HEAD NODE ID: {head_node_id}") configure_component_logger( component_name="controller", component_id=str(os.getpid()) From ea19d953ecc3c4c95859784e01587bef23f2bf39 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Thu, 29 Jun 2023 22:03:59 -0700 Subject: [PATCH 16/22] try to deploy on header Signed-off-by: Gene Su --- python/ray/serve/tests/test_serve_ha.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/serve/tests/test_serve_ha.py b/python/ray/serve/tests/test_serve_ha.py index 07aec8850c6c..8e7e7dad4994 100644 --- a/python/ray/serve/tests/test_serve_ha.py +++ b/python/ray/serve/tests/test_serve_ha.py @@ -93,7 +93,7 @@ def test_ray_serve_basic(docker_cluster): # - Make sure no task can run in the raylet where GCS is deployed. head, worker = docker_cluster - output = worker.exec_run(cmd=f"python -c '{scripts.format(num_replicas=1)}'") + output = head.exec_run(cmd=f"python -c '{scripts.format(num_replicas=1)}'") assert output.exit_code == 0 assert b"Adding 1 replica to deployment Counter." in output.output print("SERVE SA OUTPUT1", output.output) From 55fabb658d26e1c9260c75f6165bc59e015ad019 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Thu, 29 Jun 2023 23:25:50 -0700 Subject: [PATCH 17/22] call check scripts on the head node Signed-off-by: Gene Su --- python/ray/serve/tests/test_serve_ha.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/ray/serve/tests/test_serve_ha.py b/python/ray/serve/tests/test_serve_ha.py index 8e7e7dad4994..c2b6d30e0879 100644 --- a/python/ray/serve/tests/test_serve_ha.py +++ b/python/ray/serve/tests/test_serve_ha.py @@ -93,7 +93,7 @@ def test_ray_serve_basic(docker_cluster): # - Make sure no task can run in the raylet where GCS is deployed. head, worker = docker_cluster - output = head.exec_run(cmd=f"python -c '{scripts.format(num_replicas=1)}'") + output = worker.exec_run(cmd=f"python -c '{scripts.format(num_replicas=1)}'") assert output.exit_code == 0 assert b"Adding 1 replica to deployment Counter." in output.output print("SERVE SA OUTPUT1", output.output) @@ -102,7 +102,7 @@ def test_ray_serve_basic(docker_cluster): # worker_cli = worker.client() # print(worker_cli.request("GET", "/api/incr")) - output = worker.exec_run(cmd=f"python -c '{check_script.format(num_replicas=1)}'") + output = head.exec_run(cmd=f"python -c '{check_script.format(num_replicas=1)}'") print("SERVE SA OUTPUT2", output.output) assert output.exit_code == 0 @@ -111,7 +111,7 @@ def test_ray_serve_basic(docker_cluster): head.kill() # Make sure serve is still working - output = worker.exec_run(cmd=f"python -c '{check_script.format(num_replicas=1)}'") + output = head.exec_run(cmd=f"python -c '{check_script.format(num_replicas=1)}'") assert output.exit_code == 0 # Script is running on another thread so that it won't block the main thread. @@ -129,7 +129,7 @@ def reconfig(): t.join() - output = worker.exec_run(cmd=f"python -c '{check_script.format(num_replicas=2)}'") + output = head.exec_run(cmd=f"python -c '{check_script.format(num_replicas=2)}'") assert output.exit_code == 0 # Make sure the serve controller still runs on the head node after restart From ed9aa0362ee4fc7c48d438634524d0bf85b43e2a Mon Sep 17 00:00:00 2001 From: Gene Su Date: Fri, 30 Jun 2023 08:00:08 -0700 Subject: [PATCH 18/22] revert to run check script on worker, log http proxy logs, and test worker.client() Signed-off-by: Gene Su --- python/ray/serve/tests/test_serve_ha.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/python/ray/serve/tests/test_serve_ha.py b/python/ray/serve/tests/test_serve_ha.py index c2b6d30e0879..55f0893b9401 100644 --- a/python/ray/serve/tests/test_serve_ha.py +++ b/python/ray/serve/tests/test_serve_ha.py @@ -57,7 +57,7 @@ def pid(self): start_time = time.time() while time.time() - start_time < 30: - healthz = requests.get("http://127.0.0.1:8000/-/healthz/").text + healthz = requests.get("http://localhost:8000/-/healthz/").text print(f"HEALTHZ: {{healthz}}") if healthz == "success": break @@ -102,16 +102,25 @@ def test_ray_serve_basic(docker_cluster): # worker_cli = worker.client() # print(worker_cli.request("GET", "/api/incr")) - output = head.exec_run(cmd=f"python -c '{check_script.format(num_replicas=1)}'") + output = head.exec_run(cmd="grep -r 'HTTP proxy' /tmp/ray/session_latest/logs/") + print("head node logs", output.output) + output = worker.exec_run(cmd="grep -r 'HTTP proxy' /tmp/ray/session_latest/logs/") + print("worker node logs", output.output) + + output = worker.exec_run(cmd=f"python -c '{check_script.format(num_replicas=1)}'") print("SERVE SA OUTPUT2", output.output) + + # test if this works + worker_cli = worker.client() + print(worker_cli.request("GET", "/api/incr")) assert output.exit_code == 0 # Kill the head node head.kill() # Make sure serve is still working - output = head.exec_run(cmd=f"python -c '{check_script.format(num_replicas=1)}'") + output = worker.exec_run(cmd=f"python -c '{check_script.format(num_replicas=1)}'") assert output.exit_code == 0 # Script is running on another thread so that it won't block the main thread. @@ -129,7 +138,7 @@ def reconfig(): t.join() - output = head.exec_run(cmd=f"python -c '{check_script.format(num_replicas=2)}'") + output = worker.exec_run(cmd=f"python -c '{check_script.format(num_replicas=2)}'") assert output.exit_code == 0 # Make sure the serve controller still runs on the head node after restart From 5495ea9264d37d0e5d654a6e424ad5357ba16bd0 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Fri, 30 Jun 2023 09:24:12 -0700 Subject: [PATCH 19/22] log nodes in get_all_node_ids Signed-off-by: Gene Su --- python/ray/serve/_private/utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/serve/_private/utils.py b/python/ray/serve/_private/utils.py index e4b58f53c596..192734a7ff21 100644 --- a/python/ray/serve/_private/utils.py +++ b/python/ray/serve/_private/utils.py @@ -181,6 +181,8 @@ def get_all_node_ids(gcs_client) -> List[Tuple[str, str]]: passed into the Ray SchedulingPolicy API. """ nodes = gcs_client.get_all_node_info(timeout=RAY_GCS_RPC_TIMEOUT_S) + logger.info(f"HTTP proxy all nodes: {nodes}") + logger.info(f"HTTP proxy ray.nodes: {ray.nodes()}") node_ids = [ (ray.NodeID.from_binary(node_id).hex(), node["node_name"].decode("utf-8")) for (node_id, node) in nodes.items() From 52bee0095a38e29cef6288bcc502a141f6bac5b8 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Fri, 30 Jun 2023 09:46:24 -0700 Subject: [PATCH 20/22] revert debug changes and add location to serve.start() Signed-off-by: Gene Su --- python/ray/serve/_private/api.py | 4 ---- python/ray/serve/_private/utils.py | 2 -- python/ray/serve/controller.py | 7 ------ python/ray/serve/tests/test_serve_ha.py | 29 +------------------------ 4 files changed, 1 insertion(+), 41 deletions(-) diff --git a/python/ray/serve/_private/api.py b/python/ray/serve/_private/api.py index 77a49072b255..db04df80e7d6 100644 --- a/python/ray/serve/_private/api.py +++ b/python/ray/serve/_private/api.py @@ -146,8 +146,6 @@ def _start_controller( else: controller_name = format_actor_name(get_random_letters(), SERVE_CONTROLLER_NAME) - head_node_id = ray.get_runtime_context().get_node_id() - logger.info(f"NODE ID FROM API {head_node_id}") controller_actor_options = { "num_cpus": 1 if dedicated_cpu else 0, "name": controller_name, @@ -163,7 +161,6 @@ def _start_controller( controller = ServeController.options(**controller_actor_options).remote( controller_name, http_config=http_options, - head_node_id=head_node_id, detached=detached, _disable_http_proxy=True, ) @@ -185,7 +182,6 @@ def _start_controller( controller = ServeController.options(**controller_actor_options).remote( controller_name, http_config=http_options, - head_node_id=head_node_id, detached=detached, ) diff --git a/python/ray/serve/_private/utils.py b/python/ray/serve/_private/utils.py index 192734a7ff21..e4b58f53c596 100644 --- a/python/ray/serve/_private/utils.py +++ b/python/ray/serve/_private/utils.py @@ -181,8 +181,6 @@ def get_all_node_ids(gcs_client) -> List[Tuple[str, str]]: passed into the Ray SchedulingPolicy API. """ nodes = gcs_client.get_all_node_info(timeout=RAY_GCS_RPC_TIMEOUT_S) - logger.info(f"HTTP proxy all nodes: {nodes}") - logger.info(f"HTTP proxy ray.nodes: {ray.nodes()}") node_ids = [ (ray.NodeID.from_binary(node_id).hex(), node["node_name"].decode("utf-8")) for (node_id, node) in nodes.items() diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index bbdbdbe78d3a..af9c7cf32213 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -109,7 +109,6 @@ async def __init__( controller_name: str, *, http_config: HTTPOptions, - head_node_id: str = "", detached: bool = False, _disable_http_proxy: bool = False, ): @@ -117,8 +116,6 @@ async def __init__( assert ( self._controller_node_id == get_head_node_id() ), "Controller must be on the head node." - logger.info(f"CONTROLLER NODE ID: {self._controller_node_id}") - logger.info(f"PASSED HEAD NODE ID: {head_node_id}") configure_component_logger( component_name="controller", component_id=str(os.getpid()) @@ -943,10 +940,6 @@ def __init__( except ValueError: self._controller = None if self._controller is None: - head_node_id = ray.get_runtime_context().get_node_id() - logger.info( - f"CREATING CONTROLLER ACTOR {controller_name} on {head_node_id}" - ) http_config = HTTPOptions() http_config.port = http_proxy_port self._controller = ServeController.options( diff --git a/python/ray/serve/tests/test_serve_ha.py b/python/ray/serve/tests/test_serve_ha.py index 55f0893b9401..0d2577316ba4 100644 --- a/python/ray/serve/tests/test_serve_ha.py +++ b/python/ray/serve/tests/test_serve_ha.py @@ -37,31 +37,16 @@ def pid(self): import os return {{"pid": os.getpid()}} -serve.start(detached=True) +serve.start(detached=True, location="EveryNode") Counter.options(num_replicas={num_replicas}).deploy() - -print("ray nodes in setup script", ray.nodes()) """ check_script = """ -import ray -ray.init(address="auto", namespace="g") -print("ray nodes in check script", ray.nodes()) - - import requests import json -import time -start_time = time.time() -while time.time() - start_time < 30: - healthz = requests.get("http://localhost:8000/-/healthz/").text - print(f"HEALTHZ: {{healthz}}") - if healthz == "success": - break - if {num_replicas} == 1: b = json.loads(requests.get("http://127.0.0.1:8000/api/").text)["count"] for i in range(5): @@ -96,24 +81,12 @@ def test_ray_serve_basic(docker_cluster): output = worker.exec_run(cmd=f"python -c '{scripts.format(num_replicas=1)}'") assert output.exit_code == 0 assert b"Adding 1 replica to deployment Counter." in output.output - print("SERVE SA OUTPUT1", output.output) - # somehow this is not working and the port is not exposed to the host. # worker_cli = worker.client() # print(worker_cli.request("GET", "/api/incr")) - output = head.exec_run(cmd="grep -r 'HTTP proxy' /tmp/ray/session_latest/logs/") - print("head node logs", output.output) - output = worker.exec_run(cmd="grep -r 'HTTP proxy' /tmp/ray/session_latest/logs/") - print("worker node logs", output.output) - output = worker.exec_run(cmd=f"python -c '{check_script.format(num_replicas=1)}'") - print("SERVE SA OUTPUT2", output.output) - - # test if this works - worker_cli = worker.client() - print(worker_cli.request("GET", "/api/incr")) assert output.exit_code == 0 # Kill the head node From f9c86ba2a9f762677ad22e94b48e6df03f6b7a6e Mon Sep 17 00:00:00 2001 From: Gene Su Date: Fri, 30 Jun 2023 09:54:47 -0700 Subject: [PATCH 21/22] fix http_options config Signed-off-by: Gene Su --- python/ray/serve/tests/test_serve_ha.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/ray/serve/tests/test_serve_ha.py b/python/ray/serve/tests/test_serve_ha.py index 0d2577316ba4..8bfe9248ae0c 100644 --- a/python/ray/serve/tests/test_serve_ha.py +++ b/python/ray/serve/tests/test_serve_ha.py @@ -37,7 +37,7 @@ def pid(self): import os return {{"pid": os.getpid()}} -serve.start(detached=True, location="EveryNode") +serve.start(detached=True, http_options={"location": "EveryNode"}) Counter.options(num_replicas={num_replicas}).deploy() """ @@ -45,8 +45,6 @@ def pid(self): check_script = """ import requests import json - - if {num_replicas} == 1: b = json.loads(requests.get("http://127.0.0.1:8000/api/").text)["count"] for i in range(5): From 0459a1b86137ca98b5cb46c0fed68e2c760db5f6 Mon Sep 17 00:00:00 2001 From: Gene Su Date: Fri, 30 Jun 2023 10:50:35 -0700 Subject: [PATCH 22/22] escape http_options Signed-off-by: Gene Su --- python/ray/serve/tests/test_serve_ha.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/serve/tests/test_serve_ha.py b/python/ray/serve/tests/test_serve_ha.py index 8bfe9248ae0c..8cef448f52ad 100644 --- a/python/ray/serve/tests/test_serve_ha.py +++ b/python/ray/serve/tests/test_serve_ha.py @@ -37,7 +37,7 @@ def pid(self): import os return {{"pid": os.getpid()}} -serve.start(detached=True, http_options={"location": "EveryNode"}) +serve.start(detached=True, http_options={{"location": "EveryNode"}}) Counter.options(num_replicas={num_replicas}).deploy() """