Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[air] Add *path properties to Result and ResultGrid #33410

Merged
merged 22 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions python/ray/air/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,37 @@ def _metadata(self, metadata: _CheckpointMetadata):
for attr, value in metadata.checkpoint_state.items():
setattr(self, attr, value)

@property
def path(self) -> Optional[str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should checkpoint.uri get deprecated given this one?

Also, what if a user wants the local path for some reason when using cloud? Can only access through private property right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep URI to have counterparts to to_uri and from_uri for now. But I agree, we should probably see if we want to move to to_path and from_path instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For accessing the local path, if a checkpoint has a URI, the checkpoint object does not have a local path stored. There may be cached data somewhere on the local node, but it doesn't have to be (e.g. if the checkpoint was stored on a worker node). The user can use checkpoint.to_directory() to download the data to a target directory.

"""Return path to checkpoint, if available.

This will return a URI to cloud storage if this checkpoint is
persisted on cloud, or a local path if this checkpoint
is persisted on local disk and available on the current node.

In all other cases, this will return None.

Example:

>>> from ray.air import Checkpoint
>>> checkpoint = Checkpoint.from_uri("s3://some-bucket/some-location")
>>> assert checkpoint.path == "s3://some-bucket/some-location"
>>> checkpoint = Checkpoint.from_dict({"data": 1})
>>> assert checkpoint.path == None
justinvyu marked this conversation as resolved.
Show resolved Hide resolved

Returns:
Checkpoint path if this checkpoint is reachable from the current node (e.g.
cloud storage or locally available directory).

"""
if self._uri:
return self._uri

if self._local_path:
return self._local_path

return None
krfricke marked this conversation as resolved.
Show resolved Hide resolved

@property
def uri(self) -> Optional[str]:
"""Return checkpoint URI, if available.
Expand Down
44 changes: 37 additions & 7 deletions python/ray/air/result.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import warnings
from typing import TYPE_CHECKING
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

from ray.air.checkpoint import Checkpoint
from ray.util import log_once
from ray.util.annotations import PublicAPI

if TYPE_CHECKING:
import pandas as pd


@dataclass
@PublicAPI(stability="beta")
@dataclass
class Result:
"""The final result of a ML training run or a Tune trial.

Expand All @@ -27,7 +29,6 @@ class Result:
metrics: The final metrics as reported by an Trainable.
checkpoint: The final checkpoint of the Trainable.
error: The execution error of the Trainable run, if the trial finishes in error.
log_dir: Directory where the trial logs are saved.
metrics_dataframe: The full result dataframe of the Trainable.
The dataframe is indexed by iterations and contains reported
metrics.
Expand All @@ -41,10 +42,27 @@ class Result:
metrics: Optional[Dict[str, Any]]
checkpoint: Optional[Checkpoint]
error: Optional[Exception]
log_dir: Optional[Path]
metrics_dataframe: Optional["pd.DataFrame"]
best_checkpoints: Optional[List[Tuple[Checkpoint, Dict[str, Any]]]]
_items_to_repr = ["error", "metrics", "log_dir", "checkpoint"]
metrics_dataframe: Optional["pd.DataFrame"] = None
best_checkpoints: Optional[List[Tuple[Checkpoint, Dict[str, Any]]]] = None
_local_path: Optional[str] = None
_remote_path: Optional[str] = None
_items_to_repr = ["error", "metrics", "path", "checkpoint"]
# Deprecate: raise in 2.5, remove in 2.6
log_dir: Optional[Path] = None

def __post_init__(self):
if self.log_dir and log_once("result_log_dir_deprecated"):
warnings.warn(
"The `Result.log_dir` property is deprecated. "
"Use `local_path` instead."
)
self._local_path = str(self.log_dir)

# Duplicate for retrieval
self.log_dir = Path(self._local_path) if self._local_path else None
# Backwards compatibility: Make sure to cast Path to string
# Deprecate: Remove this line after 2.6
self._local_path = str(self._local_path) if self._local_path else None

@property
def config(self) -> Optional[Dict[str, Any]]:
Expand All @@ -53,11 +71,23 @@ def config(self) -> Optional[Dict[str, Any]]:
return None
return self.metrics.get("config", None)

@property
def path(self) -> str:
"""Path pointing to the result directory on persistent storage.

This can point to a remote storage location (e.g. S3) or to a local
location (path on the head node).

For instance, if your remote storage path is ``s3://bucket/location``,
this will point to ``s3://bucket/location/experiment_name/trial_name``.
"""
return self._remote_path or self._local_path

def _repr(self, indent: int = 0) -> str:
"""Construct the representation with specified number of space indent."""
from ray.tune.result import AUTO_RESULT_KEYS

shown_attributes = {k: self.__dict__[k] for k in self._items_to_repr}
shown_attributes = {k: getattr(self, k) for k in self._items_to_repr}
if self.error:
shown_attributes["error"] = type(self.error).__name__
else:
Expand Down
20 changes: 20 additions & 0 deletions python/ray/tune/analysis/experiment_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,26 @@ def __init__(

self._remote_storage_path = remote_storage_path

@property
def _local_path(self) -> str:
return str(self._local_experiment_path)

@property
def _remote_path(self) -> Optional[str]:
return self._parse_cloud_path(self._local_path)

@property
def experiment_path(self) -> str:
"""Path pointing to the experiment directory on persistent storage.

This can point to a remote storage location (e.g. S3) or to a local
location (path on the head node).

For instance, if your remote storage path is ``s3://bucket/location``,
this will point to ``s3://bucket/location/experiment_name``.
"""
return self._remote_path or self._local_path

def _parse_cloud_path(self, local_path: str):
"""Convert local path into cloud storage path"""
if not self._remote_storage_path:
Expand Down
1 change: 0 additions & 1 deletion python/ray/tune/experiment/trial.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,6 @@ def remote_checkpoint_dir(self) -> Optional[str]:

@property
def remote_path(self) -> Optional[str]:
assert self.local_path, "Trial {}: logdir not initialized.".format(self)
if not self._remote_experiment_path or not self.relative_logdir:
return None
uri = URI(self._remote_experiment_path)
Expand Down
26 changes: 24 additions & 2 deletions python/ray/tune/result_grid.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
from pathlib import Path
from typing import Optional, Union

import pandas as pd
Expand Down Expand Up @@ -73,6 +72,28 @@ def __init__(
self._trial_to_result(trial) for trial in self._experiment_analysis.trials
]

@property
def _local_path(self) -> str:
"""Return path pointing to the experiment directory on the local disk."""
return self._experiment_analysis._local_path

@property
def _remote_path(self) -> Optional[str]:
"""Return path pointing to the experiment directory on remote storage."""
return self._experiment_analysis._remote_path

@property
def experiment_path(self) -> str:
"""Path pointing to the experiment directory on persistent storage.

This can point to a remote storage location (e.g. S3) or to a local
location (path on the head node).

For instance, if your remote storage path is ``s3://bucket/location``,
this will point to ``s3://bucket/location/experiment_name``.
"""
return self._remote_path or self._local_path

def get_best_result(
self,
metric: Optional[str] = None,
Expand Down Expand Up @@ -232,7 +253,8 @@ def _trial_to_result(self, trial: Trial) -> Result:
checkpoint=checkpoint,
metrics=trial.last_result.copy(),
error=self._populate_exception(trial),
log_dir=Path(trial.local_path) if trial.local_path else None,
_local_path=trial.local_path,
_remote_path=trial.remote_path,
metrics_dataframe=self._experiment_analysis.trial_dataframes.get(
trial.local_path
)
Expand Down
44 changes: 39 additions & 5 deletions python/ray/tune/tests/test_result_grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,15 @@ class MockExperimentAnalysis:
Result(
metrics={"loss": 1.0},
checkpoint=Checkpoint(data_dict={"weight": 1.0}),
log_dir=Path("./log_1"),
_local_path=str(Path("./log_1")),
error=None,
metrics_dataframe=None,
best_checkpoints=None,
),
Result(
metrics={"loss": 2.0},
checkpoint=Checkpoint(data_dict={"weight": 2.0}),
log_dir=Path("./log_2"),
_local_path=str(Path("./log_2")),
error=RuntimeError(),
metrics_dataframe=None,
best_checkpoints=None,
Expand All @@ -265,13 +265,13 @@ class MockExperimentAnalysis:
expected_repr = """ResultGrid<[
Result(
metrics={'loss': 1.0},
log_dir=PosixPath('log_1'),
path='log_1',
checkpoint=Checkpoint(data_dict={'weight': 1.0})
),
Result(
error='RuntimeError',
metrics={'loss': 2.0},
log_dir=PosixPath('log_2'),
path='log_2',
checkpoint=Checkpoint(data_dict={'weight': 2.0})
)
]>"""
Expand Down Expand Up @@ -412,9 +412,25 @@ def train_func(config):
for (checkpoint, _) in result_grid[0].best_checkpoints:
assert checkpoint
assert "moved_ray_results" in checkpoint._local_path
assert checkpoint._local_path.startswith(result_grid._local_path)
krfricke marked this conversation as resolved.
Show resolved Hide resolved

checkpoint_data.append(checkpoint.to_dict()["it"])
assert set(checkpoint_data) == {5, 6}

# Check local_path property
assert Path(result_grid._local_path).parent.name == "moved_ray_results"

# No upload path, so path should point to local_path
assert result_grid._local_path == result_grid.experiment_path

# Check Result objects
for result in result_grid:
assert result._local_path.startswith(result_grid._local_path)
assert result._local_path == result.path
assert result.path.startswith(result_grid.experiment_path)
assert result.checkpoint._local_path.startswith(result._local_path)
assert result.checkpoint.path.startswith(result.path)


def test_result_grid_cloud_path(ray_start_2_cpus, tmpdir):
# Test that checkpoints returned by ResultGrid point to URI
Expand All @@ -429,7 +445,7 @@ def trainable(config):

tuner = tune.Tuner(
trainable,
run_config=air.RunConfig(sync_config=sync_config, local_dir=local_dir),
run_config=air.RunConfig(sync_config=sync_config, local_dir=str(local_dir)),
tune_config=tune.TuneConfig(
metric="metric",
mode="max",
Expand All @@ -444,6 +460,24 @@ def trainable(config):
== results._experiment_analysis.best_checkpoint.get_internal_representation()
)

# Check .remote_path property
assert results._remote_path.startswith("s3://bucket")
assert results.experiment_path.startswith("s3://bucket")
assert best_checkpoint.uri.startswith(results._remote_path)
krfricke marked this conversation as resolved.
Show resolved Hide resolved
assert best_checkpoint.path.startswith(results._remote_path)

# Upload path, so path should point to local_path
assert results._remote_path == results.experiment_path

# Check Result objects
for result in results:
assert result._local_path.startswith(results._local_path)
assert result._remote_path.startswith(results._remote_path)
assert result._remote_path == result.path
assert result.path.startswith(results.experiment_path)
assert result.checkpoint.uri.startswith(result._remote_path)
assert result.checkpoint.path.startswith(result.path)


if __name__ == "__main__":
import sys
Expand Down
10 changes: 6 additions & 4 deletions python/ray/tune/tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,19 @@ def get_dataset():
}
tuner = Tuner(trainable=trainer, param_space=param_space,
run_config=RunConfig(name="my_tune_run"))
analysis = tuner.fit()
results = tuner.fit()

To retry a failed tune run, you can then do

.. code-block:: python

tuner = Tuner.restore(experiment_checkpoint_dir)
tuner = Tuner.restore(results.experiment_path)
tuner.fit()

``experiment_checkpoint_dir`` can be easily located near the end of the
console output of your first failed run.
``results.experiment_path`` can be retrieved from the
:ref:`ResultGrid object <tune-analysis-docs>`. It can
also be easily seen in the log output from your first run.

"""

# One of the following is assigned.
Expand Down