From 6caf83e1a7cf41c956bbb25f6180a1bcde4325ac Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Thu, 23 Mar 2023 23:51:10 -0700 Subject: [PATCH] [air] Add `*path` properties to `Result` and `ResultGrid` (#33410) Following #33370, this PR adds `Result.path` and `ResultGrid.experiment_path` to the respective classes. Further, we remove the public facing `ExperimentAnalysis.local_path` and `ExperimentAnalysis.remote_path` in favor of a unified `ExperimentAnalysis.experiment_path`. Signed-off-by: Kai Fricke Signed-off-by: Jack He --- python/ray/air/checkpoint.py | 31 +++++++++++++ python/ray/air/result.py | 44 ++++++++++++++++--- .../ray/tune/analysis/experiment_analysis.py | 20 +++++++++ python/ray/tune/experiment/trial.py | 1 - python/ray/tune/result_grid.py | 26 ++++++++++- python/ray/tune/tests/test_result_grid.py | 44 ++++++++++++++++--- python/ray/tune/tuner.py | 10 +++-- 7 files changed, 157 insertions(+), 19 deletions(-) diff --git a/python/ray/air/checkpoint.py b/python/ray/air/checkpoint.py index f103ec2b6ad6b..60fefca9505ea 100644 --- a/python/ray/air/checkpoint.py +++ b/python/ray/air/checkpoint.py @@ -246,6 +246,37 @@ def _metadata(self, metadata: _CheckpointMetadata): for attr, value in metadata.checkpoint_state.items(): setattr(self, attr, value) + @property + def path(self) -> Optional[str]: + """Return path to checkpoint, if available. + + This will return a URI to cloud storage if this checkpoint is + persisted on cloud, or a local path if this checkpoint + is persisted on local disk and available on the current node. + + In all other cases, this will return None. + + Example: + + >>> from ray.air import Checkpoint + >>> checkpoint = Checkpoint.from_uri("s3://some-bucket/some-location") + >>> assert checkpoint.path == "s3://some-bucket/some-location" + >>> checkpoint = Checkpoint.from_dict({"data": 1}) + >>> assert checkpoint.path == None + + Returns: + Checkpoint path if this checkpoint is reachable from the current node (e.g. + cloud storage or locally available directory). + + """ + if self._uri: + return self._uri + + if self._local_path: + return self._local_path + + return None + @property def uri(self) -> Optional[str]: """Return checkpoint URI, if available. diff --git a/python/ray/air/result.py b/python/ray/air/result.py index d49d569f63895..28b469efa225b 100644 --- a/python/ray/air/result.py +++ b/python/ray/air/result.py @@ -1,17 +1,19 @@ +import warnings from typing import TYPE_CHECKING from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from ray.air.checkpoint import Checkpoint +from ray.util import log_once from ray.util.annotations import PublicAPI if TYPE_CHECKING: import pandas as pd -@dataclass @PublicAPI(stability="beta") +@dataclass class Result: """The final result of a ML training run or a Tune trial. @@ -27,7 +29,6 @@ class Result: metrics: The final metrics as reported by an Trainable. checkpoint: The final checkpoint of the Trainable. error: The execution error of the Trainable run, if the trial finishes in error. - log_dir: Directory where the trial logs are saved. metrics_dataframe: The full result dataframe of the Trainable. The dataframe is indexed by iterations and contains reported metrics. @@ -41,10 +42,27 @@ class Result: metrics: Optional[Dict[str, Any]] checkpoint: Optional[Checkpoint] error: Optional[Exception] - log_dir: Optional[Path] - metrics_dataframe: Optional["pd.DataFrame"] - best_checkpoints: Optional[List[Tuple[Checkpoint, Dict[str, Any]]]] - _items_to_repr = ["error", "metrics", "log_dir", "checkpoint"] + metrics_dataframe: Optional["pd.DataFrame"] = None + best_checkpoints: Optional[List[Tuple[Checkpoint, Dict[str, Any]]]] = None + _local_path: Optional[str] = None + _remote_path: Optional[str] = None + _items_to_repr = ["error", "metrics", "path", "checkpoint"] + # Deprecate: raise in 2.5, remove in 2.6 + log_dir: Optional[Path] = None + + def __post_init__(self): + if self.log_dir and log_once("result_log_dir_deprecated"): + warnings.warn( + "The `Result.log_dir` property is deprecated. " + "Use `local_path` instead." + ) + self._local_path = str(self.log_dir) + + # Duplicate for retrieval + self.log_dir = Path(self._local_path) if self._local_path else None + # Backwards compatibility: Make sure to cast Path to string + # Deprecate: Remove this line after 2.6 + self._local_path = str(self._local_path) if self._local_path else None @property def config(self) -> Optional[Dict[str, Any]]: @@ -53,11 +71,23 @@ def config(self) -> Optional[Dict[str, Any]]: return None return self.metrics.get("config", None) + @property + def path(self) -> str: + """Path pointing to the result directory on persistent storage. + + This can point to a remote storage location (e.g. S3) or to a local + location (path on the head node). + + For instance, if your remote storage path is ``s3://bucket/location``, + this will point to ``s3://bucket/location/experiment_name/trial_name``. + """ + return self._remote_path or self._local_path + def _repr(self, indent: int = 0) -> str: """Construct the representation with specified number of space indent.""" from ray.tune.result import AUTO_RESULT_KEYS - shown_attributes = {k: self.__dict__[k] for k in self._items_to_repr} + shown_attributes = {k: getattr(self, k) for k in self._items_to_repr} if self.error: shown_attributes["error"] = type(self.error).__name__ else: diff --git a/python/ray/tune/analysis/experiment_analysis.py b/python/ray/tune/analysis/experiment_analysis.py index f9ce5fa5e46de..293bfdf69aa0a 100644 --- a/python/ray/tune/analysis/experiment_analysis.py +++ b/python/ray/tune/analysis/experiment_analysis.py @@ -115,6 +115,26 @@ def __init__( self._remote_storage_path = remote_storage_path + @property + def _local_path(self) -> str: + return str(self._local_experiment_path) + + @property + def _remote_path(self) -> Optional[str]: + return self._parse_cloud_path(self._local_path) + + @property + def experiment_path(self) -> str: + """Path pointing to the experiment directory on persistent storage. + + This can point to a remote storage location (e.g. S3) or to a local + location (path on the head node). + + For instance, if your remote storage path is ``s3://bucket/location``, + this will point to ``s3://bucket/location/experiment_name``. + """ + return self._remote_path or self._local_path + def _parse_cloud_path(self, local_path: str): """Convert local path into cloud storage path""" if not self._remote_storage_path: diff --git a/python/ray/tune/experiment/trial.py b/python/ray/tune/experiment/trial.py index a2327471ceeee..fa75aa98ea79a 100644 --- a/python/ray/tune/experiment/trial.py +++ b/python/ray/tune/experiment/trial.py @@ -692,7 +692,6 @@ def remote_checkpoint_dir(self) -> Optional[str]: @property def remote_path(self) -> Optional[str]: - assert self.local_path, "Trial {}: logdir not initialized.".format(self) if not self._remote_experiment_path or not self.relative_logdir: return None uri = URI(self._remote_experiment_path) diff --git a/python/ray/tune/result_grid.py b/python/ray/tune/result_grid.py index f8337c9056e01..0980356274509 100644 --- a/python/ray/tune/result_grid.py +++ b/python/ray/tune/result_grid.py @@ -1,5 +1,4 @@ import os -from pathlib import Path from typing import Optional, Union import pandas as pd @@ -73,6 +72,28 @@ def __init__( self._trial_to_result(trial) for trial in self._experiment_analysis.trials ] + @property + def _local_path(self) -> str: + """Return path pointing to the experiment directory on the local disk.""" + return self._experiment_analysis._local_path + + @property + def _remote_path(self) -> Optional[str]: + """Return path pointing to the experiment directory on remote storage.""" + return self._experiment_analysis._remote_path + + @property + def experiment_path(self) -> str: + """Path pointing to the experiment directory on persistent storage. + + This can point to a remote storage location (e.g. S3) or to a local + location (path on the head node). + + For instance, if your remote storage path is ``s3://bucket/location``, + this will point to ``s3://bucket/location/experiment_name``. + """ + return self._remote_path or self._local_path + def get_best_result( self, metric: Optional[str] = None, @@ -232,7 +253,8 @@ def _trial_to_result(self, trial: Trial) -> Result: checkpoint=checkpoint, metrics=trial.last_result.copy(), error=self._populate_exception(trial), - log_dir=Path(trial.local_path) if trial.local_path else None, + _local_path=trial.local_path, + _remote_path=trial.remote_path, metrics_dataframe=self._experiment_analysis.trial_dataframes.get( trial.local_path ) diff --git a/python/ray/tune/tests/test_result_grid.py b/python/ray/tune/tests/test_result_grid.py index 075c56ceb4a57..10b34bbbb3b29 100644 --- a/python/ray/tune/tests/test_result_grid.py +++ b/python/ray/tune/tests/test_result_grid.py @@ -240,7 +240,7 @@ class MockExperimentAnalysis: Result( metrics={"loss": 1.0}, checkpoint=Checkpoint(data_dict={"weight": 1.0}), - log_dir=Path("./log_1"), + _local_path=str(Path("./log_1")), error=None, metrics_dataframe=None, best_checkpoints=None, @@ -248,7 +248,7 @@ class MockExperimentAnalysis: Result( metrics={"loss": 2.0}, checkpoint=Checkpoint(data_dict={"weight": 2.0}), - log_dir=Path("./log_2"), + _local_path=str(Path("./log_2")), error=RuntimeError(), metrics_dataframe=None, best_checkpoints=None, @@ -265,13 +265,13 @@ class MockExperimentAnalysis: expected_repr = """ResultGrid<[ Result( metrics={'loss': 1.0}, - log_dir=PosixPath('log_1'), + path='log_1', checkpoint=Checkpoint(data_dict={'weight': 1.0}) ), Result( error='RuntimeError', metrics={'loss': 2.0}, - log_dir=PosixPath('log_2'), + path='log_2', checkpoint=Checkpoint(data_dict={'weight': 2.0}) ) ]>""" @@ -412,9 +412,25 @@ def train_func(config): for (checkpoint, _) in result_grid[0].best_checkpoints: assert checkpoint assert "moved_ray_results" in checkpoint._local_path + assert checkpoint._local_path.startswith(result_grid._local_path) + checkpoint_data.append(checkpoint.to_dict()["it"]) assert set(checkpoint_data) == {5, 6} + # Check local_path property + assert Path(result_grid._local_path).parent.name == "moved_ray_results" + + # No upload path, so path should point to local_path + assert result_grid._local_path == result_grid.experiment_path + + # Check Result objects + for result in result_grid: + assert result._local_path.startswith(result_grid._local_path) + assert result._local_path == result.path + assert result.path.startswith(result_grid.experiment_path) + assert result.checkpoint._local_path.startswith(result._local_path) + assert result.checkpoint.path.startswith(result.path) + def test_result_grid_cloud_path(ray_start_2_cpus, tmpdir): # Test that checkpoints returned by ResultGrid point to URI @@ -429,7 +445,7 @@ def trainable(config): tuner = tune.Tuner( trainable, - run_config=air.RunConfig(sync_config=sync_config, local_dir=local_dir), + run_config=air.RunConfig(sync_config=sync_config, local_dir=str(local_dir)), tune_config=tune.TuneConfig( metric="metric", mode="max", @@ -444,6 +460,24 @@ def trainable(config): == results._experiment_analysis.best_checkpoint.get_internal_representation() ) + # Check .remote_path property + assert results._remote_path.startswith("s3://bucket") + assert results.experiment_path.startswith("s3://bucket") + assert best_checkpoint.uri.startswith(results._remote_path) + assert best_checkpoint.path.startswith(results._remote_path) + + # Upload path, so path should point to local_path + assert results._remote_path == results.experiment_path + + # Check Result objects + for result in results: + assert result._local_path.startswith(results._local_path) + assert result._remote_path.startswith(results._remote_path) + assert result._remote_path == result.path + assert result.path.startswith(results.experiment_path) + assert result.checkpoint.uri.startswith(result._remote_path) + assert result.checkpoint.path.startswith(result.path) + if __name__ == "__main__": import sys diff --git a/python/ray/tune/tuner.py b/python/ray/tune/tuner.py index 2ca1a253cc3b0..c184e39871fe6 100644 --- a/python/ray/tune/tuner.py +++ b/python/ray/tune/tuner.py @@ -106,17 +106,19 @@ def get_dataset(): } tuner = Tuner(trainable=trainer, param_space=param_space, run_config=RunConfig(name="my_tune_run")) - analysis = tuner.fit() + results = tuner.fit() To retry a failed tune run, you can then do .. code-block:: python - tuner = Tuner.restore(experiment_checkpoint_dir) + tuner = Tuner.restore(results.experiment_path) tuner.fit() - ``experiment_checkpoint_dir`` can be easily located near the end of the - console output of your first failed run. + ``results.experiment_path`` can be retrieved from the + :ref:`ResultGrid object `. It can + also be easily seen in the log output from your first run. + """ # One of the following is assigned.