Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for DDP fork #13405

Merged
merged 69 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from 66 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
782ca4a
fork
awaelchli Jun 22, 2022
aefac45
dont set device
awaelchli Jun 23, 2022
efce3c4
parallel dev
awaelchli Jun 23, 2022
810c0ba
add cuda
awaelchli Jun 24, 2022
5fd9cda
update device count
awaelchli Jun 24, 2022
c1b4fd0
fork
awaelchli Jun 24, 2022
daa07ee
cuda available
awaelchli Jun 24, 2022
679f363
set device
awaelchli Jun 24, 2022
c43f827
update
awaelchli Jun 24, 2022
b7e529e
update
awaelchli Jun 24, 2022
b51d172
cuda available
awaelchli Jun 25, 2022
9b41941
formatting
awaelchli Jun 25, 2022
9cea979
unused import
awaelchli Jun 25, 2022
daccd21
test fixes
awaelchli Jun 27, 2022
0ccc3b9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 27, 2022
a080ec0
add docstring
awaelchli Jun 27, 2022
9a914be
Merge remote-tracking branch 'origin/feature/ddp-fork2' into feature/…
awaelchli Jun 27, 2022
eae67cd
update
awaelchli Jun 27, 2022
167a710
update
awaelchli Jun 27, 2022
1bdd79d
fix mocks in tests
awaelchli Jun 27, 2022
297b55a
refactor
awaelchli Jun 27, 2022
6c5b769
fix test
awaelchli Jun 27, 2022
2671810
update lite and enums
awaelchli Jun 27, 2022
ff2a825
typo
awaelchli Jun 27, 2022
0879751
update docs
awaelchli Jun 27, 2022
fe16575
add validation for forking on platforms
awaelchli Jun 27, 2022
582872c
debug no breaking change for devices=1
awaelchli Jun 27, 2022
da70271
fix typo in test
awaelchli Jun 27, 2022
3f9a872
update docstring
awaelchli Jun 27, 2022
7291fa3
added windows test for device parser
awaelchli Jun 27, 2022
785c830
add changelog
awaelchli Jun 27, 2022
dd043ad
add test
awaelchli Jun 27, 2022
da843ee
add tests
awaelchli Jun 27, 2022
1a63662
update error message
awaelchli Jun 27, 2022
093a52e
Comparison section
awaelchli Jun 28, 2022
3be3c17
Merge branch 'master' into feature/ddp-fork2
awaelchli Jun 29, 2022
7b3c132
fork docs
awaelchli Jun 29, 2022
1b95954
typing
awaelchli Jun 29, 2022
df031b3
Merge branch 'master' into feature/ddp-fork2
awaelchli Jun 29, 2022
6855e50
Merge branch 'master' into feature/ddp-fork2
awaelchli Jun 30, 2022
8df3457
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 30, 2022
d5c28b9
fix tests
awaelchli Jul 1, 2022
0152c39
Merge branch 'master' into feature/ddp-fork2
awaelchli Jul 1, 2022
a6a0d09
Update docs/source-pytorch/accelerators/gpu_intermediate.rst
awaelchli Jul 3, 2022
4ede4cb
Update docs/source-pytorch/accelerators/gpu_intermediate.rst
awaelchli Jul 3, 2022
877ed07
Update docs/source-pytorch/accelerators/gpu_intermediate.rst
awaelchli Jul 3, 2022
bf36259
reviews
awaelchli Jul 3, 2022
6263636
Merge remote-tracking branch 'origin/feature/ddp-fork2' into feature/…
awaelchli Jul 3, 2022
ca9a0b3
Merge branch 'master' into feature/ddp-fork2
awaelchli Jul 19, 2022
c9b2601
handle start methods
awaelchli Jul 19, 2022
cca1606
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 19, 2022
e6b19a1
update tests
awaelchli Jul 19, 2022
59c3735
Merge remote-tracking branch 'origin/feature/ddp-fork2' into feature/…
awaelchli Jul 19, 2022
cc06e12
update type
awaelchli Jul 19, 2022
b7ffaef
Merge branch 'master' into feature/ddp-fork2
awaelchli Jul 19, 2022
c5480a1
fix merge errors
awaelchli Jul 19, 2022
87cd344
update tests
awaelchli Jul 19, 2022
b686b3b
remove unused import
awaelchli Jul 19, 2022
2d05ac3
revert weird change
awaelchli Jul 20, 2022
78e542e
remove redundant start method attribute
awaelchli Jul 20, 2022
0b13047
they insist
awaelchli Jul 20, 2022
17257a6
Merge branch 'master' into feature/ddp-fork2
awaelchli Jul 20, 2022
3d7095d
update tests
awaelchli Jul 20, 2022
a5c3592
Revert "update tests"
awaelchli Jul 20, 2022
8c74f9a
insist
awaelchli Jul 20, 2022
1832c3e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 20, 2022
5c2e6ea
Update docs/source-pytorch/accelerators/gpu_intermediate.rst
awaelchli Jul 21, 2022
59f21fc
Merge branch 'master' into feature/ddp-fork2
Borda Jul 22, 2022
8c83d1a
Merge branch 'master' into feature/ddp-fork2
awaelchli Jul 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 59 additions & 2 deletions docs/source-pytorch/accelerators/gpu_intermediate.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ Lightning supports multiple ways of doing distributed training.
|

