From eced1dd54b243f6ad83259c93ac29719e839645a Mon Sep 17 00:00:00 2001 From: rickyyx Date: Wed, 29 Mar 2023 00:35:59 +0000 Subject: [PATCH 1/4] init Signed-off-by: rickyyx --- dashboard/modules/job/tests/test_job_agent.py | 58 ++++++++++++++++++- python/ray/_private/node.py | 8 +++ python/ray/_private/ray_constants.py | 10 ++++ python/ray/_private/services.py | 2 + python/ray/_private/test_utils.py | 21 +++++++ python/ray/tests/test_ray_init_2.py | 47 ++++++++++++++- 6 files changed, 144 insertions(+), 2 deletions(-) diff --git a/dashboard/modules/job/tests/test_job_agent.py b/dashboard/modules/job/tests/test_job_agent.py index 1d2af565390ea..62c939e41d30f 100644 --- a/dashboard/modules/job/tests/test_job_agent.py +++ b/dashboard/modules/job/tests/test_job_agent.py @@ -1,6 +1,7 @@ import logging import os -from ray._private.utils import get_or_create_event_loop +import ray + import requests import shutil import sys @@ -11,6 +12,7 @@ import pytest import yaml +from ray._private.utils import get_or_create_event_loop from ray._private.gcs_utils import GcsAioClient from ray._private.runtime_env.working_dir import upload_working_dir_if_needed from ray._private.runtime_env.py_modules import upload_py_modules_if_needed @@ -21,6 +23,8 @@ wait_until_server_available, wait_for_condition, run_string_as_driver_nonblocking, + get_current_unused_port, + async_wait_for_condition_async_predicate, ) from ray.dashboard.modules.job.common import JobSubmitRequest from ray.dashboard.modules.job.utils import ( @@ -518,5 +522,57 @@ def test_agent_logs_not_streamed_to_drivers(): assert "(raylet)" not in err_str +@pytest.mark.asyncio +async def test_non_default_dashboard_agent_http_port(tmp_path): + """Test that we can connect to the dashboard agent with a non-default + http port. + """ + import subprocess + + cmd = ( + "ray start --head --block " + f"--dashboard-agent-listen-port {get_current_unused_port()}" + ) + proc = subprocess.Popen(cmd.split(" ")) + + async def verify(): + address_info = ray.init("auto", ignore_reinit_error=True).address_info + + ip, _ = address_info["webui_url"].split(":") + dashboard_agent_listen_port = address_info["dashboard_agent_listen_port"] + agent_address = f"{ip}:{dashboard_agent_listen_port}" + print(agent_address) + + agent_client = JobAgentSubmissionClient(format_web_url(agent_address)) + + assert wait_until_server_available(agent_address) + + # Submit a job through the agent. + runtime_env = RuntimeEnv().to_dict() + request = validate_request_type( + { + "runtime_env": runtime_env, + "entrypoint": "echo hello", + }, + JobSubmitRequest, + ) + + submit_result = await agent_client.submit_job_internal(request) + + job_id = submit_result.submission_id + print(job_id) + resp = await agent_client.get_job_logs_internal(job_id) + assert "hello" in resp.logs, resp.logs + + return True + + try: + await async_wait_for_condition_async_predicate(verify, retry_interval_ms=2000) + finally: + proc.terminate() + proc.wait() + subprocess.check_output("ray stop --force", shell=True) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index a01a29cc145c5..83ef223ed065b 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -263,10 +263,15 @@ def __init__( self._metrics_export_port = self._get_cached_port( "metrics_export_port", default_port=ray_params.metrics_export_port ) + self._dashboard_agent_listen_port = self._get_cached_port( + "dashboard_agent_listen_port", + default_port=ray_params.dashboard_agent_listen_port, + ) ray_params.update_if_absent( metrics_agent_port=self.metrics_agent_port, metrics_export_port=self._metrics_export_port, + dashboard_agent_listen_port=self._dashboard_agent_listen_port, ) # Pick a GCS server port. @@ -807,6 +812,9 @@ def _get_cached_port( """ file_path = os.path.join(self.get_session_dir_path(), "ports_by_node.json") + # Make sure only the ports in RAY_CACHED_PORTS are cached. + assert port_name in ray_constants.RAY_ALLOWED_CACHED_PORTS + # Maps a Node.unique_id to a dict that maps port names to port numbers. ports_by_node: Dict[str, Dict[str, int]] = defaultdict(dict) diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index 52d86c4a35da4..68a834d6980b4 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -390,3 +390,13 @@ def gcs_actor_scheduling_enabled(): ENABLE_RAY_CLUSTERS_ENV_VAR, not (sys.platform == "darwin" or sys.platform == "win32"), ) + + +# The allowed cached ports in Ray. Refer to Port configuration for more details: +# https://docs.ray.io/en/latest/ray-core/configure.html#ports-configurations +RAY_ALLOWED_CACHED_PORTS = { + "metrics_agent_port", + "metrics_export_port", + "dashboard_agent_listen_port", + "gcs_server_port", +} diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 384819492f7e9..21cd6bbe98cfb 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -1416,6 +1416,8 @@ def start_raylet( redis_password: The password to use when connecting to Redis. metrics_agent_port: The port where metrics agent is bound to. metrics_export_port: The port at which metrics are exposed to. + dashboard_agent_listen_port: The port at which the dashboard agent + listens to for http. use_valgrind: True if the raylet should be started inside of valgrind. If this is True, use_profiler must be False. use_profiler: True if the raylet should be started inside diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index 8a73a0e60b266..034ae2ea57206 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -1821,3 +1821,24 @@ def safe_write_to_results_json( os.replace(test_output_json_tmp, test_output_json) logger.info(f"Wrote results to {test_output_json}") logger.info(json.dumps(result)) + + +def get_current_unused_port(): + """ + Returns a port number that is not currently in use. + + This is useful for testing when we need to bind to a port but don't + care which one. + + Returns: + A port number that is not currently in use. (Note that this port + might become used by the time you try to bind to it.) + """ + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + # Bind the socket to a local address with a random port number + sock.bind(("localhost", 0)) + + port = sock.getsockname()[1] + sock.close() + return port diff --git a/python/ray/tests/test_ray_init_2.py b/python/ray/tests/test_ray_init_2.py index e593c7e7e6450..ddf0e66782b20 100644 --- a/python/ray/tests/test_ray_init_2.py +++ b/python/ray/tests/test_ray_init_2.py @@ -9,7 +9,11 @@ from ray._private.ray_constants import RAY_OVERRIDE_DASHBOARD_URL, DEFAULT_RESOURCES import ray._private.services from ray.dashboard.utils import ray_address_to_api_server_url -from ray._private.test_utils import run_string_as_driver +from ray._private.test_utils import ( + get_current_unused_port, + run_string_as_driver, + wait_for_condition, +) from ray.util.client.ray_client_helpers import ray_start_client_server @@ -232,6 +236,47 @@ def test_ports_assignment(ray_start_cluster): ) +def test_non_default_ports_visible_on_init(shutdown_only): + import subprocess + + ports = { + "dashboard_agent_grpc_port": get_current_unused_port(), + "metrics_export_port": get_current_unused_port(), + "dashboard_agent_listen_port": get_current_unused_port(), + "port": get_current_unused_port(), # gcs_server_port + "node_manager_port": get_current_unused_port(), + } + # Start a ray head node with customized ports. + cmd = "ray start --head --block".split(" ") + for port_name, port in ports.items(): + # replace "_" with "-" + port_name = port_name.replace("_", "-") + cmd += ["--" + port_name, str(port)] + + print(" ".join(cmd)) + proc = subprocess.Popen(cmd) + + # From the connected node + def verify(): + # Connect to the node and check ports + print(ray.init("auto", ignore_reinit_error=True)) + + node = ray.worker.global_worker.node + assert node.metrics_agent_port == ports["dashboard_agent_grpc_port"] + assert node.metrics_export_port == ports["metrics_export_port"] + assert node.dashboard_agent_listen_port == ports["dashboard_agent_listen_port"] + assert str(ports["port"]) in node.gcs_address + assert node.node_manager_port == ports["node_manager_port"] + return True + + try: + wait_for_condition(verify, timeout=10, retry_interval_ms=2000) + finally: + proc.terminate() + proc.wait() + subprocess.check_output("ray stop --force", shell=True) + + @pytest.mark.skipif(sys.platform != "linux", reason="skip except linux") def test_ray_init_from_workers(ray_start_cluster): cluster = ray_start_cluster From b98145a7ae6dafa7de7ed076b92de154287ddd88 Mon Sep 17 00:00:00 2001 From: rickyyx Date: Wed, 29 Mar 2023 00:39:05 +0000 Subject: [PATCH 2/4] commment Signed-off-by: rickyyx --- python/ray/_private/ray_constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index 68a834d6980b4..2f967f45102ab 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -398,5 +398,5 @@ def gcs_actor_scheduling_enabled(): "metrics_agent_port", "metrics_export_port", "dashboard_agent_listen_port", - "gcs_server_port", + "gcs_server_port", # the `port` option for gcs port. } From 9398af347121771ea77f44dff63bf7e2bd25e7f8 Mon Sep 17 00:00:00 2001 From: rickyyx Date: Wed, 29 Mar 2023 01:01:02 +0000 Subject: [PATCH 3/4] lint Signed-off-by: rickyyx --- python/ray/_private/ray_constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index 2f967f45102ab..de3bd1f0e014b 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -398,5 +398,5 @@ def gcs_actor_scheduling_enabled(): "metrics_agent_port", "metrics_export_port", "dashboard_agent_listen_port", - "gcs_server_port", # the `port` option for gcs port. + "gcs_server_port", # the `port` option for gcs port. } From c512e95fbc5578b91dc5a77d60f27a5c1f608eda Mon Sep 17 00:00:00 2001 From: rickyyx Date: Fri, 31 Mar 2023 00:55:34 +0000 Subject: [PATCH 4/4] fix Signed-off-by: rickyyx --- dashboard/modules/job/tests/test_job_agent.py | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/dashboard/modules/job/tests/test_job_agent.py b/dashboard/modules/job/tests/test_job_agent.py index 62c939e41d30f..159577d02544a 100644 --- a/dashboard/modules/job/tests/test_job_agent.py +++ b/dashboard/modules/job/tests/test_job_agent.py @@ -530,20 +530,21 @@ async def test_non_default_dashboard_agent_http_port(tmp_path): import subprocess cmd = ( - "ray start --head --block " - f"--dashboard-agent-listen-port {get_current_unused_port()}" + "ray start --head " f"--dashboard-agent-listen-port {get_current_unused_port()}" ) - proc = subprocess.Popen(cmd.split(" ")) + subprocess.check_output(cmd, shell=True) - async def verify(): + try: + # We will need to wait for the ray to be started in the subprocess. address_info = ray.init("auto", ignore_reinit_error=True).address_info ip, _ = address_info["webui_url"].split(":") dashboard_agent_listen_port = address_info["dashboard_agent_listen_port"] agent_address = f"{ip}:{dashboard_agent_listen_port}" - print(agent_address) + print("agent address = ", agent_address) agent_client = JobAgentSubmissionClient(format_web_url(agent_address)) + head_client = JobSubmissionClient(format_web_url(address_info["webui_url"])) assert wait_until_server_available(agent_address) @@ -556,21 +557,28 @@ async def verify(): }, JobSubmitRequest, ) - submit_result = await agent_client.submit_job_internal(request) - job_id = submit_result.submission_id - print(job_id) - resp = await agent_client.get_job_logs_internal(job_id) - assert "hello" in resp.logs, resp.logs - return True + async def verify(): + # Wait for job finished. + wait_for_condition( + partial( + _check_job, + client=head_client, + job_id=job_id, + status=JobStatus.SUCCEEDED, + ), + timeout=10, + ) + + resp = await agent_client.get_job_logs_internal(job_id) + assert "hello" in resp.logs, resp.logs + + return True - try: await async_wait_for_condition_async_predicate(verify, retry_interval_ms=2000) finally: - proc.terminate() - proc.wait() subprocess.check_output("ray stop --force", shell=True)