Skip to content

Commit

Permalink
Merge branch 'master' into refactor/dumplicate-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Borda authored Dec 11, 2020
2 parents ba529f8 + 7755572 commit 648a891
Show file tree
Hide file tree
Showing 22 changed files with 223 additions and 104 deletions.
36 changes: 36 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,42 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).


## [unreleased.Features] - YYYY-MM-DD

### Added


### Changed


### Deprecated


### Removed


### Fixed



## [unreleased.BugFix] - YYYY-MM-DD

### Added


### Changed


### Deprecated


### Removed


### Fixed



## [1.1.0] - 2020-12-09

### Added
Expand Down
4 changes: 2 additions & 2 deletions docs/source/multi_gpu.rst
Original file line number Diff line number Diff line change
Expand Up @@ -593,9 +593,9 @@ Below are the possible configurations we support.

Implement Your Own Distributed (DDP) training
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If you need your own way to init PyTorch DDP you can override :meth:`pytorch_lightning.core.LightningModule.`.
If you need your own way to init PyTorch DDP you can override :meth:`pytorch_lightning.plugins.ddp_plugin.DDPPlugin.init_ddp_connection`.

If you also need to use your own DDP implementation, override: :meth:`pytorch_lightning.core.LightningModule.configure_ddp`.
If you also need to use your own DDP implementation, override: :meth:`pytorch_lightning.plugins.ddp_plugin.DDPPlugin.configure_ddp`.


----------
Expand Down
2 changes: 1 addition & 1 deletion docs/source/weights_loading.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ You can customize the checkpointing behavior to monitor any quantity of your tra
1. Calculate any metric or other quantity you wish to monitor, such as validation loss.
2. Log the quantity using :func:`~~pytorch_lightning.core.lightning.LightningModule.log` method, with a key such as `val_loss`.
3. Initializing the :class:`~pytorch_lightning.callbacks.ModelCheckpoint` callback, and set `monitor` to be the key of your quantity.
4. Pass the callback to `checkpoint_callback` :class:`~pytorch_lightning.trainer.Trainer` flag.
4. Pass the callback to the `callbacks` :class:`~pytorch_lightning.trainer.Trainer` flag.

.. code-block:: python
Expand Down
8 changes: 6 additions & 2 deletions pytorch_lightning/accelerators/accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from contextlib import contextmanager
from enum import Enum
from typing import Any, Optional, Union

import torch
import torch.distributed as torch_distrib
from torch.optim import Optimizer

from pytorch_lightning.cluster_environments import ClusterEnvironment
from pytorch_lightning.core.lightning import LightningModule
from pytorch_lightning.plugins.ddp_plugin import DDPPlugin
from pytorch_lightning.plugins.rpc_plugin import RPCPlugin
from pytorch_lightning.utilities.apply_func import move_data_to_device
from pytorch_lightning.utilities.parsing import AttributeDict
Expand All @@ -33,7 +34,10 @@ class ReduceOp:

class Accelerator(object):

def __init__(self, trainer=None, cluster_environment=None, ddp_plugin=None):
def __init__(self,
trainer: Optional = None,
cluster_environment: Optional[ClusterEnvironment] = None,
ddp_plugin: Optional[DDPPlugin] = None):
self.trainer = trainer
self.nickname = None
self.cluster_environment = cluster_environment
Expand Down
28 changes: 11 additions & 17 deletions pytorch_lightning/accelerators/cpu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,19 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Optional, Union
from typing import Any, Optional, Union, Callable

import torch

from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp
from pytorch_lightning.cluster_environments import ClusterEnvironment
from pytorch_lightning.utilities import AMPType, rank_zero_warn
from pytorch_lightning.utilities.exceptions import MisconfigurationException


class CPUAccelerator(Accelerator):