- Data Parallel (``strategy='dp'``) (multiple-gpus, 1 machine)
- DistributedDataParallel (``strategy='ddp'``) (multiple-gpus across many machines (python script based)).
- DistributedDataParallel (``strategy='ddp_spawn'``) (multiple-gpus across many machines (spawn based)).
- DistributedDataParallel (multiple-gpus across many machines)
- Regular (``strategy='ddp'``)
- Spawn (``strategy='ddp_spawn'``)
- Fork (``strategy='ddp_fork'``)
- Horovod (``strategy='horovod'``) (multi-machine, multi-gpu, configured at runtime)
- Bagua (``strategy='bagua'``) (multiple-gpus across many machines with advanced training algorithms)

Expand Down Expand Up @@ -199,6 +201,61 @@ You can then call your scripts anywhere
python some_file.py --accelerator 'gpu' --devices 8 --strategy 'ddp'


Distributed Data Parallel Fork
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

DDP Fork is an alternative to Spawn that can be used in interactive Python and Jupyter notebooks, Google Colab, Kaggle notebooks, and so on:

.. code-block:: python

# train on 8 GPUs in a Jupyter notebook
trainer = Trainer(accelerator="gpu", devices=8, strategy="ddp_fork")

Data Parallel (``strategy="dp"``) is the only other strategy supported in interactive environments but is slower, is discouraged by PyTorch and has other limitations.
Among the native distributed strategies, regular DDP (``strategy="ddp"``) is still recommended as the go-to strategy over Spawn and Fork for its speed and stability but it can only be used with scripts.


Comparison of DDP variants and tradeoffs
****************************************

.. list-table:: DDP variants and their tradeoffs
:widths: 40 20 20 20
:header-rows: 1

* -
- DDP
- DDP Spawn
- DDP Fork
* - Works in Jupyter notebooks / IPython environments
- No
- No
awaelchli marked this conversation as resolved.
Show resolved Hide resolved
awaelchli marked this conversation as resolved.
Show resolved Hide resolved
- Yes
* - Supports multi-node
- Yes
- Yes
- Yes
* - Supported platforms
- Linux, Mac, Win
- Linux, Mac, Win
- Linux, Mac
* - Requires all objects to be picklable
- No
- Yes
- No
* - Is the guard ``if "__name__"=="__main__"`` required?
awaelchli marked this conversation as resolved.
Show resolved Hide resolved
- Yes
- Yes
awaelchli marked this conversation as resolved.
Show resolved Hide resolved
- No
* - Limitations in the main process
- None
- None
- GPU operations such as moving tensors to the GPU or calling ``torch.cuda`` functions before invoking ``Trainer.fit`` is not allowed.
* - Process creation time
- Slow
- Slow
- Fast


