Skip to content

Commit

Permalink
[AIR/Train] Add Trainer.restore API for train experiment-level faul…
Browse files Browse the repository at this point in the history
…t tolerance (ray-project#31920)

This PR introduces a `Trainer.restore` API that resumes a Train experiment that crashed/got interrupted. Previously, `resume_from_checkpoint` only allowed starting a _new_ experiment, which writes to a completely new log directory and a new set of results.

Signed-off-by: Justin Yu <justinvyu@berkeley.edu>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-authored-by: angelinalg <122562471+angelinalg@users.noreply.github.com>
  • Loading branch information
2 people authored and peytondmurray committed Mar 22, 2023
1 parent 11ca107 commit c606aa2
Show file tree
Hide file tree
Showing 12 changed files with 1,016 additions and 44 deletions.
54 changes: 50 additions & 4 deletions doc/source/train/api/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ Trainer Base Classes
~train.data_parallel_trainer.DataParallelTrainer
~train.gbdt_trainer.GBDTTrainer

``BaseTrainer`` Methods
************************
``BaseTrainer`` API
*******************

.. autosummary::
:toctree: doc/
Expand All @@ -40,7 +40,7 @@ Trainer Base Classes


Train Backend Base Classes
~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~

.. _train-backend:
.. _train-backend-config:
Expand Down Expand Up @@ -170,10 +170,56 @@ Mosaic


Reinforcement Learning (RLlib)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autosummary::
:toctree: doc/

~train.rl.RLTrainer
~train.rl.RLCheckpoint


.. _trainer-restore:

Ray Train Experiment Restoration
--------------------------------

.. autosummary::
:toctree: doc/

train.trainer.BaseTrainer.restore

.. note::

All trainer classes have a `restore` method that takes in a path
pointing to the directory of the experiment to be restored.
`restore` also exposes a subset of construtor arguments that can be re-specified.
See :ref:`train-framework-specific-restore`
below for details on `restore` arguments for different AIR trainer integrations.

.. _train-framework-specific-restore:

Restoration API for Built-in Trainers
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autosummary::
:toctree: doc/

train.data_parallel_trainer.DataParallelTrainer.restore

.. autosummary::

train.huggingface.HuggingFaceTrainer.restore

.. note::

`TorchTrainer.restore`, `TensorflowTrainer.restore`, and `HorovodTrainer.restore`
can take in the same parameters as their parent class's
:meth:`DataParallelTrainer.restore <ray.train.data_parallel_trainer.DataParallelTrainer.restore>`.

Unless otherwise specified, other trainers will accept the same parameters as
:meth:`BaseTrainer.restore <ray.train.trainer.BaseTrainer.restore>`.

.. seealso::

See :ref:`train-restore-faq` for more details on when and how trainer restore should be used.
102 changes: 102 additions & 0 deletions doc/source/train/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,108 @@ you can initialize the ``Trainer`` with ``resources_per_worker`` specified in ``
currently assume each worker is allocated exactly 1 GPU. The partial GPU and multi GPU use-cases
can still be run with Ray Train today without these functions.

.. _train-restore-faq:

How do I restore a Ray Train experiment?
----------------------------------------

A Train experiment may be interrupted due to one of the following reasons:

- The experiment was manually interrupted (e.g., Ctrl+C, or pre-empted head node instance).
- The head node crashed (e.g., OOM or some other runtime error).
- The entire cluster went down (e.g., network error affecting all nodes).

In these cases, a Trainer :ref:`can be restored <trainer-restore>` for the experiment to resume.

Since this is applicable to all of Ray Train's built-in trainers,
we'll use `FrameworkTrainer` to refer to a generic trainer for the remainder of this answer.

To restore an experiment, first find the experiment directory that your previous
run was saved to. If you saved locally, this will look like ``{local_dir}/{name}``,
where ``local_dir`` may be ``~/ray_results``, and ``name`` is something
like ``FrameworkTrainer_2023-xxx``.

Note that these are the same parameters that you pass through :class:`~ray.air.RunConfig`.

.. code-block:: python
datasets = {"train": ray.data.from_items([{"x": i, "y": 2 * i} for i in range(10)])}
restored_trainer = FrameworkTrainer.restore(
path="~/ray_results/FrameworkTrainer_2023-02-15_00-46-58",
datasets=datasets,
)
It's also possible to restore from a remote path (e.g., from an experiment directory
stored in a s3 bucket).