def __init__(self, trainer, cluster_environment=None):
def __init__(self, trainer, cluster_environment: Optional[ClusterEnvironment] = None):
"""
Runs training on CPU
Expand Down Expand Up @@ -61,29 +62,22 @@ def train(self):
results = self.train_or_test()
return results

def training_step(self, args):
def _step(self, model_step: Callable, args):
if self.trainer.amp_backend == AMPType.NATIVE:
with torch.cuda.amp.autocast():
output = self.trainer.model.training_step(*args)
output = model_step(*args)
else:
output = self.trainer.model.training_step(*args)
output = model_step(*args)
return output

def training_step(self, args):
return self._step(self.trainer.model.training_step, args)

def validation_step(self, args):
if self.trainer.amp_backend == AMPType.NATIVE:
with torch.cuda.amp.autocast():
output = self.trainer.model.validation_step(*args)
else:
output = self.trainer.model.validation_step(*args)
return output
return self._step(self.trainer.model.validation_step, args)

def test_step(self, args):
if self.trainer.amp_backend == AMPType.NATIVE:
with torch.cuda.amp.autocast():
output = self.trainer.model.test_step(*args)
else:
output = self.trainer.model.test_step(*args)
return output
return self._step(self.trainer.model.test_step, args)

def sync_tensor(self,
tensor: Union[torch.Tensor],
Expand Down
9 changes: 7 additions & 2 deletions pytorch_lightning/accelerators/ddp2_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

from pytorch_lightning import _logger as log
from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp
from pytorch_lightning.cluster_environments import ClusterEnvironment
from pytorch_lightning.core.lightning import LightningModule
from pytorch_lightning.core.step_result import Result
from pytorch_lightning.distributed.dist import LightningDistributed
from pytorch_lightning.plugins.ddp_plugin import DDPPlugin
from pytorch_lightning.plugins.rpc_plugin import RPCPlugin
from pytorch_lightning.utilities import HYDRA_AVAILABLE, AMPType
from pytorch_lightning.utilities.distributed import rank_zero_only, sync_ddp_if_available, all_gather_ddp_if_available
from pytorch_lightning.utilities.distributed import all_gather_ddp_if_available, rank_zero_only, sync_ddp_if_available

if HYDRA_AVAILABLE:
from hydra.core.hydra_config import HydraConfig
Expand All @@ -34,7 +36,10 @@

class DDP2Accelerator(Accelerator):

def __init__(self, trainer, cluster_environment=None, ddp_plugin=None):
def __init__(self,
trainer,
cluster_environment: Optional[ClusterEnvironment] = None,
ddp_plugin: Optional[DDPPlugin] = None):
"""
Runs training using DDP2 strategy on a cluster
Expand Down
15 changes: 12 additions & 3 deletions pytorch_lightning/accelerators/ddp_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,18 @@

from pytorch_lightning import _logger as log
from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp
from pytorch_lightning.cluster_environments import ClusterEnvironment
from pytorch_lightning.core.lightning import LightningModule
from pytorch_lightning.distributed.dist import LightningDistributed
from pytorch_lightning.plugins.ddp_plugin import DDPPlugin
from pytorch_lightning.plugins.rpc_plugin import RPCPlugin
from pytorch_lightning.utilities import HYDRA_AVAILABLE, AMPType
from pytorch_lightning.utilities.distributed import all_gather_ddp_if_available
from pytorch_lightning.utilities.distributed import find_free_network_port, rank_zero_only, sync_ddp_if_available
from pytorch_lightning.utilities.distributed import (
all_gather_ddp_if_available,
find_free_network_port,
rank_zero_only,
sync_ddp_if_available,
)
from pytorch_lightning.utilities.exceptions import MisconfigurationException
from pytorch_lightning.utilities.seed import seed_everything

Expand All @@ -41,7 +47,10 @@

class DDPAccelerator(Accelerator):

def __init__(self, trainer=None, cluster_environment=None, ddp_plugin=None):
def __init__(self,
trainer: Optional = None,
cluster_environment: Optional[ClusterEnvironment] = None,
ddp_plugin: Optional[DDPPlugin] = None):
"""
Runs training using DDP strategy on a single machine (manually, not via cluster start)
Expand Down
12 changes: 10 additions & 2 deletions pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
from typing import Optional

from pytorch_lightning import _logger as log
from pytorch_lightning.accelerators.ddp_hpc_accelerator import DDPHPCAccelerator
from pytorch_lightning.cluster_environments import ClusterEnvironment
from pytorch_lightning.plugins.ddp_plugin import DDPPlugin
from pytorch_lightning.utilities import HYDRA_AVAILABLE

if HYDRA_AVAILABLE:
from hydra.utils import to_absolute_path, get_original_cwd
from hydra.core.hydra_config import HydraConfig
from hydra.utils import get_original_cwd, to_absolute_path


class DDPCPUHPCAccelerator(DDPHPCAccelerator):

def __init__(self, trainer, cluster_environment=None, ddp_plugin=None):
def __init__(self,
trainer,
cluster_environment: Optional[ClusterEnvironment] = None,
ddp_plugin: Optional[DDPPlugin] = None):
"""
Runs training using DDP (with CPUs) strategy on a cluster
Expand Down
15 changes: 10 additions & 5 deletions pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,23 @@

import torch
import torch.distributed as torch_distrib
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel

from pytorch_lightning import _logger as log
from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp
from pytorch_lightning.cluster_environments import ClusterEnvironment
from pytorch_lightning.core.lightning import LightningModule
from pytorch_lightning.distributed.dist import LightningDistributed
from pytorch_lightning.plugins.ddp_plugin import DDPPlugin
from pytorch_lightning.plugins.rpc_plugin import RPCPlugin
from pytorch_lightning.utilities import HYDRA_AVAILABLE, AMPType
from pytorch_lightning.utilities.distributed import (
all_gather_ddp_if_available,
find_free_network_port,
rank_zero_only,
rank_zero_warn,
sync_ddp_if_available,
all_gather_ddp_if_available,
)

