diff --git a/python/ray/dashboard/datacenter.py b/python/ray/dashboard/datacenter.py index b0c663733f2b..82f937b912c9 100644 --- a/python/ray/dashboard/datacenter.py +++ b/python/ray/dashboard/datacenter.py @@ -3,7 +3,10 @@ from typing import Any, List, Optional import ray.dashboard.consts as dashboard_consts -from ray._private.utils import get_or_create_event_loop +from ray._private.utils import ( + get_or_create_event_loop, + parse_pg_formatted_resources_to_original, +) from ray.dashboard.utils import ( Dict, MutableNotificationDict, @@ -264,4 +267,10 @@ async def _get_actor(actor): actor["gpus"] = actor_process_gpu_stats actor["processStats"] = actor_process_stats actor["mem"] = node_physical_stats.get("mem", []) + + required_resources = parse_pg_formatted_resources_to_original( + actor["requiredResources"] + ) + actor["requiredResources"] = required_resources + return actor diff --git a/python/ray/dashboard/modules/actor/actor_head.py b/python/ray/dashboard/modules/actor/actor_head.py index 4ce372a751a3..49c6e0f58368 100644 --- a/python/ray/dashboard/modules/actor/actor_head.py +++ b/python/ray/dashboard/modules/actor/actor_head.py @@ -35,6 +35,7 @@ def actor_table_data_to_dict(message): "taskId", "parentTaskId", "sourceActorId", + "placementGroupId", }, always_print_fields_with_no_presence=True, ) @@ -55,6 +56,7 @@ def actor_table_data_to_dict(message): "startTime", "endTime", "reprName", + "placementGroupId", } light_message = {k: v for (k, v) in orig_message.items() if k in fields} light_message["actorClass"] = orig_message["className"] diff --git a/python/ray/dashboard/modules/actor/tests/test_actor.py b/python/ray/dashboard/modules/actor/tests/test_actor.py index f150904ec9d8..d5a8fd8c89be 100644 --- a/python/ray/dashboard/modules/actor/tests/test_actor.py +++ b/python/ray/dashboard/modules/actor/tests/test_actor.py @@ -11,11 +11,21 @@ from ray._private.test_utils import format_web_url, wait_until_server_available from ray.dashboard.modules.actor import actor_consts from ray.dashboard.tests.conftest import * # noqa +from ray.util.placement_group import placement_group +from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy logger = logging.getLogger(__name__) def test_actors(disable_aiohttp_cache, ray_start_with_dashboard): + """ + Tests the REST API dashboard calls on: + - alive actors + - infeasible actors + - dead actors + - pg acrors (with pg_id set and required_resources formatted) + """ + @ray.remote class Foo: def __init__(self, num): @@ -39,17 +49,39 @@ def __repr__(self) -> str: class InfeasibleActor: pass + pg = placement_group([{"CPU": 1}]) + + @ray.remote(num_cpus=1) + class PgActor: + def __init__(self): + pass + + def do_task(self): + return 1 + + def get_placement_group_id(self): + return ray.get_runtime_context().get_placement_group_id() + foo_actors = [Foo.options(name="first").remote(4), Foo.remote(5)] infeasible_actor = InfeasibleActor.options(name="infeasible").remote() # noqa dead_actor = Foo.options(name="dead").remote(1) + pg_actor = PgActor.options( + name="pg", + scheduling_strategy=PlacementGroupSchedulingStrategy( + placement_group=pg, + ), + ).remote() + ray.kill(dead_actor) - results = [actor.do_task.remote() for actor in foo_actors] # noqa + [actor.do_task.remote() for actor in foo_actors] + pg_actor.do_task.remote() webui_url = ray_start_with_dashboard["webui_url"] assert wait_until_server_available(webui_url) webui_url = format_web_url(webui_url) job_id = ray.get_runtime_context().get_job_id() node_id = ray.get(foo_actors[0].get_node_id.remote()) pid = ray.get(foo_actors[0].get_pid.remote()) + placement_group_id = ray.get(pg_actor.get_placement_group_id.remote()) timeout_seconds = 5 start_time = time.time() @@ -61,7 +93,8 @@ class InfeasibleActor: resp_json = resp.json() resp_data = resp_json["data"] actors = resp_data["actors"] - assert len(actors) == 4 + assert len(actors) == 5 + for a in actors.values(): if a["name"] == "first": actor_response = a @@ -104,6 +137,14 @@ class InfeasibleActor: all_pids = {entry["pid"] for entry in actors.values()} assert 0 in all_pids # The infeasible actor assert len(all_pids) > 1 + + # Check the pg actor metadata. + for a in actors.values(): + if a["name"] == "pg": + pg_actor_response = a + assert pg_actor_response["placementGroupId"] == placement_group_id + assert pg_actor_response["requiredResources"] == {"CPU": 1.0} + break except Exception as ex: last_ex = ex