Horovod
^^^^^^^
`Horovod <http://horovod.ai>`_ allows the same training script to be used for single-GPU,
Expand Down
2 changes: 2 additions & 0 deletions src/pytorch_lightning/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Added Apple Silicon Support via `MPSAccelerator` ([#13123](https://github.com/PyTorchLightning/pytorch-lightning/pull/13123))


- Added support for DDP Fork ([#13405](https://github.com/PyTorchLightning/pytorch-lightning/pull/13405))


### Changed

Expand Down
8 changes: 4 additions & 4 deletions src/pytorch_lightning/accelerators/cuda.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def setup(self, trainer: "pl.Trainer") -> None:
def set_nvidia_flags(local_rank: int) -> None:
# set the correct cuda visible devices (using pci order)
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
all_gpu_ids = ",".join(str(x) for x in range(torch.cuda.device_count()))
all_gpu_ids = ",".join(str(x) for x in range(device_parser.num_cuda_devices()))
devices = os.getenv("CUDA_VISIBLE_DEVICES", all_gpu_ids)
_log.info(f"LOCAL_RANK: {local_rank} - CUDA_VISIBLE_DEVICES: [{devices}]")

Expand Down Expand Up @@ -84,11 +84,11 @@ def get_parallel_devices(devices: List[int]) -> List[torch.device]:
@staticmethod
def auto_device_count() -> int:
"""Get the devices when set to auto."""
return torch.cuda.device_count()
return device_parser.num_cuda_devices()

@staticmethod
def is_available() -> bool:
return torch.cuda.device_count() > 0
return device_parser.num_cuda_devices() > 0

@classmethod
def register_accelerators(cls, accelerator_registry: Dict) -> None:
Expand Down Expand Up @@ -162,6 +162,6 @@ def _to_float(x: str) -> float:
def _get_gpu_id(device_id: int) -> str:
"""Get the unmasked real GPU IDs."""
# All devices if `CUDA_VISIBLE_DEVICES` unset
default = ",".join(str(i) for i in range(torch.cuda.device_count()))
default = ",".join(str(i) for i in range(device_parser.num_cuda_devices()))
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES", default=default).split(",")
return cuda_visible_devices[device_id].strip()
1 change: 1 addition & 0 deletions src/pytorch_lightning/lite/lite.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ def _supported_strategy_types() -> Sequence[_StrategyType]:
_StrategyType.DP,
_StrategyType.DDP,
_StrategyType.DDP_SPAWN,
_StrategyType.DDP_FORK,
_StrategyType.DEEPSPEED,
_StrategyType.DDP_SHARDED,
_StrategyType.DDP_SHARDED_SPAWN,
Expand Down
3 changes: 2 additions & 1 deletion src/pytorch_lightning/profilers/pytorch.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from torch.autograd.profiler import record_function

from pytorch_lightning.profilers.profiler import Profiler
from pytorch_lightning.utilities.device_parser import is_cuda_available
from pytorch_lightning.utilities.exceptions import MisconfigurationException
from pytorch_lightning.utilities.imports import _KINETO_AVAILABLE
from pytorch_lightning.utilities.rank_zero import rank_zero_warn
Expand Down Expand Up @@ -368,7 +369,7 @@ def _default_activities(self) -> List["ProfilerActivity"]:
return activities
if self._profiler_kwargs.get("use_cpu", True):
activities.append(ProfilerActivity.CPU)
if self._profiler_kwargs.get("use_cuda", torch.cuda.is_available()):
if self._profiler_kwargs.get("use_cuda", is_cuda_available()):
activities.append(ProfilerActivity.CUDA)
return activities

Expand Down
30 changes: 18 additions & 12 deletions src/pytorch_lightning/strategies/ddp_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from torch.distributed.constants import default_pg_timeout
from torch.nn import Module
from torch.nn.parallel.distributed import DistributedDataParallel
from typing_extensions import Literal

import pytorch_lightning as pl
from pytorch_lightning.overrides import LightningDistributedModule
Expand Down Expand Up @@ -71,6 +72,7 @@ def __init__(
ddp_comm_wrapper: Optional[callable] = None,
process_group_backend: Optional[str] = None,
timeout: Optional[timedelta] = default_pg_timeout,
start_method: Literal["spawn", "fork", "forkserver"] = "spawn",
awaelchli marked this conversation as resolved.
Show resolved Hide resolved
**kwargs: Any,
):
super().__init__(
Expand All @@ -88,6 +90,7 @@ def __init__(
self._local_rank = 0
self._process_group_backend: Optional[str] = process_group_backend
self._timeout: Optional[timedelta] = timeout
self._start_method = start_method

@property
def num_nodes(self) -> int:
Expand Down Expand Up @@ -124,7 +127,7 @@ def process_group_backend(self) -> Optional[str]:
return self._process_group_backend

def _configure_launcher(self):
self._launcher = _SpawnLauncher(self)
self._launcher = _SpawnLauncher(self, start_method=self._start_method)

def setup(self, trainer: "pl.Trainer") -> None:
os.environ["MASTER_PORT"] = str(self.cluster_environment.main_port)
Expand Down Expand Up @@ -280,17 +283,20 @@ def post_training_step(self):

@classmethod
def register_strategies(cls, strategy_registry: Dict) -> None:
strategy_registry.register(
"ddp_spawn_find_unused_parameters_false",
cls,
description="DDPSpawn Strategy with `find_unused_parameters` as False",
find_unused_parameters=False,
)
strategy_registry.register(
cls.strategy_name,
cls,
description=f"{cls.__class__.__name__}",
)
for start_method in ("spawn", "fork"):
rohitgr7 marked this conversation as resolved.
Show resolved Hide resolved
strategy_registry.register(
f"ddp_{start_method}_find_unused_parameters_false",
cls,
description=f"DDP {start_method.title()} strategy with `find_unused_parameters` as False",
find_unused_parameters=False,
start_method=start_method,
)
strategy_registry.register(
f"ddp_{start_method}",
cls,
description=f"DDP {start_method.title()} strategy",
start_method=start_method,
)

def teardown(self) -> None:
log.detail(f"{self.__class__.__name__}: tearing down strategy")
Expand Down
30 changes: 22 additions & 8 deletions src/pytorch_lightning/strategies/launchers/spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import torch
import torch.multiprocessing as mp
from torch import Tensor
from typing_extensions import Literal

import pytorch_lightning as pl
from pytorch_lightning.strategies.launchers.base import _Launcher
Expand All @@ -34,27 +35,40 @@ class _SpawnLauncher(_Launcher):
r"""Spawns processes that run a given function in parallel, and joins them all at the end.

The main process in which this launcher is invoked creates N so-called worker processes (using
:func:`torch.multiprocessing.spawn`) that run the given function.
:func:`torch.multiprocessing.start_processes`) that run the given function.
Worker processes have a rank that ranges from 0 to N - 1.

Note:
- This launcher requires all objects to be pickleable.
- It is important that the entry point to the program/script is guarded by ``if __name__ == "__main__"``.
- With start method 'fork' the user must ensure that no CUDA context gets created in the main process before
the launcher is invoked. E.g., one should avoid creating cuda tensors or calling ``torch.cuda.*`` functions
before calling ``Trainer.fit``.

Args:
strategy: A reference to the strategy that is used together with this launcher.
start_method: The method how to start the processes.
- 'spawn': The default start method. Requires all objects to be pickleable.
- 'fork': Preferrable for IPython/Jupyter environments where 'spawn' is not available. Not available on
the Windows platform for example.
- 'forkserver': Alternative implementation to 'fork'.
rohitgr7 marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(self, strategy: Strategy) -> None:
def __init__(self, strategy: Strategy, start_method: Literal["spawn", "fork", "forkserver"] = "spawn") -> None:
self._strategy = strategy
self._start_method = "spawn"
self._start_method = start_method
if start_method not in mp.get_all_start_methods():
raise ValueError(
f"The start method '{self._start_method}' is not available on this platform. Available methods are:"
f" {', '.join(mp.get_all_start_methods())}"
)

@property
def is_interactive_compatible(self) -> bool:
# The start method 'spawn' is currently the only one that works with DDP and CUDA support
# The start method 'fork' is the only one supported in Jupyter environments but not compatible with CUDA
# For more context, see https://github.com/Lightning-AI/lightning/issues/7550
return self._start_method == "fork" and self._strategy.root_device.type != "cuda"
# The start method 'spawn' is not supported in interactive environments
# The start method 'fork' is the only one supported in Jupyter environments, with constraints around CUDA
# initialization. For more context, see https://github.com/Lightning-AI/lightning/issues/7550
return self._start_method == "fork"

def launch(self, function: Callable, *args: Any, trainer: Optional["pl.Trainer"] = None, **kwargs: Any) -> Any:
"""Spawns processes that run the given function in parallel.
Expand All @@ -75,7 +89,7 @@ def launch(self, function: Callable, *args: Any, trainer: Optional["pl.Trainer"]
os.environ["MASTER_PORT"] = str(self._strategy.cluster_environment.main_port)
context = mp.get_context(self._start_method)
return_queue = context.SimpleQueue()
mp.spawn(
mp.start_processes(
self._wrapping_function,
args=(trainer, function, args, kwargs, return_queue),
nprocs=self._strategy.num_processes,
Expand Down
3 changes: 1 addition & 2 deletions src/pytorch_lightning/strategies/launchers/xla_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ class _XLASpawnLauncher(_SpawnLauncher):
"""

def __init__(self, strategy: "Strategy") -> None:
super().__init__(strategy)
self._start_method = "fork"
carmocca marked this conversation as resolved.
Show resolved Hide resolved
super().__init__(strategy=strategy, start_method="fork")

@property
def is_interactive_compatible(self) -> bool:
Expand Down
3 changes: 1 addition & 2 deletions src/pytorch_lightning/strategies/tpu_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ def __init__(
cluster_environment=XLAEnvironment(),
checkpoint_io=checkpoint_io,
precision_plugin=precision_plugin,
start_method="fork",
)
self.debug = debug
self.start_method = "fork"

@property
def root_device(self) -> torch.device:
Expand Down Expand Up @@ -113,7 +113,6 @@ def _configure_launcher(self):
self._launcher = _XLASpawnLauncher(self)

def setup(self, trainer: "pl.Trainer") -> None:
self.start_method = "fork"
self.accelerator.setup(trainer)

if self.debug:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def _check_config_and_set_final_flags(
if strategy == "ddp_cpu":
raise MisconfigurationException(
"`Trainer(strategy='ddp_cpu')` is not a valid strategy,"
" you can use `Trainer(strategy='ddp'|'ddp_spawn', accelerator='cpu')` instead."
" you can use `Trainer(strategy='ddp'|'ddp_spawn'|'ddp_fork', accelerator='cpu')` instead."
)
if strategy == "tpu_spawn":
raise MisconfigurationException(
Expand Down Expand Up @@ -496,7 +496,7 @@ def _choose_accelerator(self) -> str:
return "hpu"
if MPSAccelerator.is_available():
return "mps"
if torch.cuda.is_available() and torch.cuda.device_count() > 0:
if CUDAAccelerator.is_available():
return "cuda"
return "cpu"

Expand Down Expand Up @@ -614,7 +614,14 @@ def _check_strategy_and_fallback(self) -> None:
f"You selected strategy to be `{DDPFullyShardedNativeStrategy.strategy_name}`, "
"but GPU accelerator is not used."
)

if (
strategy_flag in ("ddp_fork", "ddp_fork_find_unused_parameters_false")
and "fork" not in torch.multiprocessing.get_all_start_methods()
):
raise ValueError(
f"You selected `Trainer(strategy='{strategy_flag}')` but process forking is not supported on this"
f" platform. We recommed `Trainer(strategy='ddp_spawn')` instead."
)
if strategy_flag:
self._strategy_flag = strategy_flag

Expand Down
2 changes: 1 addition & 1 deletion src/pytorch_lightning/trainer/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1758,7 +1758,7 @@ def _log_device_info(self) -> None:
rank_zero_info(f"HPU available: {_HPU_AVAILABLE}, using: {num_hpus} HPUs")

# TODO: Integrate MPS Accelerator here, once gpu maps to both
if torch.cuda.is_available() and not isinstance(self.accelerator, CUDAAccelerator):
if CUDAAccelerator.is_available() and not isinstance(self.accelerator, CUDAAccelerator):
rank_zero_warn(
"GPU available but not used. Set `accelerator` and `devices` using"
f" `Trainer(accelerator='gpu', devices={CUDAAccelerator.auto_device_count()})`.",
Expand Down
5 changes: 3 additions & 2 deletions src/pytorch_lightning/tuner/auto_gpu_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import torch

from pytorch_lightning.utilities import device_parser
from pytorch_lightning.utilities.exceptions import MisconfigurationException


Expand All @@ -31,7 +32,7 @@ def pick_multiple_gpus(nb: int) -> List[int]:
" Please select a valid number of GPU resources when using auto_select_gpus."
)

num_gpus = torch.cuda.device_count()
num_gpus = device_parser.num_cuda_devices()
if nb > num_gpus:
raise MisconfigurationException(f"You requested {nb} GPUs but your machine only has {num_gpus} GPUs.")
nb = num_gpus if nb == -1 else nb
Expand All @@ -51,7 +52,7 @@ def pick_single_gpu(exclude_gpus: List[int]) -> int:
"""
previously_used_gpus = []
unused_gpus = []
for i in range(torch.cuda.device_count()):
for i in range(device_parser.num_cuda_devices()):
if i in exclude_gpus:
continue

Expand Down
Loading