Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[air output] Update wording to "Logical Resource Usage". #33312

Merged
merged 7 commits into from
Mar 16, 2023
Merged
8 changes: 4 additions & 4 deletions python/ray/tune/execution/ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ def has_resources_for_trial(self, trial: Trial) -> bool:
or self._resource_manager.has_resources_ready(resource_request)
)

def _occupied_resources(self) -> dict:
def _allocated_resources(self) -> dict:
total_resources = {"CPU": 0, "GPU": 0}
for allocated_resource in self._trial_to_acquired_resources.values():
resource_request = allocated_resource.resource_request
Expand All @@ -790,9 +790,9 @@ def _occupied_resources(self) -> dict:

def debug_string(self) -> str:
"""Returns a human readable message for printing to the console."""
occupied_resources = self._occupied_resources()
allocated_resources = self._allocated_resources()

return self._resource_updater.debug_string(occupied_resources)
return self._resource_updater.debug_string(allocated_resources)

def on_step_begin(self) -> None:
"""Before step() is called, update the available resources."""
Expand Down Expand Up @@ -840,7 +840,7 @@ def _cleanup_cached_actors(
# (if the search ended).
return

for (actor, acquired_resources) in self._actor_cache.flush_cached_objects(
for actor, acquired_resources in self._actor_cache.flush_cached_objects(
force_all=force_all
):
future = actor.stop.remote()
Expand Down
13 changes: 0 additions & 13 deletions python/ray/tune/execution/trial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class TrialRunner:
runner.add_trial(Trial(...))
while not runner.is_finished():
runner.step()
print(runner.debug_string())

The main job of TrialRunner is scheduling trials to efficiently use cluster
resources, without overloading the cluster.
Expand Down Expand Up @@ -749,18 +748,6 @@ def add_trial(self, trial: Trial):
)
self.trial_executor.mark_trial_to_checkpoint(trial)

def debug_string(self, delim="\n"):
from ray.tune.progress_reporter import _trial_progress_str

result_keys = [list(t.last_result) for t in self.get_trials() if t.last_result]
metrics = set().union(*result_keys)
messages = [
self._scheduler_alg.debug_string(),
self.trial_executor.debug_string(),
_trial_progress_str(self.get_trials(), metrics, force_table=True),
]
return delim.join(messages)

def _stop_experiment_if_needed(self):
"""Stops all trials."""
fail_fast = self._fail_fast and self._has_errored
Expand Down
16 changes: 0 additions & 16 deletions python/ray/tune/progress_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ def _progress_str(
messages = [
"== Status ==",
_time_passed_str(self._start_time, time.time()),
_memory_debug_str(),
*sys_info,
]
if done:
Expand Down Expand Up @@ -692,7 +691,6 @@ def __init__(
mode: Optional[str] = None,
sort_by_metric: bool = False,
):

super(CLIReporter, self).__init__(
metric_columns=metric_columns,
parameter_columns=parameter_columns,
Expand Down Expand Up @@ -751,20 +749,6 @@ def _get_memory_usage() -> Tuple[float, float, Optional[str]]:
)


def _memory_debug_str() -> str:
"""Generate a message to be shown to the user showing memory consumption.

Returns:
String to be shown to the user with formatted memory consumption
stats.
"""
used_gb, total_gb, message = _get_memory_usage()
if np.isnan(used_gb):
return message
else:
return f"Memory usage on this node: {used_gb}/{total_gb} GiB {message or ''}"


def _get_time_str(start_time: float, current_time: float) -> Tuple[str, str]:
"""Get strings representing the current and elapsed time.

Expand Down
10 changes: 5 additions & 5 deletions python/ray/tune/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def test_trial_migration(start_connected_emptyhead_cluster, tmpdir, durable):
while not runner.is_finished():
runner.step()

assert t.status == Trial.TERMINATED, runner.debug_string()
assert t.status == Trial.TERMINATED

# Test recovery of trial that has been checkpointed
t2 = Trial("__fake", **kwargs)
Expand All @@ -255,7 +255,7 @@ def test_trial_migration(start_connected_emptyhead_cluster, tmpdir, durable):
cluster.wait_for_nodes()
while not runner.is_finished():
runner.step()
assert t2.status == Trial.TERMINATED, runner.debug_string()
assert t2.status == Trial.TERMINATED

# Test recovery of trial that won't be checkpointed
kwargs = {
Expand All @@ -273,7 +273,7 @@ def test_trial_migration(start_connected_emptyhead_cluster, tmpdir, durable):
cluster.wait_for_nodes()
while not runner.is_finished():
runner.step()
assert t3.status == Trial.ERROR, runner.debug_string()
assert t3.status == Trial.ERROR

with pytest.raises(TuneError):
runner.step()
Expand Down Expand Up @@ -322,7 +322,7 @@ def test_trial_requeue(start_connected_emptyhead_cluster, tmpdir, durable):
time.sleep(0.1) # Sleep so that next step() refreshes cluster resources
runner.step() # Process result, dispatch save
runner.step() # Process save (detect error), requeue trial
assert all(t.status == Trial.PENDING for t in trials), runner.debug_string()
assert all(t.status == Trial.PENDING for t in trials)


@pytest.mark.parametrize("durable", [False, True])
Expand Down Expand Up @@ -377,7 +377,7 @@ def test_migration_checkpoint_removal(

while not runner.is_finished():
runner.step()
assert t1.status == Trial.TERMINATED, runner.debug_string()
assert t1.status == Trial.TERMINATED


@pytest.mark.parametrize("durable", [False, True])
Expand Down
4 changes: 2 additions & 2 deletions python/ray/tune/tests/test_trial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def on_trial_result(self, trial_runner, trial, result):

runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(runner.trial_executor._occupied_resources().get("CPU"), 1)
self.assertEqual(runner.trial_executor._allocated_resources().get("CPU"), 1)
self.assertRaises(
ValueError, lambda: trials[0].update_resources(dict(cpu=2, gpu=0))
)
Expand All @@ -257,7 +257,7 @@ def on_trial_result(self, trial_runner, trial, result):
self.assertEqual(trials[0].status, Trial.PAUSED)
# extra step for tune loop to stage the resource requests.
runner.step()
self.assertEqual(runner.trial_executor._occupied_resources().get("CPU"), 2)
self.assertEqual(runner.trial_executor._allocated_resources().get("CPU"), 2)

def testQueueFilling(self):
os.environ["TUNE_MAX_PENDING_TRIALS_PG"] = "1"
Expand Down
28 changes: 12 additions & 16 deletions python/ray/tune/utils/resource_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,39 +290,35 @@ def update_avail_resources(self, num_retries=5):
)
self._last_resource_refresh = time.time()

def debug_string(self, total_resources: Dict[str, Any]) -> str:
def debug_string(self, total_allocated_resources: Dict[str, Any]) -> str:
"""Returns a human readable message for printing to the console."""
if self._last_resource_refresh > 0:
status = (
"Resources requested: {}/{} CPUs, {}/{} GPUs, "
"{}/{} GiB heap, {}/{} GiB objects".format(
total_resources.pop("CPU", 0),
self._avail_resources.cpu,
total_resources.pop("GPU", 0),
self._avail_resources.gpu,
_to_gb(total_resources.pop("memory", 0.0)),
_to_gb(self._avail_resources.memory),
_to_gb(total_resources.pop("object_store_memory", 0.0)),
_to_gb(self._avail_resources.object_store_memory),
)
status = "Logical resource usage: {}/{} CPUs, {}/{} GPUs".format(
total_allocated_resources.pop("CPU", 0),
self._avail_resources.cpu,
total_allocated_resources.pop("GPU", 0),
self._avail_resources.gpu,
)
customs = ", ".join(
[
"{}/{} {}".format(
total_resources.get(name, 0.0),
total_allocated_resources.get(name, 0.0),
self._avail_resources.get_res_total(name),
name,
)
for name in self._avail_resources.custom_resources
if not name.startswith(NODE_ID_PREFIX)
and (total_resources.get(name, 0.0) > 0 or "_group_" not in name)
and (
total_allocated_resources.get(name, 0.0) > 0
or "_group_" not in name
)
]
)
if customs:
status += f" ({customs})"
return status
else:
return "Resources requested: ?"
return "Logical resource usage: ?"

def get_num_cpus(self) -> int:
self.update_avail_resources()
Expand Down