.. code-block:: python
datasets = {"train": ray.data.from_items([{"x": i, "y": 2 * i} for i in range(10)])}
restored_trainer = FrameworkTrainer.restore(
path="s3://results-bucket/FrameworkTrainer_2023-02-15_00-46-58",
datasets=datasets,
)
.. note::

`FrameworkTrainer.restore` may allow more parameters to be re-specified depending
on which trainer you're using. See :ref:`train-framework-specific-restore` for more details.


Single Script for Automatic Restoration
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Adding the branching logic below will allow you to run the same script after the interrupt,
picking up training from where you left on the previous run. Notice that we use the
:meth:`FrameworkTrainer.can_restore <ray.train.trainer.BaseTrainer.can_restore>` utility method
to determine the existence/validity of the given experiment directory.

.. code-block:: python
# run_train_experiment.py
# Load datasets, define a preprocessor, etc.
# datasets = { ... }
# preprocessor = ...
experiment_name = "train_experiment"
experiment_dir = f"~/ray_results/{experiment_name}"
if FrameworkTrainer.can_restore(experiment_dir):
trainer = FrameworkTrainer.restore(
experiment_dir,
datasets=datasets,
)
else:
trainer = FrameworkTrainer(
datasets=datasets,
preprocessor=preprocessor,
scaling_config=air.ScalingConfig(num_workers=2, use_gpu=False),
run_config=air.RunConfig(
name=experiment_name,
local_dir="~/ray_results",
failure_config=air.FailureConfig(max_failures=3),
stop={"training_iteration": 10},
),
)
.. seealso::

See the :meth:`BaseTrainer.restore <ray.train.trainer.BaseTrainer.restore>` docstring
for a full example.

.. note::

`FrameworkTrainer.restore` is different from
:class:`FrameworkTrainer(..., resume_from_checkpoint=...) <ray.train.trainer.BaseTrainer>`.
`resume_from_checkpoint` is meant to be used to start a *new* Train experiment,
which writes results to a new directory and starts over from iteration 0.

`FrameworkTrainer.restore` is used to continue an existing experiment, where
new results will continue to be appended to existing logs.


My multi-node PyTorch GPU training is hanging or giving me obscure NCCL errors. What do I do?
---------------------------------------------------------------------------------------------
Expand Down
12 changes: 12 additions & 0 deletions python/ray/train/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,18 @@ py_test(
deps = [":train_lib"]
)

py_test(
name = "test_trainer_restore",
size = "medium",
srcs = ["tests/test_trainer_restore.py"],
tags = [
"exclusive",
"ray_air",
"team:ml",
],
deps = [":train_lib"],
)

# This is a dummy test dependency that causes the above tests to be
# re-run if any of these files changes.
py_library(
Expand Down
11 changes: 8 additions & 3 deletions python/ray/train/_internal/dataset_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
from ray.air.config import DatasetConfig

from ray.data import Dataset, DatasetPipeline
from ray.data.preprocessor import Preprocessor
from ray.data.preprocessors import Chain
from ray.air._internal.util import _estimate_avail_object_store_memory

if TYPE_CHECKING:
from ray.data import DatasetIterator
from ray.data.preprocessor import Preprocessor

RayDataset = Union["Dataset", "DatasetPipeline"]

Expand Down Expand Up @@ -113,7 +113,9 @@ def __init__(self, dataset_config: Dict[str, DatasetConfig]):
self.preprocessor: Optional["Preprocessor"] = None

def preprocess_datasets(
self, prep: "Preprocessor", datasets: Dict[str, "Dataset"]
self,
prep: "Preprocessor",
datasets: Dict[str, "Dataset"],
) -> Dict[str, "Dataset"]:
"""Preprocess the given datasets.
Expand Down Expand Up @@ -142,7 +144,10 @@ def preprocess_datasets(
continue
if conf.fit:
ds_to_fit = datasets[k]
if ds_to_fit:
if ds_to_fit and prep.fit_status() in (
Preprocessor.FitStatus.NOT_FITTED,
Preprocessor.FitStatus.PARTIALLY_FITTED,
):
prep.fit(ds_to_fit)
new_datasets = {}

Expand Down
Loading

0 comments on commit c606aa2

Please sign in to comment.