Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fix non default dashboard_agent_listen_port not used when connected to the node #33834

Merged
merged 5 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion dashboard/modules/job/tests/test_job_agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
from ray._private.utils import get_or_create_event_loop
import ray

import requests
import shutil
import sys
Expand All @@ -11,6 +12,7 @@
import pytest
import yaml

from ray._private.utils import get_or_create_event_loop
from ray._private.gcs_utils import GcsAioClient
from ray._private.runtime_env.working_dir import upload_working_dir_if_needed
from ray._private.runtime_env.py_modules import upload_py_modules_if_needed
Expand All @@ -21,6 +23,8 @@
wait_until_server_available,
wait_for_condition,
run_string_as_driver_nonblocking,
get_current_unused_port,
async_wait_for_condition_async_predicate,
)
from ray.dashboard.modules.job.common import JobSubmitRequest
from ray.dashboard.modules.job.utils import (
Expand Down Expand Up @@ -518,5 +522,65 @@ def test_agent_logs_not_streamed_to_drivers():
assert "(raylet)" not in err_str


@pytest.mark.asyncio
async def test_non_default_dashboard_agent_http_port(tmp_path):
"""Test that we can connect to the dashboard agent with a non-default
http port.
"""
import subprocess

cmd = (
"ray start --head " f"--dashboard-agent-listen-port {get_current_unused_port()}"
)
subprocess.check_output(cmd, shell=True)

try:
# We will need to wait for the ray to be started in the subprocess.
address_info = ray.init("auto", ignore_reinit_error=True).address_info

ip, _ = address_info["webui_url"].split(":")
dashboard_agent_listen_port = address_info["dashboard_agent_listen_port"]
agent_address = f"{ip}:{dashboard_agent_listen_port}"
print("agent address = ", agent_address)

agent_client = JobAgentSubmissionClient(format_web_url(agent_address))
head_client = JobSubmissionClient(format_web_url(address_info["webui_url"]))

assert wait_until_server_available(agent_address)

# Submit a job through the agent.
runtime_env = RuntimeEnv().to_dict()
request = validate_request_type(
{
"runtime_env": runtime_env,
"entrypoint": "echo hello",
},
JobSubmitRequest,
)
submit_result = await agent_client.submit_job_internal(request)
job_id = submit_result.submission_id

async def verify():
# Wait for job finished.
wait_for_condition(
partial(
_check_job,
client=head_client,
job_id=job_id,
status=JobStatus.SUCCEEDED,
),
timeout=10,
)

resp = await agent_client.get_job_logs_internal(job_id)
assert "hello" in resp.logs, resp.logs

return True

await async_wait_for_condition_async_predicate(verify, retry_interval_ms=2000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which part do we really intend to retry here? Does it make sense to do the setup once, and only retry the part that checks if the logs have appeared yet?

finally:
subprocess.check_output("ray stop --force", shell=True)


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
8 changes: 8 additions & 0 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,15 @@ def __init__(
self._metrics_export_port = self._get_cached_port(
"metrics_export_port", default_port=ray_params.metrics_export_port
)
self._dashboard_agent_listen_port = self._get_cached_port(
"dashboard_agent_listen_port",
default_port=ray_params.dashboard_agent_listen_port,
)

ray_params.update_if_absent(
metrics_agent_port=self.metrics_agent_port,
metrics_export_port=self._metrics_export_port,
dashboard_agent_listen_port=self._dashboard_agent_listen_port,
)

# Pick a GCS server port.
Expand Down Expand Up @@ -807,6 +812,9 @@ def _get_cached_port(
"""
file_path = os.path.join(self.get_session_dir_path(), "ports_by_node.json")

# Make sure only the ports in RAY_CACHED_PORTS are cached.
assert port_name in ray_constants.RAY_ALLOWED_CACHED_PORTS

# Maps a Node.unique_id to a dict that maps port names to port numbers.
ports_by_node: Dict[str, Dict[str, int]] = defaultdict(dict)

Expand Down
10 changes: 10 additions & 0 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,13 @@ def gcs_actor_scheduling_enabled():
ENABLE_RAY_CLUSTERS_ENV_VAR,
not (sys.platform == "darwin" or sys.platform == "win32"),
)


# The allowed cached ports in Ray. Refer to Port configuration for more details:
# https://docs.ray.io/en/latest/ray-core/configure.html#ports-configurations
RAY_ALLOWED_CACHED_PORTS = {
"metrics_agent_port",
"metrics_export_port",
"dashboard_agent_listen_port",
"gcs_server_port", # the `port` option for gcs port.
}
2 changes: 2 additions & 0 deletions python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1416,6 +1416,8 @@ def start_raylet(
redis_password: The password to use when connecting to Redis.
metrics_agent_port: The port where metrics agent is bound to.
metrics_export_port: The port at which metrics are exposed to.
dashboard_agent_listen_port: The port at which the dashboard agent
listens to for http.
use_valgrind: True if the raylet should be started inside
of valgrind. If this is True, use_profiler must be False.
use_profiler: True if the raylet should be started inside
Expand Down
21 changes: 21 additions & 0 deletions python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1821,3 +1821,24 @@ def safe_write_to_results_json(
os.replace(test_output_json_tmp, test_output_json)
logger.info(f"Wrote results to {test_output_json}")
logger.info(json.dumps(result))


def get_current_unused_port():
"""
Returns a port number that is not currently in use.

This is useful for testing when we need to bind to a port but don't
care which one.

Returns:
A port number that is not currently in use. (Note that this port
might become used by the time you try to bind to it.)
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# Bind the socket to a local address with a random port number
sock.bind(("localhost", 0))

port = sock.getsockname()[1]
sock.close()
return port
47 changes: 46 additions & 1 deletion python/ray/tests/test_ray_init_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
from ray._private.ray_constants import RAY_OVERRIDE_DASHBOARD_URL, DEFAULT_RESOURCES
import ray._private.services
from ray.dashboard.utils import ray_address_to_api_server_url
from ray._private.test_utils import run_string_as_driver
from ray._private.test_utils import (
get_current_unused_port,
run_string_as_driver,
wait_for_condition,
)
from ray.util.client.ray_client_helpers import ray_start_client_server


Expand Down Expand Up @@ -232,6 +236,47 @@ def test_ports_assignment(ray_start_cluster):
)


def test_non_default_ports_visible_on_init(shutdown_only):
import subprocess

ports = {
"dashboard_agent_grpc_port": get_current_unused_port(),
"metrics_export_port": get_current_unused_port(),
"dashboard_agent_listen_port": get_current_unused_port(),
"port": get_current_unused_port(), # gcs_server_port
"node_manager_port": get_current_unused_port(),
}
# Start a ray head node with customized ports.
cmd = "ray start --head --block".split(" ")
for port_name, port in ports.items():
# replace "_" with "-"
port_name = port_name.replace("_", "-")
cmd += ["--" + port_name, str(port)]

print(" ".join(cmd))
proc = subprocess.Popen(cmd)

# From the connected node
def verify():
# Connect to the node and check ports
print(ray.init("auto", ignore_reinit_error=True))

node = ray.worker.global_worker.node
assert node.metrics_agent_port == ports["dashboard_agent_grpc_port"]
assert node.metrics_export_port == ports["metrics_export_port"]
assert node.dashboard_agent_listen_port == ports["dashboard_agent_listen_port"]
assert str(ports["port"]) in node.gcs_address
assert node.node_manager_port == ports["node_manager_port"]
return True

try:
wait_for_condition(verify, timeout=10, retry_interval_ms=2000)
finally:
proc.terminate()
proc.wait()
subprocess.check_output("ray stop --force", shell=True)


@pytest.mark.skipif(sys.platform != "linux", reason="skip except linux")
def test_ray_init_from_workers(ray_start_cluster):
cluster = ray_start_cluster
Expand Down