if HYDRA_AVAILABLE:
Expand All @@ -41,7 +42,11 @@

class DDPCPUSpawnAccelerator(Accelerator):

def __init__(self, trainer, nprocs, cluster_environment=None, ddp_plugin=None):
def __init__(self,
trainer,
nprocs: int,
cluster_environment: Optional[ClusterEnvironment] = None,
ddp_plugin: Optional[DDPPlugin] = None):
"""
Runs training using DDP (on a single machine or manually on multiple machines), using mp.spawn
Expand Down Expand Up @@ -197,8 +202,8 @@ def broadcast(self, obj, src=0):

def early_stopping_should_stop(self, pl_module):
stop = torch.tensor(int(self.trainer.should_stop), device=pl_module.device)
dist.all_reduce(stop, op=dist.reduce_op.SUM)
dist.barrier()
torch_distrib.all_reduce(stop, op=torch_distrib.reduce_op.SUM)
torch_distrib.barrier()
should_stop = stop == self.trainer.world_size
return should_stop

Expand Down
9 changes: 7 additions & 2 deletions pytorch_lightning/accelerators/ddp_hpc_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

from pytorch_lightning import _logger as log
from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp
from pytorch_lightning.cluster_environments import ClusterEnvironment
from pytorch_lightning.core.lightning import LightningModule
from pytorch_lightning.distributed.dist import LightningDistributed
from pytorch_lightning.plugins.ddp_plugin import DDPPlugin
from pytorch_lightning.plugins.rpc_plugin import RPCPlugin
from pytorch_lightning.utilities import HYDRA_AVAILABLE, AMPType
from pytorch_lightning.utilities.distributed import rank_zero_only, sync_ddp_if_available, all_gather_ddp_if_available
from pytorch_lightning.utilities.distributed import all_gather_ddp_if_available, rank_zero_only, sync_ddp_if_available

if HYDRA_AVAILABLE:
from hydra.core.hydra_config import HydraConfig
Expand All @@ -34,7 +36,10 @@

class DDPHPCAccelerator(Accelerator):

def __init__(self, trainer, cluster_environment=None, ddp_plugin=None):
def __init__(self,
trainer,
cluster_environment: Optional[ClusterEnvironment] = None,
ddp_plugin: Optional[DDPPlugin] = None):
"""
Runs training using DDP on an HPC cluster
Expand Down
15 changes: 10 additions & 5 deletions pytorch_lightning/accelerators/ddp_spawn_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,25 @@

import torch
import torch.distributed as torch_distrib
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel

from pytorch_lightning import _logger as log
from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp
from pytorch_lightning.cluster_environments import ClusterEnvironment
from pytorch_lightning.core.lightning import LightningModule
from pytorch_lightning.distributed import LightningDistributed
from pytorch_lightning.plugins.ddp_plugin import DDPPlugin
from pytorch_lightning.plugins.rpc_plugin import RPCPlugin
from pytorch_lightning.utilities import HYDRA_AVAILABLE, AMPType
from pytorch_lightning.utilities.cloud_io import atomic_save
from pytorch_lightning.utilities.cloud_io import load as pl_load
from pytorch_lightning.utilities.distributed import (
all_gather_ddp_if_available,
find_free_network_port,
rank_zero_only,
rank_zero_warn,
sync_ddp_if_available,
all_gather_ddp_if_available,
)
from pytorch_lightning.utilities.seed import seed_everything

Expand All @@ -45,7 +46,11 @@

class DDPSpawnAccelerator(Accelerator):

def __init__(self, trainer, nprocs, cluster_environment=None, ddp_plugin=None):
def __init__(self,
trainer,
nprocs: int,
cluster_environment: Optional[ClusterEnvironment] = None,
ddp_plugin: Optional[DDPPlugin] = None):
"""
Runs training using DDP using mp.spawn via manual launch (not cluster launch)
Expand Down Expand Up @@ -226,8 +231,8 @@ def barrier(self, name: Optional[str] = None):

def early_stopping_should_stop(self, pl_module):
stop = torch.tensor(int(self.trainer.should_stop), device=pl_module.device)
dist.all_reduce(stop, op=dist.reduce_op.SUM)
dist.barrier()
torch_distrib.all_reduce(stop, op=torch_distrib.reduce_op.SUM)
torch_distrib.barrier()
should_stop = stop == self.trainer.world_size
return should_stop

Expand Down
Loading

0 comments on commit 648a891

Please sign in to comment.