From 4b99e2364f67a440037c87e56c688a3e096a7495 Mon Sep 17 00:00:00 2001 From: xwjiang2010 Date: Tue, 14 Mar 2023 16:27:19 -0700 Subject: [PATCH 1/5] [air output] Update wording to "Logical Resource Usage". Update wording to "Logical Resource Usage" and remove heap/object memory stuff. Signed-off-by: xwjiang2010 --- .../ray/tune/execution/ray_trial_executor.py | 8 +++--- python/ray/tune/tests/test_trial_runner.py | 4 +-- python/ray/tune/utils/resource_updater.py | 28 ++++++++----------- 3 files changed, 18 insertions(+), 22 deletions(-) diff --git a/python/ray/tune/execution/ray_trial_executor.py b/python/ray/tune/execution/ray_trial_executor.py index 18c32a38d0b9b..175e81b59de1f 100644 --- a/python/ray/tune/execution/ray_trial_executor.py +++ b/python/ray/tune/execution/ray_trial_executor.py @@ -826,7 +826,7 @@ def has_resources_for_trial(self, trial: Trial) -> bool: or self._resource_manager.has_resources_ready(resource_request) ) - def _occupied_resources(self) -> dict: + def _allocated_resources(self) -> dict: total_resources = {"CPU": 0, "GPU": 0} for allocated_resource in self._trial_to_acquired_resources.values(): resource_request = allocated_resource.resource_request @@ -837,9 +837,9 @@ def _occupied_resources(self) -> dict: def debug_string(self) -> str: """Returns a human readable message for printing to the console.""" - occupied_resources = self._occupied_resources() + allocated_resources = self._allocated_resources() - return self._resource_updater.debug_string(occupied_resources) + return self._resource_updater.debug_string(allocated_resources) def on_step_begin(self) -> None: """Before step() is called, update the available resources.""" @@ -887,7 +887,7 @@ def _cleanup_cached_actors( # (if the search ended). return - for (actor, acquired_resources) in self._actor_cache.flush_cached_objects( + for actor, acquired_resources in self._actor_cache.flush_cached_objects( force_all=force_all ): future = actor.stop.remote() diff --git a/python/ray/tune/tests/test_trial_runner.py b/python/ray/tune/tests/test_trial_runner.py index 4d9b41103b3ab..abd78023da4be 100644 --- a/python/ray/tune/tests/test_trial_runner.py +++ b/python/ray/tune/tests/test_trial_runner.py @@ -247,7 +247,7 @@ def on_trial_result(self, trial_runner, trial, result): runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) - self.assertEqual(runner.trial_executor._occupied_resources().get("CPU"), 1) + self.assertEqual(runner.trial_executor._allocated_resources().get("CPU"), 1) self.assertRaises( ValueError, lambda: trials[0].update_resources(dict(cpu=2, gpu=0)) ) @@ -257,7 +257,7 @@ def on_trial_result(self, trial_runner, trial, result): self.assertEqual(trials[0].status, Trial.PAUSED) # extra step for tune loop to stage the resource requests. runner.step() - self.assertEqual(runner.trial_executor._occupied_resources().get("CPU"), 2) + self.assertEqual(runner.trial_executor._allocated_resources().get("CPU"), 2) def testQueueFilling(self): os.environ["TUNE_MAX_PENDING_TRIALS_PG"] = "1" diff --git a/python/ray/tune/utils/resource_updater.py b/python/ray/tune/utils/resource_updater.py index 38db9471d4b5f..20b4fe4fcece8 100644 --- a/python/ray/tune/utils/resource_updater.py +++ b/python/ray/tune/utils/resource_updater.py @@ -290,39 +290,35 @@ def update_avail_resources(self, num_retries=5): ) self._last_resource_refresh = time.time() - def debug_string(self, total_resources: Dict[str, Any]) -> str: + def debug_string(self, total_allocated_resources: Dict[str, Any]) -> str: """Returns a human readable message for printing to the console.""" if self._last_resource_refresh > 0: - status = ( - "Resources requested: {}/{} CPUs, {}/{} GPUs, " - "{}/{} GiB heap, {}/{} GiB objects".format( - total_resources.pop("CPU", 0), - self._avail_resources.cpu, - total_resources.pop("GPU", 0), - self._avail_resources.gpu, - _to_gb(total_resources.pop("memory", 0.0)), - _to_gb(self._avail_resources.memory), - _to_gb(total_resources.pop("object_store_memory", 0.0)), - _to_gb(self._avail_resources.object_store_memory), - ) + status = "Logical Resource Usage: {}/{} CPUs, {}/{} GPUs".format( + total_allocated_resources.pop("CPU", 0), + self._avail_resources.cpu, + total_allocated_resources.pop("GPU", 0), + self._avail_resources.gpu, ) customs = ", ".join( [ "{}/{} {}".format( - total_resources.get(name, 0.0), + total_allocated_resources.get(name, 0.0), self._avail_resources.get_res_total(name), name, ) for name in self._avail_resources.custom_resources if not name.startswith(NODE_ID_PREFIX) - and (total_resources.get(name, 0.0) > 0 or "_group_" not in name) + and ( + total_allocated_resources.get(name, 0.0) > 0 + or "_group_" not in name + ) ] ) if customs: status += f" ({customs})" return status else: - return "Resources requested: ?" + return "Logical Resource Usage: ?" def get_num_cpus(self) -> int: self.update_avail_resources() From 90f1d52b9538525b295968e645b89398866e64dc Mon Sep 17 00:00:00 2001 From: xwjiang2010 Date: Tue, 14 Mar 2023 17:26:49 -0700 Subject: [PATCH 2/5] further ax a few extra stuff. --- python/ray/tune/progress_reporter.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/python/ray/tune/progress_reporter.py b/python/ray/tune/progress_reporter.py index daa2a669bb6c6..0d9d8f72ce827 100644 --- a/python/ray/tune/progress_reporter.py +++ b/python/ray/tune/progress_reporter.py @@ -356,7 +356,6 @@ def _progress_str( messages = [ "== Status ==", _time_passed_str(self._start_time, time.time()), - _memory_debug_str(), *sys_info, ] if done: @@ -692,7 +691,6 @@ def __init__( mode: Optional[str] = None, sort_by_metric: bool = False, ): - super(CLIReporter, self).__init__( metric_columns=metric_columns, parameter_columns=parameter_columns, @@ -751,20 +749,6 @@ def _get_memory_usage() -> Tuple[float, float, Optional[str]]: ) -def _memory_debug_str() -> str: - """Generate a message to be shown to the user showing memory consumption. - - Returns: - String to be shown to the user with formatted memory consumption - stats. - """ - used_gb, total_gb, message = _get_memory_usage() - if np.isnan(used_gb): - return message - else: - return f"Memory usage on this node: {used_gb}/{total_gb} GiB {message or ''}" - - def _get_time_str(start_time: float, current_time: float) -> Tuple[str, str]: """Get strings representing the current and elapsed time. From 9fb05dc3da6327a4c0b3c8ad182964a5d03075c3 Mon Sep 17 00:00:00 2001 From: xwjiang2010 Date: Wed, 15 Mar 2023 08:42:20 -0700 Subject: [PATCH 3/5] remove runner.debug_str Signed-off-by: xwjiang2010 --- python/ray/tune/execution/trial_runner.py | 13 ------------- python/ray/tune/tests/test_cluster.py | 10 +++++----- 2 files changed, 5 insertions(+), 18 deletions(-) diff --git a/python/ray/tune/execution/trial_runner.py b/python/ray/tune/execution/trial_runner.py index 4edd35ecbfd5c..11224233260ee 100644 --- a/python/ray/tune/execution/trial_runner.py +++ b/python/ray/tune/execution/trial_runner.py @@ -68,7 +68,6 @@ class TrialRunner: runner.add_trial(Trial(...)) while not runner.is_finished(): runner.step() - print(runner.debug_string()) The main job of TrialRunner is scheduling trials to efficiently use cluster resources, without overloading the cluster. @@ -749,18 +748,6 @@ def add_trial(self, trial: Trial): ) self.trial_executor.mark_trial_to_checkpoint(trial) - def debug_string(self, delim="\n"): - from ray.tune.progress_reporter import _trial_progress_str - - result_keys = [list(t.last_result) for t in self.get_trials() if t.last_result] - metrics = set().union(*result_keys) - messages = [ - self._scheduler_alg.debug_string(), - self.trial_executor.debug_string(), - _trial_progress_str(self.get_trials(), metrics, force_table=True), - ] - return delim.join(messages) - def _stop_experiment_if_needed(self): """Stops all trials.""" fail_fast = self._fail_fast and self._has_errored diff --git a/python/ray/tune/tests/test_cluster.py b/python/ray/tune/tests/test_cluster.py index 1576577e407ee..041d076607519 100644 --- a/python/ray/tune/tests/test_cluster.py +++ b/python/ray/tune/tests/test_cluster.py @@ -242,7 +242,7 @@ def test_trial_migration(start_connected_emptyhead_cluster, tmpdir, durable): while not runner.is_finished(): runner.step() - assert t.status == Trial.TERMINATED, runner.debug_string() + assert t.status == Trial.TERMINATED # Test recovery of trial that has been checkpointed t2 = Trial("__fake", **kwargs) @@ -255,7 +255,7 @@ def test_trial_migration(start_connected_emptyhead_cluster, tmpdir, durable): cluster.wait_for_nodes() while not runner.is_finished(): runner.step() - assert t2.status == Trial.TERMINATED, runner.debug_string() + assert t2.status == Trial.TERMINATED # Test recovery of trial that won't be checkpointed kwargs = { @@ -273,7 +273,7 @@ def test_trial_migration(start_connected_emptyhead_cluster, tmpdir, durable): cluster.wait_for_nodes() while not runner.is_finished(): runner.step() - assert t3.status == Trial.ERROR, runner.debug_string() + assert t3.status == Trial.ERROR with pytest.raises(TuneError): runner.step() @@ -322,7 +322,7 @@ def test_trial_requeue(start_connected_emptyhead_cluster, tmpdir, durable): time.sleep(0.1) # Sleep so that next step() refreshes cluster resources runner.step() # Process result, dispatch save runner.step() # Process save (detect error), requeue trial - assert all(t.status == Trial.PENDING for t in trials), runner.debug_string() + assert all(t.status == Trial.PENDING for t in trials) @pytest.mark.parametrize("durable", [False, True]) @@ -377,7 +377,7 @@ def test_migration_checkpoint_removal( while not runner.is_finished(): runner.step() - assert t1.status == Trial.TERMINATED, runner.debug_string() + assert t1.status == Trial.TERMINATED @pytest.mark.parametrize("durable", [False, True]) From 63b75d435fc4cb39c1d7fd4a64a774ec68eb645f Mon Sep 17 00:00:00 2001 From: xwjiang2010 <87673679+xwjiang2010@users.noreply.github.com> Date: Wed, 15 Mar 2023 18:02:52 -0700 Subject: [PATCH 4/5] Update python/ray/tune/utils/resource_updater.py Co-authored-by: Kai Fricke Signed-off-by: xwjiang2010 <87673679+xwjiang2010@users.noreply.github.com> --- python/ray/tune/utils/resource_updater.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tune/utils/resource_updater.py b/python/ray/tune/utils/resource_updater.py index 20b4fe4fcece8..98125ce41b3a2 100644 --- a/python/ray/tune/utils/resource_updater.py +++ b/python/ray/tune/utils/resource_updater.py @@ -293,7 +293,7 @@ def update_avail_resources(self, num_retries=5): def debug_string(self, total_allocated_resources: Dict[str, Any]) -> str: """Returns a human readable message for printing to the console.""" if self._last_resource_refresh > 0: - status = "Logical Resource Usage: {}/{} CPUs, {}/{} GPUs".format( + status = "Logical resource usage: {}/{} CPUs, {}/{} GPUs".format( total_allocated_resources.pop("CPU", 0), self._avail_resources.cpu, total_allocated_resources.pop("GPU", 0), From c7c8c945d09b5a406a242a58b305e9cb38087ab9 Mon Sep 17 00:00:00 2001 From: xwjiang2010 <87673679+xwjiang2010@users.noreply.github.com> Date: Wed, 15 Mar 2023 18:02:58 -0700 Subject: [PATCH 5/5] Update python/ray/tune/utils/resource_updater.py Co-authored-by: Kai Fricke Signed-off-by: xwjiang2010 <87673679+xwjiang2010@users.noreply.github.com> --- python/ray/tune/utils/resource_updater.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tune/utils/resource_updater.py b/python/ray/tune/utils/resource_updater.py index 98125ce41b3a2..192eb5eb0ebdf 100644 --- a/python/ray/tune/utils/resource_updater.py +++ b/python/ray/tune/utils/resource_updater.py @@ -318,7 +318,7 @@ def debug_string(self, total_allocated_resources: Dict[str, Any]) -> str: status += f" ({customs})" return status else: - return "Logical Resource Usage: ?" + return "Logical resource usage: ?" def get_num_cpus(self) -> int: self.update_avail_resources()