Skip to content

Commit

Permalink
Merge branch 'master' into refactor/running_stage
Browse files Browse the repository at this point in the history
  • Loading branch information
awaelchli committed Feb 20, 2021
2 parents cbdf2a8 + 97a81c3 commit ce6b96e
Show file tree
Hide file tree
Showing 15 changed files with 182 additions and 44 deletions.
25 changes: 25 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,31 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).


## [UnReleased] - 2021-MM-DD

### Added


### Changed


### Deprecated


### Removed


### Fixed

- Fixed incorrect yield logic for the amp autocast context manager ([#6080](https://github.com/PyTorchLightning/pytorch-lightning/pull/6080))


- Made the `Plugin.reduce` method more consistent across all Plugins to reflect a mean-reduction by default ([#6011](https://github.com/PyTorchLightning/pytorch-lightning/pull/6011)


- Fixed priority of plugin/accelerator when setting distributed mode ([#6089](https://github.com/PyTorchLightning/pytorch-lightning/pull/6089))


## [1.2.0] - 2021-02-18

### Added
Expand Down
9 changes: 5 additions & 4 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ jobs:
pip list
displayName: 'Install dependencies'
- script: |
- bash: |
python tests/collect_env_details.py
python -c "import torch ; mgpu = torch.cuda.device_count() ; assert mgpu >= 2, f'GPU: {mgpu}'"
displayName: 'Env details'
- bash: |
Expand All @@ -76,7 +77,7 @@ jobs:
ls -l legacy/checkpoints/
displayName: 'Get legacy checkpoints'
- script: |
- bash: |
python -m coverage run --source pytorch_lightning -m pytest pytorch_lightning tests -v --durations=50
displayName: 'Testing: standard'
Expand All @@ -90,11 +91,11 @@ jobs:
codecov --token=$(CODECOV_TOKEN) --flags=gpu,pytest --name="GPU-coverage" --env=linux,azure
displayName: 'Statistics'
- script: |
- bash: |
python -m pytest benchmarks pl_examples -v --maxfail=2 --durations=0
displayName: 'Testing: extended'
- script: |
- bash: |
python setup.py install --user --quiet
bash pl_examples/run_ddp-example.sh
pip uninstall -y pytorch-lightning
Expand Down
2 changes: 1 addition & 1 deletion pytorch_lightning/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time

_this_year = time.strftime("%Y")
__version__ = '1.2.0'
__version__ = '1.3.0dev'
__author__ = 'William Falcon et al.'
__author_email__ = 'waf2107@columbia.edu'
__license__ = 'Apache-2.0'
Expand Down
3 changes: 2 additions & 1 deletion pytorch_lightning/plugins/precision/native_amp.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,5 @@ def post_optimizer_step(self, optimizer: Optimizer, optimizer_idx: int) -> None:
@contextmanager
def train_step_context(self) -> Generator[autocast, None, None]:
"""Enable autocast context"""
yield torch.cuda.amp.autocast()
with torch.cuda.amp.autocast():
yield
20 changes: 16 additions & 4 deletions pytorch_lightning/plugins/training_type/ddp.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,22 @@ def model_to_device(self):
torch.cuda.set_device(self.root_device)
self.model.to(self.root_device)

def reduce(self, output, group: Optional[Any] = None, reduce_op: Optional[Union[ReduceOp, str]] = None):
if isinstance(output, torch.Tensor):
output = sync_ddp_if_available(output, group, reduce_op)
return output
def reduce(self, tensor, group: Optional[Any] = None, reduce_op: Optional[Union[ReduceOp, str]] = "mean"):
"""
Reduces a tensor from several distributed processes to one aggregated tensor.
Args:
tensor: the tensor to sync and reduce
group: the process group to gather results from. Defaults to all processes (world)
reduce_op: the reduction operation. Defaults to 'mean'/'avg'.
Can also be a string 'sum' to calculate the sum during reduction.
Return:
reduced value, except when the input was not a tensor the output remains is unchanged
"""
if isinstance(tensor, torch.Tensor):
tensor = sync_ddp_if_available(tensor, group, reduce_op=(reduce_op or "mean"))
return tensor

def training_step(self, *args, **kwargs):
return self.model(*args, **kwargs)
Expand Down
24 changes: 18 additions & 6 deletions pytorch_lightning/plugins/training_type/ddp2.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,26 @@ def setup(self, model):
self.task_idx = self.cluster_environment.local_rank()
# the difference to DDP is that we don't call children processes here

def reduce(self, output, *args, **kwargs):
if isinstance(output, Result):
output.dp_reduce()
def reduce(self, tensor, *args, **kwargs):
"""
Reduces a tensor from all processes to one aggregated tensor.
In DDP2, the reduction here is only across local devices within the node.
elif isinstance(output, torch.Tensor):
output = output.mean()
Args:
tensor: the tensor to sync and reduce
*args: ignored for DDP2
**kwargs: ignored for DDP2
return output
Return:
reduced value, except when the input was not a tensor the output remains is unchanged
"""
if isinstance(tensor, Result):
tensor.dp_reduce()

elif isinstance(tensor, torch.Tensor):
tensor = tensor.mean()

return tensor

@property
def root_device(self):
Expand Down
20 changes: 16 additions & 4 deletions pytorch_lightning/plugins/training_type/ddp_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,22 @@ def pre_backward(self, closure_loss: torch.Tensor, should_accumulate: bool, opti
if not self.lightning_module.automatic_optimization and self.model.require_backward_grad_sync:
prepare_for_backward(self.model, closure_loss)

def reduce(self, output, group: Optional[Any] = None, reduce_op: Optional[Union[ReduceOp, str]] = None):
if isinstance(output, torch.Tensor):
output = sync_ddp_if_available(output, group, reduce_op)
return output
def reduce(self, tensor, group: Optional[Any] = None, reduce_op: Optional[Union[ReduceOp, str]] = "mean"):
"""
Reduces a tensor from several distributed processes to one aggregated tensor.
Args:
tensor: the tensor to sync and reduce
group: the process group to gather results from. Defaults to all processes (world)
reduce_op: the reduction operation. Defaults to 'mean'/'avg'.
Can also be a string 'sum' to calculate the sum during reduction.
Return:
reduced value, except when the input was not a tensor the output remains is unchanged
"""
if isinstance(tensor, torch.Tensor):
tensor = sync_ddp_if_available(tensor, group, reduce_op=(reduce_op or "mean"))
return tensor

def training_step(self, *args, **kwargs):
return self.model(*args, **kwargs)
Expand Down
23 changes: 17 additions & 6 deletions pytorch_lightning/plugins/training_type/dp.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,25 @@ def setup(self, model):
model.to(self.root_device)
self._model = DataParallel(LightningParallelModule(model), self.parallel_devices)

def reduce(self, output, *args, **kwargs):
if isinstance(output, Result):
output.dp_reduce()
def reduce(self, tensor, *args, **kwargs):
"""
Reduces a tensor from all parallel processes to one aggregated tensor.
elif isinstance(output, torch.Tensor):
output = output.mean()
Args:
tensor: the tensor to sync and reduce
*args: ignored for DP
**kwargs: ignored for DP
return output
Return:
reduced value, except when the input was not a tensor the output remains is unchanged
"""
if isinstance(tensor, Result):
tensor.dp_reduce()

elif isinstance(tensor, torch.Tensor):
tensor = tensor.mean()

return tensor

@property
def root_device(self):
Expand Down
22 changes: 17 additions & 5 deletions pytorch_lightning/plugins/training_type/horovod.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,23 +127,35 @@ def model_to_device(self):
torch.cuda.set_device(self.root_device)
self.model.to(self.root_device)

def reduce(self, output, group: Optional[Any] = None, reduce_op: Optional[Union[ReduceOp, str]] = None):
def reduce(self, tensor, group: Optional[Any] = None, reduce_op: Optional[Union[ReduceOp, str]] = "mean"):
"""
Reduces a tensor from several distributed processes to one aggregated tensor.
Args:
tensor: the tensor to sync and reduce
group: the process group to gather results from. Defaults to all processes (world)
reduce_op: the reduction operation. Defaults to 'mean'/'avg'.
Can also be a string 'sum' to calculate the sum during reduction.
Return:
reduced value, except when the input was not a tensor the output remains is unchanged
"""
if group is not None:
raise ValueError(
"Horovod does not support allreduce using a subcommunicator at this time. "
"Unset `group`."
)

if reduce_op is None or reduce_op == "sum":
reduce_op = hvd.Sum
elif isinstance(reduce_op, str) and reduce_op in ("avg", "mean"):
if reduce_op in (None, "avg", "mean"):
reduce_op = hvd.Average
elif reduce_op == "sum":
reduce_op = hvd.Sum
else:
raise ValueError(f"unrecognized `reduce_op`: {reduce_op}")

# sync all processes before reduction
hvd.join()
return hvd.allreduce(output, op=reduce_op)
return hvd.allreduce(tensor, op=reduce_op)

def gather_all_tensors(self, result: Union[torch.Tensor], group: Optional[Any] = None):
if group is not None:
Expand Down
16 changes: 14 additions & 2 deletions pytorch_lightning/plugins/training_type/single_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,20 @@ def on_tpu(self) -> bool:
def on_gpu(self) -> bool:
return self.device.type == "cuda" and torch.cuda.is_available()

def reduce(self, output: Union[Any, torch.Tensor], *args: Any, **kwargs: Any) -> Union[Any, torch.Tensor]:
return output
def reduce(self, tensor: Union[Any, torch.Tensor], *args: Any, **kwargs: Any) -> Union[Any, torch.Tensor]:
"""
Reduces a tensor from several distributed processes to one aggregated tensor.
As this plugin only operates with a single device, the reduction is simply the identity.
Args:
tensor: the tensor to sync and reduce
*args: ignored
**kwargs: ignored
Return:
the unmodified input as reduction is not needed for single process operation
"""
return tensor

@property
def root_device(self) -> torch.device:
Expand Down
11 changes: 9 additions & 2 deletions pytorch_lightning/plugins/training_type/training_type_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,15 @@ def is_global_zero(self) -> bool:
"""Whether the current process is the rank zero process not only on the local node, but for all nodes."""

@abstractmethod
def reduce(self, output: Union[torch.Tensor, Any], *args: Any, **kwargs: Any) -> Union[torch.Tensor, Any]:
"""Reduces the given output (e.g. across GPUs/Processes)"""
def reduce(self, tensor: Union[torch.Tensor, Any], *args: Any, **kwargs: Any) -> Union[torch.Tensor, Any]:
"""
Reduces the given tensor (e.g. across GPUs/processes).
Args:
tensor: the tensor to sync and reduce
*args: plugin-specific positional arguments
**kwargs: plugin-specific keyword arguments
"""

@abstractmethod
def barrier(self, name: Optional[str] = None) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ def handle_given_plugins(

for plug in plugins:
if isinstance(plug, str):
# Reset the distributed type as the user has overridden training type
# via the plugins argument
self._distrib_type = None
self.set_distributed_mode(plug)

elif isinstance(plug, TrainingTypePlugin):
Expand Down Expand Up @@ -196,7 +199,6 @@ def handle_given_plugins(
)

self._training_type_plugin = training_type
self._training_type_plugin = self.training_type_plugin
self._precision_plugin = precision
self._cluster_environment = cluster_environment or self.select_cluster_environment()

Expand Down
24 changes: 23 additions & 1 deletion tests/accelerators/test_accelerator_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@
from pytorch_lightning.accelerators.cpu import CPUAccelerator
from pytorch_lightning.accelerators.gpu import GPUAccelerator
from pytorch_lightning.callbacks import Callback
from pytorch_lightning.plugins import DDP2Plugin, DDPPlugin, DDPSpawnPlugin, PrecisionPlugin, SingleDevicePlugin
from pytorch_lightning.plugins import (
DDP2Plugin,
DDPPlugin,
DDPShardedPlugin,
DDPSpawnPlugin,
PrecisionPlugin,
SingleDevicePlugin,
)
from pytorch_lightning.plugins.environments import ClusterEnvironment, SLURMEnvironment, TorchElasticEnvironment
from tests.helpers.boring_model import BoringModel

Expand Down Expand Up @@ -378,3 +385,18 @@ def on_fit_start(self, trainer, pl_module):

with pytest.raises(SystemExit):
trainer.fit(model)


@pytest.mark.parametrize(
["accelerator", "plugin"],
[('ddp_spawn', 'ddp_sharded'), (None, 'ddp_sharded')],
)
def test_plugin_accelerator_choice(accelerator, plugin):
"""
Ensure that when a plugin and accelerator is passed in, that the plugin takes precedent.
"""
trainer = Trainer(accelerator=accelerator, plugins=plugin, num_processes=2)
assert isinstance(trainer.accelerator.training_type_plugin, DDPShardedPlugin)

trainer = Trainer(plugins=plugin, num_processes=2)
assert isinstance(trainer.accelerator.training_type_plugin, DDPShardedPlugin)
1 change: 1 addition & 0 deletions tests/checkpointing/test_legacy_checkpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"1.1.6",
"1.1.7",
"1.1.8",
"1.2.0",
]
)
def test_resume_legacy_checkpoints(tmpdir, pl_version):
Expand Down
Loading

0 comments on commit ce6b96e

Please sign in to comment.